Rearrange logrep worker's snapshot handling some more.
authorTom Lane
Thu, 10 Jun 2021 16:27:27 +0000 (12:27 -0400)
committerTom Lane
Thu, 10 Jun 2021 16:27:27 +0000 (12:27 -0400)
It turns out that worker.c's code path for TRUNCATE was also
careless about establishing a snapshot while executing user-defined
code, allowing the checks added by commit 84f5c2908 to fail when
a trigger is fired in that context.

We could just wrap Push/PopActiveSnapshot around the truncate call,
but it seems better to establish a policy of holding a snapshot
throughout execution of a replication step.  To help with that and
possible future requirements, replace the previous ensure_transaction
calls with pairs of begin/end_replication_step calls.

Per report from Mark Dilger.  Back-patch to v11, like the previous
changes.

Discussion: https://postgr.es/m/B4A3AF82-79ED-4F4C-A4E5-CD2622098972@enterprisedb.com

src/backend/replication/logical/worker.c

index 454a5cbfbab6a86512a46211dc5ca5a87cbba189..eb013c46088210dd9ae33245c0d196cf5660bba5 100644 (file)
@@ -169,30 +169,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 }
 
 /*
- * Make sure that we started local transaction.
+ * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
  *
- * Also switches to ApplyMessageContext as necessary.
+ * Start a transaction, if this is the first step (else we keep using the
+ * existing transaction).
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
  */
-static bool
-ensure_transaction(void)
+static void
+begin_replication_step(void)
 {
-   if (IsTransactionState())
-   {
-       SetCurrentStatementStartTimestamp();
-
-       if (CurrentMemoryContext != ApplyMessageContext)
-           MemoryContextSwitchTo(ApplyMessageContext);
+   SetCurrentStatementStartTimestamp();
 
-       return false;
+   if (!IsTransactionState())
+   {
+       StartTransactionCommand();
+       maybe_reread_subscription();
    }
 
-   SetCurrentStatementStartTimestamp();
-   StartTransactionCommand();
-
-   maybe_reread_subscription();
+   PushActiveSnapshot(GetTransactionSnapshot());
 
    MemoryContextSwitchTo(ApplyMessageContext);
-   return true;
+}
+
+/*
+ * Finish up one step of a replication transaction.
+ * Callers of begin_replication_step() must also call this.
+ *
+ * We don't close out the transaction here, but we should increment
+ * the command counter to make the effects of this step visible.
+ */
+static void
+end_replication_step(void)
+{
+   PopActiveSnapshot();
+
+   CommandCounterIncrement();
 }
 
 
@@ -210,13 +221,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
    ResultRelInfo *resultRelInfo;
    RangeTblEntry *rte;
 
-   /*
-    * Input functions may need an active snapshot, as may AFTER triggers
-    * invoked during finish_edata.  For safety, ensure an active snapshot
-    * exists throughout all our usage of the executor.
-    */
-   PushActiveSnapshot(GetTransactionSnapshot());
-
    edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
    edata->targetRel = rel;
 
@@ -277,8 +281,6 @@ finish_edata(ApplyExecutionData *edata)
    ExecResetTupleTable(estate->es_tupleTable, false);
    FreeExecutorState(estate);
    pfree(edata);
-
-   PopActiveSnapshot();
 }
 
 /*
@@ -673,7 +675,7 @@ apply_handle_insert(StringInfo s)
    TupleTableSlot *remoteslot;
    MemoryContext oldctx;
 
-   ensure_transaction();
+   begin_replication_step();
 
    relid = logicalrep_read_insert(s, &newtup);
    rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -684,6 +686,7 @@ apply_handle_insert(StringInfo s)
         * transaction so it's safe to unlock it.
         */
        logicalrep_rel_close(rel, RowExclusiveLock);
+       end_replication_step();
        return;
    }
 
@@ -712,7 +715,7 @@ apply_handle_insert(StringInfo s)
 
    logicalrep_rel_close(rel, NoLock);
 
-   CommandCounterIncrement();
+   end_replication_step();
 }
 
 /* Workhorse for apply_handle_insert() */
@@ -781,7 +784,7 @@ apply_handle_update(StringInfo s)
    RangeTblEntry *target_rte;
    MemoryContext oldctx;
 
-   ensure_transaction();
+   begin_replication_step();
 
    relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
                                   &newtup);
@@ -793,6 +796,7 @@ apply_handle_update(StringInfo s)
         * transaction so it's safe to unlock it.
         */
        logicalrep_rel_close(rel, RowExclusiveLock);
+       end_replication_step();
        return;
    }
 
@@ -849,7 +853,7 @@ apply_handle_update(StringInfo s)
 
    logicalrep_rel_close(rel, NoLock);
 
-   CommandCounterIncrement();
+   end_replication_step();
 }
 
 /* Workhorse for apply_handle_update() */
@@ -925,7 +929,7 @@ apply_handle_delete(StringInfo s)
    TupleTableSlot *remoteslot;
    MemoryContext oldctx;
 
-   ensure_transaction();
+   begin_replication_step();
 
    relid = logicalrep_read_delete(s, &oldtup);
    rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -936,6 +940,7 @@ apply_handle_delete(StringInfo s)
         * transaction so it's safe to unlock it.
         */
        logicalrep_rel_close(rel, RowExclusiveLock);
+       end_replication_step();
        return;
    }
 
@@ -966,7 +971,7 @@ apply_handle_delete(StringInfo s)
 
    logicalrep_rel_close(rel, NoLock);
 
-   CommandCounterIncrement();
+   end_replication_step();
 }
 
 /* Workhorse for apply_handle_delete() */
@@ -1291,7 +1296,7 @@ apply_handle_truncate(StringInfo s)
    ListCell   *lc;
    LOCKMODE    lockmode = AccessExclusiveLock;
 
-   ensure_transaction();
+   begin_replication_step();
 
    remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
 
@@ -1379,7 +1384,7 @@ apply_handle_truncate(StringInfo s)
        table_close(rel, NoLock);
    }
 
-   CommandCounterIncrement();
+   end_replication_step();
 }