- 才发现 debezium 提供了一个使用 docker-compose 进行自动部署的测试环境,非常的好用,根据自己的环境改改,可以测试很多场景。
- 下载地址:
- 使用教程:
下载解压,可以看到 tutorial 提供的测试用例
- debezium 提供的测试环境主要是针对 mysql 的,本文测试主要针对 postgresql,根据 mysql 的相关测试修改一下,就可以用于 postgresql 的测试了。
[root@docker ~]# unzip debezium-examples-main.zip
[root@docker ~]# cd debezium-examples-main/tutorial
[root@docker tutorial]# ls -lrt
total 768
-rw-r--r--. 1 root root 602664 apr 19 22:42 vitess-sharding-setup.png
drwxr-xr-x. 2 root root 30 apr 19 22:42 secrets
-rw-r--r--. 1 root root 521 apr 19 22:42 register-vitess.json
-rw-r--r--. 1 root root 538 apr 19 22:42 register-sqlserver.json
-rw-r--r--. 1 root root 448 apr 19 22:42 register-postgres.json
-rw-r--r--. 1 root root 582 apr 19 22:42 register-oracle-logminer.json
-rw-r--r--. 1 root root 568 apr 19 22:42 register-mysql.json
-rw-r--r--. 1 root root 637 apr 19 22:42 register-mysql-ext-secrets.json
-rw-r--r--. 1 root root 860 apr 19 22:42 register-mysql-avro.json
-rw-r--r--. 1 root root 878 apr 19 22:42 register-mysql-apicurio.json
-rw-r--r--. 1 root root 1172 apr 19 22:42 register-mysql-apicurio-converter-json.json
-rw-r--r--. 1 root root 1166 apr 19 22:42 register-mysql-apicurio-converter-avro.json
-rw-r--r--. 1 root root 437 apr 19 22:42 register-mongodb.json
-rw-r--r--. 1 root root 576 apr 19 22:42 register-db2.json
-rw-r--r--. 1 root root 22923 apr 19 22:42 readme.md
-rw-r--r--. 1 root root 1955 apr 19 22:42 docker-compose-zookeeperless-kafka.yaml
-rw-r--r--. 1 root root 1616 apr 19 22:42 docker-compose-zookeeperless-kafka-combined.yaml
-rw-r--r--. 1 root root 885 apr 19 22:42 docker-compose-vitess.yaml
-rw-r--r--. 1 root root 1119 apr 19 22:42 docker-compose-sqlserver.yaml
-rw-r--r--. 1 root root 1082 apr 19 22:42 docker-compose-postgres.yaml
-rw-r--r--. 1 root root 927 apr 19 22:42 docker-compose-oracle.yaml
-rw-r--r--. 1 root root 887 apr 19 22:42 docker-compose-mysql.yaml
-rw-r--r--. 1 root root 1068 apr 19 22:42 docker-compose-mysql-ext-secrets.yml
-rw-r--r--. 1 root root 1671 apr 19 22:42 docker-compose-mysql-avro-worker.yaml
-rw-r--r--. 1 root root 1391 apr 19 22:42 docker-compose-mysql-avro-connector.yaml
-rw-r--r--. 1 root root 1036 apr 19 22:42 docker-compose-mysql-apicurio.yaml
-rw-r--r--. 1 root root 43764 apr 19 22:42 docker-compose-mysql-apicurio.png
-rw-r--r--. 1 root root 1094 apr 19 22:42 docker-compose-mongodb.yaml
-rw-r--r--. 1 root root 1098 apr 19 22:42 docker-compose-db2.yaml
-rw-r--r--. 1 root root 930 apr 19 22:42 docker-compose-cassandra.yaml
drwxr-xr-x. 3 root root 36 apr 19 22:42 debezium-with-oracle-jdbc
drwxr-xr-x. 3 root root 74 apr 19 22:42 debezium-vitess-init
drwxr-xr-x. 2 root root 27 apr 19 22:42 debezium-sqlserver-init
drwxr-xr-x. 4 root root 41 apr 19 22:42 debezium-db2-init
drwxr-xr-x. 2 root root 141 apr 19 22:42 debezium-cassandra-init
drwxr-xr-x. 2 root root 26 apr 19 22:42 db2data
安装 docker-compose
- docker compose 是一个在 docker 上运行多容器应用程序的工具。
- docker compose v2 是 docker compose 的主要升级版本,使用 golang 完全重写的,v1 是用 python 编写的。
- 在 github 上的地址:
- 本示例使用的 docker-compose 的下载地址:
# 下载的二进制文件,给个可执行权限就可以直接运行,为了方便修改下文件名
[root@docker ~]# chmod x docker-compose-linux-x86_64
[root@docker ~]# mv docker-compose-linux-x86_64 docker-compose
简单管理 docker-compose
- 参考文章:
# 帮助
/root/docker-compose -h
# 创建并启动项目中的所有容器
/root/docker-compose -f xxx.yaml up
# 停止并删除项目中的所有容器
/root/docker-compose -f xxx.yaml down
# 重启项目中的服务(单个容器),以下示例重启connect容器
/root/docker-compose -f xxx.yaml restart connect
# 列出项目中所有的容器
/root/docker-compose -f xxx.yaml ps
avro 有三种配置方式,第一种在 kafka connect worker 配置,第二种在 debezium 连接器上配置,第三种是使用 apicurio 注册表
第一种配置方式:在 kafka connect worker 配置
编辑 docker-compose 的配置文件
- tutorial 里面只有 mysql 的 docker-compose-mysql-avro-worker.yaml,仿照这个写一个 postgresql 的 docker-compose 配置文件
- 添加了一个 kafka web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-avro-worker.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${debezium_version}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${debezium_version}
ports:
- 9092:9092
links:
- zookeeper
environment:
- zookeeper_connect=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${debezium_version}
ports:
- 5432:5432
environment:
- postgres_user=postgres
- postgres_password=postgres
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
ports:
- 8181:8181
- 8081:8081
environment:
- schema_registry_kafkastore_bootstrap_servers=kafka:9092
- schema_registry_host_name=schema-registry
- schema_registry_listeners=http://schema-registry:8081
links:
- zookeeper
connect:
image: quay.io/debezium/connect:${debezium_version}
ports:
- 8083:8083
links:
- kafka
- postgres
- schema-registry
environment:
- bootstrap_servers=kafka:9092
- group_id=1
- config_storage_topic=my_connect_configs
- offset_storage_topic=my_connect_offsets
- status_storage_topic=my_connect_statuses
- key_converter=io.confluent.connect.avro.avroconverter
- value_converter=io.confluent.connect.avro.avroconverter
- internal_key_converter=org.apache.kafka.connect.json.jsonconverter
- internal_value_converter=org.apache.kafka.connect.json.jsonconverter
- connect_key_converter_schema_registry_url=http://schema-registry:8081
- connect_value_converter_schema_registry_url=http://schema-registry:8081
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- kafka_clusters_0_name=test
- kafka_clusters_0_bootstrapservers=kafka:9092
启动 docker-compose
- 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、schema-registry、connect 和一个 kafka 的 web 管理的容器
export debezium_version=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-worker.yaml up
注册 postgresql connector
- 使用 debezium tutorial 中自带的 register-postgres.json
# cd /root/debezium-examples-main/tutorial
# cat register-postgres.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
登录 kafka web 查看 topics 的情况
- http://192.168.0.40:8811/
- 可以看到自动为源端的每个表创建的 topics
- 可以看到自动为 schemas 创建的 topics _schemas
- 可以看到每条消息的 key 和 value 都是二进制的
配置网络
- 本实验的源端是 postgresql,目标端是 oracle 19c pdb,debezium 提供了 postgresql 的 docker 镜像,但是没有 oracle 的镜像。
- 在 docker 上安装 oracle 参考:使用docker装一个oracle 19c的单机测试环境
- 使用 docker-compose 部署的环境会建立一个默认的网络,名称为 docker-compose.yml 所在目录名称小写形式加上 “_default”,这里就是 tutorial_default。
- 在 docker 上安装 oracle 使用的默认网络,这样和 docker-compose 部署的环境,网络是相互隔离的。
- 为了让 docker-compose 部署后的 connect 容器能与 oracle 相连通,需要在 connect 容器上添加 docker 的默认网络。
# 先使用 docker ps 查看 tutoral-connect-1 容器的 container id (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
# docker inspect tutoral-connect-1 |grep ipaddress
"secondaryipaddresses": null,
"ipaddress": "172.17.0.3",
"ipaddress": "172.17.0.3",
"ipaddress": "172.26.0.3",
注册一个消费者连接器
- 消费者连接器使用的是 kafka connect jdbc,消费到 oracle 19c pdb 中
- debezium 提供的 connect 容器中没有 kafka connect jdbc,需要自行下载并上传,重启 connect 容器
# 上传 kafka connect jdbc
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重启 connect 服务
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-worker.yaml restart connect
- 编辑消费者的连接器并注册到 kafka connect
[root@docker ~]# cat oracle-jdbc-sink.json
{
"name": "oracle-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//10.16.0.1:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "orders",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink.json
- 查看消费的数据
sql> desc inventory.orders;
sql> select * from inventory.orders;
停止并删除容器,清理测试环境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-avro-worker.yaml down
第二种配置方式:在 debezium 连接器上配置
编辑 docker-compose 的配置文件
- tutorial 里面只有 mysql 的 docker-compose-mysql-avro-connector.yaml,仿照这个写一个 postgresql 的 docker-compose 配置文件
- 添加了一个 kafka web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-avro-connector.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${debezium_version}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${debezium_version}
ports:
- 9092:9092
links:
- zookeeper
environment:
- zookeeper_connect=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${debezium_version}
ports:
- 5432:5432
environment:
- postgres_user=postgres
- postgres_password=postgres
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
ports:
- 8181:8181
- 8081:8081
environment:
- schema_registry_kafkastore_bootstrap_servers=kafka:9092
- schema_registry_host_name=schema-registry
- schema_registry_listeners=http://schema-registry:8081
links:
- zookeeper
connect:
image: quay.io/debezium/connect:${debezium_version}
ports:
- 8083:8083
links:
- kafka
- postgres
- schema-registry
environment:
- bootstrap_servers=kafka:9092
- group_id=1
- config_storage_topic=my_connect_configs
- offset_storage_topic=my_connect_offsets
- status_storage_topic=my_connect_statuses
- internal_key_converter=org.apache.kafka.connect.json.jsonconverter
- internal_value_converter=org.apache.kafka.connect.json.jsonconverter
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- kafka_clusters_0_name=test
- kafka_clusters_0_bootstrapservers=kafka:9092
启动 docker-compose
- 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、schema-registry、connect 和一个 kafka 的 web 管理的容器
export debezium_version=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml up
注册 postgresql connector
- tutorial 里面只有 mysql 的 register-mysql-avro.json,仿照这个写一个 postgresql 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-avro.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.confluent.connect.avro.avroconverter",
"value.converter": "io.confluent.connect.avro.avroconverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-avro.json
查看 customers schema
curl -x get http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1
curl -x get http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
- 服务注册表还带有一个可以读取 avro 消息的控制台使用者:
# cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property schema.registry.url=http://schema-registry:8081 \
--topic dbserver1.inventory.customers
配置网络
- 详细说明见 上一种配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 container id (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
注册一个消费者连接器
- 消费者连接器使用的是 kafka connect jdbc,消费到 oracle 19c pdb 中
- debezium 提供的 connect 容器中没有 kafka connect jdbc,需要自行下载并上传,重启 connect 容器
# 上传 kafka connect jdbc
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重启 connect 服务
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml restart connect
- 编辑消费者的连接器并注册到 kafka connect
[root@docker tutorial]# cat oracle-jdbc-sink-avro.json
{
"name": "oracle-jdbc-sink-avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "orders",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.avroconverter",
"value.converter": "io.confluent.connect.avro.avroconverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-avro.json
- 查看消费的数据
sql> desc inventory.orders;
sql> select * from inventory.orders;
停止并删除容器,清理测试环境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-avro-connector.yaml down
第三种配置方式:使用 apicurio 注册表
apicurio registry 是一个开源 api 和 schema 注册表,除其他外,可用于存储 kafka 记录的 schema。 它提供
- 它自己的原生 avro 转换器和 protobuf 序列化器
- 将其 schema 导出到注册表的 json 转换器
- 与 ibm 或 confluent 等其他 schema 注册表的兼容层; 它可以与 confluent avro 转换器一起使用。
编辑 docker-compose 的配置文件
- tutorial 里面只有 mysql 的 docker-compose-mysql-apicurio.yaml,仿照这个写一个 postgresql 的 docker-compose 配置文件
- 添加了一个 kafka web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-apicurio.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${debezium_version}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${debezium_version}
ports:
- 9092:9092
links:
- zookeeper
environment:
- zookeeper_connect=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${debezium_version}
ports:
- 5432:5432
environment:
- postgres_user=postgres
- postgres_password=postgres
apicurio:
image: apicurio/apicurio-registry-mem:2.0.0.final
ports:
- 8080:8080
connect:
image: quay.io/debezium/connect:${debezium_version}
ports:
- 8083:8083
links:
- kafka
- postgres
- apicurio
environment:
- bootstrap_servers=kafka:9092
- group_id=1
- config_storage_topic=my_connect_configs
- offset_storage_topic=my_connect_offsets
- status_storage_topic=my_connect_statuses
- enable_apicurio_converters=true
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- kafka_clusters_0_name=test
- kafka_clusters_0_bootstrapservers=kafka:9092
启动 docker-compose
- 启动 docker-compose,会相继启动 zookeeper、kafka、postgres、apicurio、connect 和一个 kafka 的 web 管理的容器
export debezium_version=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml up
注册 postgresql connector (apicurio - json 格式)
- tutorial 里面只有 mysql 的 register-mysql-apicurio-converter-json.json,仿照这个写一个 postgresql 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio-converter-json.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.apicurio.registry.utils.converter.extjsonconverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.extjsonconverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-json.json
注册 postgresql connector (apicurio - avro 格式)
- tutorial 里面只有 mysql 的 register-mysql-apicurio-converter-avro.json,仿照这个写一个 postgresql 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio-converter-avro.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.apicurio.registry.utils.converter.avroconverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.avroconverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-avro.json
注册 postgresql connector (confluent - avro 格式)
- tutorial 里面只有 mysql 的 register-mysql-apicurio.json,仿照这个写一个 postgresql 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.postgresconnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.confluent.connect.avro.avroconverter",
"value.converter": "io.confluent.connect.avro.avroconverter",
"key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio.json
查看 customers schema
# apicurio - json 格式和 avro 格式
curl -x get http://localhost:8080/apis/registry/v2/groups/default/artifacts/dbserver1.inventory.customers-value | jq .
# confluent - avro 格式
curl -x get http://localhost:8080/apis/ccompat/v6/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
- 服务注册表还带有一个可以读取 avro 消息的控制台使用者:
# apicurio - json 格式
# cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
当您查看数据消息时,您会注意到它仅包含payload但不包含schema部分,因为它已外部化到注册表中。
查看 topics
- apicurio - json
- apicurio - avro
- confluent - avro
配置网络
- 详细说明见 上一种配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 container id (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
注册一个消费者连接器
- 消费者连接器使用的是 kafka connect jdbc,消费到 oracle 19c pdb 中
- debezium 提供的 connect 容器中没有 kafka connect jdbc,需要自行下载并上传,重启 connect 容器
# 上传 kafka connect jdbc
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重启 connect 服务
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml restart connect
- 编辑消费者的连接器并注册到 kafka connect (apicurio - json 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-json.json
{
"name": "oracle-jdbc-sink-apicurio-json",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "orders",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.apicurio.registry.utils.converter.extjsonconverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.extjsonconverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-json.json
- 消费端没走通,报错:tolerance exceeded in error handler
- 编辑消费者的连接器并注册到 kafka connect (apicurio - avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro.json
{
"name": "oracle-jdbc-sink-apicurio-avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "orders",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.apicurio.registry.utils.converter.avroconverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.avroconverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro.json
- 编辑消费者的连接器并注册到 kafka connect (confluent - avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro2.json
{
"name": "oracle-jdbc-sink-apicurio-avro2",
"config": {
"connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "orders",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.avroconverter",
"value.converter": "io.confluent.connect.avro.avroconverter",
"key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6"
}
}
curl -i -x post -h "accept:application/json" -h "content-type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro2.json
停止并删除容器,清理测试环境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-apicurio.yaml down
最后修改时间:2022-04-27 19:52:15
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。