debezium 的 postgresql 连接器在第一次启动时,默认会执行数据库的初始一致性快照,相当于导出全量数据再导入到kafka。
可以通过设置连接器配置属性 snapshot.mode (默认:initial) 的值来自定义连接器执行快照的方式。
当 snapshot.mode 设置为 (默认:initial) 时,连接器完成以下任务来创建快照:
- 使用 serializable、read only、deferrable 隔离级别启动事务,以确保此事务中的后续读取针对数据的单个一致版本。 由于其他客户端的后续 insert、update 和 delete 操作而对数据进行的任何更改对此事务不可见。
- 读取数据库事务日志中的当前位置(lsn)。
- 扫描数据库表和模式,为每一行生成一个 read 事件并将该事件写入适当的特定于表的 kafka 主题(topic)。
- 提交事务。
- 在连接器偏移(offsets)中记录快照的成功完成。
如果连接器出现故障、重新平衡或在 步骤1 开始之后但在 步骤5 完成之前停止,则在重新启动时连接器将开始一个新的快照。 连接器完成其初始快照后,postgresql 连接器会继续从其在 步骤2 中读取的位置进行流式传输。这可确保连接器不会错过任何更新。 如果连接器由于任何原因再次停止,则在重新启动时,连接器会继续从之前停止的位置流式传输更改。
snapshot.mode 支持的参数配置
参数值 | 描述 |
---|---|
initial(默认) | 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。 |
always | 连接器在启动时始终执行一致性快照。快照完成后,连接器继续执行流式传输更改。此模式用于以下情况: |
- | 已知一些 wal 段已被删除,不再可用。 |
- | 集群发生故障后,主备库发生切换。此快照模式确保连接器不会错过在新主节点提升之后但在新主节点上重新启动连接器之前所做的任何更改。 |
initial_only | 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。 |
never | 不执行初始一致性快照,但是会同步后续数据库的更改记录 |
- | 如果 kafka offsets topic 中存在先前存储的 lsn,则连接器会继续从该位置流式传输更改。 |
- | 如果没有存储 lsn,则连接器从在服务器上创建 postgresql 逻辑复制槽的时间点开始流式传输更改。 |
exported | 已弃用,所有模式都是无锁的。 |
custom | 自定义接口 io.debezium.connector.postgresql.spi.snapshotter |
图示
按自己的理解,总结了一张图,不一定准确
snapshot.mode = initial
- 向 kafka connect 注册并启动一个新的 debezium postgresql connector,添加选项 “snapshot.mode”: “initial”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
-
源端数据库里的数据情况,inventory schema下有6张表,其中一张表 spatial_ref_sys 不同步
-
debezium postgresql connector 第一次启动后执行初始一致性快照,将全量数据导出转换写入到kafka,可以看到 kafka 中为每张有数据的表建了一个 topics,topics 的名称格式是
,其中表 spatial_ref_sys 不同步,还不知道啥原因,表里的每条数据在 kafka 中存储为一条消息(messages),以下截图也可以看出每个 topics 的 messages 个数与 postgresql 中表的数据行数一致。
-
源端数据库执行 dml 操作,自动同步到 kafka
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
postgres=# update inventory.orders set quantity=2 where id=11001;
postgres=# delete from inventory.orders where id = 11001;
snapshot.mode = always
- 更新上面创建的连接器 snapshot-mode-initial,更改选项 “snapshot.mode”: “always”,每次启动都会执行初始化快照,快照完成后会继续捕获变更数据
[root@docker ~]# cat pgsql-snapshot-mode-update.json
{
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
curl -i -x put -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/snapshot-mode-initial/config -d @pgsql-snapshot-mode-update.json
# 需要重新启动连接器实例和任务实例,只重启连接器实例是不起作用的
curl -s localhost:8083/connectors/snapshot-mode-initial/status | jq
curl -s -x post localhost:8083/connectors/snapshot-mode-initial/restart?includetasks=true
- 更新前 kafka 里面的数据情况
- 更新后 kafka 里面的数据情况,很明显新增了初始化的数据
snapshot.mode = always 恢复被删除的 topics
- 删除现有的 topics
# 先把连接器停了,不然有时删了 topics 还会自动创建
curl -s -x delete localhost:8083/connectors/snapshot-mode-initial
# 删除 topics
# docker exec -it connect bash
bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.customers
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.geom
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.orders
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.products
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.products_on_hand
- 重洗启动连接器 snapshot-mode-initial,包含选项 “snapshot.mode”: “always”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
- 源端数据库执行 dml 操作,同步正常,忽略测试过程
- 需要注意,一切正常了以后,要考虑是否将 snapshot.mode 改成默认的 initial ,否则每次启动连接器都会执行初始化任务。
[root@docker ~]# cat pgsql-snapshot-mode-update.json
{
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
curl -i -x put -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/snapshot-mode-initial/config -d @pgsql-snapshot-mode-update.json
curl -s -x post localhost:8083/connectors/snapshot-mode-initial/restart?includetasks=true
# 已测试,更改 "snapshot.mode": "initial",重启连接器和任务后数据不会重新初始化,dml 捕获正常。
snapshot.mode = initial_only
- 向 kafka connect 注册并启动一个新的 debezium postgresql connector,添加选项 “snapshot.mode”: “initial_only”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-initial_only",
"config": {
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial_only",
"slot.name": "initial_only_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial_only",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
- 源端数据库里的数据情况,inventory schema下有6张表,其中一张表 spatial_ref_sys 不同步
- debezium oracle connector 第一次启动后执行初始一致性快照
- 源端数据库执行 dml 操作,此时不会同步一致性快照以后的变更数据
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
postgres=# update inventory.orders set quantity=2 where id=11001;
postgres=# delete from inventory.orders where id = 11001;
- 可以看到 snapshot.mode = initial 的表已经同步数据了,但是 snapshot.mode = initial_only 的表并没有同步数据
snapshot.mode = never
- 向 kafka connect 注册并启动一个新的 debezium postgresql connector,添加选项 “snapshot.mode”: “never”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-never",
"config": {
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "never",
"slot.name": "never_slot",
"schema.include.list": "inventory",
"snapshot.mode": "never",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
- debezium postgresql connector 第一次启动后不会执行初始一致性快照,可以看到 kafka 的没有存放数据的 topics
- 但是源端数据库执行 dml 操作,debezium postgresql connector 会自动捕获变更数据在 kafka 上创建 topics 并写入消息
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
postgres=# update inventory.orders set quantity=2 where id=11001;
postgres=# delete from inventory.orders where id = 11001;
总结一点
debezium postgresql connector 比 debezium oracle connector 要好用多了,同步延迟很小,测试还算比较顺利,可以看出 debezium 对 postgresql 兼容要比 oracle 要好很多。