
實(shí)驗(yàn)環(huán)境
- Kafka 中的數(shù)據(jù)來(lái)自于捕獲的 PostgreSQL 14.2 的變更數(shù)據(jù),參考文章:在Docker環(huán)境上使用Debezium捕獲PostgreSQL 14.2中的變更數(shù)據(jù)到Kafka
- 準(zhǔn)備 Kafka Connect JDBC Connector(連接器),本實(shí)驗(yàn)使用的版本是 10.4.1,下載地址:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- 準(zhǔn)備 Oracle jdbc 驅(qū)動(dòng),Kafka Connect JDBC Connector 里面包含了 Oracle jdbc 驅(qū)動(dòng)(ojdbc8-19.7.0.0.jar),如果想使用新版的驅(qū)動(dòng),也可以自行下載,下載地址:https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/
啟動(dòng) Oracle 19C 數(shù)據(jù)庫(kù),創(chuàng)建一個(gè)測(cè)試用戶
參考文章:使用Docker裝一個(gè)Oracle 19C的單機(jī)測(cè)試環(huán)境
# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba
SQL> create user inventory identified by inventory;
SQL> grant connect,resource,create view to inventory;
SQL> grant unlimited tablespace to inventory;
配置 JDBC Sink Connector
上傳驅(qū)動(dòng)和JDBC連接器
將下載的 Oracle JDBC 驅(qū)動(dòng)和 Kafka Connect JDBC Connector(連接器) 上傳服務(wù)器并復(fù)制到 connect 容器中
[root@docker ~]# ls -lrt
-rw-r--r--. 1 root root 4458107 Apr 17 22:12 ojdbc8-19.14.0.0.jar
-rw-r--r--. 1 root root 20208429 Apr 17 22:12 confluentinc-kafka-connect-jdbc-10.4.1.zip
# 上傳 Oracle JDBC 驅(qū)動(dòng),如果使用 Kafka Connect JDBC Connector 自帶的驅(qū)動(dòng)可以忽略此處
docker cp ojdbc8-19.14.0.0.jar connect:/kafka/libs
# 上傳 Kafka Connect JDBC Connector
unzip confluentinc-kafka-connect-jdbc-10.4.1.zip
chown -R 1001:1001 confluentinc-kafka-connect-jdbc-10.4.1
docker cp confluentinc-kafka-connect-jdbc-10.4.1 connect:/kafka/connect
# 重啟 Kafka Connect 連接器
docker restart connect
查看現(xiàn)有連接器信息
- 安裝 jq,用于格式化 JSON 格式
yum install -y wget wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum install -y jq
- 查看當(dāng)前存在哪些連接器
curl -s localhost:8083/connectors/ | jq

- 查看連接器的具體信息
curl -s localhost:8083/connectors/snapshot-mode-initial | jq

配置連接目標(biāo)端 Oracle 19C PDB 的連接器
- 查看下目標(biāo)端 Oracle 容器的IP地址
[root@docker ~]# docker inspect ora19c |grep IPAddress
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.2",
"IPAddress": "172.17.0.2",
- 編輯一個(gè) JSON 文件,配置連接器信息
JDBC Sink Connector Configuration Properties: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html#sink-config-options
[root@docker ~]# vi oracle-jdbc-sink.json
{
"name": "oracle-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "initial.inventory.orders",
"table.name.format": "orders",
"dialect.name": "OracleDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
}
- 向 Kafka 連接器注冊(cè) JDBC Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-jdbc-sink.json
查看已注冊(cè)的連機(jī)器信息
# 當(dāng)前已注冊(cè)的連接器
curl -s localhost:8083/connectors/ | jq
# 連接器的具體信息
curl -s localhost:8083/connectors/oracle-jdbc-sink | jq

- 查看連接器的運(yùn)行狀態(tài)
curl -s localhost:8083/connectors/oracle-jdbc-sink/status | jq

遇到第一個(gè)問(wèn)題,但是這個(gè)問(wèn)題是由方法解決的,后面再說(shuō)吧
- 向 Kafka 連接器注冊(cè) JDBC Sink Connector 之后,連接器會(huì)自動(dòng)連接到 Oracle PDB 上建表插入數(shù)據(jù),但是自動(dòng)建的表名上帶有雙引號(hào),強(qiáng)制將表名轉(zhuǎn)為了小寫(xiě)。

- 查看 Oracle PDB 表中的數(shù)據(jù)

驗(yàn)證目標(biāo)端的數(shù)據(jù)

Oracle 端模擬業(yè)務(wù)
- INSERT
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
(4 rows)
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
INSERT 0 1

- UPDATE
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 1 | 102
(5 rows)
postgres=# update inventory.orders set quantity=2 where id=11001;
UPDATE 1

- DELETE
postgres=# select * from inventory.orders;
id | order_date | purchaser | quantity | product_id
-------+------------+-----------+----------+------------
10001 | 2016-01-16 | 1001 | 1 | 102
10002 | 2016-01-17 | 1002 | 2 | 105
10003 | 2016-02-19 | 1002 | 2 | 106
10004 | 2016-02-21 | 1003 | 1 | 107
11001 | 2022-04-22 | 1003 | 2 | 102
(5 rows)
postgres=# delete from inventory.orders where id = 11001;
DELETE 1
遇到第二個(gè)問(wèn)題,不同步 DELETE 操作
-
源端 PostgreSQL 執(zhí)行 DELETE 操作,發(fā)現(xiàn)目標(biāo)端 Oracle 沒(méi)有同步執(zhí)行 DELETE 操作,查看 Kafka Connect 日志發(fā)現(xiàn)這是有參數(shù)限制的呀

tasks 也失敗了

-
更新 sink 連接器,配置 delete.enabled 和 pk.mode
[root@docker ~]# vi oracle-jdbc-sink_update.json
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "initial.inventory.orders",
"table.name.format": "orders",
"dialect.name": "OracleDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key"
}
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/oracle-jdbc-sink/config -d @oracle-jdbc-sink_update.json
tasks 恢復(fù)正常

DELETE 操作也同步到目標(biāo)端

查看消費(fèi)者組

最后修改時(shí)間:2022-04-27 16:18:06
「喜歡這篇文章,您的關(guān)注和贊賞是給作者最好的鼓勵(lì)」
關(guān)注作者
【版權(quán)聲明】本文為墨天輪用戶原創(chuàng)內(nèi)容,轉(zhuǎn)載時(shí)必須標(biāo)注文章的來(lái)源(墨天輪),文章鏈接,文章作者等基本信息,否則作者和墨天輪有權(quán)追究責(zé)任。如果您發(fā)現(xiàn)墨天輪中有涉嫌抄襲或者侵權(quán)的內(nèi)容,歡迎發(fā)送郵件至:contact@modb.pro進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),墨天輪將立刻刪除相關(guān)內(nèi)容。




