const char *gid)
{
XLogRecPtr recptr;
+ bool replorigin;
+
+ /*
+ * Are we using the replication origins feature? Or, in other words, are
+ * we replaying remote actions?
+ */
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
/*
* Catch the scenario where we aborted partway through
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
xid, gid);
+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ XactLastRecEnd);
+
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
xl_dbinfo.tsId = MyDatabaseTableSpace;
}
- /* dump transaction origin information only for abort prepared */
+ /*
+ * Dump transaction origin information only for abort prepared. We need
+ * this during recovery to update the replication origin progress.
+ */
if ((replorigin_session_origin != InvalidRepOriginId) &&
- TransactionIdIsValid(twophase_xid) &&
- XLogLogicalInfoActive())
+ TransactionIdIsValid(twophase_xid))
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
+ XLogRecPtr lsn, RepOriginId origin_id)
{
TransactionId max_xid;
StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+ false /* backward */ , false /* WAL */ );
+ }
+
/* Make sure files supposed to be dropped are dropped */
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
}
xl_xact_parsed_abort parsed;
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
- xact_redo_abort(&parsed, XLogRecGetXid(record));
+ xact_redo_abort(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr, XLogRecGetOrigin(record));
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_parsed_abort parsed;
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
- xact_redo_abort(&parsed, parsed.twophase_xid);
+ xact_redo_abort(&parsed, parsed.twophase_xid,
+ record->EndRecPtr, XLogRecGetOrigin(record));
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);