1

kafka connect json converter includes the record’s message schema -m6米乐安卓版下载

原创 张玉龙 2022-04-20
961

参考文档

背景

debezium 连接器运行在 kafka connect 框架中,通过生成变更事件记录写入到kafka主题(topic)来捕获和记录数据库中的每个行级更改,在 debezium 连接器将变更事件记录写入到kafka主题(topic)之前,需要将捕获的记录进行转换,转换成 kafka 可以存储的形式,kafka connect 提供了一个 json 转换器,可将记录的 keys 和 values 序列化为 json 格式,但是 json 转换器默认是包含记录的消息 schema,这使得每条记录都非常冗长。
image.png

  • key
{ "schema": { "type": "struct", "fields": [ { "type": "int8", "optional": false, "field": "deptno" } ], "optional": false, "name": "ora19c.scott.dept.key" }, "payload": { "deptno": 10 } }
  • values
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int8", "optional": false, "field": "deptno" }, { "type": "string", "optional": true, "field": "dname" }, { "type": "string", "optional": true, "field": "loc" } ], "optional": true, "name": "ora19c.scott.dept.value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int8", "optional": false, "field": "deptno" }, { "type": "string", "optional": true, "field": "dname" }, { "type": "string", "optional": true, "field": "loc" } ], "optional": true, "name": "ora19c.scott.dept.value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "txid" }, { "type": "string", "optional": true, "field": "scn" }, { "type": "string", "optional": true, "field": "commit_scn" }, { "type": "string", "optional": true, "field": "lcr_position" } ], "optional": false, "name": "io.debezium.connector.oracle.source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "ora19c.scott.dept.envelope" }, "payload": { "before": null, "after": { "deptno": 10, "dname": "accounting", "loc": "new york" }, "source": { "version": "1.9.0.final", "connector": "oracle", "name": "ora19c", "ts_ms": 1650415436277, "snapshot": "true", "db": "pdbtt", "sequence": null, "schema": "scott", "table": "dept", "txid": null, "scn": "6853525", "commit_scn": null, "lcr_position": null }, "op": "r", "ts_ms": 1650415436280, "transaction": null } }

以上是一条记录在kafka中的存储形式,keys 和 values 包含了消息 schema,使得每条消息特别冗长,这样会导致存储压力和网络传输压力,关于消息 schema 有什么作用暂时还不清楚,但是 debezium 还是让考虑关闭记录 schema 的属性,涉及到以下两个参数:

key.converter.schemas.enable value.converter.schemas.enable

配置参数关闭 message schema 的属性

  • 在 kafka connect 连接器的配置文件(config/connect-distributed.properties)中配置参数值:
# 因 kafka connect 是一个 docker 容器,容器内也没 vi 编辑器,所有可以考虑在 docker 宿主机上对容器内的文件进行编辑 # 查看容器内的目录在 docker 宿主机上的挂载位置 [root@docker ~]# docker inspect connect |grep "/kafka/config" -b1 "source": "/var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data", "destination": "/kafka/config", # 编辑 connect-distributed.properties 文件,修改参数 [root@docker ~]# cd /var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data [root@docker _data]# ll total 68 -rw-r--r--. 1 1001 1001 906 apr 19 14:51 connect-console-sink.properties -rw-r--r--. 1 1001 1001 909 apr 19 14:51 connect-console-source.properties -rw-r--r--. 1 1001 1001 5608 apr 19 14:52 connect-distributed.properties -rw-r--r--. 1 1001 1001 883 apr 19 14:51 connect-file-sink.properties -rw-r--r--. 1 1001 1001 881 apr 19 14:51 connect-file-source.properties -rw-r--r--. 1 1001 1001 2103 apr 19 14:51 connect-log4j.properties -rw-r--r--. 1 1001 1001 2540 apr 19 14:51 connect-mirror-maker.properties -rw-r--r--. 1 1001 1001 2262 apr 19 14:51 connect-standalone.properties -rw-r--r--. 1 1001 1001 1221 apr 19 14:51 consumer.properties drwxr-xr-x. 2 1001 1001 102 apr 19 14:51 kraft -rw-rw-r--. 1 1001 1001 850 apr 19 10:03 log4j.properties -rw-r--r--. 1 1001 1001 1925 apr 19 14:51 producer.properties -rw-r--r--. 1 1001 1001 6849 apr 19 14:51 server.properties -rw-r--r--. 1 1001 1001 1032 apr 19 14:51 tools-log4j.properties -rw-r--r--. 1 1001 1001 1169 apr 19 14:51 trogdor.conf -rw-r--r--. 1 1001 1001 1205 apr 19 14:51 zookeeper.properties [root@docker _data]# vi connect-distributed.properties key.converter.schemas.enable=false value.converter.schemas.enable=false
  • 重启连接器使参数生效
# docker restart connect

新建一个连接器,观察效果

新建连接器时需要注意:参数 name、database.server.name、database.history.kafka.topic 建议保持唯一。

[root@docker ~]# cat oracle-scott-connector.json { "name": "oracle-scott-connector2", "config": { "connector.class" : "io.debezium.connector.oracle.oracleconnector", "database.hostname" : "172.17.0.2", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "dbz", "database.dbname" : "orcl", "database.pdb.name" : "pdbtt", "database.server.name" : "ora1", "tasks.max" : "1", "schema.include.list": "scott", "database.history.kafka.bootstrap.servers" : "192.168.0.40:9092", "database.history.kafka.topic": "schema-changes.inventory1" } } [root@docker ~]# curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json

image.png

  • key
{ "deptno": 10 }
  • values
{ "before": null, "after": { "deptno": 10, "dname": "accounting", "loc": "new york" }, "source": { "version": "1.9.0.final", "connector": "oracle", "name": "ora1", "ts_ms": 1650414625783, "snapshot": "true", "db": "pdbtt", "sequence": null, "schema": "scott", "table": "dept", "txid": null, "scn": "6732490", "commit_scn": null, "lcr_position": null }, "op": "r", "ts_ms": 1650414625799, "transaction": null }
  • 相同数据,在 kafka 中存储大小的对比,第一张图是没有关闭 schema ,第二张图是关闭了 schema
    image.png

image.png

最后修改时间:2022-04-25 09:24:24
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图