MogDB邏輯解碼與pg_recvlogical
概述
談到邏輯解碼需要先從流復(fù)制開(kāi)始說(shuō)起。流復(fù)制的最重要的一個(gè)用途就是實(shí)現(xiàn)數(shù)據(jù)庫(kù)的熱備,數(shù)據(jù)庫(kù)的主備同步方式是物理級(jí)別的數(shù)據(jù)同步。但實(shí)際應(yīng)用場(chǎng)景中僅僅通過(guò)物理賦值是無(wú)法滿足業(yè)務(wù)需求的,因此提供了邏輯復(fù)制的功能。
邏輯復(fù)制主要解決了以物理賦值無(wú)法解決的一些問(wèn)題,例如:
-
指定庫(kù)或部分表的復(fù)制需求
-
將多個(gè)數(shù)據(jù)庫(kù)實(shí)例的數(shù)據(jù)匯聚到同一個(gè)目標(biāo)庫(kù)
-
將一個(gè)庫(kù)的數(shù)據(jù)分發(fā)到多個(gè)不同的庫(kù)
-
不同的版本之間的復(fù)制
-
不同庫(kù)名之間的表同步
邏輯復(fù)制的關(guān)鍵是將WAL日志的內(nèi)容進(jìn)行邏輯解碼成特定的格式,如json,SQL等。pg_recvlogical 客戶端工具就是邏輯解碼的一種典型應(yīng)用,它將WAL日志解碼為json格式,保存在指定文件或標(biāo)準(zhǔn)輸出stdout中。
邏輯復(fù)制約束
需要修改wal_level,在MogDB中wal_level有如下取值:
- minimal
- 優(yōu)點(diǎn):一些重要操作(包括創(chuàng)建表、創(chuàng)建索引、簇操作和表的復(fù)制)都能安全的跳過(guò),這樣就可以使操作變得更快。
- 缺點(diǎn):WAL僅提供從數(shù)據(jù)庫(kù)服務(wù)器崩潰或者緊急關(guān)閉狀態(tài)恢復(fù)時(shí)所需要的基本信息,無(wú)法用WAL歸檔日志恢復(fù)數(shù)據(jù)。
- archive
- 這個(gè)參數(shù)增加了WAL歸檔需要的日志信息,從而可以支持?jǐn)?shù)據(jù)庫(kù)的歸檔恢復(fù)。
- hot_standby
- 這個(gè)參數(shù)進(jìn)一步增加了在備機(jī)上運(yùn)行的SQL查詢的信息,這個(gè)參數(shù)只能在數(shù)據(jù)庫(kù)服務(wù)重新啟動(dòng)后生效。
- 為了在備機(jī)上開(kāi)啟只讀查詢,wal_level必須在主機(jī)上設(shè)置成hot_standby ,并且備機(jī)必須打開(kāi)hot_standby參數(shù)。
- logical
- 這個(gè)參數(shù)表示W(wǎng)AL日志支持邏輯復(fù)制。
默認(rèn)值: minimal
- 這個(gè)參數(shù)表示W(wǎng)AL日志支持邏輯復(fù)制。
須知:
- 如果需要啟用WAL日志歸檔和主備機(jī)的數(shù)據(jù)流復(fù)制,必須將此參數(shù)設(shè)置為archive或者h(yuǎn)ot_standby。
- 如果此參數(shù)設(shè)置為minimal,archive_mode必須設(shè)置為off,hot_standby必須設(shè)置為off,max_wal_senders參數(shù)設(shè)置為0,且需為單機(jī)環(huán)境,否則將導(dǎo)致數(shù)據(jù)庫(kù)無(wú)法啟動(dòng)。
- 如果此參數(shù)設(shè)置為archive,hot_standby必須設(shè)置為off,否則將導(dǎo)致數(shù)據(jù)庫(kù)無(wú)法啟動(dòng)。但是,hot_standby在雙機(jī)環(huán)境中不能設(shè)置為off,具體參見(jiàn)hot_standby參數(shù)說(shuō)明。
注意:
- 與PostgreSQL的wal_level略有不同,PostgreSQL只包括3種級(jí)別:minimal, replica, or logical
- PostgreSQL中wal_level的默認(rèn)值是
replica,它會(huì)寫(xiě)入足夠的數(shù)據(jù)以支持WAL歸檔和復(fù)制,包括在后備服務(wù)器上運(yùn)行只讀查詢。minimal會(huì)去掉除從崩潰或者立即關(guān)機(jī)中進(jìn)行恢復(fù)所需的信息之外的所有記錄。最后,logical會(huì)增加支持邏輯解碼所需的信息。每個(gè)層次包括所有更低層次記錄的信息。這個(gè)參數(shù)只能在服務(wù)器啟動(dòng)時(shí)設(shè)置。
postgres.conf配置
wal_level = logical # minimal, archive, hot_standby or logical
# (change requires restart)
max_wal_senders = 10 # max number of walsender processes
# (change requires restart)
wal_keep_segments = 16 # in logfile segments, 16MB each; 0 disables
max_replication_slots = 8 # max number of replication slots.i
注:修改配置后需要重啟MogDB
創(chuàng)建replication用戶
MogDB=#CREATE ROLE pub_sub_user WITH SYSADMIN REPLICATION LOGIN PASSWORD 'pub_sub@123';
NOTICE: The encrypted password contains MD5 ciphertext, which is not secure.
CREATE ROLE
MogDB=#
注:需要有SYSADMIN 和REPLICATION 權(quán)限
pg_hba.conf配置
# replication privilege.
#local replication omm trust
#host replication omm 127.0.0.1/32 trust
#host replication omm ::1/128 trust
host all all 0.0.0.0/0 md5
host replication all 0.0.0.0/0 md5
場(chǎng)景限制
- 目前不支持DDL解析,只能解析DML(INSERT、UPDATE、DELETE,TRUNCATE);
- TEMPORARY表和UNLOGGED表不會(huì)被復(fù)制;
- 表必須有主鍵或唯一約束,否則像update或delete操作無(wú)法被復(fù)制;
- 序列不被復(fù)制;
- 大對(duì)象不被復(fù)制;
- 新增加的表,不會(huì)自動(dòng)加入訂閱,需要在訂閱端進(jìn)行刷新;
pg_recvlogical工作原理
pg_recvlogical可以作為觀察wal日志具體變化的工具。
整體架構(gòu)

如上圖:
- pg_recvlogical通過(guò)libpq與MogDB server建立鏈接
- 通過(guò)
CREATE_REPLICATION_SLOT建立邏輯復(fù)制槽 - 運(yùn)行
START_REPLICATION,循環(huán)處理wal流,并進(jìn)行解碼 - 將解碼數(shù)據(jù)寫(xiě)入文件或者在stdout屏幕數(shù)據(jù)
- pg_logical_slot_get_changes、pg_logical_slot_peek_changes可以查看修改情況
DROP_REPLICATION_SLOT銷(xiāo)毀復(fù)制槽
pg_recvlogical主流程

- 設(shè)置插件
- pg_recvlogical默認(rèn)使用
mppdb_decoding
- pg_recvlogical默認(rèn)使用
plugin = pg_strdup("mppdb_decoding");
- pg_recvlogical也可以通過(guò)命令行參數(shù)
--plugin設(shè)置插件 如:–plugin = wal2json

通過(guò)ouput/decode插件處理pg_recvlogical可以將wal變更內(nèi)容輸出為特定格式(TXT、JISON)。
- 命令行處理
pg_recvlogical receives logical change stream.
Usage:
pg_recvlogical [OPTION]...
Options:
-f, --file=FILE receive log into this file. - for stdout
-n, --no-loop do not loop on connection lost
-v, --verbose output verbose messages
-V, --version output version information, then exit
-?, --help show this help, then exit
Connection options:
-d, --dbname=DBNAME database to connect to
-h, --host=HOSTNAME database server host or socket directory
-p, --port=PORT database server port number
-U, --username=NAME connect as specified database user
-w, --no-password never prompt for password
-W, --password force password prompt (should happen automatically)
Replication options:
-F --fsync-interval=INTERVAL
frequency of syncs to the output file (in seconds, defaults to 10)
-o, --option=NAME[=VALUE]
Specify option NAME with optional value VAL, to be passed
to the output plugin
-P, --plugin=PLUGIN use output plugin PLUGIN (defaults to mppdb_decoding)
-s, --status-interval=INTERVAL
time between status packets sent to server (in seconds, defaults to 10)
-S, --slot=SLOT use existing replication slot SLOT instead of starting a new one
-I, --startpos=PTR Where in an existing slot should the streaming start
-r, --raw parallel decoding output raw results without converting to text format
Action to be performed:
--create create a new replication slot (for the slotname see --slot)
--start start streaming in a replication slot (for the slotname see --slot)
--drop drop the replication slot (for the slotname see --slot)
- CREATE_REPLICATION_SLOT
- DROP_REPLICATION_SLOT
- START_REPLICATION
以上3個(gè)命令通過(guò)詞法和語(yǔ)法解析src/backend/replication/repl_scanner.l、src/gausskernel/storage/replication/repl_gram.y

/* CREATE_REPLICATION_SLOT SLOT slot [%X/%X] */
create_replication_slot:
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL [init_slot_lsn] */
K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL RECPTR
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->init_slot_lsn = $4;
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_LOGICAL;
cmd->slotname = $2;
cmd->plugin = $4;
$$ = (Node *) cmd;
}
;
/* DROP_REPLICATION_SLOT slot */
drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT
{
DropReplicationSlotCmd *cmd;
cmd = makeNode(DropReplicationSlotCmd);
cmd->slotname = $2;
$$ = (Node *) cmd;
}
;
- 加載插件
output插件都要實(shí)現(xiàn)_PG_output_plugin_init 函數(shù),和必要的回調(diào)函數(shù)。

回調(diào)函數(shù)集合:
/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks {
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeAbortCB abort_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
} OutputPluginCallbacks;
主要的幾個(gè)回調(diào)函數(shù)包括(mppdb_decoding為例):
- pg_decode_begin_txn --開(kāi)始事務(wù)
- pg_decode_change --增、刪、改、truncate
- pg_decode_commit_txn --提交事務(wù)
演示
創(chuàng)建測(cè)試表
CREATE TABLE pub_sub
(
id SERIAL primary key ,
name character varying
);
創(chuàng)建復(fù)制槽
pg_recvlogical --create -S test_slot -d postgres

啟動(dòng)recvlogical
pg_recvlogical --start -S test_slot -d postgres -f - -- “-f -” 表示輸出到stdout(屏幕)

注:pg_recvlogical會(huì)不停地每個(gè)5s檢查復(fù)制槽是否有更新
插入數(shù)據(jù)
MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
location | xid | data
----------+-----+------
(0 rows)
MogDB=#insert into pub_sub (name) values('test pg_recvlogical 01');
INSERT 0 1
MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
location | xid | data
-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------
0/48C60D8 | 63062 | BEGIN 63062
0/48C60D8 | 63062 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["23"
,"'test pg_recvlogical 01'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48C62B8 | 63062 | COMMIT 63062 (at 2022-09-01 11:14:24.539429+00) CSN 47645
(3 rows)
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
location | xid | data
----------+-----+------
(0 rows)
MogDB=#insert into pub_sub (name) values('test pg_recvlogical 02');
INSERT 0 1
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
ERROR: replication slot "test_slot" is already active
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
location | xid | data
-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------
0/48C6548 | 63063 | BEGIN 63063
0/48C6548 | 63063 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24"
,"'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48C6728 | 63063 | COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
(3 rows)

pg_recvlogical輸出如下
BEGIN 63063
{"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24","'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
因?yàn)閜g_recvlogical默認(rèn)使用mppdb_decoding插件,mppdb_decoding輸出格式為json。
格式化后的json:

銷(xiāo)毀復(fù)制槽
pg_recvlogical --drop -S test_slot -d postgres

換插件
- wal2json
$ pg_recvlogical --create -S test_slot -d postgres --plugin=wal2json $ pg_recvlogical --start -S test_slot -d postgres -f -
wal2json 沒(méi)有顯示BEGIN和COMMIT

警告:有概率會(huì)報(bào)一下錯(cuò)誤,疑似BUG。

- pgoutput
$ pg_recvlogical --create -S test_slot -d postgres --plugin=pgoutput $ pg_recvlogical --start -S test_slot -d postgres -f -
更換為pgoutput 后回報(bào)版本錯(cuò)誤:FATAL: client sent proto_version=0 but we only support protocol 1 or higher

SQL方式演示
MogDB=#SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'mppdb_decoding');
slotname | xlog_position
-----------------+---------------
regression_slot | 0/48D3488
(1 row)
MogDB=#select * from pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn | dummy_standby
-----------------+----------------+-----------+--------+----------+--------+------+--------------+-------------+---------------
user_sub | pgoutput | logical | 15016 | postgres | t | | 63071 | 0/48D3438 | f
wal2json | wal2json | logical | 15016 | postgres | f | | 61212 | 0/40EBA88 | f
test_slot | mppdb_decoding | logical | 15016 | postgres | f | | 61212 | 0/48C9A90 | f
regression_slot | mppdb_decoding | logical | 15016 | postgres | f | | 63071 | 0/48D3408 | f
(4 rows)
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
location | xid | data
----------+-----+------
(0 rows)
MogDB=#CREATE TABLE data(id serial primary key, data text);
NOTICE: CREATE TABLE will create implicit sequence "data_id_seq" for serial column "data.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "data_pkey" for table "data"
CREATE TABLE
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
location | xid | data
-----------+-------+-----------------------------------------------------------
0/48C9D30 | 63070 | BEGIN 63070
0/48D32E8 | 63070 | COMMIT 63070 (at 2022-09-01 11:51:40.602993+00) CSN 47650
(2 rows)
MogDB=#BEGIN;
BEGIN
MogDB=#INSERT INTO data(data) VALUES('1');
INSERT 0 1
MogDB=#INSERT INTO data(data) VALUES('2');
INSERT 0 1
MogDB=#COMMIT
#;
COMMIT
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
location | xid | data
-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------
0/48E0CD0 | 63073 | BEGIN 63073
0/48E0CD0 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["1","'1'"],"old_key
s_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48E1090 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["2","'2'"],"old_key
s_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48E11E0 | 63073 | COMMIT 63073 (at 2022-09-01 11:58:19.799037+00) CSN 47653
(4 rows)
MogDB=#SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
總結(jié)
邏輯復(fù)制/解碼相對(duì)于物理復(fù)制更加靈活,也可以根據(jù)實(shí)際業(yè)務(wù)需要開(kāi)發(fā)對(duì)應(yīng)的邏輯解碼插件,甚至可以當(dāng)做ETL來(lái)使用。總的來(lái)說(shuō)邏輯解碼是對(duì)用戶非常友好的接口。以上通過(guò)對(duì)MogDB邏輯復(fù)制、邏輯解碼的原理和部分代碼進(jìn)行分析,利用pg_recvlogical和SQL演示邏輯解碼的過(guò)程,希望對(duì)大家理解MogDB邏輯解碼有所幫助。




