實驗環(huán)境
- Kafka 中的數(shù)據(jù)來自于捕獲的 Oracle 19C PDB 的變更數(shù)據(jù),參考文章:在Docker環(huán)境上使用Debezium捕獲Oracle 19C PDB中的變更數(shù)據(jù)到Kafka
- 準備 Kafka Connect JDBC Connector(連接器),本實驗使用的版本是 10.4.1,下載地址:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- 準備 PostgreSQL jdbc 驅(qū)動,Kafka Connect JDBC Connector里面包含了 PostgreSQL jdbc 驅(qū)動(postgresql-42.3.3.jar),如果想使用新版的驅(qū)動,也可以自行下載,下載地址:https://jdbc.postgresql.org/download.html
- 本文參考postgresql大神的文章:Debezium替換ogg的測試(三) 下游PG數(shù)據(jù)庫實時同步,感謝分享。
啟動 PostgreSQL 并創(chuàng)建測試庫
PostgreSQL 的 Docker 倉庫:https://hub.docker.com/_/postgres
# 創(chuàng)建一個數(shù)據(jù)持久化目錄
mkdir -p /docker_data/postgres
chmod -R a+rwx /docker_data/postgres/
# 后臺運行14.2版本的 PostgreSQL 數(shù)據(jù)庫
docker run -d --name postgres \
-p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
-e PGDATA=/var/lib/postgresql/data/pgdata \
-v /docker_data/postgres:/var/lib/postgresql/data \
postgres:14.2
# 運行 psql 容器
[root@docker ~]# alias psql="docker run -it --rm --name psql postgres:14.2 psql -h 192.168.0.40 -U postgres -p 5432"
[root@docker ~]# psql
Password for user postgres:
psql (14.2 (Debian 14.2-1.pgdg110+1))
Type "help" for help.
postgres=# create database scott;
CREATE DATABASE
postgres=# \l
List of databases
Name | Owner | Encoding | Collate | Ctype | Access privileges
-----------+----------+----------+------------+------------+-----------------------
postgres | postgres | UTF8 | en_US.utf8 | en_US.utf8 |
scott | postgres | UTF8 | en_US.utf8 | en_US.utf8 |
template0 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres +
| | | | | postgres=CTc/postgres
template1 | postgres | UTF8 | en_US.utf8 | en_US.utf8 | =c/postgres +
| | | | | postgres=CTc/postgres
(4 rows)
配置 JDBC Sink Connector
上傳驅(qū)動和JDBC連接器
將下載的 postgresql JDBC 驅(qū)動和 Kafka Connect JDBC Connector(連接器) 上傳服務(wù)器并復制到 connect 容器中
[root@docker ~]# ls -lrt
-rw-r--r--. 1 root root 1006732 Apr 17 22:12 postgresql-42.2.25.jar
-rw-r--r--. 1 root root 20208429 Apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip
# 上傳 postgresql JDBC 驅(qū)動,如果使用 Kafka Connect JDBC Connector 自帶的驅(qū)動可以忽略此處
docker cp postgresql-42.2.25.jar connect:/kafka/libs
# 上傳 Kafka Connect JDBC Connector
unzip confluentinc-kafka-connect-jdbc-10.4.1.zip
chown -R 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1
docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect
# 重啟 Kafka Connect 連接器
docker restart connect
查看現(xiàn)有連接器信息
- 安裝 jq,用于格式化 JSON 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
- 查看當前存在哪些連接器
curl localhost:8083/connectors/ | jq

- 查看連接器的具體信息
curl localhost:8083/connectors/oracle-scott-connector | jq

配置連接目標端 PostgreSQL 的連接器
- 查看下目標端 PostgreSQL 容器的IP地址
[root@docker ~]# docker inspect postgres |grep IPAddress
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.7",
"IPAddress": "172.17.0.7",
- 編輯一個 JSON 文件,配置連接器信息
JDBC Sink Connector Configuration Properties: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html#sink-config-options
[root@docker ~]# vi pgsql-scott-jdbc-sink.json
{
"name": "pgsql-scott-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslMode=require",
"tasks.max": "1",
"topics": "oracle19c.SCOTT.DEPT",
"table.name.format": "dept",
"dialect.name": "PostgreSqlDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
}
- 向 Kafka 連接器注冊 JDBC Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
查看已注冊的連機器信息
# 當前已注冊的連接器
curl localhost:8083/connectors/ | jq
# 連接器的具體信息
curl localhost:8083/connectors/pgsql-scott-jdbc-sink | jq

- 查看連接器的運行狀態(tài)
curl localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq

一個排錯過程(可選擇性跳過)
- 向 Kafka 連接器注冊 JDBC Sink Connector 之后,連接器會自動連接到 PostgreSQL 上建表插入數(shù)據(jù)
- 查看連接器的運行狀態(tài)發(fā)現(xiàn)有問題

- 查看 Kafka 連接器日志發(fā)現(xiàn)建表語句有問題

- 這才了解為啥要在連接器的配置信息中加 “table.name.format”: “dept”

- postgresql大神的文章:Debezium替換ogg的測試(三) 下游PG數(shù)據(jù)庫實時同步 提供了兩種更改連接器配置信息的方法,我直接使用暴力的方法
# 1.刪除連接器
curl -v -X DELETE 192.168.0.40:8083/connectors/pgsql-scott-jdbc-sink
# 2.修改json文件
# 3.重新添加連接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
驗證目標端的數(shù)據(jù)

Oracle 端模擬業(yè)務(wù)
- INSERT
[root@docker ~]# sqlplus scott/scott@192.168.0.40:1521/pdbtt
SQL> insert into dept values (70,'BBBB','BB');
SQL> commit;

「喜歡這篇文章,您的關(guān)注和贊賞是給作者最好的鼓勵」
關(guān)注作者
【版權(quán)聲明】本文為墨天輪用戶原創(chuàng)內(nèi)容,轉(zhuǎn)載時必須標注文章的來源(墨天輪),文章鏈接,文章作者等基本信息,否則作者和墨天輪有權(quán)追究責任。如果您發(fā)現(xiàn)墨天輪中有涉嫌抄襲或者侵權(quán)的內(nèi)容,歡迎發(fā)送郵件至:contact@modb.pro進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,墨天輪將立刻刪除相關(guān)內(nèi)容。




