前言
最近7-1忙著各種值班,沒時間寫東西,第三篇拖更了許久。今天終于抽點時間把這個系列完成了,沒有太監(jiān)。
Kafka到PostgreSQL
其實從kafka到PostgreSQL很簡單,我們要理解的一點是kafka既可以push也可以poll。不需要我們重復(fù)發(fā)明輪子,我們只需要寫好配置文件在把它調(diào)起來就可以了。
研究了一番,發(fā)現(xiàn)目標(biāo)端都是使用JDBC Connector來實現(xiàn)的。那么我們也采用JDBC Connector的方案。要使用JDBC需要先下載Kafka Connect JDBC 連接器, 它是由Confluent開發(fā)的。
我們需要做兩件事。
把PostgreSQL jdbc驅(qū)動放在/kafka/libs目錄下。我用的是postgresql-42.2.14.jar。下載地址:https://jdbc.postgresql.org/download.html。 下載Kafka Connect JDBC 連接器放入到插件目錄下。下載地址:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
做完上述操作,我們接下來就來進(jìn)行配置。今天我們要使用RESTAPI進(jìn)行配置,我們先查看我們的配置情況。
curl localhost:8083/connectors/ | jq

如圖所示,我們可以看到我們Oracle數(shù)據(jù)庫配置的連接器testoracledb,可以進(jìn)一步查看詳細(xì)的配置。
curl localhost:8083/connectors/testoracledb | jq

這里可以看到它的type是source,這就代表它是源端。
那么目標(biāo)端我們先配置一個jdbc-sink.json文件。內(nèi)容如下:
{ "name": "jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "12cdb.HR.S1", "dialect.name": "PostgreSqlDatabaseDialect", "table.name.format": "s1", "connection.url": "jdbc:postgresql://192.168.56.170:5432/postgres?user=postgres&password=Sqlite123&sslMode=require", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true" }}
然后把這個文件POST進(jìn)去。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json

使用RESTAPI POST成功之后,我們可以查看現(xiàn)在的狀態(tài)。
curl localhost:8083/connectors/jdbc-sink | jq

這里出現(xiàn)了一個jdbc-sink的連接器。查看jdbc-sink的狀態(tài)。
curl localhost:8083/connectors/jdbc-sink | jq

下游數(shù)據(jù)庫應(yīng)用這里的type就必須是sink。可以查看它的工作狀態(tài)。
curl localhost:8083/connectors/jdbc-sink/status | jq

這里都顯示RUNNING就是正常的。
測試效果
我們在Oracle里面插入記錄。

查看我們的PostgreSQL中是否存在這條記錄。

配置有問題如何修改
當(dāng)然可能會出現(xiàn)各種各樣的配置問題,那么如何修改呢?
暴力的方法可以直接刪除連接器,修改完json文件再添加連接器。這只適用于一開始就配置失敗的情況。
1.刪除連接器curl -v -X DELETE localhost:8083/connectors/jdbc-sink 2.修改json文件3.重新添加連接器curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json
還可以選擇再創(chuàng)建一個jdbc-sink.put的配置文件,修改你的配置,比如這里增加delete.enabled參數(shù)
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "12cdb.HR.S1", "dialect.name": "PostgreSqlDatabaseDialect", "table.name.format": "s1", "connection.url": "jdbc:postgresql://192.168.56.170:5432/postgres?user=postgres&password=Sqlite123", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "delete.enabled": "true" }
然后執(zhí)行更新連接器配置操作。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/jdbc-sink/config -d @jdbc-sink.put
更新完成后執(zhí)行重啟就行了。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/jdbc-sink/restart
后記
整個Debezium替換ogg的測試文章已經(jīng)完成,后續(xù)我準(zhǔn)備在生產(chǎn)割接環(huán)境上進(jìn)行測試。這條路是能走通的,但是肯定有一堆的坑要填埋。




