Track replication origin progress for rollbacks.
authorAmit Kapila
Mon, 8 Mar 2021 02:24:03 +0000 (07:54 +0530)
committerAmit Kapila
Mon, 8 Mar 2021 02:24:03 +0000 (07:54 +0530)
Commit 1eb6d6527a allowed to track replica origin replay progress for 2PC
but it was not complete. It misses to properly track the progress for
rollback prepared especially it missed updating the code for recovery.
Additionally, we need to allow tracking it on subscriber nodes where
wal_level might not be logical.

It is required to track decoding of 2PC which is committed in PG14
(a271a1b50e) and also nobody complained about this till now so not
backpatching it.

Author: Amit Kapila
Reviewed-by: Michael Paquier and Ajin Cherian
Discussion: https://postgr.es/m/CAA4eK1L-kHmMnSdrRW6UhRbCjR7cgh04c+6psY15qzT6ktcd+g@mail.gmail.com

src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c

index 80d2d20d6ccda94b741c0bec19b7f029fca005e2..6023e7c16fb73155fc3edec78b25b7ea02cfcc55 100644 (file)
@@ -2276,6 +2276,14 @@ RecordTransactionAbortPrepared(TransactionId xid,
                               const char *gid)
 {
    XLogRecPtr  recptr;
+   bool        replorigin;
+
+   /*
+    * Are we using the replication origins feature?  Or, in other words, are
+    * we replaying remote actions?
+    */
+   replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+                 replorigin_session_origin != DoNotReplicateId);
 
    /*
     * Catch the scenario where we aborted partway through
@@ -2298,6 +2306,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
                                MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
                                xid, gid);
 
+   if (replorigin)
+       /* Move LSNs forward for this replication origin */
+       replorigin_session_advance(replorigin_session_origin_lsn,
+                                  XactLastRecEnd);
+
    /* Always flush, since we're about to remove the 2PC state file */
    XLogFlush(recptr);
 
index 4e6a3df6b874252261a4ee44f9c5e964f3f1f746..c83aa16f2ce74b293f67ad318dfb78ea47b4fc15 100644 (file)
@@ -5714,10 +5714,12 @@ XactLogAbortRecord(TimestampTz abort_time,
        xl_dbinfo.tsId = MyDatabaseTableSpace;
    }
 
-   /* dump transaction origin information only for abort prepared */
+   /*
+    * Dump transaction origin information only for abort prepared. We need
+    * this during recovery to update the replication origin progress.
+    */
    if ((replorigin_session_origin != InvalidRepOriginId) &&
-       TransactionIdIsValid(twophase_xid) &&
-       XLogLogicalInfoActive())
+       TransactionIdIsValid(twophase_xid))
    {
        xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
@@ -5923,7 +5925,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
  * because subtransaction commit is never WAL logged.
  */
 static void
-xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
+               XLogRecPtr lsn, RepOriginId origin_id)
 {
    TransactionId max_xid;
 
@@ -5972,6 +5975,13 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
            StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
    }
 
+   if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+   {
+       /* recover apply progress */
+       replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+                          false /* backward */ , false /* WAL */ );
+   }
+
    /* Make sure files supposed to be dropped are dropped */
    DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
@@ -6013,7 +6023,8 @@ xact_redo(XLogReaderState *record)
        xl_xact_parsed_abort parsed;
 
        ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
-       xact_redo_abort(&parsed, XLogRecGetXid(record));
+       xact_redo_abort(&parsed, XLogRecGetXid(record),
+                       record->EndRecPtr, XLogRecGetOrigin(record));
    }
    else if (info == XLOG_XACT_ABORT_PREPARED)
    {
@@ -6021,7 +6032,8 @@ xact_redo(XLogReaderState *record)
        xl_xact_parsed_abort parsed;
 
        ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
-       xact_redo_abort(&parsed, parsed.twophase_xid);
+       xact_redo_abort(&parsed, parsed.twophase_xid,
+                       record->EndRecPtr, XLogRecGetOrigin(record));
 
        /* Delete TwoPhaseState gxact entry and/or 2PC file. */
        LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);