使用docker快速搭建Flink实时获取MysqlBinLog更新

flink作业

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<hadoop.version>2.7.7</hadoop.version>
<flink.version>1.8.0</flink.version>
<kafka.version>1.1.1</kafka.version>
</properties>

....

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>

测试客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class TestKafkaConnector {
public static void main(String[] args) throws Exception {

// kafka setting
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

// flink stream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dsSrc = env.addSource(
new FlinkKafkaConsumer<>("kafkatest", new SimpleStringSchema(), properties)
);

dsSrc.print();

env.execute("run at " + TestKafkaConnector.class);
}
}

更新数据库发现更新内容如下:

1
1> {"data":[{"name":"lucy","reg_date":"2019-07-12 05:30:59"}],"database":"test","es":1562922545000,"id":3,"isDdl":false,"mysqlType":{"name":"varchar(20)","reg_date":"datetime"},"old":[{"reg_date":"2019-07-12 04:30:59"}],"pkNames":null,"sql":"","sqlType":{"name":12,"reg_date":93},"table":"userinfo","ts":1562922545507,"type":"UPDATE"}

原创技术分享,您的支持将鼓励我继续创作