背景
平时我们在mysql客户端或者使用flink来查询mysql都是属于静态查询,所谓静态查询意思是每次根据语句查询全量数据,例如我们统计用户注册表,库表结果如下1
2
3
4
5CREATE TABLE `userinfo` (
`name` varchar(20) DEFAULT NULL,
`reg_date` datetime DEFAULT NULL,
UNIQUE KEY `userinfo_pk` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
假如目前数据量如下1
2
3
4
5
6+--------+---------------------+
| name | reg_date |
+--------+---------------------+
| roger | 2019-07-11 02:26:04 |
| joe | 2019-07-11 02:26:15 |
+--------+---------------------+
如果统计当前有多少人注册了,可以使用如下语句1
2
3
4
5
6mysql> select count(1) from userinfo;
+----------+
| count(1) |
+----------+
| 5 |
+----------+
假如这时增加一个用户peter,如果对于传统直观方式,我们会再次发起执行count聚合函数来统计,但是这个对于流式处理来说相当于之前处理过的数据又重新处理,一来浪费资源(带宽,计算等),二来影响吞吐,每次都是需要重新处理。
方案1 最土的方法
如果数据量不大,且不在乎资源,只想每次都做静态查询的场景下,可以使用如下Sourcer来解决
jdbc解析器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
public class UserInfoReader extends RichSourceFunction<Tuple2<String, Date>> {
private static final Logger logger = LoggerFactory.getLogger(UserInfoReader.class);
private Connection connection;
private PreparedStatement preparedStatement;
private volatile boolean isRunning = true;
/**
* open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
* @param parameters
* @throws Exception
*/
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=UTC";
String username = "root";
String password = "123456";
//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(url, username, password);
//3.获得执行语句
String sql = "select * from test.userinfo";
//3.获得执行语句
preparedStatement = connection.prepareStatement(sql);
}
public void run(SourceContext<Tuple2<String, Date>> ctx) throws Exception {
Tuple2<String, java.sql.Date> DeviceMap = new Tuple2<>();
try {
while (isRunning) {
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
String name = resultSet.getString(1);
java.sql.Date regDate = resultSet.getDate(2);
if (!(name.isEmpty() || regDate == null)) {
ctx.collect(new Tuple2<>(name, regDate));//发送更新
}
}
logger.info("DeviceMap>>>>>>" + DeviceMap); // 一个批次
Thread.sleep(1000);
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
public void cancel() {
isRunning = false; // 停止监听
}
/**
* 程序执行完毕就可以进行,关闭连接和释放资源的动作了
* @throws Exception
*/
public void close() throws Exception {
//5.关闭连接和释放资源
super.close();
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
}
}
主函数1
2
3
4
5
6
7
8
9public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Date>> dsSrc = env.addSource(new UserInfoReader());
dsSrc.print();
env.execute("run at " + TestStreamMysqlQuery.class);
}
上述代码就是sourcer没隔1秒查询数据,同时将每个数据都输出,这样就没有用到flink的强大的功能。
方案2 业务耦合
简单暴力
基于之前的UserInfo表来说,每次注册用户都通过消息中间件来传输告诉flink,也就是数据埋点,这个不管是通过修改业务代码或者使用数据库的触发器来说,都是需要修改代码或者提供存储过程来实现,缺点可想而知,重!!!
触发器模式
graph LR;
业务 --> 数据库;
数据库 --> 触发器;
触发器 --> 消息中间件;
消息中间件--> Flink;
**数据埋点**
graph LR;
业务 --> 数据库;
业务 --> 异步同步业务;
异步同步业务 --> 消息中间件;
消息中间件--> Flink;
上面的方案不管是性能吞吐,或者一致性的都不应完美
### 退一步,调整表结构
要不修改`userinfo`表来增加自增序列主键,这样通过`Sourcer`来记录对应的游标位置来判断哪些是新数据,这个代码就演示了,这里问题是,业务耦合性重,每个表都需要遵守这个规则,同时存在作业故障恢复机制,不就是通过写文件或者使用flink的state来存储,不过怎么说业务代码支持。
graph LR;
业务 --> 数据库;
数据库 --> Sourcer-Offset;
Sourcer-Offset --> Flink;
变形的kafka,自己维护offset,需要考虑故障问题
## 采用非侵入式处理数据
采用canal读取mysql的binlog来写入到kafka/rmq来解决,具体搭建本机测试环境参考博客里的文章。
graph LR;
业务-->数据库
数据库-->canal
canal-->
数据库 --> Sourcer-Offset;
Sourcer-Offset --> Flink;