flink使用mysql作为输入与输出

使用mysql作为flink的输入与输出

背景

  • 输入表结构 quote
    f2000, f2400, f2401, f2402, f2403, f2404 股票代码 日期 收盘价 开盘价 高价 低价
  • 输出表结构 stock_result
    id,pub,modify_time 股票代码 日期 更新时间

    mysql输入源

    数据结构pojo StockInfo.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    import java.util.Date;

    public class StockInfo {
    private String id;
    private Date pub;
    private Double close;
    private Double open;
    private Double high;
    private Double low;

    public StockInfo() {
    }

    public StockInfo(String id, Date pub, Double close, Double open, Double high, Double low) {
    this.id = id;
    this.pub = pub;
    this.close = close;
    this.open = open;
    this.high = high;
    this.low = low;
    }

    public String getId() {
    return id;
    }

    public void setId(String id) {
    this.id = id;
    }

    public Date getPub() {
    return pub;
    }

    public void setPub(Date pub) {
    this.pub = pub;
    }

    public Double getClose() {
    return close;
    }

    public void setClose(Double close) {
    this.close = close;
    }

    public Double getOpen() {
    return open;
    }

    public void setOpen(Double open) {
    this.open = open;
    }

    public Double getHigh() {
    return high;
    }

    public void setHigh(Double high) {
    this.high = high;
    }

    public Double getLow() {
    return low;
    }

    public void setLow(Double low) {
    this.low = low;
    }

    @Override
    public String toString() {
    return "StockInfo{" +
    "id='" + id + '\'' +
    ", pub=" + pub +
    ", close=" + close +
    ", open=" + open +
    ", high=" + high +
    ", low=" + low +
    '}';
    }

    @Override
    public int hashCode() {
    return getId().hashCode();
    }
    }

自定义输入源 MysqlReader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MySQLReader extends RichSourceFunction<StockInfo> {

private static final Logger logegr = LoggerFactory.getLogger(MySQLReader.class);

private Connection connection;
private PreparedStatement preparedStatement;

/**
* open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/jrtz_hg?useSSL=false&serverTimezone=UTC";
String username = "root";
String password = "123456";

//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(url, username, password);
//3.获得执行语句
String sql = "select f2000, f2400, f2401, f2402, f2403, f2404 from jrtz_hg.quote where f2000 = '200012' limit 20";
preparedStatement = connection.prepareStatement(sql);
}

/**
* DataStream调用一次run()方法用来获取数据
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<StockInfo> ctx) throws Exception {
try {
//4.执行查询,封装数据
ResultSet rs = preparedStatement.executeQuery();
while (rs.next()) {
StockInfo info = new StockInfo(
rs.getString("f2000"),
rs.getDate("f2400"),
rs.getDouble("f2401"),
rs.getDouble("f2402"),
rs.getDouble("f2403"),
rs.getDouble("f2404"));
ctx.collect(info);
}
} catch (Exception e) {
e.printStackTrace();
logegr.error(e.getMessage());
}
}

@Override
public void cancel() {

}

/**
* 程序执行完毕就可以进行,关闭连接和释放资源的动作了
* @throws Exception
*/
@Override
public void close() throws Exception {
//5.关闭连接和释放资源
super.close();
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
}
}

mysql输出源

输出结构

数据结构pojo StockResult.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

import java.util.Date;

public class StockResult {
private String id;
private Date pub;
private Date modifyTime;

public StockResult() {
}

public StockResult(String id, Date pub, Date modifyTime) {
this.id = id;
this.pub = pub;
this.modifyTime = modifyTime;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public Date getPub() {
return pub;
}

public void setPub(Date pub) {
this.pub = pub;
}

public Date getModifyTime() {
return modifyTime;
}

public void setModifyTime(Date modifyTime) {
this.modifyTime = modifyTime;
}

@Override
public String toString() {
return "StockResult{" +
"id='" + id + '\'' +
", pub=" + pub +
", modifyTime=" + modifyTime +
'}';
}

@Override
public int hashCode() {
return getId().hashCode();
}
}

扩展RichSinkFunction

自定义输出源 MySQLWriter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class MySQLWriter extends RichSinkFunction<StockResult> {
private static final Logger logegr = LoggerFactory.getLogger(MySQLReader.class);

private Connection connection;
private PreparedStatement preparedStatement;

/**
* open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/jrtz_hg?useSSL=false&serverTimezone=UTC";
String username = "root";
String password = "123456";

//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(url, username, password);
//3.获得执行语句
String sql = "replace into stock_result (id, pub, modify_time)values(?,?,?);";
//3.获得执行语句
preparedStatement = connection.prepareStatement(sql);
}

@Override
public void close() throws Exception {
super.close();
//5.关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
}

/**
* 每个元素的插入都要调用一次invoke()方法,这里主要进行插入操作
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(StockResult value, Context context) throws Exception {
try {
//4.组装数据,执行插入操作
preparedStatement.setString(1, value.getId());
preparedStatement.setDate(2, new java.sql.Date(value.getPub().getTime()));
preparedStatement.setDate(3, new java.sql.Date(value.getModifyTime().getTime()));
preparedStatement.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
logegr.error(e.getMessage());
}
}
}

主函数操作

main.java

1
2
3
4
5
6
7
8
9
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<StockInfo> source = env.addSource(new MySQLReader());

source.map(value->
new StockResult(value.getId(), value.getPub(), new Date())
).addSink(new MySQLWriter());

env.execute("testTableApi");

逻辑是将StockInfo里的id,pub输出,同时打印更新时间到StockResult表。

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<hadoop.version>2.7.7</hadoop.version>
<flink.version>1.8.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

执行效果

数据写入到mysql库表里

进阶篇

支持范型数据作为输入跟输出

原创技术分享,您的支持将鼓励我继续创作