実行中のトランザクションの論理レプリケーション – PostgreSQL 14でコミットされた機能の紹介:技術者Blog
PostgreSQLインサイド

Amit Kapila

FUJITSU Limited
Software Products Business Unit Data Management Division
Senior Director

背景

Amit
論理レプリケーションはPostgreSQL 10で導入されて以降、バージョンが上がるごとに改良されています。論理レプリケーションは、クラスタ全体のデータをコピーする物理レプリケーションとは異なり、データを選択してレプリケーションする方法です。これを利用して、マルチマスターや双方向のレプリケーションソリューションを構築できます。物理レプリケーションとの主な違いの1つは、コミット時にのみトランザクションをレプリケート(複製)することです。このため、大規模なトランザクションでは、トランザクションが完了するまでデータの転送を待つ必要があり、適用遅延が発生します。PostgreSQL 14では、実行中の大規模トランザクションをストリームする仕組みを導入します。これにより、特に早い段階でのフィルタリングによって大規模トランザクションでレプリケーションの性能が2倍か、それ以上に向上しました。性能検証の結果については、hackersで報告されています。これにより、適用遅延が大幅に軽減されます。

必要な拡張機能

Amit
この機能を実現するために最初に必要だったことは、WALの内容のストリーミングをいつ開始するのかを決めることでした。もしそのような機能を実現するとしたら、WALからトランザクションの変更を別々に取り出したときに、個別にストリーミングすればよいのではないかと考えることもできます。しかし、そうしてしまうとネットワークにより多くのデータを送信することになってしまいます。なぜなら、アプリケーション側が変更のあったトランザクションを認識できるように、変更ごとに追加のトランザクション情報を送信する必要があるからです。この問題に対処するために、PostgreSQL 13では、新しいGUCパラメーターlogical_decoding_work_memを導入しました。これにより、ディスクに書き込まれるか、または、サブスクライバーにストリームされる前に、ロジカルデコーディングで使用されるメモリー最大量をユーザーが指定できるようになります。

段階的にデコーディングをする際に次の課題となるのが、トップレベルのトランザクションとサブトランザクションのトランザクションID(XID)を関連付けることで生じる遅延になります。ロジカルデコーディング中に、すべての変更がその(サブ)トランザクションとともに累積されます。ここで、出力プラグインまたはストリームへの変更を他のノードに送信する際に、それぞれのトップレベルのトランザクションとサブトランザクション間の関連を見出すことが要求されるトランザクション内で生じたすべての変更を結びつける必要があります。PostgreSQL 14より前では、この関連付けはXLOG_XACT_ASSIGNMENT WALというレコードで行われていました。これは、通常、64回のサブトランザクションの後か、またはコミット時に記録されていました。なぜなら、WAL内でこのような関連付けを取得するのはこの2回だけだからです。PostgreSQL 14では、この関連付けが生じたときに、各サブトランザクションの最初のWALレコードの一部として、すぐに割り当て情報をWALに書き込みます。これは、オーバーヘッドを最小限に抑えるために、wal_level=logicalの場合にのみ行われます。

さらに、段階的なデコーディングに必要なもう1つのことは、各コマンド終了時の無効化処理でした。無効化の基本的な考え方は、例えばrelation cacheのように、キャッシュを最新にして、次のコマンドが最新のスキーマを使用できるようにすることです。これは、WALを段階的に正しくデコードするために必要なことで、デコード中にそうしたキャッシュからのrelationの属性値を使用するためです。このため、wal_level=logicalの場合、コマンド終了時に無効化をWALに書き込み、デコードでこの情報を使用できるようにします。この無効化はデコードされ、トップトランザクションに蓄積され、リプレイ中に実行されます。これにより、コミットレコードの一部として無効化をデコードする必要がなくなります。

APIの拡張

Amit
前の段落では、段階的なデコーディングを可能にするためにPostgreSQLサーバーの基盤で必要とされる拡張機能について説明しました。次のステップは、大規模な実行中のトランザクションをストリームするために、PostgreSQLのコア範囲外における論理レプリケーション用のAPI(ストリームメソッド)を提供することでした。これを可能にするために、出力プラグインAPIに7つのメソッドを追加しました。それらは、「stream_start_cb、stream_stop_cb、stream_abort_cb、stream_commit_cb、stream_change_cb」のコールバックと2つのオプションコールバック「stream_message_cbとstream_truncate_cb」です。これらのAPIについての詳細はPostgreSQL文書を参照してください。

実行中のトランザクションをストリーミングする場合、変更(およびメッセージ)はstream_start_cbコールバックとstream_stop_cbコールバックによって区切られたブロックでストリーミングされます。すべてのデコードされた変更が送信されると、stream_commit_cbコールバックを使用してトランザクションをコミットできます。またはstream_abort_cbコールバックを使用して中止することもできます。

拡張されたAPIを使った場合のストリーミングトランザクションのデコード内容を確認すると、次の例のようになります。

/* Change logical_decoding_work_mem to 64kB in the session */
postgres=# show logical_decoding_work_mem;
 logical_decoding_work_mem
---------------------------
 64kB
(1 row)
postgres=# CREATE TABLE stream_test(data text);
CREATE TABLE
postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ?column?
----------
 init
(1 row)
postgres=# INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 500) g(i);
INSERT 0 500
postgres=# SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '1', 'skip-empty-xacts', '1', 'stream-changes', '1');
                       data
--------------------------------------------------
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 committing streamed transaction TXN 741
(505 rows)

実際のコールバック関数の呼び出しシーケンスは、サーバーの操作によってより複雑になることが想定されます。複数のストリームされたトランザクションのブロックがあったり、トランザクションのいくつかがアボートされたものがあったり、などです。

ストリーミングは、(実行中の全トランザクションのための)WALからデコードされた変更の総量がlogical_decoding_work_memの値に定義された制限値を超えたときに起動されることに注意してください。その時点で、(デコードされた変更に現在使用されているメモリー量で測定された)最大のトップレベルトランザクションが選択され、ストリーミングされます。しかし、メモリーの閾値を超えているにもかかわらず、完全なタプルをデコードしていないために、ストリーミングが有効になっていてもディスクに書き込まなければならない場合があります。例えば、TOASTテーブルへのINSERTをデコードしたが、メインテーブルへのINSERTをデコードしていない場合や、Speculative Insertionのデコードをしても対応する確認レコードをデコードしていない場合などがあります。しかし、完全なタプルを取得するとすぐに、順番に並べられた変更を含むトランザクションをストリーミングします。

実行中のトランザクションのストリーミング中に、出力プラグイン(またはWALレコードのデコード)がカタログ(システムとユーザー定義の両方)を参照するときに、並列アボートによってエラーが発生することがあります。例を挙げて説明しますと、(xmin:500、xmax:0)を持つカタログタプルが1つあるとします。ここで、XID:501がカタログタプルを更新すると、その後で2つのタプル(xmin:500、xmax:501)と(xmin:501、xmax:0)ができます。ここで、501がアボートされ、他のトランザクション、例えばXID:502が同じカタログタプルを更新すると、最初のタプルは(xmin:500、xmax:502)に変更されます。問題は、XID:501で挿入/更新されたタプルをデコードしようとするとき、そのタプルがその時点のスナップショットには不可視であるXID:502によって削除されているタプルであると考えられるため、カタログタプルの(xmin:500、xmax:502)が可視であると見えることになることです。そして、そのカタログタプルによってデコードするときに誤った結果やクラッシュを引き起こす可能性があることです。したがって、実行中のトランザクションのストリーミングを可能にするには、並列アボートを検出する必要があります。並列アボートを検出するために、カタログスキャン中にXIDのステータスをチェックし、それが中断された場合は特定のエラーを報告して、現在のトランザクションのストリーミングを停止し、そのようなエラーですでにストリーミングされている変更を破棄できるようにします。アボートされた(サブ)トランザクションに対する変更の一部はすでにストリーミングされている可能性がありますが、アボートをデコードする際に、アボートメッセージをストリームしてサブスクライバー内の変更点を切り捨てるため、それは問題ありません。

使い方

Amit
組み込み論理レプリケーションに、実行中のトランザクションのストリームミングのサポートを追加するには、主に次の4つのことを行う必要があります。

  • (a)論理レプリケーション・プロトコルを拡張して、実行中のトランザクションを識別し、情報(サブトランザクションのXIDなど)を追加できるようにします。プロトコルの詳細はPostgreSQL文書を参照してください。
  • (b)拡張されたレプリケーション・プロトコルを活用して、新しいストリームAPIコールバックを実装するように出力プラグイン(pgoutput)を変更します。
  • (c)レプリケーションApply Workerを変更して、データをディスクに書き込んでからコミット時に再実行することで、ストリーム化された実行中のトランザクションを適切に処理します。
  • (d)サブスクリプションの作成中に、ストリーミング用の新しいオプションを提供します。

次の例では、組み込み論理レプリケーションを使用してストリーミングを設定する方法を示します。

Publisher Node:
Set logical_decoding_work_mem = '64kB';
# 初期データを使用してパブリケーションを設定
CREATE TABLE test_tab (a int primary key, b varchar);
INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar');
CREATE PUBLICATION tap_pub FOR TABLE test_tab;

Subscriber Node:
# streaming = onオプションを指定してサブスクリプションを設定
CREATE TABLE test_tab (a int primary key, b varchar);
CREATE SUBSCRIPTION tap_sub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION tap_pub WITH (streaming = on);

Publisher Node:
# パブリッシャーノードに対応するレプリケーションスロットが作成されているかを確認
postgres=# SELECT slot_name, plugin, slot_type FROM pg_replication_slots;
 slot_name |  plugin  | slot_type
-----------+----------+-----------
 tap_sub   | pgoutput | logical
(1 row)
# ストリームされたデータがまだないことを確認
postgres=# SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           0 |            0 |            0
(1 row)
# logical_decoding_work_mem (64kB)制限を超えるのに十分な行の挿入、更新および削除
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
# ストリーミングが行われたことを確認
postgres=# SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           1 |           22 |      1444410
(1 row)

Subscriber Node:
# ストリーミングされたデータはまだ表示されない
postgres=# SELECT * FROM test_tab;
 a |  b
---+-----
 1 | foo
 2 | bar
(2 rows)

Publisher Node:
# 大規模トランザクションのコミット
postgres=# Commit;

Subscriber Node:
# データはサブスクライバー上で表示される必要あり
postgres=# SELECT count(*) FROM test_tab;
 count
-------
  3334
(1 row)

謝辞

Amit
この機能は2017年に提案され、さまざまなコミット0bead9af48, c55040ccd0, 45fdc9738b, 7259736a6e, 464824323eの1つとして2020年にコミットされました。この機能を実現するにはさまざまな基盤が必要であったため、完成までに長い時間を要しました。最初にこの機能を提案してくれたTomas Vondra氏、そして私と一緒に残りの部分を完成させてくれたDilip Kumar氏に感謝致します。また、Neha Sharma氏、Mahendra Singh Thalor氏、Ajin Cherian氏、Kuntal Ghosh氏など、プロジェクト全体を通してレビューやさまざまなテストに協力してくれた人達にも感謝致します。さらに、この機能の重要な問題に対する解決策を提案してくれたAndres Freund氏をはじめとするコミュニティーメンバーにも感謝致します。最後になりましたが、私や他のメンバーがこの機能に取り組むことを奨励してくれたEnterpriseDB Corporationと富士通の経営陣に感謝致します。

2021年10月8日公開

オンデマンド(動画)セミナー

    • PostgreSQLに関連するセミナー動画を公開中。いつでもセミナーをご覧いただけます。
      • 【事例解説】運送業務改革をもたらす次世代の運送業界向けDXプラットフォームの構築
      • ハイブリッドクラウドに最適なOSSベースのデータベースご紹介

本コンテンツに関するお問い合わせ

お電話でのお問い合わせ

Webでのお問い合わせ

当社はセキュリティ保護の観点からSSL技術を使用しております。

ページの先頭へ