Debezium 的 Oracle 連接器在第一次啟動時,默認會執行數據庫的初始一致性快照,相當于導出全量數據再導入到Kafka。
可以通過設置連接器配置屬性 snapshot.mode (默認:initial) 的值來自定義連接器創建快照的方式。
當 snapshot.mode 設置為 (默認:initial) 時,連接器完成以下任務來創建快照:
- 確定要捕獲的表
- 獲取每個要捕獲表的 ROW SHARE MODE 鎖,以防止在創建快照期間更改表結構,Debezium 持有鎖的時間很短。
- 從數據庫的 Redo 日志中讀取當前系統更改號(SCN)位置。
- 捕獲所有相關表的表結構。
- 釋放 步驟2 中獲得的鎖。
- 在 步驟3 中讀取的 SCN 位置掃描所有相關的數據庫表(SELECT * FROM …? AS OF SCN 123),為每一行生成一個 READ 事件,然后將事件記錄寫入到 Kafka 主題(topic)。
- 在連接器偏移(offsets)中記錄快照的成功完成。
執行創建快照的進程開始后,如果進程因連接器故障、重新平衡或其他原因而中斷,連接器重啟后快照進程也會重新啟動。連接器完成初始一致性快照后,它會繼續從 步驟3 中讀取的 SCN 位置進行流式傳輸,以免丟失任何數據。如果連接器由于某種原因再次停止,則在連接器重新啟動后,它將從之前停止的位置繼續恢復數據的流式傳輸。
snapshot.mode 支持的參數配置,這個參數只在連接器在第一次啟動時起作用
| 參數值 | 描述 |
|---|---|
| initial(默認) | 連接器執行數據庫的初始一致性快照,快照完成后,連接器開始為后續數據庫更改流式傳輸事件記錄。 |
| initial_only | 連接器只執行數據庫的初始一致性快照,不允許捕獲任何后續更改的事件。 |
| schema_only | 連接器只捕獲所有相關表的表結構,不捕獲初始數據,但是會同步后續數據庫的更改記錄 |
| schema_only_recovery | 設置此選項可恢復丟失或損壞的數據庫歷史主題(database.history.kafka.topic)。 |
snapshot.mode = initial
- 向 Kafka Connect 注冊并啟動一個新的 Debezium Oracle Connector,添加選項 “snapshot.mode”: “initial”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
-
源端數據庫里的數據情況,SCOTT 用戶下有5張表,其中一張表BONUS里面沒有數據

-
Debezium Oracle Connector 第一次啟動后執行初始一致性快照,將全量數據導出轉換寫入到Kafka,可以看到 Kafka 中為每張有數據的表建了一個 Topics,Topics 的名稱格式是 <serverName.schemaName.tableName>,其中表 BONUS 沒有數據,所以沒有建立 Topics,但是在 Topics initial 中記錄的表的DDL語句,同時還建立了一個歷史Topics schema-changes.initial,表里的每條數據在 Kafka 中存儲為一條消息(Messages),以下截圖也可以看出每個 Topics 的 Messages 個數與 Oracle 中表的數據行數一致。


-
源端數據庫執行 DML 操作,自動同步到 Kafka
SQL> insert into dept values (50,'AAAA','A');
SQL> commit;

SQL> update dept set DNAME='BBBB' where DEPTNO=50;
SQL> commit;

SQL> delete from dept where DEPTNO=50;
SQL> commit;

snapshot.mode = initial_only
- 向 Kafka Connect 注冊并啟動一個新的 Debezium Oracle Connector,添加選項 “snapshot.mode”: “initial_only”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial-only",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial-only",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "initial_only",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial_only"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
-
源端數據庫里的數據情況

-
Debezium Oracle Connector 第一次啟動后執行初始一致性快照

-
源端數據庫執行 DML 操作,此時不會同步一致性快照以后的變更數據
SQL> insert into dept values (60,'BBBB','B');
SQL> commit;
- 可以看到 snapshot.mode = initial 的表已經同步數據了,但是 snapshot.mode = initial_only 的表并沒有同步數據


- 連接器的狀態還是RUNNING
curl -s -X GET localhost:8083/connectors/snapshot-mode-initial-only/status | jq

- Topics my_connect_offsets 中也記錄了快照信息

snapshot.mode = schema_only
- 向 Kafka Connect 注冊并啟動一個新的 Debezium Oracle Connector,添加選項 “snapshot.mode”: “schema_only”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-schema-only",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "schema-only",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "schema_only",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.schema_only"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
- 源端數據庫里的數據情況

- Debezium Oracle Connector 第一次啟動后不會執行初始一致性快照,只將表的DDL表結構寫入到Kafka,可以看到 Kafka 的沒有存放數據的 Topics,只有一個存放表的DDL的 Topics
[kafka@4c24d79ab670 ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --list |grep schema-only schema-only


- 源端數據庫執行 DML 操作,自動同步到 Kafka
SQL> insert into dept values (70,'CCCC','C');
SQL> commit;


snapshot.mode = schema_only_recovery
當 database.history.kafka.topic 被刪除了,可以使用 snapshot.mode = schema_only_recovery 來恢復
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic schema-changes.initial

- 向 Kafka Connect 注冊并啟動一個新的 Debezium Oracle Connector,添加選項 “snapshot.mode”: “schema_only_recovery”
[root@docker ~]# cat oracle-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "initial",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"snapshot.mode": "SCHEMA_ONLY_RECOVERY",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.initial"
}
}
# 刪除連接器
curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial
# 重新注冊連接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-snapshot-mode.json
總結一點
功能可以實現,但是捕獲延遲很嚴重,變更一條記錄要好長時間才能捕獲到,不知道是配置的問題還是連接器本身的問題,就是感覺對Oracle的兼容不是太友好。




