一个问题困扰我好长时间,重建同名连接器不能重新执行一致性快照
我在测试 debezium connector for oracle ,将 oracle 中的数据同步到 kafka 中,创建了一个源端连接器,用于捕获 oracle 中的数据,连接器的名称为 snapshot-mode-initial,出于一些测试的原因,我删除了这个连接器,并且删除了这个连接器在 kafka 创建的所有 topic,再次使用相同的连接器名称(snapshot-mode-initial)创建连接器时,数据不能重新初始化,以下是测试示例:
- 创建源端连接器,连接器命名 snapshot-mode-initial
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "scott",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 连接器创建并自动启动后执行一致性快照,进行数据初始化,自动在kafka中创建了以下几个 topic
[kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --list __consumer_offsets my_connect_configs my_connect_offsets my_connect_statuses initial initial.scott.dept initial.scott.emp initial.scott.salgrade schema-changes.initial
- 出于测试原因,删除了连接器和topic
# 删除连接器
curl -s -x delete localhost:8083/connectors/snapshot-mode-initial
# 删除topic
docker exec -it connect bash
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.scott.dept
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.scott.emp
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.scott.salgrade
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic schema-changes.initial
- 重建连接器,还以为可以重新创建 topic 并初始化数据呢,结果报出以下错误
# 重新注册连接器
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
# kafka connect 日志报以下错误
2022-04-20t12:13:45.826691242z 2022-04-20 12:13:45,826 error || workersourcetask{id=snapshot-mode-initial-0} task threw an uncaught and unrecoverable exception. task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.workertask]
2022-04-20t12:13:45.826696807z io.debezium.debeziumexception: the db history topic is missing. you may attempt to recover it by reconfiguring the connector to schema_only_recovery
- 没想象中的重新初始化数据,确报出找不到 history topic,这里的 history topic 就是注册连接器时配置的 “database.history.kafka.topic”: “schema-changes.initial”,至于是干什么用的,这里先不提了,然而 schema-changes.initial topic 已经被我 delete 了,这可以证明一点,新注册的连接器还在用之前的信息。
那么如果重新执行一致性快照呢?–两种方法
- 第一种方法:重新注册一个其他名称的连接器,例如:snapshot-mode-initial2,但是这个方法总是有一种不舒适的感觉。
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial2",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "scott",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 第二种方法:如标题所说,删除连接器上的 committed offsets,这种方法需要使用 kafkacat
如何删除连接器上的 committed offsets,没有测试成功,不知道啥情况
- 有时在进行实验时(或者当连接器在开始时配置错误时),有必要删除连接器偏移(offsets)以从干净状态开始。
- 第一步是找出包含连接器偏移量的主题(topic)的名称,这是在 offset.storage.topic 选项中配置的,一般默认是 my_connect_offsets,也可以在 kafka connect 日志找到。
- 下一步是找出给定连接器的最后一个偏移量,存储它的键,并确定用于存储偏移量的分区,需要使用 kafkacat 工具。
# docker 运行 kafkacat
alias kafkacat='docker run --rm edenhill/kcat:1.7.1 kcat'
kafkacat -b 172.17.0.5:9092 -c -t my_connect_offsets -f 'partition(%p) %k %s\n'
连接器 snapshot-mode-initial,分区号是13,但是 {“snapshot_scn”:“14208424”,“snapshot”:true,“scn”:“14208424”,“snapshot_completed”:true} 不像是最后的偏移量,这像是记录第一次启动源端连接器执行的一致性快照的信息,执行快照的scn已经快照是否执行完成等。
{“commit_scn”:“14854345”,“transaction_id”:null,“snapshot_scn”:“14676262”,“scn”:“14844029”}像是最后的偏移量。
- 修改连接器的信息,应停止连接器并且发出以下命令:
# 停止连接器,不知道咋停止单个连接器,还是先删了吧
curl -s -x delete localhost:8083/connectors/snapshot-mode-initial
# 修改连接器的信息,不起作用
echo '["snapshot-mode-initial",{"server":"initial"}]#' | kafkacat -p -z -b 172.17.0.5:9092 -t my_connect_offsets -k# -p 13
echo '["snapshot-mode-initial",{"server":"initial"}]#{"snapshot_scn":"14208424","snapshot":false,"scn":"14208424","snapshot_completed":false}' |kafkacat -p -b 172.17.0.5:9092 -t my_connect_offsets -k# -p 13
- 查找了好多资料,还是没有搞明白,太难了,这个问题先放着吧,但是我怀疑可能跟 debezium connector for oracle 连接器有关系,因为记录偏移量的 topic my_connect_offsets 是由连接器管理,而 debezium connector for oracle 在第一次启动执行完成一致性快照后只支持增量快照,如果使用 debezium connector for mysql 可能会成功。
- 参考文章:
最后修改时间:2022-04-21 22:11:43
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。