Avoid repeated decoding of prepared transactions after a restart.
authorAmit Kapila
Mon, 1 Mar 2021 03:41:18 +0000 (09:11 +0530)
committerAmit Kapila
Mon, 1 Mar 2021 03:41:18 +0000 (09:11 +0530)
In commit a271a1b50e, we allowed decoding at prepare time and the prepare
was decoded again if there is a restart after decoding it. It was done
that way because we can't distinguish between the cases where we have not
decoded the prepare because it was prior to consistent snapshot or we have
decoded it earlier but restarted. To distinguish between these two cases,
we have introduced an initial_consistent_point at the slot level which is
an LSN at which we found a consistent point at the time of slot creation.
This is also the point where we have exported a snapshot for the initial
copy. So, prepare transaction prior to this point are sent along with
commit prepared.

This commit bumps SNAPBUILD_VERSION because of change in SnapBuild. It
will break existing slots which is fine in a major release.

Author: Ajin Cherian, based on idea by Andres Freund
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com

contrib/test_decoding/expected/twophase.out
contrib/test_decoding/expected/twophase_stream.out
doc/src/sgml/logicaldecoding.sgml
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/include/replication/reorderbuffer.h
src/include/replication/slot.h
src/include/replication/snapbuild.h

index afa35669795c70600d99c8e5a5fda72a65c7f050..8a1d06d706d3aea6fe1d7027fbb7e239b2ff6c14 100644 (file)
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#1';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                        data                        
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared#3';
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
@@ -158,14 +151,10 @@ RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                   data                                    
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+                 data                 
+--------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                            data                            
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+                   data                    
+-------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
index 3acc4acd3651d54b65df9e5916fb3f67bdad3ad3..d54e640b409de37f0471d7c37dcad55931e07332 100644 (file)
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-                            data                             
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+          data           
+-------------------------
  COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
index 85c55d641257959fb94c674609a0faeee2bf2116..f1f13d81d569806bf14cd61099cd66085db7830e 100644 (file)
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
     lsn    | xid |                    data                    
 -----------+-----+--------------------------------------------
- 0/1689DC0 | 529 | BEGIN 529
- 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
- 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
  0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
 (4 row)
 
@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
       gid field, which is part of the
       txn parameter, can be used in this callback to
       check if the plugin has already received this PREPARE
-      in which case it can skip the remaining changes of the transaction.
-      This can only happen if the user restarts the decoding after receiving
-      the PREPARE for a transaction but before receiving
-      the COMMIT PREPARED, say because of some error.
+      in which case it can either error out or skip the remaining changes of 
+      the transaction.
       
        typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                                     ReorderBufferTXN *txn);
index 657cb4af1e3bb80e8cfec6b4bc83ef4aff6fce98..5f596135b150f6dfe47a1f6d691f9052200bdcf5 100644 (file)
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    if (two_phase)
    {
        ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+                                   SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
                                    commit_time, origin_id, origin_lsn,
                                    parsed->twophase_gid, true);
    }
@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    {
        ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
                                    abort_time, origin_id, origin_lsn,
+                                   InvalidXLogRecPtr,
                                    parsed->twophase_gid, false);
    }
    else
index baeb45ff43cdd758554c1e3365b8fd4093a4d92e..3f6d723d09637a5f7b115cb4a99504c11297d26e 100644 (file)
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
    ctx->reorder = ReorderBufferAllocate();
    ctx->snapshot_builder =
        AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-                               need_full_snapshot);
+                               need_full_snapshot, slot->data.initial_consistent_point);
 
    ctx->reorder->private_data = ctx;
 
@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
    SpinLockAcquire(&slot->mutex);
    slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+   slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
    SpinLockRelease(&slot->mutex);
 }
 
index c3b963211e8572e1acfd9cc9bc2f83ef0aa6e270..91600ac56671b2644708f58cffea7d45d8c80cf1 100644 (file)
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
                            XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                           XLogRecPtr initial_consistent_point,
                            TimestampTz commit_time, RepOriginId origin_id,
                            XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
    /*
     * It is possible that this transaction is not decoded at prepare time
     * either because by that time we didn't have a consistent snapshot or it
-    * was decoded earlier but we have restarted. We can't distinguish between
-    * those two cases so we send the prepare in both the cases and let
-    * downstream decide whether to process or skip it. We don't need to
-    * decode the xact for aborts if it is not done already.
+    * was decoded earlier but we have restarted. We only need to send the
+    * prepare if it was not decoded earlier. We don't need to decode the xact
+    * for aborts if it is not done already.
     */
-   if (!rbtxn_prepared(txn) && is_commit)
+   if ((txn->final_lsn < initial_consistent_point) && is_commit)
    {
        txn->txn_flags |= RBTXN_PREPARE;
 
index e11788795f1b57811be2188173c69c3e40298f50..ed3acadab7b46d4b585ae7e40989a15a2a6896ab 100644 (file)
@@ -164,6 +164,17 @@ struct SnapBuild
     */
    XLogRecPtr  start_decoding_at;
 
+   /*
+    * LSN at which we found a consistent point at the time of slot creation.
+    * This is also the point where we have exported a snapshot for the
+    * initial copy.
+    *
+    * The prepared transactions that are not covered by initial snapshot
+    * needs to be sent later along with commit prepared and they must be
+    * before this point.
+    */
+   XLogRecPtr  initial_consistent_point;
+
    /*
     * Don't start decoding WAL until the "xl_running_xacts" information
     * indicates there are no running xids with an xid smaller than this.
@@ -269,7 +280,8 @@ SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
                        TransactionId xmin_horizon,
                        XLogRecPtr start_lsn,
-                       bool need_full_snapshot)
+                       bool need_full_snapshot,
+                       XLogRecPtr initial_consistent_point)
 {
    MemoryContext context;
    MemoryContext oldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
    builder->initial_xmin_horizon = xmin_horizon;
    builder->start_decoding_at = start_lsn;
    builder->building_full_snapshot = need_full_snapshot;
+   builder->initial_consistent_point = initial_consistent_point;
 
    MemoryContextSwitchTo(oldcontext);
 
@@ -356,6 +369,15 @@ SnapBuildCurrentState(SnapBuild *builder)
    return builder->state;
 }
 
+/*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildInitialConsistentPoint(SnapBuild *builder)
+{
+   return builder->initial_consistent_point;
+}
+
 /*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
    offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 3
+#define SNAPBUILD_VERSION 4
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
index bab31bf7af71d9f474ae17e0a9688fe6a1a119c5..565a961d6ab242d52fc77563243ab86776fef6d7 100644 (file)
@@ -643,6 +643,7 @@ void        ReorderBufferCommit(ReorderBuffer *, TransactionId,
                                TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
 void       ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
                                        XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                                       XLogRecPtr initial_consistent_point,
                                        TimestampTz commit_time,
                                        RepOriginId origin_id, XLogRecPtr origin_lsn,
                                        char *gid, bool is_commit);
index 38a9a0b3fc4405fc371a18ec131fcfa435a2ab79..5c3fde20c6981b621a742abc48f33c64c2d52075 100644 (file)
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
     */
    XLogRecPtr  confirmed_flush;
 
+   /*
+    * LSN at which we found a consistent point at the time of slot creation.
+    * This is also the point where we have exported a snapshot for the
+    * initial copy.
+    */
+   XLogRecPtr  initial_consistent_point;
+
    /* plugin name */
    NameData    plugin;
 } ReplicationSlotPersistentData;
index d9f187a58ec2d6d660069e4985bbf0ed40109ab6..fbabce6764d31e54b26f15af099f21e002e38340 100644 (file)
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
                                          TransactionId xmin_horizon, XLogRecPtr start_lsn,
-                                         bool need_full_snapshot);
+                                         bool need_full_snapshot,
+                                         XLogRecPtr initial_consistent_point);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
                                            TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
                               TransactionId xid, int nsubxacts,