2

在docker环境上使用kafka connect jdbc将变更数据从kafka应用到oracle 19c pdb -m6米乐安卓版下载

原创 张玉龙 2022-04-22
1848

image.png

实验环境

  • kafka 中的数据来自于捕获的 postgresql 14.2 的变更数据,参考文章:
  • 准备 kafka connect jdbc connector(连接器),本实验使用的版本是 10.4.1,下载地址:
  • 准备 oracle jdbc 驱动,kafka connect jdbc connector 里面包含了 oracle jdbc 驱动(ojdbc8-19.7.0.0.jar),如果想使用新版的驱动,也可以自行下载,下载地址:

启动 oracle 19c 数据库,创建一个测试用户

参考文章:使用docker装一个oracle 19c的单机测试环境

# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba sql> create user inventory identified by inventory; sql> grant connect,resource,create view to inventory; sql> grant unlimited tablespace to inventory;

配置 jdbc sink connector

上传驱动和jdbc连接器

将下载的 oracle jdbc 驱动和 kafka connect jdbc connector(连接器) 上传服务器并复制到 connect 容器中

[root@docker ~]# ls -lrt -rw-r--r--. 1 root root 4458107 apr 17 22:12 ojdbc8-19.14.0.0.jar -rw-r--r--. 1 root root 20208429 apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip # 上传 oracle jdbc 驱动,如果使用 kafka connect jdbc connector 自带的驱动可以忽略此处 docker cp ojdbc8-19.14.0.0.jar connect:/kafka/libs # 上传 kafka connect jdbc connector unzip confluentinc-kafka-connect-jdbc-10.4.1.zip chown -r 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1 docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect # 重启 kafka connect 连接器 docker restart connect

查看现有连接器信息

  • 安装 jq,用于格式化 json 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
  • 查看当前存在哪些连接器
curl -s localhost:8083/connectors/ | jq

image.png

  • 查看连接器的具体信息
curl -s localhost:8083/connectors/snapshot-mode-initial | jq

image.png

配置连接目标端 oracle 19c pdb 的连接器

  • 查看下目标端 oracle 容器的ip地址
[root@docker ~]# docker inspect ora19c |grep ipaddress "secondaryipaddresses": null, "ipaddress": "172.17.0.2", "ipaddress": "172.17.0.2",
  • 编辑一个 json 文件,配置连接器信息
    jdbc sink connector configuration properties:
[root@docker ~]# vi oracle-jdbc-sink.json { "name": "oracle-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "initial.inventory.orders", "table.name.format": "orders", "dialect.name": "oracledatabasedialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } }
  • 向 kafka 连接器注册 jdbc sink connector
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-jdbc-sink.json

查看已注册的连机器信息

# 当前已注册的连接器 curl -s localhost:8083/connectors/ | jq # 连接器的具体信息 curl -s localhost:8083/connectors/oracle-jdbc-sink | jq
  • 查看连接器的运行状态
curl -s localhost:8083/connectors/oracle-jdbc-sink/status | jq

image.png

遇到第一个问题,但是这个问题是由方法解决的,后面再说吧

  • 向 kafka 连接器注册 jdbc sink connector 之后,连接器会自动连接到 oracle pdb 上建表插入数据,但是自动建的表名上带有双引号,强制将表名转为了小写。
  • 查看 oracle pdb 表中的数据

验证目标端的数据

image.png

oracle 端模拟业务

  • insert
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 (4 rows) postgres=# insert into inventory.orders values (11001,now(),1003,1,102); insert 0 1

image.png

  • update
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 1 | 102 (5 rows) postgres=# update inventory.orders set quantity=2 where id=11001; update 1
  • delete
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 2 | 102 (5 rows) postgres=# delete from inventory.orders where id = 11001; delete 1

遇到第二个问题,不同步 delete 操作

  • 源端 postgresql 执行 delete 操作,发现目标端 oracle 没有同步执行 delete 操作,查看 kafka connect 日志发现这是有参数限制的呀
    image.png
    tasks 也失败了

  • 更新 sink 连接器,配置 delete.enabled 和 pk.mode

[root@docker ~]# vi oracle-jdbc-sink_update.json { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt", "connection.user": "inventory", "connection.password": "inventory", "tasks.max": "1", "topics": "initial.inventory.orders", "table.name.format": "orders", "dialect.name": "oracledatabasedialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "pk.mode": "record_key" } curl -i -x put -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/oracle-jdbc-sink/config -d @oracle-jdbc-sink_update.json

tasks 恢复正常
image.png
delete 操作也同步到目标端
image.png

查看消费者组

image.png

最后修改时间:2022-04-27 16:18:06
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
1人已赞赏
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图