问题:如何同步无主键表的 delete 操作?
- kafka 要想将 delete 操作同步到目标库,需要使用墓碑事件(tombstone events),也就是 kafka 消息的 key 不为空,而 value 是空。
- 对于存在主键的表,kafka 消息的 key 值使用这个表的主键列。
- 对于不存在主键的表,kafka 消息的 key 值默认是空,这样 delete 消息就会被跳过。
- 对于不存在主键的表,debezium 连接器提供了配置参数 message.key.columns,使用指定列(复合列)生成 kafka 消息的 key 值,但是要保证指定列(复合列)不会出现空值的情况,就像 ogg 针对无主键表使用全列一样。
- 源端 postgresql
postgres=# create database test_dml;
postgres=# \c test_dml
test_dml=# create schema inventory;
test_dml=# create table inventory.orders (
id integer not null,
order_date date not null,
purchaser integer not null,
quantity integer not null,
product_id integer not null
);
test_dml=# alter table only inventory.orders add constraint orders_pkey primary key (id);
test_dml=# insert into inventory.orders values (10001,now(),1001,1,102);
insert into inventory.orders values (10002,now(),1002,2,105);
insert into inventory.orders values (10003,now(),1003,2,106);
insert into inventory.orders values (10004,now(),1004,1,107);
- 目标端 oracle 19c pdb
# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba
sql> create user test identified by test;
sql> grant connect,resource,create view to test;
sql> grant unlimited tablespace to test;
-- 存在时间列转换的问题,以后研究
sql> create table test.orders (
id number not null,
order_date number not null,
purchaser number not null,
quantity number not null,
product_id number not null
);
sql> alter table test.orders add constraint orders_pkey primary key(id);
# 捕获源端数据的连接器初始配置
[root@docker tutorial]# cat register-postgres-key.json
{
"name": "inventory-connector-key",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test_dml",
"database.server.name": "test_dml",
"snapshot.mode": "always",
"schema.include.list": "inventory",
"slot.name": "test_dml_slot"
}
}
curl -s -x delete localhost:8083/connectors/inventory-connector-key
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json
# 同步给目标端的连接器初始配置
[root@docker tutorial]# cat oracle-testdml-sink.json
{
"name": "oracle-testdml-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "test",
"connection.password": "test",
"tasks.max": "1",
"topics": "test_dml.inventory.orders",
"table.name.format": "orders",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false"
}
}
curl -s -x delete localhost:8083/connectors/oracle-testdml-sink
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json
insert into inventory.orders values (11001,now(),1003,1,102);
- kafka 消息的 key 值默认使用这个表的主键列。
- before: 是这条记录的旧值,对于 insert 操作没有旧值,而 update 和 delete 操作会记录旧值,但针对源端是 postgresql 数据库的情况下,可以记录旧值的前提是被同步的表配置了 replica identity,后面有介绍。
- after: 是这条记录的新值,对于 delete 操作,alter 为 null。
- op: 是这条消息的类型:c = create、u = update、d = delete、r = read (applies to only snapshots)、t = truncate、m = message
update inventory.orders set quantity=2 where id=11001;
- before 没有之前的数据,原因是没有配置表的 replica identity 属性。
delete from inventory.orders where id = 11001;
- 针对 delete 操作,会在 kafka 中产生两条消息事件,其中一条消息事件的 value 是 null,此消息事件就是墓碑事件(tombstone events),用作删除目标端的记录,实现 delete 操作的同步。
- before 除了主键,其他列都是0,原因是没有配置表的 replica identity 属性。
replica identity
- 特定于 postgresql 的表级设置,仅在使用逻辑复制时有效。
- 控制表的更改写入wal日志的信息,以识别 update 或 delete 事件的行。
- 每当发生 update 或 delete 事件时,replica identity 的设置控制了哪些信息(如果有)可用于所涉及的表列的先前值。
- 4个设置选项值
- default:记录主键列(如果有)的旧值,这是非系统表的默认值。
- using index index_name:记录指定索引所包含的列的旧值。
- full:记录行中所有列的旧值。
- nothing:不记录旧值,这是系统表的默认设置。
- 如果表没有主键,则连接器不会为该表发出 update 或 delete 事件,对于没有主键的表,连接器仅发出 create 事件。
- 设置方式
alter table only inventory.orders replica identity full;
- update
- delete
目标端消费
- 目标端连接器使用本文开头 <连接器默认配置> 章节的默认配置,此处会涉及三个参数,默认配置 delete.enabled=false,pk.mode=none,insert.mode=insert
问题一:不能同步 delete 操作
- kafka connector 的日志显示的报错信息
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception:
sink connector 'oracle-testdml-sink' is configured with 'delete.enabled=false' and 'pk.mode=none'
and therefore requires records with a non-null struct value and non-null struct schema,
but found record at (topic='test_dml.inventory.orders',partition=0,offset=7,timestamp=1651152793683)
with a null value and null value schema.
- 不能同步 delete 操作,连接器上需要添加 “delete.enabled”: “true”, “pk.mode”: “record_key”
- delete.enabled 默认是 false,pk.mode 默认是 none
问题二:违反唯一约束?
ora-00001: unique constraint (test.sys_c007736) violated
- 主备端都是有主键的表,且备端的数据都是从主端同步过来的,为什么会违反唯一约束?
- 这里先不说这个问题,但是解决方法是在问题一的基础上再添加参数 “insert.mode”: “upsert”
- insert.mode 默认是 insert
- 正常同步了上面的 insert、update 和 delete 操作。
- 针对无主键的表,数据写入 kafka ,默认情况下消息的 key 为空。
insert into inventory.orders2 values (11001,now(),1003,1,102);
- insert 操作可以正常同步,但是 update 和 delete 操作的记录被跳过,在源端是 postgresql 数据库时,就体现出配置表的replica identity属性的重要性。
update inventory.orders2 set quantity=2 where id=11001;
tutorial-connect-1 | 2022-04-28 13:41:53,005 warn postgres|test_dml|streaming no new values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.value", "type" : "struct", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.date", "type" : "int32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}]} }'
from update message at 'struct{version=1.9.0.final,connector=postgresql,name=test_dml,ts_ms=1651153312826,db=test_dml,sequence=["152831320","152831376"],schema=inventory,table=orders2,txid=833,lsn=152831376}';
skipping record [io.debezium.relational.relationalchangerecordemitter]
delete from inventory.orders2 where id = 11001;
tutorial-connect-1 | 2022-04-28 13:43:12,205 warn postgres|test_dml|streaming no old values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.value", "type" : "struct", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.date", "type" : "int32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "int32", "optional" : "false", "default" : null}}]} }'
from delete message at 'struct{version=1.9.0.final,connector=postgresql,name=test_dml,ts_ms=1651153391920,db=test_dml,sequence=["152831512","152831800"],schema=inventory,table=orders2,txid=834,lsn=152831800}';
skipping record [io.debezium.relational.relationalchangerecordemitter]
replica identity
- 为表配置 replica identity,无主键表的 update 和 delete 操作可以正常到 kafka
alter table only inventory.orders2 replica identity full;
- update
- delete
目标端消费
- 重新初始化环境,删除源端的连接器,删除 topics,重新注册连接器,保持无主键表的原始状态
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “true”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception:
sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=true' and 'pk.mode=record_key'
and therefore requires records with a non-null key and non-null struct or primitive key schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687)
with a null key and null key schema.
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception:
sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_key'
and therefore requires records with a non-null key and non-null struct or primitive key schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687)
with a null key and null key schema.
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “none”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception:
write to table '"orders2"' in upsert mode requires key field names to be known, check the primary key configuration
- 目标端连接器在本文开头 <连接器默认配置> 章节的默认配置的基础下添加 “delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert”
# insert 和 update 都能支持,一样不支持 delete 操作
tutorial-connect-1 | org.apache.kafka.connect.errors.connectexception:
sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_value'
and therefore requires records with a non-null struct value and non-null struct schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=9,timestamp=1651160261336)
with a null value and null value schema.
无主键表的几个问题
- 问题一:无主键表的 key 是空值,所以不能使用 “delete.enabled”: “true”,“pk.mode”: “record_key”
- 修改 “pk.mode”: “record_value”,加入参数 “pk.fields”: “id”,让 key 使用 value 中的字段
{
"error_code": 400,
"message": "connector configuration is invalid and contains the following 2 error(s):
deletes are only supported for pk.mode record_key
you can also find the above list of errors at the endpoint `/connector-plugins/{connectortype}/config/validate`"
}
-
“delete.enabled”: “true” 只能和 “pk.mode”: “record_key” 搭配使用,如果想要同步 delete 操作,key 必须不为空
-
问题二:“insert.mode”: “upsert” 需要主键,对于没有主键的表需要配置 “pk.mode”: “record_value”,“pk.fields”: “id”
-
问题三:“delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert” 一样不支持 delete 操作,如果想要同步 delete 操作,key 必须不为空
-
下一章节的 message.key.columns 参数均可解决这这几个问题
[root@docker tutorial]# cat register-postgres-key.json
{
"name": "inventory-connector-key",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test_dml",
"database.server.name": "test_dml",
"snapshot.mode": "always",
"table.include.list": "inventory.orders3",
"slot.name": "test_dml_slot",
"message.key.columns": "inventory.orders3:id,product_id"
}
}
curl -s -x delete localhost:8083/connectors/inventory-connector-key
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json
[root@docker tutorial]# cat oracle-testdml-sink.json
{
"name": "oracle-testdml-sink3",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "test",
"connection.password": "test",
"tasks.max": "1",
"topics": "test_dml.inventory.orders3",
"table.name.format": "orders3",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"delete.enabled": "true",
"pk.mode": "record_key",
"insert.mode": "upsert"
}
}
curl -s -x delete localhost:8083/connectors/oracle-testdml-sink3
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json
insert into inventory.orders3 values (11001,now(),1003,1,102);
update inventory.orders3 set quantity=2 where id=11001;
delete from inventory.orders3 where id = 11001;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。