5

在docker环境上使用debezium捕获oracle 19c pdb中的变更数据到kafka -m6米乐安卓版下载

原创 张玉龙 2022-04-17
2574

实验环境

  • debezium 版本 1.9 (2022-04-05)
  • debezium tested versions
    image.png
  • oracle 版本是单机的 19.3
  • 本测试参考文档:
  • 基于 debezium 的变更数据捕获的架构:
    image.png

启动 zookeeper

# 后台运行 docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9 # 实时查看 zookeeper 的日志信息 docker logs -f -t --tail 10 zookeeper

启动 kafka

# 后台运行 docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9 # 实时查看 kafka 的日志信息 docker logs -f -t --tail 10 kafka

启动 oracle 19c 数据库

[oracle@ora11g ~]$ sqlplus scott/scott@192.168.0.40:1521/pdbtt sql> set line 100 pages 100 sql> select * from scott.emp; empno ename job mgr hiredate sal comm deptno ---------- ---------- --------- ---------- --------- ---------- ---------- ---------- 7369 smith clerk 7902 17-dec-80 800 20 7499 allen salesman 7698 20-feb-81 1600 300 30 7521 ward salesman 7698 22-feb-81 1250 500 30 7566 jones manager 7839 02-apr-81 2975 20 7654 martin salesman 7698 28-sep-81 1250 1400 30 7698 blake manager 7839 01-may-81 2850 30 7782 clark manager 7839 09-jun-81 2450 10 7788 scott analyst 7566 19-apr-87 3000 20 7839 king president 17-nov-81 5000 10 7844 turner salesman 7698 08-sep-81 1500 0 30 7876 adams clerk 7788 23-may-87 1100 20 7900 james clerk 7698 03-dec-81 950 30 7902 ford analyst 7566 03-dec-81 3000 20 7934 miller clerk 7782 23-jan-82 1300 10 14 rows selected.

配置 oracle 19c 数据库

  • 确保数据库启动归档模式,使用cdb的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba sql> archive log list; database log mode archive mode automatic archival enabled archive destination /opt/oracle/oradata/orcl/archive_logs oldest online log sequence 5 next log sequence to archive 7 current log sequence 7
  • 启用最小补充日志,使用cdb的服务登录
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba -- 启用最小补充日志 sql> alter database add supplemental log data; -- 切换到 pdb 中,为表启用补充日志 sql> alter session set container=pdbtt; alter table scott.dept add supplemental log data (all) columns; alter table scott.emp add supplemental log data (all) columns; alter table scott.bonus add supplemental log data (all) columns; alter table scott.salgrade add supplemental log data (all) columns;
  • 在 cdb 和 pdb 中创建 logminer 用户使用的表空间
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba create tablespace logminer_tbs datafile '/opt/oracle/oradata/orcl/logminer_tbs.dbf' size 25m reuse autoextend on maxsize unlimited; -- 切换到 pdb 中,创建表空间 sql> alter session set container=pdbtt; create tablespace logminer_tbs datafile '/opt/oracle/oradata/orcl/pdbtt/logminer_tbs.dbf' size 25m reuse autoextend on maxsize unlimited;
  • 在 cdb 中创建 logminer 用户并授予相关权限
[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba create user c##dbzuser identified by dbz default tablespace logminer_tbs quota unlimited on logminer_tbs container=all; grant create session to c##dbzuser container=all; grant set container to c##dbzuser container=all; grant select on v_$database to c##dbzuser container=all; grant flashback any table to c##dbzuser container=all; grant select any table to c##dbzuser container=all; grant select_catalog_role to c##dbzuser container=all; grant execute_catalog_role to c##dbzuser container=all; grant select any transaction to c##dbzuser container=all; grant logmining to c##dbzuser container=all; grant create table to c##dbzuser container=all; grant lock any table to c##dbzuser container=all; grant create sequence to c##dbzuser container=all; grant execute on dbms_logmnr to c##dbzuser container=all; grant execute on dbms_logmnr_d to c##dbzuser container=all; grant select on v_$log to c##dbzuser container=all; grant select on v_$log_history to c##dbzuser container=all; grant select on v_$logmnr_logs to c##dbzuser container=all; grant select on v_$logmnr_contents to c##dbzuser container=all; grant select on v_$logmnr_parameters to c##dbzuser container=all; grant select on v_$logfile to c##dbzuser container=all; grant select on v_$archived_log to c##dbzuser container=all; grant select on v_$archive_dest_status to c##dbzuser container=all; grant select on v_$transaction to c##dbzuser container=all;

测试数据库层面 logminer

[root@docker ~]# sqlplus sys/oracle@192.168.0.40:1521/orcl as sysdba sql> select member from v$logfile; member -------------------------------------------------------------------------------- /opt/oracle/oradata/orcl/redo03.log /opt/oracle/oradata/orcl/redo02.log /opt/oracle/oradata/orcl/redo01.log sql> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/orcl/redo01.log',dbms_logmnr.new); sql> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/orcl/redo02.log',dbms_logmnr.addfile); sql> execute dbms_logmnr.add_logfile('/opt/oracle/oradata/orcl/redo03.log',dbms_logmnr.addfile); sql> execute dbms_logmnr.start_logmnr(options=>dbms_logmnr.dict_from_online_catalog dbms_logmnr.committed_data_only); sql> select sql_redo,sql_undo from v$logmnr_contents where table_name like '�pt%' and operation='insert'; sql_redo ---------------------------------------------------------------------------------------------------- sql_undo --------------------------------------------------------------------------------------------------------------- insert into "scott"."dept"("deptno","dname","loc") values ('10','accounting','new york'); delete from "scott"."dept" where "deptno" = '10' and "dname" = 'accounting' and "loc" = 'new york' and rowid = 'aaar1daamaaaacdaaa'; insert into "scott"."dept"("deptno","dname","loc") values ('20','research','dallas'); delete from "scott"."dept" where "deptno" = '20' and "dname" = 'research' and "loc" = 'dallas' and rowid = 'aaar1daamaaaacdaab'; insert into "scott"."dept"("deptno","dname","loc") values ('30','sales','chicago'); delete from "scott"."dept" where "deptno" = '30' and "dname" = 'sales' and "loc" = 'chicago' and rowid = 'aaar1daamaaaacdaac'; insert into "scott"."dept"("deptno","dname","loc") values ('40','operations','boston'); delete from "scott"."dept" where "deptno" = '40' and "dname" = 'operations' and "loc" = 'boston' and rowid = 'aaar1daamaaaacdaad'; -- 下面语句为结束语句 sql> execute dbms_logmnr.end_logmnr;

启动 kafka connect

# 后台运行 docker run -d --name connect \ -p 8083:8083 \ -e group_id=1 \ -e config_storage_topic=my_connect_configs \ -e offset_storage_topic=my_connect_offsets \ -e status_storage_topic=my_connect_statuses \ --link zookeeper:zookeeper \ --link kafka:kafka \ quay.io/debezium/connect:1.9 # 实时查看 kafka connect 的日志信息 docker logs -f -t --tail 10 connect

debezium oracle connector

  • 下载 ojdbc8.jar 连接驱动
  • 将驱动上传到 connect 容器中,重启 connect 容器
[root@docker ~]# mv ojdbc8-19.3.0.0.jar ojdbc8.jar [root@docker ~]# docker cp ojdbc8.jar connect:/kafka/libs [root@docker ~]# docker restart connect
  • 准备 debezium oracle connector 配置文件
    将配置文件创建在 docker 宿主机上即可,connect 容器开放了 rest api 来管理 debezium 的连接器
    database.hostname 需要使用容器内的ip地址,不然加不上
[root@docker ~]# vi oracle-scott-connector.json { "name": "oracle-scott-connector", "config": { "connector.class" : "io.debezium.connector.oracle.oracleconnector", "database.hostname" : "172.17.0.3", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "dbz", "database.dbname" : "orcl", "database.pdb.name" : "pdbtt", "database.server.name" : "oracle19c", "tasks.max" : "1", "schema.include.list": "scott", "database.history.kafka.bootstrap.servers" : "192.168.0.40:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
  • 向 kafka 连接器注册 debezium oracle connector
[root@docker ~]# curl -i -x post -h "accept:application/json" -h "content-type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json http/1.1 201 created date: sat, 16 apr 2022 17:03:12 gmt location: http://192.168.0.40:8083/connectors/oracle-scott-connector content-type: application/json content-length: 534 server: jetty(9.4.43.v20210629) {"name":"oracle-scott-connector","config":{"connector.class":"io.debezium.connector.oracle.oracleconnector","database.hostname":"172.17.0.3","database.port":"1521","database.user":"c##dbzuser","database.password":"dbz","database.dbname":"orcl","database.pdb.name":"pdbtt","database.server.name":"oracle19c","tasks.max":"1","schema.include.list":"scott","database.history.kafka.bootstrap.servers":"192.168.0.40:9092","database.history.kafka.topic":"schema-changes.inventory","name":"oracle-scott-connector"},"tasks":[],"type":"source"}

核对捕获到的数据

  • 进入到connect容器内部 执行
[root@docker ~]# docker exec -it connect bash [kafka@839c4a43b889 ~]$ bin/kafka-topics.sh --list --bootstrap-server kafka:9092 __consumer_offsets my_connect_configs my_connect_offsets my_connect_statuses oracle19c oracle19c.scott.dept oracle19c.scott.emp oracle19c.scott.salgrade schema-changes.inventory [kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic schema-changes.inventory --from-beginning [kafka@839c4a43b889 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic oracle19c.scott.dept --from-beginning
  • ddl
    image.png

  • dml

模拟业务

  • insert

    image.png

使用 kafka-ui 查看 kafka 里的消息

kafka-ui:open-source web gui for apache kafka management:

docker run -p 8811:8080 \ -e kafka_clusters_0_name=oracle-scott-connector \ -e kafka_clusters_0_bootstrapservers=192.168.0.40:9092 \ -d provectuslabs/kafka-ui:latest

网页登录:http://192.168.0.40:8811/

image.png
image.png
image.png

image.png
image.png

最后修改时间:2022-04-18 09:29:42
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【米乐app官网下载的版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

网站地图