3

在docker环境上使用debezium捕获postgresql 14.2中的变更数据到kafka -m6米乐安卓版下载

原创 张玉龙 2022-04-22
3037

image.png

实验环境

  • debezium 版本 1.9 (2022-04-05)
  • debezium tested versions
    image.png
  • postgresql 版本是单机的 14.2
  • 本测试参考文档:
  • 基于 debezium 的变更数据捕获的架构:
    image.png

启动 zookeeper

# 后台运行 docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper # 实时查看 zookeeper 的日志信息 docker logs -f -t --tail 10 zookeeper

启动 kafka

# 后台运行 docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka # 实时查看 kafka 的日志信息 docker logs -f -t --tail 10 kafka

启动 postgresql 14.2,使用 debezium 提供的示例镜像,里面自带了一个测试的 schemas inventory

# 创建一个数据持久化目录 mkdir -p /docker_data/postgres chmod -r a rwx /docker_data/postgres/ # 后台运行 14.2 版本的 postgresql 数据库 docker run -d --name postgres \ -p 5432:5432 \ -e postgres_password=postgres \ -e pgdata=/var/lib/pgdata \ -v /docker_data/postgres:/var/lib/pgdata \ quay.io/debezium/example-postgres # 运行 psql 容器 [root@docker ~]# alias psql='docker run -it --rm --name psql debezium/example-postgres psql -h 192.168.0.40 -u postgres -p 5432' [root@docker ~]# psql password for user postgres: psql (14.2 (debian 14.2-1.pgdg110 1)) type "help" for help. postgres=# select version(); version ----------------------------------------------------------------------------------------------------------------------------- postgresql 14.2 (debian 14.2-1.pgdg110 1) on x86_64-pc-linux-gnu, compiled by gcc (debian 10.2.1-6) 10.2.1 20210110, 64-bit (1 row) postgres=# \l list of databases name | owner | encoding | collate | ctype | access privileges ----------- ---------- ---------- ------------ ------------ ----------------------- postgres | postgres | utf8 | en_us.utf8 | en_us.utf8 | template0 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres | | | | | postgres=ctc/postgres template1 | postgres | utf8 | en_us.utf8 | en_us.utf8 | =c/postgres | | | | | postgres=ctc/postgres (3 rows) postgres=# \dn list of schemas name | owner ----------- ---------- inventory | postgres public | postgres (2 rows) postgres=# \dt inventory.* list of relations schema | name | type | owner ----------- ------------------ ------- ---------- inventory | customers | table | postgres inventory | geom | table | postgres inventory | orders | table | postgres inventory | products | table | postgres inventory | products_on_hand | table | postgres inventory | spatial_ref_sys | table | postgres (6 rows) postgres=# select schemaname,relname,n_live_tup from pg_stat_user_tables; schemaname | relname | n_live_tup ------------ ------------------ ------------ inventory | customers | 4 inventory | products_on_hand | 9 inventory | orders | 4 inventory | products | 9 inventory | spatial_ref_sys | 8500 inventory | geom | 3 (6 rows)

看看这个 debezium 提供的 postgresql 镜像中都做了哪些配置

  • pg_hba.conf
[root@docker ~]# cd /docker_data/postgres/ [root@docker postgres]# cat pg_hba.conf host all all all scram-sha-256 host replication postgres 0.0.0.0/0 trust

image.png

  • postgresql.conf
listen_addresses = '*' shared_preload_libraries = 'decoderbufs,wal2json' wal_level = logical max_wal_senders = 4 #wal_keep_segments = 4 #wal_sender_timeout = 60s max_replication_slots = 4

image.png

启动 kafka connect

# 后台运行 docker run -d --name connect \ -p 8083:8083 \ -e group_id=1 \ -e config_storage_topic=my_connect_configs \ -e offset_storage_topic=my_connect_offsets \ -e status_storage_topic=my_connect_statuses \ --link zookeeper:zookeeper \ --link kafka:kafka \ --link postgres:postgres \ quay.io/debezium/connect # 实时查看 kafka connect 的日志信息 docker logs -f -t --tail 10 connect

debezium postgresql connector

  • 准备 debezium postgresql connector 配置文件
    将配置文件创建在 docker 宿主机上即可,connect 容器开放了 rest api 来管理 debezium 的连接器
[root@docker ~]# vi pgsql-inventory-connector.json { "name": "pgsql-inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.postgresconnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "database.server.name": "pgsql", "slot.name": "inventory_slot", "table.include.list": "inventory.orders,inventory.products", "publication.name": "dbz_inventory_connector", "publication.autocreate.mode": "filtered", "plugin.name": "pgoutput" } }
  • 向 kafka 连接器注册 debezium postgresql connector
[root@docker ~]# curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-inventory-connector.json http/1.1 201 created date: fri, 22 apr 2022 00:08:47 gmt location: http://192.168.0.40:8083/connectors/pgsql-inventory-connector content-type: application/json content-length: 551 server: jetty(9.4.43.v20210629) {"name":"pgsql-inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.postgresconnector","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","database.server.name":"pgsql","slot.name":"inventory_slot","table.include.list":"inventory.orders,inventory.products","publication.name":"dbz_inventory_connector","publication.autocreate.mode":"filtered","plugin.name":"pgoutput","name":"pgsql-inventory-connector"},"tasks":[],"type":"source"}

使用 kafka-ui 核对捕获到的数据

kafka-ui:open-source web gui for apache kafka management:

docker run -p 8811:8080 \ -e kafka_clusters_0_name=oracle-scott-connector \ -e kafka_clusters_0_bootstrapservers=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest

网页登录:http://192.168.0.40:8811/

image.png

模拟业务

  • insert
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 (4 rows) postgres=# insert into inventory.orders values (11001,now(),1003,1,102); insert 0 1 postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 1 | 102 (5 rows)

image.png

  • update
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 1 | 102 (5 rows) postgres=# update inventory.orders set quantity=2 where id=11001; update 1 postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 2 | 102 (5 rows)

image.png

  • delete
postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 11001 | 2022-04-22 | 1003 | 2 | 102 (5 rows) postgres=# delete from inventory.orders where id = 11001; delete 1 postgres=# select * from inventory.orders; id | order_date | purchaser | quantity | product_id ------- ------------ ----------- ---------- ------------ 10001 | 2016-01-16 | 1001 | 1 | 102 10002 | 2016-01-17 | 1002 | 2 | 105 10003 | 2016-02-19 | 1002 | 2 | 106 10004 | 2016-02-21 | 1003 | 1 | 107 (4 rows)


image.png
image.png

一个问题,“schema.include.list” 捕获的表不全

当连接器属性配置 “schema.include.list”: “inventory”,正常来说会捕获 schema inventory 里面的所有表,但是测试发现少捕获一张 spatial_ref_sys 表,没整明白啥情况。

{ "name": "snapshot-mode-initial", "config": { "connector.class" : "io.debezium.connector.postgresql.postgresconnector", "database.hostname" : "postgres", "database.port" : "5432", "database.user" : "postgres", "database.password" : "postgres", "database.dbname" : "postgres", "database.server.name" : "initial", "slot.name": "initial_slot", "schema.include.list": "inventory", "snapshot.mode": "initial", "plugin.name": "pgoutput", "publication.name": "dbz_inventory_connector", "publication.autocreate.mode": "filtered" } }

image.png

监控 postgresql 的复制槽

postgres=# select * from pg_replication_slots;

image.png

最后修改时间:2022-04-27 16:17:38
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
1人已赞赏
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图