背景
canal默认在1.1.1后默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:kafka跟rocketmq
为了方便操作,这里使用docker-compose来搭建环境
docker搭建kafka环境
kafka依赖zookeeper不管是kafka是要单节点还是集群,都需要依赖他,这里提供一个简单的docker_file来快速搭建kafka跟zookeeper环境
kafka_docekr_compose.yml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24version: '3'
# <!-- docker-compose的兼容版本,可以按实际的docker的大版本号配置 -->
services:
# <!--定义zk层服务-->
zookeeper:
image: zookeeper
container_name: zoo
ports:
- "2181:2181"
# <!--定义Kafka层-->
kafka:
image: wurstmeister/kafka:2.11-1.1.1
container_name: kafka
# <!--优先使用基础镜像,而不是本地Dockerfile的重新构建方式,实际情况可以与build配置项二选一,因此下面一行注释掉-->
# <!-- build: .-->
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_NO: 1
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
通过命令拉起容器1
docker-compose -f kafka_docekr_compose.yml up -d
打开两个登陆到kafka容器窗口,来模拟生产者跟消费者来验证kafka的环境完整性
生产者1
2
3kafka-topics.sh --zookeeper zookeeper:2181 --create --replication-factor 1 --partitions 1 --topic kafkatest # 创建topic
kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatest # 拉起kafka生产者命令行
# 删除topic
消费者1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning # 拉起kafka的消费者
在生产者里输入任何信息查看下消费者窗口是否打印出来
docker搭建跑canal
这里容器需要指定跟kafka通网络下指定--network,不然连接不上,这里已经认为你的mysql已经跑起来,具体配置参考使用canal来监听mysql binlog文章。1
2
3
4
5
6
7
8
9
10
11
12
13
14docker run --name canal-server \
-e canal.destinations=test \
-e canal.instance.master.address=172.18.0.2:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=false \
-e canal.instance.gtidon=false \
-e canal.serverMode=kafka \
-e canal.mq.servers=172.18.0.4:9092 \
-e canal.mq.topic=kafkatest \
-e canal.mq.partition=0 \
--net bigdata_default \
-d canal/canal-server:v1.1.3
把容器跑起来
测试binlog到kafka
- 登陆到kafka容器中打开消费者命令行
1 | kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning |
修改数据库提交更新
查看消费者命令行打印如下内容:
1
2bash-4.4# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning
{"data":[{"name":"lucy","reg_date":"2019-07-12 04:28:59"}],"database":"test","es":1562921873000,"id":1,"isDdl":false,"mysqlType":{"name":"varchar(20)","reg_date":"datetime"},"old":[{"reg_date":"2019-07-12 04:26:59"}],"pkNames":null,"sql":"","sqlType":{"name":12,"reg_date":93},"table":"userinfo","ts":1562921874466,"type":"UPDATE"}
kafka跟canal已经联通了
错误
kakfa错误
Failed to elect leader for partition kafkatest-0 under strategy OfflinePartitionLeaderElectionStrategy
检查下是否连接到zk服务器
Connection to node -1 could not be established. Broker may not be available.
检查下kafka运行配置参数KAFKA_ADVERTISED_HOST_NAME与KAFKA_ADVERTISED_LISTENERS是否正确,如果是本地测试搭建的单机模式,可以使用127.0.0.1;如果是使用容器,就要改成kafka在容器网络中的ip,例如改成成172.18.0.x等,如果是远程服务器,就改成对应服务器ip