3

kafka 维护笔记-m6米乐安卓版下载

原创 张玉龙 2022-04-18
3239

一个问题引出本篇文章,作为工具类,方便维护 kafka

基于这篇文章:在docker环境上使用kafka connect jdbc将变更数据从kafka应用到postgresql

  1. 我把目标端 postgresql 里的表给删除了,怎么让 kafka 重新同步这个表呢?
    需要修改或删除消费者组的偏移量offset
    image.png
  2. 怎么修改或删除消费者组的偏移量offset? --见文章末尾
  3. 这一个问题研究了一天,太难了

kafka connect rest api

参考文档:

获取 connect 集群的基本信息

# curl -s -x get localhost:8083/ | jq

列出 kafka connect worker 上安装的插件

# curl -s -x get localhost:8083/connector-plugins | jq

image.png

创建一个连接器

# vi pgsql-scott-jdbc-sink.json { "name": "pgsql-scott-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslmode=require", "tasks.max": "1", "topics": "oracle19c.scott.dept", "table.name.format": "dept", "dialect.name": "postgresqldatabasedialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } } # curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

获取所有现有的连接器名称

# curl -s -x get localhost:8083/connectors/ | jq

获取连接器的配置信息

# curl -s -x get localhost:8083/connectors/pgsql-scott-jdbc-sink | jq

image.png

获取连接器的状态信息

# curl -s -x get localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq

image.png

获取当前为连接器运行的任务列表

# curl -s -x get localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks | jq

image.png

获取任务的当前状态

# curl -s -x get localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/status | jq

image.png

获取连接器使用的主题(topics)列表

# curl -s -x get localhost:8083/connectors/oracle-scott-connector/topics | jq

清空连接器的活动主题(topics)列表

# curl -s -x put localhost:8083/connectors/oracle-scott-connector/topics/reset

暂停连接器任务

# curl -s -x put localhost:8083/connectors/pgsql-scott-jdbc-sink/pause

恢复连接器任务

# curl -s -x put localhost:8083/connectors/pgsql-scott-jdbc-sink/resume

删除连接器

# curl -s -x delete localhost:8083/connectors/pgsql-scott-jdbc-sink

更新连接器

# cat pgsql-scott-jdbc-sink.json { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:postgresql://172.17.0.6:5432/scott?user=postgres&password=postgres&sslmode=require", "tasks.max": "1", "topics": "oracle19c.scott.dept", "table.name.format": "dept", "dialect.name": "postgresqldatabasedialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } # curl -i -x put -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/pgsql-scott-jdbc-sink/config -d @pgsql-scott-jdbc-sink.json

重启连接器和任务(tasks)

  • 语法
post /connectors/{name}/restart?includetasks=<true|false>&onlyfailed=<true|false> # "includetasks=true": 重新启动连接器实例和任务实例 # "includetasks=false"(默认): 仅重新启动连接器实例 # "onlyfailed=true": 仅重新启动具有 failed 状态的实例 # "onlyfailed=false"(默认): 重新所有实例
  • 示例
# curl -s -x post localhost:8083/connectors/pgsql-scott-jdbc-sink/restart
  • 默认只重新启动连接器并不会重新启动其所有任务。因此,您也可以重新启动失败的单个任务,然后再次获取其状态:
# curl -s -x post localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/restart # curl -s -x get localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/status | jq

kafka-consumer-groups.sh 消费者组管理

参考文章:

相关可选参数

参数 描述
–bootstrap-server 连接到指定的kafka服务
–list 列出所有消费者组名称
–describe 查询消费者描述信息
–group 指定消费者组
–all-groups 指定所有消费者组
–members 查询消费者组的成员信息
–state 查询消费者的状态信息
–offsets 列出消息的偏移量信息
–delete 删除消费者组
–reset-offsets 重置消费组的偏移量
–dry-run 重置偏移量的命令预执行
–excute 真正的执行重置偏移量的操作
–delete-offsets 删除偏移量

查看消费者列表 --list

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list

查看消费者组详情 --describe --group/all-groups

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink group topic partition current-offset log-end-offset lag consumer-id host client-id connect-pgsql-scott-jdbc-sink oracle19c.scott.dept 0 7 7 0 connector-consumer-pgsql-scott-jdbc-sink-0-17144e72-7b1b-49ef-a3a6-bb4677a43ece /172.17.0.5 connector-consumer-pgsql-scott-jdbc-sink-0 # 查看所有消费者组信息 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --all-groups

查看消费者成员信息 --members

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --members --group connect-pgsql-scott-jdbc-sink group consumer-id host client-id #partitions connect-pgsql-scott-jdbc-sink connector-consumer-pgsql-scott-jdbc-sink-0-17144e72-7b1b-49ef-a3a6-bb4677a43ece /172.17.0.5 connector-consumer-pgsql-scott-jdbc-sink-0 1 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --members --all-groups

消费者状态信息 --state

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --state --group connect-pgsql-scott-jdbc-sink group coordinator (id) assignment-strategy state #members connect-pgsql-scott-jdbc-sink 172.17.0.4:9092 (1) range stable 1 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --state --all-groups

删除消费者组 --delete

  • 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete --group pgsql-scott-jdbc-sink deletion of requested consumer groups ('pgsql-scott-jdbc-sink') was successful. # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete --all-groups

重置消费组的偏移量 --reset-offsets

  • 能够执行成功的一个前提是 消费组是不可用状态
  • 相关重置 offset 的模式
参数 描述
–to-earliest 重置 offset 到最开始的那条 offset (找到还未被删除最早的那个 offset )
–to-current 直接重置 offset 到当前的 offset,也就是 loe
–to-latest 重置到最后一个 offset
–to-datetime 重置到指定时间的 offset,格式为:yyyy-mm-ddthh:mm:ss.sss;
- 例如: --to-datetime “2021-6-26t00:00:00.000”
–to-offset 重置到指定的 offset,一般不用这个,例如:–to-offset 3465
–shift-by 按照偏移量增加或者减少多少个 offset,例如:–shift-by 100 、–shift-by -100
–from-file 根据cvs文档来重置
  • 上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的 offset;不过 --from-file 可以让我们更灵活一点
# 先配置cvs文档,格式为: topic:分区号:重置目标偏移量 oracle19c.scott.dept:0:100 oracle19c.scott.dept:1:200 oracle19c.scott.dept:2:300 # 执行命令 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --from-file config/reset-offset.csv --group test2_consumer_group --dry-run
  • 示例
# --dry-run 预执行,不会真正执行命令,为了看看效果 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --all-topics --to-earliest --dry-run --group connect-pgsql-scott-jdbc-sink # --execute 真正执行命令 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --all-topics --to-earliest --execute --group connect-pgsql-scott-jdbc-sink # --topic 指定主题 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.scott.dept --to-earliest --execute --group connect-pgsql-scott-jdbc-sink # --to-offset 指定偏移量 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.scott.dept --to-offset 2 --execute --group connect-pgsql-scott-jdbc-sink # --topic :0 指定分区 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.scott.dept:0 --to-offset 2 --execute --group connect-pgsql-scott-jdbc-sink

删除偏移量 --delete-offsets

  • 能够执行成功的一个前提是 消费组是不可用状态
  • 偏移量被删除了之后,consumer group下次启动的时候,会从头消费
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete-offsets --topic oracle19c.scott.dept --group connect-pgsql-scott-jdbc-sink

解决问题

  1. 怎么修改或删除消费者组的偏移量offset?
# 查询需要修改的主题名 bin/kafka-topics.sh --list --bootstrap-server kafka:9092 # 查询需要修改的消费者组名 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list # 确认修改的主题名和消费者组名是否正确 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink
  1. 修改或删除消费组的偏移量的前提是:消费组是不可用状态
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink

如果 consumer-id、host、client-id 存在信息,则消费组是处于活动状态,修改或删除消费组的偏移量将会失败,报出以下错误信息:

error: assignments can only be reset if the group 'connect-pgsql-scott-jdbc-sink' is inactive, but the current state is stable.
  1. 尝试暂停连接器任务,消费者组还是处于活动状态
curl -s -x put localhost:8083/connectors/pgsql-scott-jdbc-sink/pause
  1. 先删除连接器,消费者组处于非活动状态,不知道这个地方有什么好的方法能让消费者组处于非活动状态
curl -s -x delete localhost:8083/connectors/pgsql-scott-jdbc-sink

image.png
5. 修改或删除消费组的偏移量

# 修改消费组的偏移量 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.scott.dept --to-earliest --execute --group connect-pgsql-scott-jdbc-sink group topic partition new-offset connect-pgsql-scott-jdbc-sink oracle19c.scott.dept 0 0 # 删除消费者组的偏移量 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete-offsets --topic oracle19c.scott.dept --group connect-pgsql-scott-jdbc-sink request succeed for deleting offsets with topic oracle19c.scott.dept group connect-pgsql-scott-jdbc-sink topic partition status oracle19c.scott.dept 0 successful
  1. 添加连接器,此时 postgresql 上的目标表已重新同步
vi pgsql-scott-jdbc-sink.json { "name": "pgsql-scott-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:postgresql://172.17.0.6:5432/scott?user=postgres&password=postgres&sslmode=require", "tasks.max": "1", "topics": "oracle19c.scott.dept", "table.name.format": "scott.dept", "dialect.name": "postgresqldatabasedialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } } curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
最后修改时间:2022-04-19 09:04:16
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图