参考文档
背景
debezium 连接器运行在 kafka connect 框架中,通过生成变更事件记录写入到kafka主题(topic)来捕获和记录数据库中的每个行级更改,在 debezium 连接器将变更事件记录写入到kafka主题(topic)之前,需要将捕获的记录进行转换,转换成 kafka 可以存储的形式,kafka connect 提供了一个 json 转换器,可将记录的 keys 和 values 序列化为 json 格式,但是 json 转换器默认是包含记录的消息 schema,这使得每条记录都非常冗长。
- key
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "deptno"
}
],
"optional": false,
"name": "ora19c.scott.dept.key"
},
"payload": {
"deptno": 10
}
}
- values
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "deptno"
},
{
"type": "string",
"optional": true,
"field": "dname"
},
{
"type": "string",
"optional": true,
"field": "loc"
}
],
"optional": true,
"name": "ora19c.scott.dept.value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "deptno"
},
{
"type": "string",
"optional": true,
"field": "dname"
},
{
"type": "string",
"optional": true,
"field": "loc"
}
],
"optional": true,
"name": "ora19c.scott.dept.value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "txid"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "lcr_position"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "ora19c.scott.dept.envelope"
},
"payload": {
"before": null,
"after": {
"deptno": 10,
"dname": "accounting",
"loc": "new york"
},
"source": {
"version": "1.9.0.final",
"connector": "oracle",
"name": "ora19c",
"ts_ms": 1650415436277,
"snapshot": "true",
"db": "pdbtt",
"sequence": null,
"schema": "scott",
"table": "dept",
"txid": null,
"scn": "6853525",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1650415436280,
"transaction": null
}
}
以上是一条记录在kafka中的存储形式,keys 和 values 包含了消息 schema,使得每条消息特别冗长,这样会导致存储压力和网络传输压力,关于消息 schema 有什么作用暂时还不清楚,但是 debezium 还是让考虑关闭记录 schema 的属性,涉及到以下两个参数:
key.converter.schemas.enable value.converter.schemas.enable
- 测试发现,在某些场景,关闭记录 schema 的属性会导致消费端出现问题,建议考虑avro转换器,以下是一个问题案例
kafka使用json序列化关闭schema后在消费端产生的一个问题
配置参数关闭 message schema 的属性
- 在 kafka connect 连接器的配置文件(config/connect-distributed.properties)中配置参数值:
# 因 kafka connect 是一个 docker 容器,容器内也没 vi 编辑器,所有可以考虑在 docker 宿主机上对容器内的文件进行编辑
# 查看容器内的目录在 docker 宿主机上的挂载位置
[root@docker ~]# docker inspect connect |grep "/kafka/config" -b1
"source": "/var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data",
"destination": "/kafka/config",
# 编辑 connect-distributed.properties 文件,修改参数
[root@docker ~]# cd /var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data
[root@docker _data]# ll
total 68
-rw-r--r--. 1 1001 1001 906 apr 19 14:51 connect-console-sink.properties
-rw-r--r--. 1 1001 1001 909 apr 19 14:51 connect-console-source.properties
-rw-r--r--. 1 1001 1001 5608 apr 19 14:52 connect-distributed.properties
-rw-r--r--. 1 1001 1001 883 apr 19 14:51 connect-file-sink.properties
-rw-r--r--. 1 1001 1001 881 apr 19 14:51 connect-file-source.properties
-rw-r--r--. 1 1001 1001 2103 apr 19 14:51 connect-log4j.properties
-rw-r--r--. 1 1001 1001 2540 apr 19 14:51 connect-mirror-maker.properties
-rw-r--r--. 1 1001 1001 2262 apr 19 14:51 connect-standalone.properties
-rw-r--r--. 1 1001 1001 1221 apr 19 14:51 consumer.properties
drwxr-xr-x. 2 1001 1001 102 apr 19 14:51 kraft
-rw-rw-r--. 1 1001 1001 850 apr 19 10:03 log4j.properties
-rw-r--r--. 1 1001 1001 1925 apr 19 14:51 producer.properties
-rw-r--r--. 1 1001 1001 6849 apr 19 14:51 server.properties
-rw-r--r--. 1 1001 1001 1032 apr 19 14:51 tools-log4j.properties
-rw-r--r--. 1 1001 1001 1169 apr 19 14:51 trogdor.conf
-rw-r--r--. 1 1001 1001 1205 apr 19 14:51 zookeeper.properties
[root@docker _data]# vi connect-distributed.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
- 重启连接器使参数生效
# docker restart connect
新建一个连接器,观察效果
新建连接器时需要注意:参数 name、database.server.name、database.history.kafka.topic 建议保持唯一。
[root@docker ~]# cat oracle-scott-connector.json
{
"name": "oracle-scott-connector2",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "ora1",
"tasks.max" : "1",
"schema.include.list": "scott",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.inventory1"
}
}
[root@docker ~]# curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json
- key
{
"deptno": 10
}
- values
{
"before": null,
"after": {
"deptno": 10,
"dname": "accounting",
"loc": "new york"
},
"source": {
"version": "1.9.0.final",
"connector": "oracle",
"name": "ora1",
"ts_ms": 1650414625783,
"snapshot": "true",
"db": "pdbtt",
"sequence": null,
"schema": "scott",
"table": "dept",
"txid": null,
"scn": "6732490",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1650414625799,
"transaction": null
}
- 相同数据,在 kafka 中存储大小的对比,第一张图是没有关闭 schema ,第二张图是关闭了 schema
最后修改时间:2022-04-25 09:24:24
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。