背景
- 输入表结构 quote
f2000, f2400, f2401, f2402, f2403, f2404 股票代码 日期 收盘价 开盘价 高价 低价 - 输出表结构 stock_result
id,pub,modify_time 股票代码 日期 更新时间mysql输入源
数据结构pojo StockInfo.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87import java.util.Date;
public class StockInfo {
private String id;
private Date pub;
private Double close;
private Double open;
private Double high;
private Double low;
public StockInfo() {
}
public StockInfo(String id, Date pub, Double close, Double open, Double high, Double low) {
this.id = id;
this.pub = pub;
this.close = close;
this.open = open;
this.high = high;
this.low = low;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Date getPub() {
return pub;
}
public void setPub(Date pub) {
this.pub = pub;
}
public Double getClose() {
return close;
}
public void setClose(Double close) {
this.close = close;
}
public Double getOpen() {
return open;
}
public void setOpen(Double open) {
this.open = open;
}
public Double getHigh() {
return high;
}
public void setHigh(Double high) {
this.high = high;
}
public Double getLow() {
return low;
}
public void setLow(Double low) {
this.low = low;
}
public String toString() {
return "StockInfo{" +
"id='" + id + '\'' +
", pub=" + pub +
", close=" + close +
", open=" + open +
", high=" + high +
", low=" + low +
'}';
}
public int hashCode() {
return getId().hashCode();
}
}
自定义输入源 MysqlReader.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySQLReader extends RichSourceFunction<StockInfo> {
private static final Logger logegr = LoggerFactory.getLogger(MySQLReader.class);
private Connection connection;
private PreparedStatement preparedStatement;
/**
* open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
* @param parameters
* @throws Exception
*/
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/jrtz_hg?useSSL=false&serverTimezone=UTC";
String username = "root";
String password = "123456";
//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(url, username, password);
//3.获得执行语句
String sql = "select f2000, f2400, f2401, f2402, f2403, f2404 from jrtz_hg.quote where f2000 = '200012' limit 20";
preparedStatement = connection.prepareStatement(sql);
}
/**
* DataStream调用一次run()方法用来获取数据
* @param ctx
* @throws Exception
*/
public void run(SourceContext<StockInfo> ctx) throws Exception {
try {
//4.执行查询,封装数据
ResultSet rs = preparedStatement.executeQuery();
while (rs.next()) {
StockInfo info = new StockInfo(
rs.getString("f2000"),
rs.getDate("f2400"),
rs.getDouble("f2401"),
rs.getDouble("f2402"),
rs.getDouble("f2403"),
rs.getDouble("f2404"));
ctx.collect(info);
}
} catch (Exception e) {
e.printStackTrace();
logegr.error(e.getMessage());
}
}
public void cancel() {
}
/**
* 程序执行完毕就可以进行,关闭连接和释放资源的动作了
* @throws Exception
*/
public void close() throws Exception {
//5.关闭连接和释放资源
super.close();
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
}
}
mysql输出源
输出结构
数据结构pojo StockResult.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import java.util.Date;
public class StockResult {
private String id;
private Date pub;
private Date modifyTime;
public StockResult() {
}
public StockResult(String id, Date pub, Date modifyTime) {
this.id = id;
this.pub = pub;
this.modifyTime = modifyTime;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Date getPub() {
return pub;
}
public void setPub(Date pub) {
this.pub = pub;
}
public Date getModifyTime() {
return modifyTime;
}
public void setModifyTime(Date modifyTime) {
this.modifyTime = modifyTime;
}
public String toString() {
return "StockResult{" +
"id='" + id + '\'' +
", pub=" + pub +
", modifyTime=" + modifyTime +
'}';
}
public int hashCode() {
return getId().hashCode();
}
}
扩展RichSinkFunction
自定义输出源 MySQLWriter.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MySQLWriter extends RichSinkFunction<StockResult> {
private static final Logger logegr = LoggerFactory.getLogger(MySQLReader.class);
private Connection connection;
private PreparedStatement preparedStatement;
/**
* open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
* @param parameters
* @throws Exception
*/
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/jrtz_hg?useSSL=false&serverTimezone=UTC";
String username = "root";
String password = "123456";
//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(url, username, password);
//3.获得执行语句
String sql = "replace into stock_result (id, pub, modify_time)values(?,?,?);";
//3.获得执行语句
preparedStatement = connection.prepareStatement(sql);
}
public void close() throws Exception {
super.close();
//5.关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
}
/**
* 每个元素的插入都要调用一次invoke()方法,这里主要进行插入操作
* @param value
* @param context
* @throws Exception
*/
public void invoke(StockResult value, Context context) throws Exception {
try {
//4.组装数据,执行插入操作
preparedStatement.setString(1, value.getId());
preparedStatement.setDate(2, new java.sql.Date(value.getPub().getTime()));
preparedStatement.setDate(3, new java.sql.Date(value.getModifyTime().getTime()));
preparedStatement.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
logegr.error(e.getMessage());
}
}
}
主函数操作
main.java1
2
3
4
5
6
7
8
9StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<StockInfo> source = env.addSource(new MySQLReader());
source.map(value->
new StockResult(value.getId(), value.getPub(), new Date())
).addSink(new MySQLWriter());
env.execute("testTableApi");
逻辑是将StockInfo里的id,pub输出,同时打印更新时间到StockResult表。
maven依赖
1 | <properties> |
执行效果
数据写入到mysql库表里
进阶篇
支持范型数据作为输入跟输出