debezium 的 oracle 连接器在第一次启动时,默认会执行数据库的初始一致性快照,相当于导出全量数据再导入到kafka。
可以通过设置连接器配置属性 snapshot.mode (默认:initial) 的值来自定义连接器创建快照的方式。
当 snapshot.mode 设置为 (默认:initial) 时,连接器完成以下任务来创建快照:
- 确定要捕获的表
- 获取每个要捕获表的 row share mode 锁,以防止在创建快照期间更改表结构,debezium 持有锁的时间很短。
- 从数据库的 redo 日志中读取当前系统更改号(scn)位置。
- 捕获所有相关表的表结构。
- 释放 步骤2 中获得的锁。
- 在 步骤3 中读取的 scn 位置扫描所有相关的数据库表(select * from … as of scn 123),为每一行生成一个 read 事件,然后将事件记录写入到 kafka 主题(topic)。
- 在连接器偏移(offsets)中记录快照的成功完成。
执行创建快照的进程开始后,如果进程因连接器故障、重新平衡或其他原因而中断,连接器重启后快照进程也会重新启动。连接器完成初始一致性快照后,它会继续从 步骤3 中读取的 scn 位置进行流式传输,以免丢失任何数据。如果连接器由于某种原因再次停止,则在连接器重新启动后,它将从之前停止的位置继续恢复数据的流式传输。
snapshot.mode 支持的参数配置,这个参数只在连接器在第一次启动时起作用
参数值 | 描述 |
---|---|
initial(默认) | 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。 |
initial_only | 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。 |
schema_only | 连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录 |
schema_only_recovery | 设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。 |
snapshot.mode = initial
- 向 kafka connect 注册并启动一个新的 debezium oracle connector,添加选项 “snapshot.mode”: “initial”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "scott",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
-
源端数据库里的数据情况,scott 用户下有5张表,其中一张表bonus里面没有数据
-
debezium oracle connector 第一次启动后执行初始一致性快照,将全量数据导出转换写入到kafka,可以看到 kafka 中为每张有数据的表建了一个 topics,topics 的名称格式是
,其中表 bonus 没有数据,所以没有建立 topics,但是在 topics initial 中记录的表的ddl语句,同时还建立了一个历史topics schema-changes.initial,表里的每条数据在 kafka 中存储为一条消息(messages),以下截图也可以看出每个 topics 的 messages 个数与 oracle 中表的数据行数一致。
-
源端数据库执行 dml 操作,自动同步到 kafka
sql> insert into dept values (50,'aaaa','a');
sql> commit;
sql> update dept set dname='bbbb' where deptno=50;
sql> commit;
sql> delete from dept where deptno=50;
sql> commit;
snapshot.mode = initial_only
- 向 kafka connect 注册并启动一个新的 debezium oracle connector,添加选项 “snapshot.mode”: “initial_only”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial-only",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "initial-only",
"tasks.max" : "1",
"schema.include.list": "scott",
"snapshot.mode": "initial_only",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial_only"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
-
源端数据库里的数据情况
-
debezium oracle connector 第一次启动后执行初始一致性快照
-
源端数据库执行 dml 操作,此时不会同步一致性快照以后的变更数据
sql> insert into dept values (60,'bbbb','b');
sql> commit;
- 可以看到 snapshot.mode = initial 的表已经同步数据了,但是 snapshot.mode = initial_only 的表并没有同步数据
- 连接器的状态还是running
curl -s -x get localhost:8083/connectors/snapshot-mode-initial-only/status | jq
- topics my_connect_offsets 中也记录了快照信息
snapshot.mode = schema_only
- 向 kafka connect 注册并启动一个新的 debezium oracle connector,添加选项 “snapshot.mode”: “schema_only”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-schema-only",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "schema-only",
"tasks.max" : "1",
"schema.include.list": "scott",
"snapshot.mode": "schema_only",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.schema_only"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 源端数据库里的数据情况
- debezium oracle connector 第一次启动后不会执行初始一致性快照,只将表的ddl表结构写入到kafka,可以看到 kafka 的没有存放数据的 topics,只有一个存放表的ddl的 topics
[kafka@4c24d79ab670 ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --list |grep schema-only schema-only
- 源端数据库执行 dml 操作,自动同步到 kafka
sql> insert into dept values (70,'cccc','c');
sql> commit;
snapshot.mode = schema_only_recovery
当 database.history.kafka.topic 被删除了,可以使用 snapshot.mode = schema_only_recovery 来恢复
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic schema-changes.initial
- 向 kafka connect 注册并启动一个新的 debezium oracle connector,添加选项 “snapshot.mode”: “schema_only_recovery”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "scott",
"snapshot.mode": "schema_only_recovery",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
# 删除连接器
curl -s -x delete localhost:8083/connectors/snapshot-mode-initial
# 重新注册连接器
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
总结一点
功能可以实现,但是捕获延迟很严重,变更一条记录要好长时间才能捕获到,不知道是配置的问题还是连接器本身的问题,就是感觉对oracle的兼容不是太友好。