在docker环境上使用kafka connect jdbc将变更数据从kafka应用到postgresql -m6米乐安卓版下载

原创 张玉龙 2022-04-18
1566

实验环境

  • kafka 中的数据来自于捕获的 oracle 19c pdb 的变更数据,参考文章:在docker环境上使用debezium捕获oracle 19c pdb中的变更数据到kafka
  • 准备 kafka connect jdbc connector(连接器),本实验使用的版本是 10.4.1,下载地址:
  • 准备 postgresql jdbc 驱动,kafka connect jdbc connector里面包含了 postgresql jdbc 驱动(postgresql-42.3.3.jar),如果想使用新版的驱动,也可以自行下载,下载地址:
  • 本文参考postgresql大神的文章:,感谢分享。

启动 postgresql 并创建测试库

postgresql 的 docker 仓库:

# 创建一个数据持久化目录 mkdir -p /docker_data/postgres chmod -r a rwx /docker_data/postgres/ # 后台运行14.2版本的 postgresql 数据库 docker run -d --name postgres \ -p 5432:5432 \ -e postgres_password=postgres \ -e pgdata=/var/lib/postgresql/data/pgdata \ -v /docker_data/postgres:/var/lib/postgresql/data \ postgres:14.2 # 运行 psql 容器 [root@docker ~]# alias psql="docker run -it --rm --name psql postgres:14.2 psql -h 192.168.0.40 -u postgres -p 5432" [root@docker ~]# psql password for user postgres: psql (14.2 (debian 14.2-1.pgdg110 1)) type "help" for help. postgres=# create database scott; create database postgres=# \l list of databases name | owner | encoding | collate | ctype | access privileges ----------- ---------- ---------- ------------ ------------ ----------------------- postgres | postgres | utf8 | en_us.utf8 | en_us.utf8 | scott | postgres | utf8 | en_us.utf8 | en_us.utf8 | template0 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres | | | | | postgres=ctc/postgres template1 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres | | | | | postgres=ctc/postgres (4 rows)

配置 jdbc sink connector

上传驱动和jdbc连接器

将下载的 postgresql jdbc 驱动和 kafka connect jdbc connector(连接器) 上传服务器并复制到 connect 容器中

[root@docker ~]# ls -lrt -rw-r--r--. 1 root root 1006732 apr 17 22:12 postgresql-42.2.25.jar -rw-r--r--. 1 root root 20208429 apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip # 上传 postgresql jdbc 驱动,如果使用 kafka connect jdbc connector 自带的驱动可以忽略此处 docker cp postgresql-42.2.25.jar connect:/kafka/libs # 上传 kafka connect jdbc connector unzip confluentinc-kafka-connect-jdbc-10.4.1.zip chown -r 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1 docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect # 重启 kafka connect 连接器 docker restart connect

查看现有连接器信息

  • 安装 jq,用于格式化 json 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
  • 查看当前存在哪些连接器
curl localhost:8083/connectors/ | jq
  • 查看连接器的具体信息
curl localhost:8083/connectors/oracle-scott-connector | jq

image.png

配置连接目标端 postgresql 的连接器

  • 查看下目标端 postgresql 容器的ip地址
[root@docker ~]# docker inspect postgres |grep ipaddress "secondaryipaddresses": null, "ipaddress": "172.17.0.7", "ipaddress": "172.17.0.7",
  • 编辑一个 json 文件,配置连接器信息
    jdbc sink connector configuration properties:
[root@docker ~]# vi pgsql-scott-jdbc-sink.json { "name": "pgsql-scott-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslmode=require", "tasks.max": "1", "topics": "oracle19c.scott.dept", "table.name.format": "dept", "dialect.name": "postgresqldatabasedialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } }
  • 向 kafka 连接器注册 jdbc sink connector
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

查看已注册的连机器信息

# 当前已注册的连接器 curl localhost:8083/connectors/ | jq # 连接器的具体信息 curl localhost:8083/connectors/pgsql-scott-jdbc-sink | jq
  • 查看连接器的运行状态
curl localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq

image.png

一个排错过程(可选择性跳过)

  • 向 kafka 连接器注册 jdbc sink connector 之后,连接器会自动连接到 postgresql 上建表插入数据
  • 查看连接器的运行状态发现有问题
    image.png
  • 查看 kafka 连接器日志发现建表语句有问题
  • 这才了解为啥要在连接器的配置信息中加 “table.name.format”: “dept”
    image.png
  • postgresql大神的文章: 提供了两种更改连接器配置信息的方法,我直接使用暴力的方法
# 1.删除连接器 curl -v -x delete 192.168.0.40:8083/connectors/pgsql-scott-jdbc-sink # 2.修改json文件 # 3.重新添加连接器 curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

验证目标端的数据

image.png

oracle 端模拟业务

  • insert
[root@docker ~]# sqlplus scott/scott@192.168.0.40:1521/pdbtt sql> insert into dept values (70,'bbbb','bb'); sql> commit;

image.png

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图