使用canal来监听mysql binlog

使用cannal来监听mysql binlog

mysql环境准备

开启binlog

修改配置文件my.cnf,修改如下内容

1
2
3
4
5
log_bin=master-bin
log_bin_index=master-bin.index
binlog_do_db=test
binlog-format=ROW
server-id=1

注意binlog_do_db非必须,其余缺省照填即可,具体字段含义或者高级配置,请自行查询官方文档

准备canal账号

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';    
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

这里创建账号canal同时密码也设置一样的,这里权限只要上面的权限足已,不用开全局权限

部署单节点

本机部署

  1. 通过https://github.com/alibaba/canal/releases找个合适版本下载对应的canal.deployer版本
  2. 解压到任意目录
  3. 然后配置配置conf/canal.properties文件,直接打开注释并修改对应的mysql的地址与账号

    1
    2
    3
    4
    anal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8
    canal.manager.jdbc.username=canal
    canal.manager.jdbc.password=canal
    canal.destinations = test
  4. 执行bin/startup.sh如果是windows就执行bat后缀的文件

  5. 观察到logs/canal/canal.log以及logs/test/test/log是否存在the canal server is running now ......表示已经跑起来了

使用docker来部署

  1. 输入如下命令来启动
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    docker run --name canal-server \
    -e canal.auto.scan=false \
    -e canal.destinations=test \
    -e canal.instance.master.address=172.17.0.2:3306 \
    -e canal.instance.dbUsername=canal \
    -e canal.instance.dbPassword=canal \
    -e canal.instance.connectionCharset=UTF-8 \
    -e canal.instance.tsdb.enable=false \
    -e canal.instance.gtidon=false \
    -p 11111:11111 \
    -d canal/canal-server:v1.1.3

里面需要注意的是destinations的名字,类似实例名,以及对应的mysql的host,username,password修改即可,这里镜像用可以选用最新的,目前是1.1.3,具体参数可以通过AdminGuide来了解。

  1. 进入容器,检查日志是否正常 /home/admin/canal-server/logs/canal/canal.log是否存在the canal server is running now ......表示已经跑起来了
  2. 进入容器,检查日志是否正常 /home/admin/canal-server/logs/test/test.log是否存在the canal server is running now ......表示已经跑起来了

客户端验证

pom依赖

1
2
3
4
5
6
<!-- canal-client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>

注意版本要对齐canal-server的版本

client单点实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

public class TestCanalClient {
public static void main(String[] args) {
// 创建链接
String ip = "127.0.0.1";
// String destination = "example";
String destination = "test";
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
destination, "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
// printEntry(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}

System.out.println("empty too many times, exit");
} catch (Exception exp) {
exp.printStackTrace();
System.out.println(exp.getMessage());
} finally {
connector.disconnect();
}
}
}

跑起来后会不停打印empty count这个表示已经连接上canal-server了
后面

接着提交mysql的dml语句,修改下数据并提交,将会出现如下提示

1
2
3
4
5
6
7
empty count : 15
empty count : 16
empty count : 17
message[batchId=4,size=3]
empty count : 1
empty count : 2
empty count : 3

这样就可以表示已经接收到mysql的数据被修改了。

错误

Misconfigured master - server_id was not set

这里是因为mysql没有设置serveri_id参考前面的配置来修改my.cnf文件。

原创技术分享,您的支持将鼓励我继续创作