实验环境
- kafka 中的数据来自于捕获的 oracle 19c pdb 的变更数据,参考文章:在docker环境上使用debezium捕获oracle 19c pdb中的变更数据到kafka
- 准备 kafka connect jdbc connector(连接器),本实验使用的版本是 10.4.1,下载地址:
- 准备 postgresql jdbc 驱动,kafka connect jdbc connector里面包含了 postgresql jdbc 驱动(postgresql-42.3.3.jar),如果想使用新版的驱动,也可以自行下载,下载地址:
- 本文参考postgresql大神的文章:,感谢分享。
启动 postgresql 并创建测试库
postgresql 的 docker 仓库:
# 创建一个数据持久化目录
mkdir -p /docker_data/postgres
chmod -r a rwx /docker_data/postgres/
# 后台运行14.2版本的 postgresql 数据库
docker run -d --name postgres \
-p 5432:5432 \
-e postgres_password=postgres \
-e pgdata=/var/lib/postgresql/data/pgdata \
-v /docker_data/postgres:/var/lib/postgresql/data \
postgres:14.2
# 运行 psql 容器
[root@docker ~]# alias psql="docker run -it --rm --name psql postgres:14.2 psql -h 192.168.0.40 -u postgres -p 5432"
[root@docker ~]# psql
password for user postgres:
psql (14.2 (debian 14.2-1.pgdg110 1))
type "help" for help.
postgres=# create database scott;
create database
postgres=# \l
list of databases
name | owner | encoding | collate | ctype | access privileges
----------- ---------- ---------- ------------ ------------ -----------------------
postgres | postgres | utf8 | en_us.utf8 | en_us.utf8 |
scott | postgres | utf8 | en_us.utf8 | en_us.utf8 |
template0 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres
| | | | | postgres=ctc/postgres
template1 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres
| | | | | postgres=ctc/postgres
(4 rows)
配置 jdbc sink connector
上传驱动和jdbc连接器
将下载的 postgresql jdbc 驱动和 kafka connect jdbc connector(连接器) 上传服务器并复制到 connect 容器中
[root@docker ~]# ls -lrt
-rw-r--r--. 1 root root 1006732 apr 17 22:12 postgresql-42.2.25.jar
-rw-r--r--. 1 root root 20208429 apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip
# 上传 postgresql jdbc 驱动,如果使用 kafka connect jdbc connector 自带的驱动可以忽略此处
docker cp postgresql-42.2.25.jar connect:/kafka/libs
# 上传 kafka connect jdbc connector
unzip confluentinc-kafka-connect-jdbc-10.4.1.zip
chown -r 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1
docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect
# 重启 kafka connect 连接器
docker restart connect
查看现有连接器信息
- 安装 jq,用于格式化 json 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
- 查看当前存在哪些连接器
curl localhost:8083/connectors/ | jq
- 查看连接器的具体信息
curl localhost:8083/connectors/oracle-scott-connector | jq
配置连接目标端 postgresql 的连接器
- 查看下目标端 postgresql 容器的ip地址
[root@docker ~]# docker inspect postgres |grep ipaddress
"secondaryipaddresses": null,
"ipaddress": "172.17.0.7",
"ipaddress": "172.17.0.7",
- 编辑一个 json 文件,配置连接器信息
jdbc sink connector configuration properties:
[root@docker ~]# vi pgsql-scott-jdbc-sink.json
{
"name": "pgsql-scott-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslmode=require",
"tasks.max": "1",
"topics": "oracle19c.scott.dept",
"table.name.format": "dept",
"dialect.name": "postgresqldatabasedialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
}
- 向 kafka 连接器注册 jdbc sink connector
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
查看已注册的连机器信息
# 当前已注册的连接器
curl localhost:8083/connectors/ | jq
# 连接器的具体信息
curl localhost:8083/connectors/pgsql-scott-jdbc-sink | jq
- 查看连接器的运行状态
curl localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq
一个排错过程(可选择性跳过)
- 向 kafka 连接器注册 jdbc sink connector 之后,连接器会自动连接到 postgresql 上建表插入数据
- 查看连接器的运行状态发现有问题
- 查看 kafka 连接器日志发现建表语句有问题
- 这才了解为啥要在连接器的配置信息中加 “table.name.format”: “dept”
- postgresql大神的文章: 提供了两种更改连接器配置信息的方法,我直接使用暴力的方法
# 1.删除连接器
curl -v -x delete 192.168.0.40:8083/connectors/pgsql-scott-jdbc-sink
# 2.修改json文件
# 3.重新添加连接器
curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
验证目标端的数据
oracle 端模拟业务
- insert
[root@docker ~]# sqlplus scott/scott@192.168.0.40:1521/pdbtt
sql> insert into dept values (70,'bbbb','bb');
sql> commit;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。