參考文檔
https://debezium.io/documentation/reference/1.9/configuration/avro.html
背景
Debezium 連接器運行在 Kafka Connect 框架中,通過生成變更事件記錄寫入到Kafka主題(topic)來捕獲和記錄數據庫中的每個行級更改,在 Debezium 連接器將變更事件記錄寫入到Kafka主題(topic)之前,需要將捕獲的記錄進行轉換,轉換成 Kafka 可以存儲的形式,Kafka Connect 提供了一個 JSON 轉換器,可將記錄的 keys 和 values 序列化為 JSON 格式,但是 JSON 轉換器默認是包含記錄的消息 schema,這使得每條記錄都非常冗長。

- Key
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "DEPTNO"
}
],
"optional": false,
"name": "ora19c.SCOTT.DEPT.Key"
},
"payload": {
"DEPTNO": 10
}
}
- Values
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "DEPTNO"
},
{
"type": "string",
"optional": true,
"field": "DNAME"
},
{
"type": "string",
"optional": true,
"field": "LOC"
}
],
"optional": true,
"name": "ora19c.SCOTT.DEPT.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int8",
"optional": false,
"field": "DEPTNO"
},
{
"type": "string",
"optional": true,
"field": "DNAME"
},
{
"type": "string",
"optional": true,
"field": "LOC"
}
],
"optional": true,
"name": "ora19c.SCOTT.DEPT.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "txId"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "lcr_position"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "ora19c.SCOTT.DEPT.Envelope"
},
"payload": {
"before": null,
"after": {
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
},
"source": {
"version": "1.9.0.Final",
"connector": "oracle",
"name": "ora19c",
"ts_ms": 1650415436277,
"snapshot": "true",
"db": "PDBTT",
"sequence": null,
"schema": "SCOTT",
"table": "DEPT",
"txId": null,
"scn": "6853525",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1650415436280,
"transaction": null
}
}
以上是一條記錄在Kafka中的存儲形式,keys 和 values 包含了消息 schema,使得每條消息特別冗長,這樣會導致存儲壓力和網絡傳輸壓力,關于消息 schema 有什么作用暫時還不清楚,但是 Debezium 還是讓考慮關閉記錄 schema 的屬性,涉及到以下兩個參數:
key.converter.schemas.enable value.converter.schemas.enable
- 測試發現,在某些場景,關閉記錄 schema 的屬性會導致消費端出現問題,建議考慮Avro轉換器,以下是一個問題案例
Kafka使用JSON序列化關閉schema后在消費端產生的一個問題
配置參數關閉 message schema 的屬性
- 在 Kafka Connect 連接器的配置文件(config/connect-distributed.properties)中配置參數值:
# 因 Kafka Connect 是一個 Docker 容器,容器內也沒 vi 編輯器,所有可以考慮在 Docker 宿主機上對容器內的文件進行編輯
# 查看容器內的目錄在 Docker 宿主機上的掛載位置
[root@docker ~]# docker inspect connect |grep "/kafka/config" -B1
"Source": "/var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data",
"Destination": "/kafka/config",
# 編輯 connect-distributed.properties 文件,修改參數
[root@docker ~]# cd /var/lib/docker/volumes/11bb601aeeb27d02bfa1d238bbbf157dfb5b813a42a1af33b1a79ec25df0ad75/_data
[root@docker _data]# ll
total 68
-rw-r--r--. 1 1001 1001 906 Apr 19 14:51 connect-console-sink.properties
-rw-r--r--. 1 1001 1001 909 Apr 19 14:51 connect-console-source.properties
-rw-r--r--. 1 1001 1001 5608 Apr 19 14:52 connect-distributed.properties
-rw-r--r--. 1 1001 1001 883 Apr 19 14:51 connect-file-sink.properties
-rw-r--r--. 1 1001 1001 881 Apr 19 14:51 connect-file-source.properties
-rw-r--r--. 1 1001 1001 2103 Apr 19 14:51 connect-log4j.properties
-rw-r--r--. 1 1001 1001 2540 Apr 19 14:51 connect-mirror-maker.properties
-rw-r--r--. 1 1001 1001 2262 Apr 19 14:51 connect-standalone.properties
-rw-r--r--. 1 1001 1001 1221 Apr 19 14:51 consumer.properties
drwxr-xr-x. 2 1001 1001 102 Apr 19 14:51 kraft
-rw-rw-r--. 1 1001 1001 850 Apr 19 10:03 log4j.properties
-rw-r--r--. 1 1001 1001 1925 Apr 19 14:51 producer.properties
-rw-r--r--. 1 1001 1001 6849 Apr 19 14:51 server.properties
-rw-r--r--. 1 1001 1001 1032 Apr 19 14:51 tools-log4j.properties
-rw-r--r--. 1 1001 1001 1169 Apr 19 14:51 trogdor.conf
-rw-r--r--. 1 1001 1001 1205 Apr 19 14:51 zookeeper.properties
[root@docker _data]# vi connect-distributed.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
- 重啟連接器使參數生效
# docker restart connect
新建一個連接器,觀察效果
新建連接器時需要注意:參數 name、database.server.name、database.history.kafka.topic 建議保持唯一。
[root@docker ~]# cat oracle-scott-connector.json
{
"name": "oracle-scott-connector2",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"database.hostname" : "172.17.0.2",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.pdb.name" : "PDBTT",
"database.server.name" : "ora1",
"tasks.max" : "1",
"schema.include.list": "SCOTT",
"database.history.kafka.bootstrap.servers" : "192.168.0.40:9092",
"database.history.kafka.topic": "schema-changes.inventory1"
}
}
[root@docker ~]# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @oracle-scott-connector.json

- Key
{
"DEPTNO": 10
}
- Values
{
"before": null,
"after": {
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
},
"source": {
"version": "1.9.0.Final",
"connector": "oracle",
"name": "ora1",
"ts_ms": 1650414625783,
"snapshot": "true",
"db": "PDBTT",
"sequence": null,
"schema": "SCOTT",
"table": "DEPT",
"txId": null,
"scn": "6732490",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1650414625799,
"transaction": null
}
- 相同數據,在 Kafka 中存儲大小的對比,第一張圖是沒有關閉 schema ,第二張圖是關閉了 schema


最后修改時間:2022-04-25 09:24:24
「喜歡這篇文章,您的關注和贊賞是給作者最好的鼓勵」
關注作者
【版權聲明】本文為墨天輪用戶原創內容,轉載時必須標注文章的來源(墨天輪),文章鏈接,文章作者等基本信息,否則作者和墨天輪有權追究責任。如果您發現墨天輪中有涉嫌抄襲或者侵權的內容,歡迎發送郵件至:contact@modb.pro進行舉報,并提供相關證據,一經查實,墨天輪將立刻刪除相關內容。




