2

debezium对源库dml操作的同步测试 -m6米乐安卓版下载

原创 张玉龙 2022-04-29
3952

image.png

问题:如何同步无主键表的 delete 操作?

  • kafka 要想将 delete 操作同步到目标库,需要使用墓碑事件(tombstone events),也就是 kafka 消息的 key 不为空,而 value 是空。
  • 对于存在主键的表,kafka 消息的 key 值使用这个表的主键列。
  • 对于不存在主键的表,kafka 消息的 key 值默认是空,这样 delete 消息就会被跳过。
  • 对于不存在主键的表,debezium 连接器提供了配置参数 message.key.columns,使用指定列(复合列)生成 kafka 消息的 key 值,但是要保证指定列(复合列)不会出现空值的情况,就像 ogg 针对无主键表使用全列一样。
  • 源端 postgresql
postgres=# create database test_dml; postgres=# \c test_dml test_dml=# create schema inventory; test_dml=# create table inventory.orders ( id integer not null, order_date date not null, purchaser integer not null, quantity integer not null, product_id integer not null ); test_dml=# alter table only inventory.orders add constraint orders_pkey primary key (id); test_dml=# insert into inventory.orders values (10001,now(),1001,1,102); insert into inventory.orders values (10002,now(),1002,2,105); insert into inventory.orders values (10003,now(),1003,2,106); insert into inventory.orders values (10004,now(),1004,1,107);
  • 目标端 oracle 19c pdb
# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba sql> create user test identified by test; sql> grant connect,resource,create view to test; sql> grant unlimited tablespace to test; -- 存在时间列转换的问题,以后研究 sql> create table test.orders ( id number not null, order_date number not null, purchaser number not null, quantity number not null, product_id number not null ); sql> alter table test.orders add constraint orders_pkey primary key(id);
# 捕获源端数据的连接器初始配置 [root@docker tutorial]# cat register-postgres-key.json { "name": "inventory-connector-key", "config": { "connector.class": "io.debezium.connector.postgresql.postgresconnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test_dml", "database.server.name": "test_dml", "snapshot.mode": "always", "schema.include.list": "inventory", "slot.name": "test_dml_slot" } } curl -s -x delete localhost:8083/connectors/inventory-connector-key curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json # 同步给目标端的连接器初始配置 [root@docker tutorial]# cat oracle-testdml-sink.json { "name": "oracle-testdml-sink", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "test", "connection.password": "test", "tasks.max": "1", "topics": "test_dml.inventory.orders", "table.name.format": "orders", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false" } } curl -s -x delete localhost:8083/connectors/oracle-testdml-sink curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json

image.png
image.png

insert into inventory.orders values (11001,now(),1003,1,102);

image.png

  • kafka 消息的 key 值默认使用这个表的主键列。

image.png

  • before: 是这条记录的旧值,对于 insert 操作没有旧值,而 update 和 delete 操作会记录旧值,但针对源端是 postgresql 数据库的情况下,可以记录旧值的前提是被同步的表配置了 replica identity,后面有介绍。
  • after: 是这条记录的新值,对于 delete 操作,alter 为 null。
  • op: 是这条消息的类型:c = create、u = update、d = delete、r = read (applies to only snapshots)、t = truncate、m = message
update inventory.orders set quantity=2 where id=11001;

image.png

  • before 没有之前的数据,原因是没有配置表的 replica identity 属性。
delete from inventory.orders where id = 11001;

image.png

  • 针对 delete 操作,会在 kafka 中产生两条消息事件,其中一条消息事件的 value 是 null,此消息事件就是墓碑事件(tombstone events),用作删除目标端的记录,实现 delete 操作的同步。


image.png

  • before 除了主键,其他列都是0,原因是没有配置表的 replica identity 属性。

image.png
image.png

replica identity

  • 特定于 postgresql 的表级设置,仅在使用逻辑复制时有效。
  • 控制表的更改写入wal日志的信息,以识别 update 或 delete 事件的行。
  • 每当发生 update 或 delete 事件时,replica identity 的设置控制了哪些信息(如果有)可用于所涉及的表列的先前值。
  • 4个设置选项值
    • default:记录主键列(如果有)的旧值,这是非系统表的默认值。
    • using index index_name:记录指定索引所包含的列的旧值。
    • full:记录行中所有列的旧值。
    • nothing:不记录旧值,这是系统表的默认设置。
  • 如果表没有主键,则连接器不会为该表发出 update 或 delete 事件,对于没有主键的表,连接器仅发出 create 事件。
  • 设置方式
alter table only inventory.orders replica identity full;
  • update
  • delete

image.png

目标端消费

  • 目标端连接器使用本文开头 <连接器默认配置> 章节的默认配置,此处会涉及三个参数,默认配置 delete.enabled=false,pk.mode=none,insert.mode=insert

问题一:不能同步 delete 操作

  • kafka connector 的日志显示的报错信息
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception: sink connector 'oracle-testdml-sink' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null struct value and non-null struct schema, but found record at (topic='test_dml.inventory.orders',partition=0,offset=7,timestamp=1651152793683) with a null value and null value schema.
  • 不能同步 delete 操作,连接器上需要添加 “delete.enabled”: “true”, “pk.mode”: “record_key”
  • delete.enabled 默认是 false,pk.mode 默认是 none

问题二:违反唯一约束?

ora-00001: unique constraint (test.sys_c007736) violated
  • 主备端都是有主键的表,且备端的数据都是从主端同步过来的,为什么会违反唯一约束?
  • 这里先不说这个问题,但是解决方法是在问题一的基础上再添加参数 “insert.mode”: “upsert”
  • insert.mode 默认是 insert

image.png

  • 正常同步了上面的 insert、update 和 delete 操作。

image.png

  • 针对无主键的表,数据写入 kafka ,默认情况下消息的 key 为空。
insert into inventory.orders2 values (11001,now(),1003,1,102);

image.png
image.png

  • insert 操作可以正常同步,但是 update 和 delete 操作的记录被跳过,在源端是 postgresql 数据库时,就体现出配置表的replica identity属性的重要性。
update inventory.orders2 set quantity=2 where id=11001;
tutorial-connect-1 | 2022-04-28 13:41:53,005 warn postgres|test_dml|streaming no new values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.value", "type" : "struct", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.date", "type" : "int32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}]} }' from update message at 'struct{version=1.9.0.final,connector=postgresql,name=test_dml,ts_ms=1651153312826,db=test_dml,sequence=["152831320","152831376"],schema=inventory,table=orders2,txid=833,lsn=152831376}'; skipping record [io.debezium.relational.relationalchangerecordemitter]
delete from inventory.orders2 where id = 11001;
tutorial-connect-1 | 2022-04-28 13:43:12,205 warn postgres|test_dml|streaming no old values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.value", "type" : "struct", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.date", "type" : "int32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}]} }' from delete message at 'struct{version=1.9.0.final,connector=postgresql,name=test_dml,ts_ms=1651153391920,db=test_dml,sequence=["152831512","152831800"],schema=inventory,table=orders2,txid=834,lsn=152831800}'; skipping record [io.debezium.relational.relationalchangerecordemitter]

replica identity

  • 为表配置 replica identity,无主键表的 update 和 delete 操作可以正常到 kafka
alter table only inventory.orders2 replica identity full;
  • update

image.png

  • delete

image.png

目标端消费

  • 重新初始化环境,删除源端的连接器,删除 topics,重新注册连接器,保持无主键表的原始状态
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “true”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception: sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null struct or primitive key schema, but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687) with a null key and null key schema.
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception: sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null struct or primitive key schema, but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687) with a null key and null key schema.
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “none”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception: write to table '"orders2"' in upsert mode requires key field names to be known, check the primary key configuration
  • 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert”
# insert 和 update 都能支持,一样不支持 delete 操作 tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception: sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_value' and therefore requires records with a non-null struct value and non-null struct schema, but found record at (topic='test_dml.inventory.orders2',partition=0,offset=9,timestamp=1651160261336) with a null value and null value schema.

无主键表的几个问题

  • 问题一:无主键表的 key 是空值,所以不能使用 “delete.enabled”: “true”,“pk.mode”: “record_key”
  • 修改 “pk.mode”: “record_value”,加入参数 “pk.fields”: “id”,让 key 使用 value 中的字段
{ "error_code": 400, "message": "connector configuration is invalid and contains the following 2 error(s): deletes are only supported for pk.mode record_key you can also find the above list of errors at the endpoint `/connector-plugins/{connectortype}/config/validate`" }
  • “delete.enabled”: “true” 只能和 “pk.mode”: “record_key” 搭配使用,如果想要同步 delete 操作,key 必须不为空

  • 问题二:“insert.mode”: “upsert” 需要主键,对于没有主键的表需要配置 “pk.mode”: “record_value”,“pk.fields”: “id”

  • 问题三:“delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert” 一样不支持 delete 操作,如果想要同步 delete 操作,key 必须不为空

  • 下一章节的 message.key.columns 参数均可解决这这几个问题

[root@docker tutorial]# cat register-postgres-key.json { "name": "inventory-connector-key", "config": { "connector.class": "io.debezium.connector.postgresql.postgresconnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test_dml", "database.server.name": "test_dml", "snapshot.mode": "always", "table.include.list": "inventory.orders3", "slot.name": "test_dml_slot", "message.key.columns": "inventory.orders3:id,product_id" } } curl -s -x delete localhost:8083/connectors/inventory-connector-key curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json [root@docker tutorial]# cat oracle-testdml-sink.json { "name": "oracle-testdml-sink3", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "test", "connection.password": "test", "tasks.max": "1", "topics": "test_dml.inventory.orders3", "table.name.format": "orders3", "quote.sql.identifiers": "never", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "delete.enabled": "true", "pk.mode": "record_key", "insert.mode": "upsert" } } curl -s -x delete localhost:8083/connectors/oracle-testdml-sink3 curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json
insert into inventory.orders3 values (11001,now(),1003,1,102);

image.png
image.png

update inventory.orders3 set quantity=2 where id=11001;

image.png

delete from inventory.orders3 where id = 11001;




image.png
image.png

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图