| tags: [ MySQL Kafka Debezium ] categories: [ Development ]
使用 Debezium 复制 MySQL binlog 到 Kafka
Debezium 是一个支持 Kafka Connect 框架的数据库变更捕获(Change Data Capture)服务,它可以消费 MySQL、PostgreSQL、SQL Server、MongoDB 的 binlog 写入到 Apache Kafka 里,下文是针对 MySQL 的简单配置步骤。
-
使用 docker 创建数据库:
docker network create mysql docker run -dt -p 13306:3306 \ -e MYSQL_ALLOW_EMPTY_PASSWORD=yes \ --net mysql --name mysql_1 mysql:5.7 \ --server_id=1 --log_bin=mysql-bin \ --gtid_mode=ON --enforce_gtid_consistency=ON # 这个例子里 Debezium 用不到 mysql_2 这个从库 docker run -dt -p 23306:3306 \ -e MYSQL_ALLOW_EMPTY_PASSWORD=yes \ --net mysql --name mysql_2 mysql:5.7 \ --server_id=2 --log_bin=mysql-bin \ --gtid_mode=ON --enforce_gtid_consistency=ON docker exec mysql-2 mysql -Be "STOP SLAVE; CHANGE MASTER TO MASTER_HOST='mysql_1', MASTER_USER='root', MASTER_AUTO_POSITION=1; START SLAVE" docker exec mysql-2 mysql -Be "SHOW SLAVE STATUS\G"
-
在 mysql_1 里新建帐号
CREATE USER 'debezium-user' IDENTIFIED BY '123456'; GRANT SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium-user'; GRANT SELECT ON test.* TO 'debezium-user'; FLUSH PRIVILEGES;
这里没有授予
RELOAD ON *.*
以及LOCK TABLES ON test.*
是为了避免 Debezium 在获取 table schema 时锁表。特别注意的是一定需要授予SELECT
权限,Debezium 依赖这个权限执行SHOW FULL TABLES IN some_db WHERE Table_Type = 'BASE TABLE'
以及SHOW CREATE TABLE some_db.some_table
获取每个表的列名,没有这个权限,查询information_schema.tables
和information_schema.columns
也查不到对应表的信息,这是 MySQL 的奇葩权限机制导致。 -
下载 Confluent Platform,解开到某个目录下,记作
$CONFLUENT_ROOT
; -
从 https://www.confluent.io/hub/debezium/debezium-connector-mysql/ 手动下载 debezium 插件的 zip 压缩包,解压缩到
$CONFLUENT_ROOT/share/java
; -
执行
export CONFLUENT_CURRENT=$CONFLUENT/data; $CONFLUENT_ROOT/bin/confluent local start
启动 Confluent Platform,启动完后grep plugin.path data/confluent.*/connect/connect.properties
可以确认 Kafka Connect 插件应该放到什么位置。 -
编辑
test-connector.json
文件如下:{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "13306", "database.user": "debezium-user", "database.password": "123456", "database.server.id": "1", "database.server.name": "mysql_1", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "debezium-mysql_1", "include.schema.changes": "true", "snapshot.mode": "schema_only", "snapshot.locking.mode": "none", "tombstones.on.delete": "false" }
-
database.history.kafka.topic 和 include.schema.changes
Debezium 会将 DML binlog 以 AVRO 格式(也支持 JSON 格式)写入 Kafka topic
SERVER.DB.TABLE
,一份 JSON 格式的 DDL 写入debezium-mysql_1
,一份 AVRO 格式的 DDL 写入SERVER
(include.schema.changes=true)。 -
snapshot.mode
Debezium 支持五种模式:
initial
:默认模式,在没有找到 offset 时(记录在 Kafka topic 的connect-offsets
中,Kafka connect 框架维护),做一次 snapshot——遍历有 SELECT 权限的表,收集列名,并且将每个表的所有行 select 出来写入 Kafka;when_needed
: 跟initial
类似,只是增加了一种情况,当记录的 offset 对应的 binlog 位置已经在 MySQL 服务端被 purge 了时,就重新做一个 snapshot。never
: 不做 snapshot,也就是不拿所有表的列名,也不导出表数据到 Kafka,这个模式下,要求从最开头消费 binlog,以获取完整的 DDL 信息,MySQL 服务端的 binlog 不能被 purge 过,否则由于 DML binlog 里只有 database name、table name、column type 却没有 column name,Debezium 会报错Encountered change event for table some_db.some_table whose schema isn't known to this connector
;schema_only
: 这种情况下会拿所有表的列名信息,但不会导出表数据到 Kafka,而且只从 Debezium 启动那刻起的 binlog 末尾开始消费,所以很适合不关心历史数据,只关心最近变更的场合。schema_only_recovery
: 在 Debezium 的 schema_only 模式出错时,用这个模式恢复,一般不会用到。
-
snapshot.locking.mode
设置为 “none” 是为了避免获取表的元信息时锁表(要么是 RELOAD 权限用 flush tables with read lock,要么是 LOCK TABLES 权限锁单个表),此时要求 Debezium 启动或者重启时没有 DDL 语句执行,否则 Debezium 抓取到的元信息跟并发执行的 DML 之间不一致。
-
tombstones.on.delete
设置成 “false” 是为了避免因 delete 消息额外写入一条不符合 AVRO schema 的 tombstone 消息。
-
-
执行如下命令创建 Kafka connector 并启动:
curl -s http://localhost:8083/connectors/test-connector/config -XPUT --data-binary @test-connector.json -H 'Content-Type: application/json' curl -s http://localhost:8083/connectors/test-connector/status | json_pp
之后就可以在 Kafka 的
SERVER.DB.TABLE
,SERVER
,debezium-SERVER
三类 topic 里看到 binlog 消息了。
在 Debezium 文档里还有更多选项,可以做到 database、table、column 级别的黑白名单,可以对 column 脱敏等等。