实验环境
- debezium 版本 1.9 (2022-04-05)
- debezium tested versions
- oracle 版本是单机的 19.3
- 本测试参考文档:
- 基于 debezium 的变更数据捕获的架构:
启动 zookeeper
# 后台运行
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9
# 实时查看 zookeeper 的日志信息
docker logs -f -t --tail 10 zookeeper
启动 kafka
# 后台运行
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9
# 实时查看 kafka 的日志信息
docker logs -f -t --tail 10 kafka
启动 oracle 19c 数据库
- 参考文章:使用docker装一个oracle 19c的单机测试环境
- 19c 数据库里面创建 scott 测试用户和数据。
[oracle@ora11g ~]$ sqlplus scott/scott@192.168.0.40:1521/pdbtt
sql> set line 100 pages 100
sql> select * from scott.emp;
empno ename job mgr hiredate sal comm deptno
---------- ---------- --------- ---------- --------- ---------- ---------- ----------
7369 smith clerk 7902 17-dec-80 800 20
7499 allen salesman 7698 20-feb-81 1600 300 30
7521 ward salesman 7698 22-feb-81 1250 500 30
7566 jones manager 7839 02-apr-81 2975 20
7654 martin salesman 7698 28-sep-81 1250 1400 30
7698 blake manager 7839 01-may-81 2850 30
7782 clark manager 7839 09-jun-81 2450 10
7788 scott analyst 7566 19-apr-87 3000 20
7839 king president 17-nov-81 5000 10
7844 turner salesman 7698 08-sep-81 1500 0 30
7876 adams clerk 7788 23-may-87 1100 20
7900 james clerk 7698 03-dec-81 950 30
7902 ford analyst 7566 03-dec-81 3000 20
7934 miller clerk 7782 23-jan-82 1300 10
14 rows selected.
配置 oracle 19c 数据库
- 确保数据库启动归档模式,使用cdb的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
sql> archive log list;
database log mode archive mode
automatic archival enabled
archive destination /opt/oracle/oradata/orcl/archive_logs
oldest online log sequence 5
next log sequence to archive 7
current log sequence 7
- 启用最小补充日志,使用cdb的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
-- 启用最小补充日志
sql> alter database add supplemental log data;
-- 切换到 pdb 中,为表启用补充日志
sql> alter session set container=pdbtt;
alter table scott.dept add supplemental log data (all) columns;
alter table scott.emp add supplemental log data (all) columns;
alter table scott.bonus add supplemental log data (all) columns;
alter table scott.salgrade add supplemental log data (all) columns;
- 在 cdb 和 pdb 中创建 logminer 用户使用的表空间
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
create tablespace logminer_tbs datafile '/opt/oracle/oradata/orcl/logminer_tbs.dbf' size 25m reuse autoextend on maxsize unlimited;
-- 切换到 pdb 中,创建表空间
sql> alter session set container=pdbtt;
create tablespace logminer_tbs datafile '/opt/oracle/oradata/orcl/pdbtt/logminer_tbs.dbf' size 25m reuse autoextend on maxsize unlimited;
- 在 cdb 中创建 logminer 用户并授予相关权限
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
create user c##dbzuser identified by dbz default tablespace logminer_tbs quota unlimited on logminer_tbs container=all;
grant create session to c##dbzuser container=all;
grant set container to c##dbzuser container=all;
grant select on v_$database to c##dbzuser container=all;
grant flashback any table to c##dbzuser container=all;
grant select any table to c##dbzuser container=all;
grant select_catalog_role to c##dbzuser container=all;
grant execute_catalog_role to c##dbzuser container=all;
grant select any transaction to c##dbzuser container=all;
grant logmining to c##dbzuser container=all;
grant create table to c##dbzuser container=all;
grant lock any table to c##dbzuser container=all;
grant create sequence to c##dbzuser container=all;
grant execute on dbms_logmnr to c##dbzuser container=all;
grant execute on dbms_logmnr_d to c##dbzuser container=all;
grant select on v_$log to c##dbzuser container=all;
grant select on v_$log_history to c##dbzuser container=all;
grant select on v_$logmnr_logs to c##dbzuser container=all;
grant select on v_$logmnr_contents to c##dbzuser container=all;
grant select on v_$logmnr_parameters to c##dbzuser container=all;
grant select on v_$logfile to c##dbzuser container=all;
grant select on v_$archived_log to c##dbzuser container=all;
grant select on v_$archive_dest_status to c##dbzuser container=all;
grant select on v_$transaction to c##dbzuser container=all;
测试数据库层面 logminer
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba
sql> select member from v$logfile;
member
--------------------------------------------------------------------------------
/opt/oracle/oradata/orcl/redo03.log
/opt/oracle/oradata/orcl/redo02.log
/opt/oracle/oradata/orcl/redo01.log
sql> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/orcl/redo01.log',dbms_logmnr.new);
sql> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/orcl/redo02.log',dbms_logmnr.addfile);
sql> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/orcl/redo03.log',dbms_logmnr.addfile);
sql> execute dbms_logmnr.start_logmnr(options=>dbms_logmnr.dict_from_online_catalog dbms_logmnr.committed_data_only);
sql> select sql_redo,sql_undo from v$logmnr_contents where table_name like '�pt%' and operation='insert';
sql_redo
----------------------------------------------------------------------------------------------------
sql_undo
---------------------------------------------------------------------------------------------------------------
insert into "scott"."dept"("deptno","dname","loc") values ('10','accounting','new york');
delete from "scott"."dept" where "deptno" = '10' and "dname" = 'accounting' and "loc" = 'new york' and rowid = 'aaar1daamaaaacdaaa';
insert into "scott"."dept"("deptno","dname","loc") values ('20','research','dallas');
delete from "scott"."dept" where "deptno" = '20' and "dname" = 'research' and "loc" = 'dallas' and rowid = 'aaar1daamaaaacdaab';
insert into "scott"."dept"("deptno","dname","loc") values ('30','sales','chicago');
delete from "scott"."dept" where "deptno" = '30' and "dname" = 'sales' and "loc" = 'chicago' and rowid = 'aaar1daamaaaacdaac';
insert into "scott"."dept"("deptno","dname","loc") values ('40','operations','boston');
delete from "scott"."dept" where "deptno" = '40' and "dname" = 'operations' and "loc" = 'boston' and rowid = 'aaar1daamaaaacdaad';
-- 下面语句为结束语句
sql> execute dbms_logmnr.end_logmnr;
启动 kafka connect
# 后台运行
docker run -d --name connect \
-p 8083:8083 \
-e group_id=1 \
-e config_storage_topic=my_connect_configs \
-e offset_storage_topic=my_connect_offsets \
-e status_storage_topic=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
quay.io/debezium/connect:1.9
# 实时查看 kafka connect 的日志信息
docker logs -f -t --tail 10 connect
debezium oracle connector
- 下载 ojdbc8.jar 连接驱动
- 将驱动上传到 connect 容器中,重启 connect 容器
[root@docker ~]# mv ojdbc8-19.3.0.0.jar ojdbc8.jar
[root@docker ~]# docker cp ojdbc8.jar connect:/kafka/libs
[root@docker ~]# docker restart connect
- 准备 debezium oracle connector 配置文件
将配置文件创建在 docker 宿主机上即可,connect 容器开放了 rest api 来管理 debezium 的连接器
database.hostname 需要使用容器内的ip地址,不然加不上
[root@docker ~]# vi oracle-scott-connector.json
{
"name": "oracle-scott-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.oracleconnector",
"database.hostname" : "172.17.0.3",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name" : "pdbtt",
"database.server.name" : "oracle19c",
"tasks.max" : "1",
"schema.include.list": "scott",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
- 向 kafka 连接器注册 debezium oracle connector
[root@docker ~]# curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json
http/1.1 201 created
date: sat, 16 apr 2022 17:03:12 gmt
location: http://192.168.0.40:8083/connectors/oracle-scott-connector
content-type: application/json
content-length: 534
server: jetty(9.4.43.v20210629)
{"name":"oracle-scott-connector","config":{"connector.class":"io.debezium.connector.oracle.oracleconnector","database.hostname":"172.17.0.3","database.port":"1521","database.user":"c##dbzuser","database.password":"dbz","database.dbname":"orcl","database.pdb.name":"pdbtt","database.server.name":"oracle19c","tasks.max":"1","schema.include.list":"scott","database.history.kafka.bootstrap.servers":"192.168.0.40:9092","database.history.kafka.topic":"schema-changes.inventory","name":"oracle-scott-connector"},"tasks":[],"type":"source"}
核对捕获到的数据
- 进入到connect容器内部 执行
[root@docker ~]# docker exec -it connect bash
[kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --list --bootstrap-server kafka:9092
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses
oracle19c
oracle19c.scott.dept
oracle19c.scott.emp
oracle19c.scott.salgrade
schema-changes.inventory
[kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic schema-changes.inventory --from-beginning
[kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic oracle19c.scott.dept --from-beginning
-
ddl
-
dml
模拟业务
- insert
使用 kafka-ui 查看 kafka 里的消息
kafka-ui:open-source web gui for apache kafka management:
docker run -p 8811:8080 \ -e kafka_clusters_0_name=oracle-scott-connector \ -e kafka_clusters_0_bootstrapservers=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest
网页登录:http://192.168.0.40:8811/
最后修改时间:2022-04-18 09:29:42
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。