
Debezium 的 PostgreSQL 連接器在第一次啟動時,默認會執行數據庫的初始一致性快照,相當于導出全量數據再導入到Kafka。
可以通過設置連接器配置屬性 snapshot.mode (默認:initial) 的值來自定義連接器執行快照的方式。
當 snapshot.mode 設置為 (默認:initial) 時,連接器完成以下任務來創建快照:
- 使用 SERIALIZABLE、READ ONLY、DEFERRABLE 隔離級別啟動事務,以確保此事務中的后續讀取針對數據的單個一致版本。 由于其他客戶端的后續 INSERT、UPDATE 和 DELETE 操作而對數據進行的任何更改對此事務不可見。
- 讀取數據庫事務日志中的當前位置(LSN)。
- 掃描數據庫表和模式,為每一行生成一個 READ 事件并將該事件寫入適當的特定于表的 Kafka 主題(topic)。
- 提交事務。
- 在連接器偏移(offsets)中記錄快照的成功完成。
如果連接器出現故障、重新平衡或在 步驟1 開始之后但在 步驟5 完成之前停止,則在重新啟動時連接器將開始一個新的快照。 連接器完成其初始快照后,PostgreSQL 連接器會繼續從其在 步驟2 中讀取的位置進行流式傳輸。這可確保連接器不會錯過任何更新。 如果連接器由于任何原因再次停止,則在重新啟動時,連接器會繼續從之前停止的位置流式傳輸更改。
snapshot.mode 支持的參數配置
| 參數值 | 描述 |
|---|---|
| initial(默認) | 連接器執行數據庫的初始一致性快照,快照完成后,連接器開始為后續數據庫更改流式傳輸事件記錄。 |
| always | 連接器在啟動時始終執行一致性快照。快照完成后,連接器繼續執行流式傳輸更改。此模式用于以下情況: |
| - | 已知一些 WAL 段已被刪除,不再可用。 |
| - | 集群發生故障后,主備庫發生切換。此快照模式確保連接器不會錯過在新主節點提升之后但在新主節點上重新啟動連接器之前所做的任何更改。 |
| initial_only | 連接器只執行數據庫的初始一致性快照,不允許捕獲任何后續更改的事件。 |
| never | 不執行初始一致性快照,但是會同步后續數據庫的更改記錄 |
| - | 如果 Kafka offsets topic 中存在先前存儲的 LSN,則連接器會繼續從該位置流式傳輸更改。 |
| - | 如果沒有存儲 LSN,則連接器從在服務器上創建 PostgreSQL 邏輯復制槽的時間點開始流式傳輸更改。 |
| exported | 已棄用,所有模式都是無鎖的。 |
| custom | 自定義接口 io.debezium.connector.postgresql.spi.Snapshotter |
圖示
按自己的理解,總結了一張圖,不一定準確
snapshot.mode = initial
- 向 Kafka Connect 注冊并啟動一個新的 Debezium PostgreSQL Connector,添加選項 “snapshot.mode”: “initial”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
-
源端數據庫里的數據情況,inventory schema下有6張表,其中一張表 spatial_ref_sys 不同步

-
Debezium PostgreSQL Connector 第一次啟動后執行初始一致性快照,將全量數據導出轉換寫入到Kafka,可以看到 Kafka 中為每張有數據的表建了一個 Topics,Topics 的名稱格式是 <serverName.schemaName.tableName>,其中表 spatial_ref_sys 不同步,還不知道啥原因,表里的每條數據在 Kafka 中存儲為一條消息(Messages),以下截圖也可以看出每個 Topics 的 Messages 個數與 PostgreSQL 中表的數據行數一致。

-
源端數據庫執行 DML 操作,自動同步到 Kafka
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);

postgres=# update inventory.orders set quantity=2 where id=11001;

postgres=# delete from inventory.orders where id = 11001;

snapshot.mode = always
- 更新上面創建的連接器 snapshot-mode-initial,更改選項 “snapshot.mode”: “always”,每次啟動都會執行初始化快照,快照完成后會繼續捕獲變更數據
[root@docker ~]# cat pgsql-snapshot-mode-update.json
{
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/snapshot-mode-initial/config -d @pgsql-snapshot-mode-update.json
# 需要重新啟動連接器實例和任務實例,只重啟連接器實例是不起作用的
curl -s localhost:8083/connectors/snapshot-mode-initial/status | jq
curl -s -X POST localhost:8083/connectors/snapshot-mode-initial/restart?includeTasks=true
- 更新前 Kafka 里面的數據情況

- 更新后 Kafka 里面的數據情況,很明顯新增了初始化的數據

snapshot.mode = always 恢復被刪除的 Topics
- 刪除現有的 Topics
# 先把連接器停了,不然有時刪了 Topics 還會自動創建
curl -s -X DELETE localhost:8083/connectors/snapshot-mode-initial
# 刪除 Topics
# docker exec -it connect bash
bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.customers
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.geom
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.orders
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.products
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic initial.inventory.products_on_hand


- 重洗啟動連接器 snapshot-mode-initial,包含選項 “snapshot.mode”: “always”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-initial",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json

- 源端數據庫執行 DML 操作,同步正常,忽略測試過程
- 需要注意,一切正常了以后,要考慮是否將 snapshot.mode 改成默認的 initial ,否則每次啟動連接器都會執行初始化任務。
[root@docker ~]# cat pgsql-snapshot-mode-update.json
{
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial",
"slot.name": "initial_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/snapshot-mode-initial/config -d @pgsql-snapshot-mode-update.json
curl -s -X POST localhost:8083/connectors/snapshot-mode-initial/restart?includeTasks=true
# 已測試,更改 "snapshot.mode": "initial",重啟連接器和任務后數據不會重新初始化,DML 捕獲正常。
snapshot.mode = initial_only
- 向 Kafka Connect 注冊并啟動一個新的 Debezium PostgreSQL Connector,添加選項 “snapshot.mode”: “initial_only”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-initial_only",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "initial_only",
"slot.name": "initial_only_slot",
"schema.include.list": "inventory",
"snapshot.mode": "initial_only",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
- 源端數據庫里的數據情況,inventory schema下有6張表,其中一張表 spatial_ref_sys 不同步

- Debezium Oracle Connector 第一次啟動后執行初始一致性快照

- 源端數據庫執行 DML 操作,此時不會同步一致性快照以后的變更數據
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
postgres=# update inventory.orders set quantity=2 where id=11001;
postgres=# delete from inventory.orders where id = 11001;
- 可以看到 snapshot.mode = initial 的表已經同步數據了,但是 snapshot.mode = initial_only 的表并沒有同步數據

snapshot.mode = never
- 向 Kafka Connect 注冊并啟動一個新的 Debezium PostgreSQL Connector,添加選項 “snapshot.mode”: “never”
[root@docker ~]# cat pgsql-snapshot-mode.json
{
"name": "snapshot-mode-never",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "postgres",
"database.port" : "5432",
"database.user" : "postgres",
"database.password" : "postgres",
"database.dbname" : "postgres",
"database.server.name" : "never",
"slot.name": "never_slot",
"schema.include.list": "inventory",
"snapshot.mode": "never",
"plugin.name": "pgoutput",
"publication.name": "dbz_inventory_connector",
"publication.autocreate.mode": "filtered"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-snapshot-mode.json
- Debezium PostgreSQL Connector 第一次啟動后不會執行初始一致性快照,可以看到 Kafka 的沒有存放數據的 Topics

- 但是源端數據庫執行 DML 操作,Debezium PostgreSQL Connector 會自動捕獲變更數據在 Kafka 上創建 Topics 并寫入消息
postgres=# insert into inventory.orders values (11001,now(),1003,1,102);
postgres=# update inventory.orders set quantity=2 where id=11001;
postgres=# delete from inventory.orders where id = 11001;


總結一點
Debezium PostgreSQL Connector 比 Debezium Oracle Connector 要好用多了,同步延遲很小,測試還算比較順利,可以看出 Debezium 對 PostgreSQL 兼容要比 Oracle 要好很多。




