
問(wèn)題:如何同步無(wú)主鍵表的 DELETE 操作?
- Kafka 要想將 DELETE 操作同步到目標(biāo)庫(kù),需要使用墓碑事件(Tombstone events),也就是 Kafka 消息的 Key 不為空,而 Value 是空。
- 對(duì)于存在主鍵的表,Kafka 消息的 Key 值使用這個(gè)表的主鍵列。
- 對(duì)于不存在主鍵的表,Kafka 消息的 Key 值默認(rèn)是空,這樣 DELETE 消息就會(huì)被跳過(guò)。
- 對(duì)于不存在主鍵的表,Debezium 連接器提供了配置參數(shù) message.key.columns,使用指定列(復(fù)合列)生成 Kafka 消息的 Key 值,但是要保證指定列(復(fù)合列)不會(huì)出現(xiàn)空值的情況,就像 OGG 針對(duì)無(wú)主鍵表使用全列一樣。
初始測(cè)試環(huán)境
- 源端 PostgreSQL
postgres=# create database test_dml;
postgres=# \c test_dml
test_dml=# create schema inventory;
test_dml=# CREATE TABLE inventory.orders (
id integer NOT NULL,
order_date date NOT NULL,
purchaser integer NOT NULL,
quantity integer NOT NULL,
product_id integer NOT NULL
);
test_dml=# ALTER TABLE ONLY inventory.orders ADD CONSTRAINT orders_pkey PRIMARY KEY (id);
test_dml=# insert into inventory.orders values (10001,now(),1001,1,102);
insert into inventory.orders values (10002,now(),1002,2,105);
insert into inventory.orders values (10003,now(),1003,2,106);
insert into inventory.orders values (10004,now(),1004,1,107);
- 目標(biāo)端 Oracle 19C PDB
# sqlplus sys/oracle@192.168.0.40:1521/pdbtt as sysdba
SQL> create user test identified by test;
SQL> grant connect,resource,create view to test;
SQL> grant unlimited tablespace to test;
-- 存在時(shí)間列轉(zhuǎn)換的問(wèn)題,以后研究
SQL> CREATE TABLE test.orders (
id number NOT NULL,
order_date number NOT NULL,
purchaser number NOT NULL,
quantity number NOT NULL,
product_id number NOT NULL
);
SQL> alter table test.orders add constraint orders_pkey primary key(id);
連接器默認(rèn)配置
# 捕獲源端數(shù)據(jù)的連接器初始配置
[root@docker tutorial]# cat register-postgres-key.json
{
"name": "inventory-connector-key",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test_dml",
"database.server.name": "test_dml",
"snapshot.mode": "always",
"schema.include.list": "inventory",
"slot.name": "test_dml_slot"
}
}
curl -s -X DELETE localhost:8083/connectors/inventory-connector-key
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json
# 同步給目標(biāo)端的連接器初始配置
[root@docker tutorial]# cat oracle-testdml-sink.json
{
"name": "oracle-testdml-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "test",
"connection.password": "test",
"tasks.max": "1",
"topics": "test_dml.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
curl -s -X DELETE localhost:8083/connectors/oracle-testdml-sink
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json
先看看有主鍵表的DML操作同步情況


insert into inventory.orders values (11001,now(),1003,1,102);

- Kafka 消息的 Key 值默認(rèn)使用這個(gè)表的主鍵列。

- before: 是這條記錄的舊值,對(duì)于 INSERT 操作沒(méi)有舊值,而 UPDATE 和 DELETE 操作會(huì)記錄舊值,但針對(duì)源端是 PostgreSQL 數(shù)據(jù)庫(kù)的情況下,可以記錄舊值的前提是被同步的表配置了 REPLICA IDENTITY,后面有介紹。
- after: 是這條記錄的新值,對(duì)于 DELETE 操作,alter 為 NULL。
- op: 是這條消息的類型:c = create、u = update、d = delete、r = read (applies to only snapshots)、t = truncate、m = message
update inventory.orders set quantity=2 where id=11001;


- before 沒(méi)有之前的數(shù)據(jù),原因是沒(méi)有配置表的 REPLICA IDENTITY 屬性。
delete from inventory.orders where id = 11001;

- 針對(duì) DELETE 操作,會(huì)在 Kafka 中產(chǎn)生兩條消息事件,其中一條消息事件的 Value 是 NULL,此消息事件就是墓碑事件(Tombstone events),用作刪除目標(biāo)端的記錄,實(shí)現(xiàn) DELETE 操作的同步。


- before 除了主鍵,其他列都是0,原因是沒(méi)有配置表的 REPLICA IDENTITY 屬性。


REPLICA IDENTITY
- 特定于 PostgreSQL 的表級(jí)設(shè)置,僅在使用邏輯復(fù)制時(shí)有效。
- 控制表的更改寫(xiě)入WAL日志的信息,以識(shí)別 UPDATE 或 DELETE 事件的行。
- 每當(dāng)發(fā)生 UPDATE 或 DELETE 事件時(shí),REPLICA IDENTITY 的設(shè)置控制了哪些信息(如果有)可用于所涉及的表列的先前值。
- 4個(gè)設(shè)置選項(xiàng)值
- DEFAULT:記錄主鍵列(如果有)的舊值,這是非系統(tǒng)表的默認(rèn)值。
- USING INDEX index_name:記錄指定索引所包含的列的舊值。
- FULL:記錄行中所有列的舊值。
- NOTHING:不記錄舊值,這是系統(tǒng)表的默認(rèn)設(shè)置。
- 如果表沒(méi)有主鍵,則連接器不會(huì)為該表發(fā)出 UPDATE 或 DELETE 事件,對(duì)于沒(méi)有主鍵的表,連接器僅發(fā)出 CREATE 事件。
- 設(shè)置方式
ALTER TABLE ONLY inventory.orders REPLICA IDENTITY FULL;
- UPDATE

- DELETE

目標(biāo)端消費(fèi)
- 目標(biāo)端連接器使用本文開(kāi)頭 <連接器默認(rèn)配置> 章節(jié)的默認(rèn)配置,此處會(huì)涉及三個(gè)參數(shù),默認(rèn)配置 delete.enabled=false,pk.mode=none,insert.mode=insert
問(wèn)題一:不能同步 DELETE 操作
- Kafka Connector 的日志顯示的報(bào)錯(cuò)信息
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink' is configured with 'delete.enabled=false' and 'pk.mode=none'
and therefore requires records with a non-null Struct value and non-null Struct schema,
but found record at (topic='test_dml.inventory.orders',partition=0,offset=7,timestamp=1651152793683)
with a null value and null value schema.
- 不能同步 DELETE 操作,連接器上需要添加 “delete.enabled”: “true”, “pk.mode”: “record_key”
- delete.enabled 默認(rèn)是 false,pk.mode 默認(rèn)是 none
問(wèn)題二:違反唯一約束?
ORA-00001: unique constraint (TEST.SYS_C007736) violated
- 主備端都是有主鍵的表,且備端的數(shù)據(jù)都是從主端同步過(guò)來(lái)的,為什么會(huì)違反唯一約束?
- 這里先不說(shuō)這個(gè)問(wèn)題,但是解決方法是在問(wèn)題一的基礎(chǔ)上再添加參數(shù) “insert.mode”: “upsert”
- insert.mode 默認(rèn)是 insert

- 正常同步了上面的 INSERT、UPDATE 和 DELETE 操作。
再看看無(wú)主鍵表的DML操作同步情況


- 針對(duì)無(wú)主鍵的表,數(shù)據(jù)寫(xiě)入 Kafka ,默認(rèn)情況下消息的 Key 為空。
insert into inventory.orders2 values (11001,now(),1003,1,102);


- INSERT 操作可以正常同步,但是 UPDATE 和 DELETE 操作的記錄被跳過(guò),在源端是 PostgreSQL 數(shù)據(jù)庫(kù)時(shí),就體現(xiàn)出配置表的REPLICA IDENTITY屬性的重要性。
update inventory.orders2 set quantity=2 where id=11001;
tutorial-connect-1 | 2022-04-28 13:41:53,005 WARN Postgres|test_dml|streaming no new values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.Date", "type" : "INT32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}]} }'
from update message at 'Struct{version=1.9.0.Final,connector=postgresql,name=test_dml,ts_ms=1651153312826,db=test_dml,sequence=["152831320","152831376"],schema=inventory,table=orders2,txId=833,lsn=152831376}';
skipping record [io.debezium.relational.RelationalChangeRecordEmitter]
delete from inventory.orders2 where id = 11001;
tutorial-connect-1 | 2022-04-28 13:43:12,205 WARN Postgres|test_dml|streaming no old values found for table '{ key : null, value : {"name" : "test_dml.inventory.orders2.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "order_date", "index" : "1", "schema" : {"name" : "io.debezium.time.Date", "type" : "INT32", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "purchaser", "index" : "2", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "quantity", "index" : "3", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}, {"name" : "product_id", "index" : "4", "schema" : {"type" : "INT32", "optional" : "false", "default" : null}}]} }'
from delete message at 'Struct{version=1.9.0.Final,connector=postgresql,name=test_dml,ts_ms=1651153391920,db=test_dml,sequence=["152831512","152831800"],schema=inventory,table=orders2,txId=834,lsn=152831800}';
skipping record [io.debezium.relational.RelationalChangeRecordEmitter]
REPLICA IDENTITY
- 為表配置 REPLICA IDENTITY,無(wú)主鍵表的 UPDATE 和 DELETE 操作可以正常到 Kafka
ALTER TABLE ONLY inventory.orders2 REPLICA IDENTITY FULL;
- UPDATE

- DELETE



目標(biāo)端消費(fèi)
- 重新初始化環(huán)境,刪除源端的連接器,刪除 Topics,重新注冊(cè)連接器,保持無(wú)主鍵表的原始狀態(tài)

- 目標(biāo)端連接器在本文開(kāi)頭 <連接器默認(rèn)配置> 章節(jié)的默認(rèn)配置的基礎(chǔ)下添加 “delete.enabled”: “true”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=true' and 'pk.mode=record_key'
and therefore requires records with a non-null key and non-null Struct or primitive key schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687)
with a null key and null key schema.
- 目標(biāo)端連接器在本文開(kāi)頭 <連接器默認(rèn)配置> 章節(jié)的默認(rèn)配置的基礎(chǔ)下添加 “delete.enabled”: “false”,“pk.mode”: “record_key”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_key'
and therefore requires records with a non-null key and non-null Struct or primitive key schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=0,timestamp=1651157942687)
with a null key and null key schema.
- 目標(biāo)端連接器在本文開(kāi)頭 <連接器默認(rèn)配置> 章節(jié)的默認(rèn)配置的基礎(chǔ)下添加 “delete.enabled”: “false”,“pk.mode”: “none”,“insert.mode”: “upsert”
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Write to table '"ORDERS2"' in UPSERT mode requires key field names to be known, check the primary key configuration
- 目標(biāo)端連接器在本文開(kāi)頭 <連接器默認(rèn)配置> 章節(jié)的默認(rèn)配置的基礎(chǔ)下添加 “delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert”

# INSERT 和 UPDATE 都能支持,一樣不支持 DELETE 操作
tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException:
Sink connector 'oracle-testdml-sink2' is configured with 'delete.enabled=false' and 'pk.mode=record_value'
and therefore requires records with a non-null Struct value and non-null Struct schema,
but found record at (topic='test_dml.inventory.orders2',partition=0,offset=9,timestamp=1651160261336)
with a null value and null value schema.
無(wú)主鍵表的幾個(gè)問(wèn)題
- 問(wèn)題一:無(wú)主鍵表的 Key 是空值,所以不能使用 “delete.enabled”: “true”,“pk.mode”: “record_key”
- 修改 “pk.mode”: “record_value”,加入?yún)?shù) “pk.fields”: “id”,讓 Key 使用 Value 中的字段
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):
Deletes are only supported for pk.mode record_key
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}
-
“delete.enabled”: “true” 只能和 “pk.mode”: “record_key” 搭配使用,如果想要同步 DELETE 操作,Key 必須不為空
-
問(wèn)題二:“insert.mode”: “upsert” 需要主鍵,對(duì)于沒(méi)有主鍵的表需要配置 “pk.mode”: “record_value”,“pk.fields”: “id”
-
問(wèn)題三:“delete.enabled”: “false”,“pk.mode”: “record_value”,“pk.fields”: “id”,“insert.mode”: “upsert” 一樣不支持 DELETE 操作,如果想要同步 DELETE 操作,Key 必須不為空
-
下一章節(jié)的 message.key.columns 參數(shù)均可解決這這幾個(gè)問(wèn)題
Debezium 為無(wú)主鍵表提供 message.key.columns 參數(shù)

[root@docker tutorial]# cat register-postgres-key.json
{
"name": "inventory-connector-key",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test_dml",
"database.server.name": "test_dml",
"snapshot.mode": "always",
"table.include.list": "inventory.orders3",
"slot.name": "test_dml_slot",
"message.key.columns": "inventory.orders3:id,product_id"
}
}
curl -s -X DELETE localhost:8083/connectors/inventory-connector-key
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-key.json
[root@docker tutorial]# cat oracle-testdml-sink.json
{
"name": "oracle-testdml-sink3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "test",
"connection.password": "test",
"tasks.max": "1",
"topics": "test_dml.inventory.orders3",
"table.name.format": "ORDERS3",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"delete.enabled": "true",
"pk.mode": "record_key",
"insert.mode": "upsert"
}
}
curl -s -X DELETE localhost:8083/connectors/oracle-testdml-sink3
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-testdml-sink.json

insert into inventory.orders3 values (11001,now(),1003,1,102);


update inventory.orders3 set quantity=2 where id=11001;


delete from inventory.orders3 where id = 11001;





「喜歡這篇文章,您的關(guān)注和贊賞是給作者最好的鼓勵(lì)」
關(guān)注作者
【版權(quán)聲明】本文為墨天輪用戶原創(chuàng)內(nèi)容,轉(zhuǎn)載時(shí)必須標(biāo)注文章的來(lái)源(墨天輪),文章鏈接,文章作者等基本信息,否則作者和墨天輪有權(quán)追究責(zé)任。如果您發(fā)現(xiàn)墨天輪中有涉嫌抄襲或者侵權(quán)的內(nèi)容,歡迎發(fā)送郵件至:contact@modb.pro進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),墨天輪將立刻刪除相關(guān)內(nèi)容。




