实验环境
- debezium 版本 1.9 (2022-04-05)
- debezium tested versions
- postgresql 版本是单机的 14.2
- 本测试参考文档:
- 基于 debezium 的变更数据捕获的架构:
启动 zookeeper
# 后台运行
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper
# 实时查看 zookeeper 的日志信息
docker logs -f -t --tail 10 zookeeper
启动 kafka
# 后台运行
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka
# 实时查看 kafka 的日志信息
docker logs -f -t --tail 10 kafka
启动 postgresql 14.2,使用 debezium 提供的示例镜像,里面自带了一个测试的 schemas inventory
# 创建一个数据持久化目录
mkdir -p /docker_data/postgres
chmod -r a rwx /docker_data/postgres/
# 后台运行 14.2 版本的 postgresql 数据库
docker run -d --name postgres \
-p 5432:5432 \
-e postgres_password=postgres \
-e pgdata=/var/lib/pgdata \
-v /docker_data/postgres:/var/lib/pgdata \
quay.io/debezium/example-postgres
# 运行 psql 容器
[root@docker ~]# alias psql='docker run -it --rm --name psql debezium/example-postgres psql -h 192.168.0.40 -u postgres -p 5432'
[root@docker ~]# psql
password for user postgres:
psql (14.2 (debian 14.2-1.pgdg110 1))
type "help" for help.
postgres=# select version();
version
-----------------------------------------------------------------------------------------------------------------------------
postgresql 14.2 (debian 14.2-1.pgdg110 1) on x86_64-pc-linux-gnu, compiled by gcc (debian 10.2.1-6) 10.2.1 20210110, 64-bit
(1 row)
postgres=# \l
list of databases
name | owner | encoding | collate | ctype | access privileges
----------- ---------- ---------- ------------ ------------ -----------------------
postgres | postgres | utf8 | en_us.utf8 | en_us.utf8 |
template0 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres
| | | | | postgres=ctc/postgres
template1 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres
| | | | | postgres=ctc/postgres
(3 rows)
postgres=# \dn
list of schemas
name | owner
----------- ----------
inventory | postgres
public | postgres
(2 rows)
postgres=# \dt inventory.*
list of relations
schema | name | type | owner
----------- ------------------ ------- ----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
postgres=# select schemaname,relname,n_live_tup from pg_stat_user_tables;
schemaname | relname | n_live_tup
------------ ------------------ ------------
inventory | customers | 4
inventory | products_on_hand | 9
inventory | orders | 4
inventory | products | 9
inventory | spatial_ref_sys | 8500
inventory | geom | 3
(6 rows)
看看这个 debezium 提供的 postgresql 镜像中都做了哪些配置
- pg_hba.conf
[root@docker ~]# cd /docker_data/postgres/
[root@docker postgres]# cat pg_hba.conf
host all all all scram-sha-256
host replication postgres 0.0.0.0/0 trust
- postgresql.conf
listen_addresses = '*'
shared_preload_libraries = 'decoderbufs,wal2json'
wal_level = logical
max_wal_senders = 4
#wal_keep_segments = 4
#wal_sender_timeout = 60s
max_replication_slots = 4
启动 kafka connect
# 后台运行
docker run -d --name connect \
-p 8083:8083 \
-e group_id=1 \
-e config_storage_topic=my_connect_configs \
-e offset_storage_topic=my_connect_offsets \
-e status_storage_topic=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link postgres:postgres \
quay.io/debezium/connect
# 实时查看 kafka connect 的日志信息
docker logs -f -t --tail 10 connect
debezium postgresql connector
- 准备 debezium postgresql connector 配置文件
将配置文件创建在 docker 宿主机上即可,connect 容器开放了 rest api 来管理 debezium 的连接器
[root@docker ~]# vi pgsql-inventory-connector.json
{
"name": "pgsql-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "pgsql",
"slot.name": "inventory_slot",
"table.include.list": "inventory.orders,inventory.products",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered",
"plugin.name": "pgoutput"
}
}
- 向 kafka 连接器注册 debezium postgresql connector
[root@docker ~]# curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-inventory-connector.json
http/1.1 201 created
date: fri, 22 apr 2022 00:08:47 gmt
location: http://192.168.0.40:8083/connectors/pgsql-inventory-connector
content-type: application/json
content-length: 551
server: jetty(9.4.43.v20210629)
{"name":"pgsql-inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.postgresconnector","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","database.server.name":"pgsql","slot.name":"inventory_slot","table.include.list":"inventory.orders,inventory.products","publication.name":"dbz_inventory_connector","publication.autocreate.mode":"filtered","plugin.name":"pgoutput","name":"pgsql-inventory-connector"},"tasks":[],"type":"source"}
使用 kafka-ui 核对捕获到的数据
kafka-ui:open-source web gui for apache kafka management:
docker run -p 8811:8080 \ -e kafka_clusters_0_name=oracle-scott-connector \ -e kafka_clusters_0_bootstrapservers=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest
网页登录:http://192.168.0.40:8811/
模拟业务
- insert
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
------- ------------ ----------- ---------- ------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(4 rows)
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
insert 0 1
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
------- ------------ ----------- ---------- ------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 1 | 102
(5 rows)
- update
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
------- ------------ ----------- ---------- ------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 1 | 102
(5 rows)
postgres=# update inventory.orders set quantity=2 where id=11001;
update 1
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
------- ------------ ----------- ---------- ------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 2 | 102
(5 rows)
- delete
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
------- ------------ ----------- ---------- ------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 2 | 102
(5 rows)
postgres=# delete from inventory.orders where id = 11001;
delete 1
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
------- ------------ ----------- ---------- ------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(4 rows)
一个问题,“schema.include.list” 捕获的表不全
当连接器属性配置 “schema.include.list”: “inventory”,正常来说会捕获 schema inventory 里面的所有表,但是测试发现少捕获一张 spatial_ref_sys 表,没整明白啥情况。
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.postgresql.postgresconnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
监控 postgresql 的复制槽
postgres=# select * from pg_replication_slots;
最后修改时间:2022-04-27 16:17:38
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。