即將推出的 PostgreSQL 15 引入了由富士通 OSS 團隊與 PostgreSQL 開源社區合作添加的一項新功能,該功能允許在使用發布/訂閱的邏輯復制中支持兩階段提交。讓我們來看看如何使用它。

此功能支持創建允許對兩階段事務的復制進行解碼的發布/訂閱。我們還修改了邏輯解碼插件 pgoutput 以支持所有必需的兩階段回調 。
啟用兩階段提交時,準備好的事務在 PREPARE TRANSACTION 時發送給訂閱者,訂閱者也將其作為兩階段事務處理。
一、背景
PostgreSQL 14 已經添加了框架和解碼器端基礎設施,以允許在 PREPARE TRANSACTION 時解碼兩階段提交。PostgreSQL 14 還修改了 test_decoding 插件以使用這個框架。
但是,使用 PUBLICATION/SUBSCRIPTION 進行邏輯復制的客戶端無法直接訪問該功能。這意味著在 PostgreSQL 14 中,準備好的事務在解碼 PREPARE TRANSACTION 時不會發送給訂閱者,而是僅在解碼相應的 COMMIT PREPARED 時發送給訂閱者。
例如,PostgreSQL 14 的行為如下:
1發布方
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE PUBLICATION pub FOR TABLE test;
2訂閱方
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres host=localhost' PUBLICATION pub;
NOTICE: created replication slot "sub" on publisher
CREATE SUBSCRIPTION
3 發布方
postgres=# BEGIN;
BEGIN
postgres=*# INSERT INTO test VALUES (7,'aa');
INSERT 0 1
postgres=*# PREPARE TRANSACTION 't1';
PREPARE TRANSACTION
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+-------------------------------+-------+----------
790 | t1 | 2022-03-14 06:59:49.341013-04 | ajin | postgres
(1 row)
4 訂閱方
postgres=# SELECT * FORM pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+-----+----------+-------+----------
(0 rows)
請注意,準備好的事務不會在訂閱者上復制。
二、特征
1.概述
新的 SUBSCRIPTION 選項two_phase指定是否為此 SUBSCRIPTION 啟用兩階段提交。默認值為false。
CREATE SUBSCRIPTION sub
CONNECTION 'conninfo'
PUBLICATION pub
WITH (two_phase = on);
啟用兩階段提交時,準備好的事務在 PREPARE TRANSACTION 時發送給訂閱者,訂閱者也將其作為兩階段事務處理。否則,準備好的事務只有在提交時才會發送給訂閱者,并立即處理。(我的測試結果和這句話有點不符:從我的測試結果來看,沒用兩階段提交,也會把兩階段提交的狀態文件傳輸到訂閱節點,只不過在發布節點COMMIT?PREPARE的時候,訂閱節點的不能正常把變更寫到庫里)
2.克服PREPARE 并發
兩階段事務在 PREPARE TRANSACTION 重放,然后分別在 COMMIT PREPARED 和 ROLLBACK PREPARED 提交或回滾。
當 tablesync worker仍在忙于執行初始復制時,準備好的事務可能會到達應用worker。在這種情況下,apply worker 啟動一個新事務,但隨后會跳過所有后續更改(例如,insert),假設正在運行的 tablesync worker 正在處理它們。同時,tablesync worker 可能根本看不到準備好的事務(因為它在 tablesync worker 開始應用更改的一致點之前)。
現在,tablesync worker 退出,沒有對準備好的事務做任何事情。稍后,當apply worker執行 COMMIT PREPARED 時,它會得到一個空的PREPARE錯誤(事務是空的,因為Apply worker之前跳過了插入)。
為了避免這種復雜性,兩階段提交的實現要求復制已成功完成初始表同步階段。這意味著即使為訂閱啟用了two_phase,內部兩階段狀態也會暫時掛起,直到所有表初始化完成。請參閱以下三態部分。

通過這些步驟,我們有:
1在啟用two_phase的情況下創建訂閱。
2最初,訂閱處于tablesync階段 - 為每個表啟動 tablesync worker。
3每個 tablesync worker 為發布者上的每個表創建一個 tablesync 槽。
4兩階段狀態設置為掛起(通過在 pg_subscription 目錄中設置列subtwophasestate - 稍后會詳細介紹)。
之后,進入應用worker階段。

在Apply worker階段,我們有:
1 tablesync worker將其 tablesync 插槽放在發布者上并死亡。
2Apply worker接管。
3Apply worker在發布者上創建訂閱復制槽。
4兩相狀態設置為啟用。
3.三態啟用
在上圖中,兩階段狀態的改變是通過設置 pg_subscription 的新列subtwophasestate來完成的,它表示兩階段模式的狀態。

即使用戶指定他們想要使用two_phase = on的訂閱,在內部它也會以pending的三態開始,并且僅在所有 tablesync 初始化完成后才啟用- 也就是說,當所有 tablesync worker都達到其就緒狀態時. 換句話說,pending只是訂閱啟動時的一個過渡狀態。
在兩階段正確可用(啟用三態)之前,訂閱的行為就像two_phase = off一樣。當apply worker 檢測到所有tablesyncs 已經準備好(當三態處于pending時)它會重啟apply worker 進程。
當(重新啟動的)應用worker發現所有 tablesync worker都已準備好進行兩階段三態掛起時,它調用 wal_startstreaming 以正確 啟用發布者以進行兩階段提交并將三態值掛起更新為已啟用。
如果用戶需要知道三態值,他們可以從 pg_subscription 目錄中獲取它。例如:
postgres=# SELECT subtwophasestate FROM pg_subscription;
subtwophasestate
------------------
e
4.ALTER SUBSCRIPTION 限制
ALTER SUBSCRIPTION 無法更改two_phase選項。
此限制是為了規避準備好的事務和相應的 COMMIT PREPARED 跨越two_phase選項的啟用或禁用的情況。在這種情況下,解碼器將無法決定事務是否需要完全解碼,或者只發送 COMMIT PREPARED。
5.訂閱者的全局 ID (GID)
在訂閱者上復制的準備好的事務將與在發布者上指定的 GID 不同。如果有多個訂閱者在發布者上應用特定的準備事務,并且所有訂閱者都使用與發布者相同的 GID,那么當第二個事務嘗試使用相同的 GID 進行準備時,這將失敗。
為了避免這種沖突,訂閱者上的應用worker根據訂閱者 ID 和發布者上的事務 ID 替換生成的唯一 GID:pg_gid_
示例:pg_gid_24576_790
6.回調 API
對于此功能,實現了以下 pgoutput 函數,以便可以分配兩階段提交所需的回調。有關這些回調的詳細信息,請參閱我之前的博客文章PostgreSQL 14 中兩階段提交的邏輯解碼。
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
cb->prepare_cb = pgoutput_prepare_txn;
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
三、例子
1發布方
- 創建表和發布。
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE PUBLICATION pub FOR TABLE test;
2訂閱方
- 創建同一個表,并創建一個啟用 two_phase 模式的訂閱。
- 檢查subtwophasestate列以驗證它是否啟用了two_phase(如果值為e,則開啟了)。
postgres=# CREATE TABLE test (col1 INT, col2 TEXT, PRIMARY KEY(col1));
CREATE TABLE
postgres=# CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres host=localhost' PUBLICATION pub WITH (two_phase = on);
NOTICE: created replication slot "sub" on publisher
CREATE SUBSCRIPTION
postgres=# SELECT subtwophasestate FROM pg_subscription;
subtwophasestate
------------------
e
(1 row)
3發布方
- 開始交易。
- 插入一些數據。
- 準備事務并檢查 GID。
postgres=# BEGIN;
BEGIN
postgres=*# INSERT INTO test VALUES (7,'aa');
INSERT 0 1
postgres=*# PREPARE TRANSACTION 't1';
PREPARE TRANSACTION
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+-------------------------------+-------+----------
790 | t1 | 2022-03-14 06:59:49.341013-04 | ajin | postgres
(1 row)
4訂閱方
- 檢查訂戶端并查看生成的準備好的事務 GID 也在那里復制。
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+------------------+-------------------------------+-------+----------
877 | pg_gid_24576_790 | 2022-03-14 06:59:49.350815-04 | ajin | postgres
(1 row)
5發布方
- 提交準備好的事務。
- 觀察準備好的事務 GID 現在已經消失(它已提交)。
- 選擇插入的數據。
postgres=# COMMIT PREPARED 't1';
COMMIT PREPARED
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+----------+-------+----------
(0 rows)
postgres=# SELECT * FROM test;
a | b
---+----
7 | aa
(1 row)
6訂閱方
- 訂閱方生成的 GID 也消失了(已提交)。
- 選擇顯示已復制發布的數據。
postgres=# SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
------------+-----+----------+-------+----------
(0 rows)
postgres=# SELECT * from test;
a | b
---+----
7 | aa
(1 row)
四、未來展望
PostgreSQL 15 現在提供了支持兩階段提交的分布式數據庫的底層框架。對于在分布式數據庫中工作的兩階段事務,備用數據庫需要通知主數據庫有關失敗的 PREPARE 并啟動回滾。這種類型的備用反饋機制目前在 PostgreSQL 中不存在,并且是未來改進的候選者。
原文鏈接
參考鏈接
https://github.com/postgres/postgres/commit/a8fd13cab0ba815e9925dc9676e6309f699b5f72
https://github.com/postgres/postgres/commit/63cf61cdeb7b0450dcf3b2f719c553177bac85a2




