前言
最近7-1忙着各种值班,没时间写东西,第三篇拖更了许久。今天终于抽点时间把这个系列完成了,没有太监。
kafka到postgresql
其实从kafka到postgresql很简单,我们要理解的一点是kafka既可以push也可以poll。不需要我们重复发明轮子,我们只需要写好配置文件在把它调起来就可以了。
研究了一番,发现目标端都是使用jdbc connector来实现的。那么我们也采用jdbc connector的方案。要使用jdbc需要先下载kafka connect jdbc 连接器, 它是由confluent开发的。
我们需要做两件事。
把postgresql jdbc驱动放在/kafka/libs目录下。我用的是postgresql-42.2.14.jar。下载地址:https://jdbc.postgresql.org/download.html。 下载kafka connect jdbc 连接器放入到插件目录下。下载地址:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
做完上述操作,我们接下来就来进行配置。今天我们要使用restapi进行配置,我们先查看我们的配置情况。
curl localhost:8083/connectors/ | jq
如图所示,我们可以看到我们oracle数据库配置的连接器testoracledb,可以进一步查看详细的配置。
curl localhost:8083/connectors/testoracledb | jq
这里可以看到它的type是source,这就代表它是源端。
那么目标端我们先配置一个jdbc-sink.json文件。内容如下:
{ "name": "jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "tasks.max": "1", "topics": "12cdb.hr.s1", "dialect.name": "postgresqldatabasedialect", "table.name.format": "s1", "connection.url": "jdbc:postgresql://192.168.56.170:5432/postgres?user=postgres&password=sqlite123&sslmode=require", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true" }}
然后把这个文件post进去。
curl -i -x post -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json
使用restapi post成功之后,我们可以查看现在的状态。
curl localhost:8083/connectors/jdbc-sink | jq
这里出现了一个jdbc-sink的连接器。查看jdbc-sink的状态。
curl localhost:8083/connectors/jdbc-sink | jq
下游数据库应用这里的type就必须是sink。可以查看它的工作状态。
curl localhost:8083/connectors/jdbc-sink/status | jq
这里都显示running就是正常的。
测试效果
我们在oracle里面插入记录。
查看我们的postgresql中是否存在这条记录。
配置有问题如何修改
当然可能会出现各种各样的配置问题,那么如何修改呢?
暴力的方法可以直接删除连接器,修改完json文件再添加连接器。这只适用于一开始就配置失败的情况。
1.删除连接器curl -v -x delete localhost:8083/connectors/jdbc-sink 2.修改json文件3.重新添加连接器curl -i -x post -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json
还可以选择再创建一个jdbc-sink.put的配置文件,修改你的配置,比如这里增加delete.enabled参数
{ "connector.class": "io.confluent.connect.jdbc.jdbcsinkconnector", "tasks.max": "1", "topics": "12cdb.hr.s1", "dialect.name": "postgresqldatabasedialect", "table.name.format": "s1", "connection.url": "jdbc:postgresql://192.168.56.170:5432/postgres?user=postgres&password=sqlite123", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.extractnewrecordstate", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "delete.enabled": "true" }
然后执行更新连接器配置操作。
curl -i -x put -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/jdbc-sink/config -d @jdbc-sink.put
更新完成后执行重启就行了。
curl -i -x post -h "accept:application/json" -h "content-type:application/json" localhost:8083/connectors/jdbc-sink/restart
后记
整个debezium替换ogg的测试文章已经完成,后续我准备在生产割接环境上进行测试。这条路是能走通的,但是肯定有一堆的坑要填埋。