【Canal从入门到放弃】(一)Canal 基础使用 | Eddie'Blog
【Canal从入门到放弃】(一)Canal 基础使用

【Canal从入门到放弃】(一)Canal 基础使用

eddie 684 2022-07-12

目录

介绍 Canal 基础使用, TCP模式与Kafka模式的

版本说明

  • canal-1.1.2
  • MySQL:5.7
  • Zookeeper:3.4.6
  • Kafka:2.13-2.8.1
  • kafka Manager:latest

前置条件

1、MySQL 开启 Binlog

my.cnf 指定 开启 Binglog 的库, 配置好需要重启

[client]
port = 3306
socket = /var/lib/mysql/data/mysql.sock
[mysqld]
 # 针对5.7版本执行group by字句出错问题解决
sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'
 # 一般配置选项
basedir = /var/lib/mysql
datadir = /var/lib/mysql/data
port = 3306
socket = /var/lib/mysql/data/mysql.sock
lc-messages-dir = /usr/share/mysql # 务必配置此项,否则执行sql出错时,只能显示错误代码而不显示具体错误消息
character-set-server=utf8mb4
back_log = 300
max_connections = 3000
max_connect_errors = 50
table_open_cache = 4096
max_allowed_packet = 32M
#binlog_cache_size = 4M
max_heap_table_size = 128M
read_rnd_buffer_size = 16M
sort_buffer_size = 16M
join_buffer_size = 16M
thread_cache_size = 16
query_cache_size = 64M
query_cache_limit = 4M
ft_min_word_len = 8
thread_stack = 512K
#tx_isolation = READ-COMMITTED
tmp_table_size = 64M 
#log-bin=mysql-bin
long_query_time = 6 
server_id=1
innodb_buffer_pool_size = 1024M 
innodb_thread_concurrency = 16
innodb_log_buffer_size = 16M
wait_timeout= 31536000
interactive_timeout= 31536000
lower_case_table_names = 1
log-bin=mysql-bin # 开启 binlog
binlog_format=row # 选择 ROW 模式
# 指定需要设置binlog的库名称,比如 test 库
binlog-do-db=test

2、 查看是否开启 binlog

mysql> show variables like 'log_%';
+----------------------------------------+-------------------------------------+
| Variable_name                          | Value                               |
+----------------------------------------+-------------------------------------+
| log_bin                                | ON                                  |
| log_bin_basename                       | /var/lib/mysql/data/mysql-bin       |
| log_bin_index                          | /var/lib/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators        | OFF                                 |
| log_bin_use_v1_row_events              | OFF                                 |
| log_builtin_as_identified_by_password  | OFF                                 |
| log_error                              | stderr                              |
| log_error_verbosity                    | 3                                   |
| log_output                             | FILE                                |
| log_queries_not_using_indexes          | OFF                                 |
| log_slave_updates                      | OFF                                 |
| log_slow_admin_statements              | OFF                                 |
| log_slow_slave_statements              | OFF                                 |
| log_statements_unsafe_for_binlog       | ON                                  |
| log_syslog                             | OFF                                 |
| log_syslog_facility                    | daemon                              |
| log_syslog_include_pid                 | ON                                  |
| log_syslog_tag                         |                                     |
| log_throttle_queries_not_using_indexes | 0                                   |
| log_timestamps                         | UTC                                 |
| log_warnings                           | 2                                   |
+----------------------------------------+-------------------------------------+
21 rows in set (0.12 sec)

3、创建 Mysql 账户与授权

canal 用户只需要查的权限,其余增删改不需要

CREATE USER `canal`@`%` IDENTIFIED BY 'canal'
GRANT Replication Client, Replication Slave, Select ON *.* TO `canal`@`%`

Canal 下载与启动

1、Github 网址下载

canal.deployer-1.1.2.tar.gz

2、 创建存放目录并解压

mkdir -p /opt/module/canal  
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal

3、修改 example 下的配置文件 (默认:TCP)

cd /opt/module/canal/
vim conf/example/instance.properties 

# 不能与mysql service-id 一样
canal.instance.mysql.slaveId=20

# 对应的 mysql 地址
canal.instance.master.address=120.79.196.173:3306

# 刚刚创建的mysql用户名,可以直接修改为root
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

TCP 连接方式 (Java编码方式)

pom.xml

<dependencies>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.1</version>
</dependency>
</dependencies>

Main方法

  package org.example;

  import com.alibaba.fastjson.JSONObject;
  import com.alibaba.otter.canal.client.CanalConnector;
  import com.alibaba.otter.canal.client.CanalConnectors;
  import com.alibaba.otter.canal.protocol.CanalEntry;
  import com.alibaba.otter.canal.protocol.CanalEntry.Column;
  import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
  import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
  import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
  import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
  import com.alibaba.otter.canal.protocol.Message;
  import com.google.protobuf.ByteString;
  import com.google.protobuf.InvalidProtocolBufferException;
  import java.net.InetSocketAddress;
  import java.util.List;

  /**
   * @author eddie.lee
   * @description
   */
  public class CanalClient {

    public static void main(String[] args)
        throws InterruptedException, InvalidProtocolBufferException {
      // 获取连接
      CanalConnector connector = CanalConnectors.newSingleConnector(
          new InetSocketAddress("127.0.0.1", 11111), "example", "", "");

      while (true) {
        // 连接
        connector.connect();
        
        // 订阅数据库的表
        connector.subscribe("test\\..*");

        // 获取数据 (单次拉取100条,并不会阻塞)
        Message message = connector.get(100);

        // 获取Entry集合
        List<Entry> entries = message.getEntries();

        // 判断集合是否为空, 如果为空,则等待一会继续拉取数据
        if (entries.size() <= 0) {
          System.out.println("当前没有数据,休息一会。。。");
          Thread.sleep(1000);
        } else {

          // 遍历 entries 单条解析
          for (CanalEntry.Entry entry : entries) {
            // 1. 获取表名
            String tableName = entry.getHeader().getTableName();

            // 2. 获取类型
            EntryType entryType = entry.getEntryType();

            // 3. 获取序列化后的数据
            ByteString storeValue = entry.getStoreValue();

            // 4. 判断当前entryType类型是否为ROWDATA
            if (EntryType.ROWDATA.equals(entryType)) {

              // 5. 反序列化数据
              RowChange rowChange = RowChange.parseFrom(storeValue);

              // 6. 获取当前时间的操作类型
              EventType eventType = rowChange.getEventType();

              // 7. 获取数据及
              List<RowData> rowDatasList = rowChange.getRowDatasList();

              // 8. 遍历 rowDatasList 并且打印
              for (RowData rowData : rowDatasList) {

                // 9. 打印更新前数据
                JSONObject beforeData = new JSONObject();
                List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
                for (Column column : beforeColumnsList) {
                  beforeData.put(column.getName(), column.getValue());
                }

                // 10. 打印更新后数据
                JSONObject afterData = new JSONObject();
                List<Column> afterColumnsList = rowData.getAfterColumnsList();
                for (Column column : afterColumnsList) {
                  afterData.put(column.getName(), column.getValue());
                }

                System.out.println("Table:" + tableName +
                    ", EventType: " + entryType +
                    ", Before: " + beforeData +
                    ", After: " + afterData);
              }
            } else {
              System.out.println("当前操作类型为:" + entryType);
            }
          }
        }
      }
    }
  }

运行Main方法,一直挂着,不要关!!!

启动 Canal 服务端

[root@VM-0-3-centos canal]# pwd
/opt/module/canal
[root@VM-0-3-centos canal]# bin/startup.sh
[root@VM-0-3-centos canal]# jps
20745 CanalLauncher
20761 Jps

DB.Tbale 新增修改数据

新增一条SQL

INSERT INTO user_info VALUES ('1002','lisi','female')

用工具也行,不一定要语句的

IDEA控制台打印信息:

Table:user_info, EventType: ROWDATA, Before: {"sex":"female","id":"1002","NAME":"abc"}, After: {"sex":"female","id":"1002","NAME":"lisi"}
当前操作类型为:TRANSACTIONEND
当前没有数据,休息一会。。。
当前没有数据,休息一会。。。

更新一条SQL

UPDATE `test`.`user_info` SET `NAME` = 'wangwu' WHERE `id` = '1002' AND `NAME` = 'lisi' AND `sex` = 'female' LIMIT 1

IDEA控制台打印信息:

当前没有数据,休息一会。。。
当前操作类型为:TRANSACTIONBEGIN
Table:user_info, EventType: ROWDATA, Before: {"sex":"female","id":"1002","NAME":"lisi"}, After: {"sex":"female","id":"1002","NAME":"wangwu"}
当前操作类型为:TRANSACTIONEND
当前没有数据,休息一会。。。
当前没有数据,休息一会。。。

Kafka 连接方式 (非编码方式)

修改配置文件

修改:canal.properties 文件

[root@VM-0-3-centos canal]# vim conf/canal.properties 


# TCP 修改为 kafak
# tcp, kafka, RocketMQ
#canal.serverMode = tcp
canal.serverMode = kafka


##################################################
#########                    MQ      #############
##################################################
# 修改为kafka地址
#canal.mq.servers = 127.0.0.1:6667
canal.mq.servers = 127.0.0.1:9092

修改:instance.properties 文件

[root@VM-0-3-centos conf]# vim conf/example/instance.properties 

# 修改 mq 的主题
# mq config
canal.mq.topic=canal_kafka
canal.mq.partition=0

修改后,就不能使用IDEA请求,因为连接不到,现在连接模式是Kafka

Kafka 启动消费者

## 进入容器
[root@VM-0-3-centos ~]# docker exec -it kafka bash

## 启动消费者
root@a9ea8b546c2d:/# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic example_canal_kafka

## 在数据库随意的增删改,查看能不能消费

{"data":[{"id":"1003","NAME":"lisi","sex":"male"}],"database":"test","es":1656645220000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(255)","NAME":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"NAME":12,"sex":12},"table":"user_info","ts":1656645220451,"type":"INSERT"}

{"data":[{"id":"1003","NAME":"lisi","sex":"female"}],"database":"test","es":1656645272000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(255)","NAME":"varchar(255)","sex":"varchar(255)"},"old":[{"sex":"male"}],"sql":"","sqlType":{"id":12,"NAME":12,"sex":12},"table":"user_info","ts":1656645272803,"type":"UPDATE"}

{"data":[{"id":"1003","NAME":"lisi","sex":"female"}],"database":"test","es":1656645388000,"id":8,"isDdl":false,"mysqlType":{"id":"varchar(255)","NAME":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"NAME":12,"sex":12},"table":"user_info","ts":1656645388216,"type":"DELETE"}

Q&A

1、提示 found canal.pid , Please run stop.sh first ,then startup.sh

该文件是记录了canal运行的pid号,用户stop的时候kill对应的pid。应该是没有执行stop.sh需要执行一下stop.sh如果执行了stop.sh,该文件还没删除,提示:存在canal.pid,但是进程又没有。可以直接删除canal.pid,在启动就可以了

参考网址:https://www.jianshu.com/p/83929b8e6712