Debezium 是一个支持 Kafka Connect 框架的数据库变更捕获(Change Data Capture)服务,它可以消费 MySQL、PostgreSQL、SQL Server、MongoDB 的 binlog 写入到 Apache Kafka 里,下文是针对 MySQL 的简单配置步骤。

  1. 使用 docker 创建数据库:

    docker network create mysql
    
    docker run -dt -p 13306:3306 \
      -e MYSQL_ALLOW_EMPTY_PASSWORD=yes \
      --net mysql --name mysql_1 mysql:5.7 \
      --server_id=1 --log_bin=mysql-bin \
      --gtid_mode=ON --enforce_gtid_consistency=ON
    
    # 这个例子里 Debezium 用不到 mysql_2 这个从库
    docker run -dt -p 23306:3306 \
      -e MYSQL_ALLOW_EMPTY_PASSWORD=yes \
      --net mysql --name mysql_2 mysql:5.7 \
      --server_id=2 --log_bin=mysql-bin \
      --gtid_mode=ON --enforce_gtid_consistency=ON
    
    docker exec mysql-2 mysql -Be "STOP SLAVE; CHANGE MASTER TO MASTER_HOST='mysql_1', MASTER_USER='root', MASTER_AUTO_POSITION=1; START SLAVE"
    
    docker exec mysql-2 mysql -Be "SHOW SLAVE STATUS\G"
    
  2. 在 mysql_1 里新建帐号

    CREATE USER 'debezium-user' IDENTIFIED BY '123456';
    GRANT SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium-user';
    GRANT SELECT ON test.* TO 'debezium-user';
    FLUSH PRIVILEGES;
    

    这里没有授予 RELOAD ON *.* 以及 LOCK TABLES ON test.* 是为了避免 Debezium 在获取 table schema 时锁表。特别注意的是一定需要授予 SELECT 权限,Debezium 依赖这个权限执行 SHOW FULL TABLES IN some_db WHERE Table_Type = 'BASE TABLE' 以及 SHOW CREATE TABLE some_db.some_table获取每个表的列名,没有这个权限,查询 information_schema.tablesinformation_schema.columns 也查不到对应表的信息,这是 MySQL 的奇葩权限机制导致。

  3. 下载 Confluent Platform,解开到某个目录下,记作 $CONFLUENT_ROOT

  4. https://www.confluent.io/hub/debezium/debezium-connector-mysql/ 手动下载 debezium 插件的 zip 压缩包,解压缩到 $CONFLUENT_ROOT/share/java

  5. 执行 export CONFLUENT_CURRENT=$CONFLUENT/data; $CONFLUENT_ROOT/bin/confluent local start 启动 Confluent Platform,启动完后 grep plugin.path data/confluent.*/connect/connect.properties 可以确认 Kafka Connect 插件应该放到什么位置。

  6. 编辑 test-connector.json 文件如下:

    {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "127.0.0.1",
        "database.port": "13306",
        "database.user": "debezium-user",
        "database.password": "123456",
        "database.server.id": "1",
        "database.server.name": "mysql_1",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "debezium-mysql_1",
        "include.schema.changes": "true",
        "snapshot.mode": "schema_only",
        "snapshot.locking.mode": "none",
        "tombstones.on.delete": "false"
    }
    
    • database.history.kafka.topic 和 include.schema.changes

      Debezium 会将 DML binlog 以 AVRO 格式(也支持 JSON 格式)写入 Kafka topic SERVER.DB.TABLE,一份 JSON 格式的 DDL 写入 debezium-mysql_1,一份 AVRO 格式的 DDL 写入 SERVER(include.schema.changes=true)。

    • snapshot.mode

      Debezium 支持五种模式:

      1. initial :默认模式,在没有找到 offset 时(记录在 Kafka topic 的 connect-offsets 中,Kafka connect 框架维护),做一次 snapshot——遍历有 SELECT 权限的表,收集列名,并且将每个表的所有行 select 出来写入 Kafka;
      2. when_needed: 跟 initial 类似,只是增加了一种情况,当记录的 offset 对应的 binlog 位置已经在 MySQL 服务端被 purge 了时,就重新做一个 snapshot。
      3. never: 不做 snapshot,也就是不拿所有表的列名,也不导出表数据到 Kafka,这个模式下,要求从最开头消费 binlog,以获取完整的 DDL 信息,MySQL 服务端的 binlog 不能被 purge 过,否则由于 DML binlog 里只有 database name、table name、column type 却没有 column name,Debezium 会报错 Encountered change event for table some_db.some_table whose schema isn't known to this connector
      4. schema_only: 这种情况下会拿所有表的列名信息,但不会导出表数据到 Kafka,而且只从 Debezium 启动那刻起的 binlog 末尾开始消费,所以很适合不关心历史数据,只关心最近变更的场合。
      5. schema_only_recovery: 在 Debezium 的 schema_only 模式出错时,用这个模式恢复,一般不会用到。
    • snapshot.locking.mode

      设置为 “none” 是为了避免获取表的元信息时锁表(要么是 RELOAD 权限用 flush tables with read lock,要么是 LOCK TABLES 权限锁单个表),此时要求 Debezium 启动或者重启时没有 DDL 语句执行,否则 Debezium 抓取到的元信息跟并发执行的 DML 之间不一致。

    • tombstones.on.delete

      设置成 “false” 是为了避免因 delete 消息额外写入一条不符合 AVRO schema 的 tombstone 消息。

  7. 执行如下命令创建 Kafka connector 并启动:

    curl -s http://localhost:8083/connectors/test-connector/config -XPUT --data-binary @test-connector.json -H 'Content-Type: application/json'
    
    curl -s http://localhost:8083/connectors/test-connector/status | json_pp
    

    之后就可以在 Kafka 的 SERVER.DB.TABLE, SERVER, debezium-SERVER 三类 topic 里看到 binlog 消息了。

Debezium 文档里还有更多选项,可以做到 database、table、column 级别的黑白名单,可以对 column 脱敏等等。