
Debezium 提供的 Docker 測試環境
- 才發現 Debezium 提供了一個使用 docker-compose 進行自動部署的測試環境,非常的好用,根據自己的環境改改,可以測試很多場景。
- 下載地址:https://github.com/debezium/debezium-examples
- 使用教程:https://github.com/debezium/debezium-examples/tree/main/tutorial
下載解壓,可以看到 tutorial 提供的測試用例
- Debezium 提供的測試環境主要是針對 MySQL 的,本文測試主要針對 PostgreSQL,根據 MySQL 的相關測試修改一下,就可以用于 PostgreSQL 的測試了。
[root@docker ~]# unzip debezium-examples-main.zip
[root@docker ~]# cd debezium-examples-main/tutorial
[root@docker tutorial]# ls -lrt
total 768
-rw-r--r--. 1 root root 602664 Apr 19 22:42 vitess-sharding-setup.png
drwxr-xr-x. 2 root root 30 Apr 19 22:42 secrets
-rw-r--r--. 1 root root 521 Apr 19 22:42 register-vitess.json
-rw-r--r--. 1 root root 538 Apr 19 22:42 register-sqlserver.json
-rw-r--r--. 1 root root 448 Apr 19 22:42 register-postgres.json
-rw-r--r--. 1 root root 582 Apr 19 22:42 register-oracle-logminer.json
-rw-r--r--. 1 root root 568 Apr 19 22:42 register-mysql.json
-rw-r--r--. 1 root root 637 Apr 19 22:42 register-mysql-ext-secrets.json
-rw-r--r--. 1 root root 860 Apr 19 22:42 register-mysql-avro.json
-rw-r--r--. 1 root root 878 Apr 19 22:42 register-mysql-apicurio.json
-rw-r--r--. 1 root root 1172 Apr 19 22:42 register-mysql-apicurio-converter-json.json
-rw-r--r--. 1 root root 1166 Apr 19 22:42 register-mysql-apicurio-converter-avro.json
-rw-r--r--. 1 root root 437 Apr 19 22:42 register-mongodb.json
-rw-r--r--. 1 root root 576 Apr 19 22:42 register-db2.json
-rw-r--r--. 1 root root 22923 Apr 19 22:42 README.md
-rw-r--r--. 1 root root 1955 Apr 19 22:42 docker-compose-zookeeperless-kafka.yaml
-rw-r--r--. 1 root root 1616 Apr 19 22:42 docker-compose-zookeeperless-kafka-combined.yaml
-rw-r--r--. 1 root root 885 Apr 19 22:42 docker-compose-vitess.yaml
-rw-r--r--. 1 root root 1119 Apr 19 22:42 docker-compose-sqlserver.yaml
-rw-r--r--. 1 root root 1082 Apr 19 22:42 docker-compose-postgres.yaml
-rw-r--r--. 1 root root 927 Apr 19 22:42 docker-compose-oracle.yaml
-rw-r--r--. 1 root root 887 Apr 19 22:42 docker-compose-mysql.yaml
-rw-r--r--. 1 root root 1068 Apr 19 22:42 docker-compose-mysql-ext-secrets.yml
-rw-r--r--. 1 root root 1671 Apr 19 22:42 docker-compose-mysql-avro-worker.yaml
-rw-r--r--. 1 root root 1391 Apr 19 22:42 docker-compose-mysql-avro-connector.yaml
-rw-r--r--. 1 root root 1036 Apr 19 22:42 docker-compose-mysql-apicurio.yaml
-rw-r--r--. 1 root root 43764 Apr 19 22:42 docker-compose-mysql-apicurio.png
-rw-r--r--. 1 root root 1094 Apr 19 22:42 docker-compose-mongodb.yaml
-rw-r--r--. 1 root root 1098 Apr 19 22:42 docker-compose-db2.yaml
-rw-r--r--. 1 root root 930 Apr 19 22:42 docker-compose-cassandra.yaml
drwxr-xr-x. 3 root root 36 Apr 19 22:42 debezium-with-oracle-jdbc
drwxr-xr-x. 3 root root 74 Apr 19 22:42 debezium-vitess-init
drwxr-xr-x. 2 root root 27 Apr 19 22:42 debezium-sqlserver-init
drwxr-xr-x. 4 root root 41 Apr 19 22:42 debezium-db2-init
drwxr-xr-x. 2 root root 141 Apr 19 22:42 debezium-cassandra-init
drwxr-xr-x. 2 root root 26 Apr 19 22:42 db2data
安裝 docker-compose
- Docker Compose 是一個在 Docker 上運行多容器應用程序的工具。
- Docker Compose V2 是 Docker Compose 的主要升級版本,使用 Golang 完全重寫的,V1 是用 Python 編寫的。
- 在 Github 上的地址:https://github.com/docker/compose
- 本示例使用的 docker-compose 的下載地址:https://github.com/docker/compose/releases/download/v2.4.1/docker-compose-linux-x86_64
# 下載的二進制文件,給個可執行權限就可以直接運行,為了方便修改下文件名
[root@docker ~]# chmod +x docker-compose-linux-x86_64
[root@docker ~]# mv docker-compose-linux-x86_64 docker-compose
簡單管理 docker-compose
# 幫助
/root/docker-compose -h
# 創建并啟動項目中的所有容器
/root/docker-compose -f xxx.yaml up
# 停止并刪除項目中的所有容器
/root/docker-compose -f xxx.yaml down
# 重啟項目中的服務(單個容器),以下示例重啟connect容器
/root/docker-compose -f xxx.yaml restart connect
# 列出項目中所有的容器
/root/docker-compose -f xxx.yaml ps
測試 Avro
Avro 有三種配置方式,第一種在 Kafka Connect Worker 配置,第二種在 Debezium 連接器上配置,第三種是使用 Apicurio 注冊表
第一種配置方式:在 Kafka Connect Worker 配置
編輯 docker-compose 的配置文件
- tutorial 里面只有 MySQL 的 docker-compose-mysql-avro-worker.yaml,仿照這個寫一個 PostgreSQL 的 docker-compose 配置文件
- 添加了一個 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-avro-worker.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=test
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
啟動 docker-compose
- 啟動 docker-compose,會相繼啟動 zookeeper、kafka、postgres、schema-registry、connect 和一個 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-worker.yaml up

注冊 PostgreSQL connector
- 使用 Debezium tutorial 中自帶的 register-postgres.json
# cd /root/debezium-examples-main/tutorial
# cat register-postgres.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
登錄 Kafka Web 查看 Topics 的情況
- http://192.168.0.40:8811/
- 可以看到自動為源端的每個表創建的 Topics
- 可以看到自動為 schemas 創建的 Topics _schemas

- 可以看到每條消息的 key 和 value 都是二進制的

配置網絡
- 本實驗的源端是 PostgreSQL,目標端是 Oracle 19C PDB,Debezium 提供了 PostgreSQL 的 Docker 鏡像,但是沒有 Oracle 的鏡像。
- 在 Docker 上安裝 Oracle 參考:使用Docker裝一個Oracle 19C的單機測試環境
- 使用 docker-compose 部署的環境會建立一個默認的網絡,名稱為 docker-compose.yml 所在目錄名稱小寫形式加上 “_default”,這里就是 tutorial_default。

- 在 Docker 上安裝 Oracle 使用的默認網絡,這樣和 docker-compose 部署的環境,網絡是相互隔離的。
- 為了讓 docker-compose 部署后的 connect 容器能與 Oracle 相連通,需要在 connect 容器上添加 Docker 的默認網絡。
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
# docker inspect tutoral-connect-1 |grep IPAddress
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.3",
"IPAddress": "172.17.0.3",
"IPAddress": "172.26.0.3",
注冊一個消費者連接器
- 消費者連接器使用的是 Kafka Connect JDBC,消費到 Oracle 19C PDB 中
- Debezium 提供的 connect 容器中沒有 Kafka Connect JDBC,需要自行下載并上傳,重啟 connect 容器
# 上傳 Kafka Connect JDBC
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重啟 connect 服務
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-worker.yaml restart connect
- 編輯消費者的連接器并注冊到 Kafka Connect
[root@docker ~]# cat oracle-jdbc-sink.json
{
"name": "oracle-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//10.16.0.1:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink.json
- 查看消費的數據
SQL> desc INVENTORY.ORDERS;
SQL> select * from INVENTORY.ORDERS;

停止并刪除容器,清理測試環境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-avro-worker.yaml down
第二種配置方式:在 Debezium 連接器上配置
編輯 docker-compose 的配置文件
- tutorial 里面只有 MySQL 的 docker-compose-mysql-avro-connector.yaml,仿照這個寫一個 PostgreSQL 的 docker-compose 配置文件
- 添加了一個 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-avro-connector.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=test
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
啟動 docker-compose
- 啟動 docker-compose,會相繼啟動 zookeeper、kafka、postgres、schema-registry、connect 和一個 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml up
注冊 PostgreSQL connector
- tutorial 里面只有 MySQL 的 register-mysql-avro.json,仿照這個寫一個 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-avro.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-avro.json
查看 customers schema
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
- 服務注冊表還帶有一個可以讀取 Avro 消息的控制臺使用者:
# cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property schema.registry.url=http://schema-registry:8081 \
--topic dbserver1.inventory.customers
配置網絡
- 詳細說明見 上一種配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
注冊一個消費者連接器
- 消費者連接器使用的是 Kafka Connect JDBC,消費到 Oracle 19C PDB 中
- Debezium 提供的 connect 容器中沒有 Kafka Connect JDBC,需要自行下載并上傳,重啟 connect 容器
# 上傳 Kafka Connect JDBC
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重啟 connect 服務
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-avro-connector.yaml restart connect
- 編輯消費者的連接器并注冊到 Kafka Connect
[root@docker tutorial]# cat oracle-jdbc-sink-avro.json
{
"name": "oracle-jdbc-sink-avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-avro.json
- 查看消費的數據
SQL> desc INVENTORY.ORDERS;
SQL> select * from INVENTORY.ORDERS;

停止并刪除容器,清理測試環境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-avro-connector.yaml down
第三種配置方式:使用 Apicurio 注冊表
Apicurio Registry 是一個開源 API 和 schema 注冊表,除其他外,可用于存儲 Kafka 記錄的 schema。 它提供
- 它自己的原生 Avro 轉換器和 Protobuf 序列化器
- 將其 schema 導出到注冊表的 JSON 轉換器
- 與 IBM 或 Confluent 等其他 schema 注冊表的兼容層; 它可以與 Confluent Avro 轉換器一起使用。
編輯 docker-compose 的配置文件
- tutorial 里面只有 MySQL 的 docker-compose-mysql-apicurio.yaml,仿照這個寫一個 PostgreSQL 的 docker-compose 配置文件
- 添加了一個 Kafka Web 管理工具
# cd /root/debezium-examples-main/tutorial
# cat docker-compose-postgres-apicurio.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
apicurio:
image: apicurio/apicurio-registry-mem:2.0.0.Final
ports:
- 8080:8080
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
- apicurio
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- ENABLE_APICURIO_CONVERTERS=true
kafkaui:
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=test
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
啟動 docker-compose
- 啟動 docker-compose,會相繼啟動 zookeeper、kafka、postgres、apicurio、connect 和一個 Kafka 的 Web 管理的容器
export DEBEZIUM_VERSION=1.9
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml up
注冊 PostgreSQL connector (Apicurio - JSON 格式)
- tutorial 里面只有 MySQL 的 register-mysql-apicurio-converter-json.json,仿照這個寫一個 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio-converter-json.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-json.json
注冊 PostgreSQL connector (Apicurio - Avro 格式)
- tutorial 里面只有 MySQL 的 register-mysql-apicurio-converter-avro.json,仿照這個寫一個 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio-converter-avro.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio-converter-avro.json
注冊 PostgreSQL connector (Confluent - Avro 格式)
- tutorial 里面只有 MySQL 的 register-mysql-apicurio.json,仿照這個寫一個 PostgreSQL 的配置文件
# cd /root/debezium-examples-main/tutorial
# cat register-postgres-apicurio.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-apicurio.json
查看 customers schema
# Apicurio - JSON 格式和 Avro 格式
curl -X GET http://localhost:8080/apis/registry/v2/groups/default/artifacts/dbserver1.inventory.customers-value | jq .
# Confluent - Avro 格式
curl -X GET http://localhost:8080/apis/ccompat/v6/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
- 服務注冊表還帶有一個可以讀取 Avro 消息的控制臺使用者:
# Apicurio - JSON 格式
# cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
當您查看數據消息時,您會注意到它僅包含payload但不包含schema部分,因為它已外部化到注冊表中。
查看 Topics
- Apicurio - JSON


- Apicurio - Avro


- Confluent - Avro


配置網絡
- 詳細說明見 上一種配置方式
# 先使用 docker ps 查看 tutoral-connect-1 容器的 CONTAINER ID (5aedff3b90e4)
# docker network connect bridge 5aedff3b90e4
注冊一個消費者連接器
- 消費者連接器使用的是 Kafka Connect JDBC,消費到 Oracle 19C PDB 中
- Debezium 提供的 connect 容器中沒有 Kafka Connect JDBC,需要自行下載并上傳,重啟 connect 容器
# 上傳 Kafka Connect JDBC
docker cp confluentinc-kafka-connect-jdbc-10.4.1 tutorial-connect-1:/kafka/connect
# 使用 docker-compose 重啟 connect 服務
cd /root/debezium-examples-main/tutorial
/root/docker-compose -f docker-compose-postgres-apicurio.yaml restart connect
- 編輯消費者的連接器并注冊到 Kafka Connect (Apicurio - JSON 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-json.json
{
"name": "oracle-jdbc-sink-apicurio-json",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-json.json
- 消費端沒走通,報錯:Tolerance exceeded in error handler

- 編輯消費者的連接器并注冊到 Kafka Connect (Apicurio - Avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro.json
{
"name": "oracle-jdbc-sink-apicurio-avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro.json
- 編輯消費者的連接器并注冊到 Kafka Connect (Confluent - Avro 格式)
[root@docker tutorial]# cat oracle-jdbc-sink-apicurio-avro2.json
{
"name": "oracle-jdbc-sink-apicurio-avro2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:oracle:thin:@//172.17.0.2:1521/pdbtt",
"connection.user": "inventory",
"connection.password": "inventory",
"tasks.max": "1",
"topics": "dbserver1.inventory.orders",
"table.name.format": "ORDERS",
"quote.sql.identifiers": "never",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v6"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oracle-jdbc-sink-apicurio-avro2.json
停止并刪除容器,清理測試環境
# cd /root/debezium-examples-main/tutorial
# /root/docker-compose -f docker-compose-postgres-apicurio.yaml down
最后修改時間:2022-04-27 19:52:15
「喜歡這篇文章,您的關注和贊賞是給作者最好的鼓勵」
關注作者
【版權聲明】本文為墨天輪用戶原創內容,轉載時必須標注文章的來源(墨天輪),文章鏈接,文章作者等基本信息,否則作者和墨天輪有權追究責任。如果您發現墨天輪中有涉嫌抄襲或者侵權的內容,歡迎發送郵件至:contact@modb.pro進行舉報,并提供相關證據,一經查實,墨天輪將立刻刪除相關內容。




