Restore the portal-level snapshot after procedure COMMIT/ROLLBACK.
authorTom Lane
Fri, 21 May 2021 18:03:53 +0000 (14:03 -0400)
committerTom Lane
Fri, 21 May 2021 18:03:53 +0000 (14:03 -0400)
COMMIT/ROLLBACK necessarily destroys all snapshots within the session.
The original implementation of intra-procedure transactions just
cavalierly did that, ignoring the fact that this left us executing in
a rather different environment than normal.  In particular, it turns
out that handling of toasted datums depends rather critically on there
being an outer ActiveSnapshot: otherwise, when SPI or the core
executor pop whatever snapshot they used and return, it's unsafe to
dereference any toasted datums that may appear in the query result.
It's possible to demonstrate "no known snapshots" and "missing chunk
number N for toast value" errors as a result of this oversight.

Historically this outer snapshot has been held by the Portal code,
and that seems like a good plan to preserve.  So add infrastructure
to pquery.c to allow re-establishing the Portal-owned snapshot if it's
not there anymore, and add enough bookkeeping support that we can tell
whether it is or not.

We can't, however, just re-establish the Portal snapshot as part of
COMMIT/ROLLBACK.  As in normal transaction start, acquiring the first
snapshot should wait until after SET and LOCK commands.  Hence, teach
spi.c about doing this at the right time.  (Note that this patch
doesn't fix the problem for any PLs that try to run intra-procedure
transactions without using SPI to execute SQL commands.)

This makes SPI's no_snapshots parameter rather a misnomer, so in HEAD,
rename that to allow_nonatomic.

replication/logical/worker.c also needs some fixes, because it wasn't
careful to hold a snapshot open around AFTER trigger execution.
That code doesn't use a Portal, which I suspect someday we're gonna
have to fix.  But for now, just rearrange the order of operations.
This includes back-patching the recent addition of finish_estate()
to centralize the cleanup logic there.

This also back-patches commit 2ecfeda3e into v13, to improve the
test coverage for worker.c (it was that test that exposed that
worker.c's snapshot management is wrong).

Per bug #15990 from Andreas Wicht.  Back-patch to v11 where
intra-procedure COMMIT was added.

Discussion: https://postgr.es/m/15990-eee2ac466b11293d@postgresql.org

12 files changed:
src/backend/commands/functioncmds.c
src/backend/executor/spi.c
src/backend/replication/logical/worker.c
src/backend/tcop/pquery.c
src/backend/utils/mmgr/portalmem.c
src/include/executor/spi_priv.h
src/include/tcop/pquery.h
src/include/utils/portal.h
src/pl/plpgsql/src/pl_exec.c
src/test/isolation/expected/plpgsql-toast.out
src/test/isolation/specs/plpgsql-toast.spec
src/test/subscription/t/013_partition.pl

index bb918388842e7542ca3e0fb010e5d04587650b31..4f1f4fb1ef94e63abcec282b4ea84cb82dda04f4 100644 (file)
@@ -62,6 +62,7 @@
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
 #include "pgstat.h"
+#include "tcop/pquery.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -2252,6 +2253,20 @@ ExecuteCallStmt(CallStmt *stmt, ParamListInfo params, bool atomic, DestReceiver
        if (fcinfo->isnull)
            elog(ERROR, "procedure returned null record");
 
+       /*
+        * Ensure there's an active snapshot whilst we execute whatever's
+        * involved here.  Note that this is *not* sufficient to make the
+        * world safe for TOAST pointers to be included in the returned data:
+        * the referenced data could have gone away while we didn't hold a
+        * snapshot.  Hence, it's incumbent on PLs that can do COMMIT/ROLLBACK
+        * to not return TOAST pointers, unless those pointers were fetched
+        * after the last COMMIT/ROLLBACK in the procedure.
+        *
+        * XXX that is a really nasty, hard-to-test requirement.  Is there a
+        * way to remove it?
+        */
+       EnsurePortalSnapshotExists();
+
        td = DatumGetHeapTupleHeader(retval);
        tupType = HeapTupleHeaderGetTypeId(td);
        tupTypmod = HeapTupleHeaderGetTypMod(td);
index b1081688211c7c760c2d4798ed5d1778856c130e..97b1a635273e3cb7c9f54afbd95ae2e7510c6b36 100644 (file)
@@ -251,12 +251,8 @@ _SPI_commit(bool chain)
    /* Start the actual commit */
    _SPI_current->internal_xact = true;
 
-   /*
-    * Before committing, pop all active snapshots to avoid error about
-    * "snapshot %p still active".
-    */
-   while (ActiveSnapshotSet())
-       PopActiveSnapshot();
+   /* Release snapshots associated with portals */
+   ForgetPortalSnapshots();
 
    if (chain)
        SaveTransactionCharacteristics();
@@ -313,6 +309,9 @@ _SPI_rollback(bool chain)
    /* Start the actual rollback */
    _SPI_current->internal_xact = true;
 
+   /* Release snapshots associated with portals */
+   ForgetPortalSnapshots();
+
    if (chain)
        SaveTransactionCharacteristics();
 
@@ -2100,6 +2099,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
    uint64      my_processed = 0;
    SPITupleTable *my_tuptable = NULL;
    int         res = 0;
+   bool        allow_nonatomic = plan->no_snapshots;   /* legacy API name */
    bool        pushed_active_snap = false;
    ErrorContextCallback spierrcontext;
    CachedPlan *cplan = NULL;
@@ -2132,11 +2132,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
     * In the first two cases, we can just push the snap onto the stack once
     * for the whole plan list.
     *
-    * But if the plan has no_snapshots set to true, then don't manage
-    * snapshots at all.  The caller should then take care of that.
+    * Note that snapshot != InvalidSnapshot implies an atomic execution
+    * context.
     */
-   if (snapshot != InvalidSnapshot && !plan->no_snapshots)
+   if (snapshot != InvalidSnapshot)
    {
+       Assert(!allow_nonatomic);
        if (read_only)
        {
            PushActiveSnapshot(snapshot);
@@ -2211,15 +2212,39 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        stmt_list = cplan->stmt_list;
 
        /*
-        * In the default non-read-only case, get a new snapshot, replacing
-        * any that we pushed in a previous cycle.
+        * If we weren't given a specific snapshot to use, and the statement
+        * list requires a snapshot, set that up.
         */
-       if (snapshot == InvalidSnapshot && !read_only && !plan->no_snapshots)
+       if (snapshot == InvalidSnapshot &&
+           (list_length(stmt_list) > 1 ||
+            (list_length(stmt_list) == 1 &&
+             PlannedStmtRequiresSnapshot(linitial_node(PlannedStmt,
+                                                       stmt_list)))))
        {
-           if (pushed_active_snap)
-               PopActiveSnapshot();
-           PushActiveSnapshot(GetTransactionSnapshot());
-           pushed_active_snap = true;
+           /*
+            * First, ensure there's a Portal-level snapshot.  This back-fills
+            * the snapshot stack in case the previous operation was a COMMIT
+            * or ROLLBACK inside a procedure or DO block.  (We can't put back
+            * the Portal snapshot any sooner, or we'd break cases like doing
+            * SET or LOCK just after COMMIT.)  It's enough to check once per
+            * statement list, since COMMIT/ROLLBACK/CALL/DO can't appear
+            * within a multi-statement list.
+            */
+           EnsurePortalSnapshotExists();
+
+           /*
+            * In the default non-read-only case, get a new per-statement-list
+            * snapshot, replacing any that we pushed in a previous cycle.
+            * Skip it when doing non-atomic execution, though (we rely
+            * entirely on the Portal snapshot in that case).
+            */
+           if (!read_only && !allow_nonatomic)
+           {
+               if (pushed_active_snap)
+                   PopActiveSnapshot();
+               PushActiveSnapshot(GetTransactionSnapshot());
+               pushed_active_snap = true;
+           }
        }
 
        foreach(lc2, stmt_list)
@@ -2231,6 +2256,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
            _SPI_current->processed = 0;
            _SPI_current->tuptable = NULL;
 
+           /* Check for unsupported cases. */
            if (stmt->utilityStmt)
            {
                if (IsA(stmt->utilityStmt, CopyStmt))
@@ -2259,9 +2285,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 
            /*
             * If not read-only mode, advance the command counter before each
-            * command and update the snapshot.
+            * command and update the snapshot.  (But skip it if the snapshot
+            * isn't under our control.)
             */
-           if (!read_only && !plan->no_snapshots)
+           if (!read_only && pushed_active_snap)
            {
                CommandCounterIncrement();
                UpdateActiveSnapshotCommandId();
@@ -2295,13 +2322,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                QueryCompletion qc;
 
                /*
-                * If the SPI context is atomic, or we are asked to manage
-                * snapshots, then we are in an atomic execution context.
-                * Conversely, to propagate a nonatomic execution context, the
-                * caller must be in a nonatomic SPI context and manage
-                * snapshots itself.
+                * If the SPI context is atomic, or we were not told to allow
+                * nonatomic operations, tell ProcessUtility this is an atomic
+                * execution context.
                 */
-               if (_SPI_current->atomic || !plan->no_snapshots)
+               if (_SPI_current->atomic || !allow_nonatomic)
                    context = PROCESS_UTILITY_QUERY;
                else
                    context = PROCESS_UTILITY_QUERY_NONATOMIC;
index e25ad67223553f59d6665aa606370e3c10a265b1..caaa59c7bc78be8cc2d8dff4b8b4d7187d5e7a44 100644 (file)
@@ -199,6 +199,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
    ResultRelInfo *resultRelInfo;
    RangeTblEntry *rte;
 
+   /*
+    * Input functions may need an active snapshot, as may AFTER triggers
+    * invoked during finish_estate.  For safety, ensure an active snapshot
+    * exists throughout all our usage of the executor.
+    */
+   PushActiveSnapshot(GetTransactionSnapshot());
+
    estate = CreateExecutorState();
 
    rte = makeNode(RangeTblEntry);
@@ -223,6 +230,22 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
    return estate;
 }
 
+/*
+ * Finish any operations related to the executor state created by
+ * create_estate_for_relation().
+ */
+static void
+finish_estate(EState *estate)
+{
+   /* Handle any queued AFTER triggers. */
+   AfterTriggerEndQuery(estate);
+
+   /* Cleanup. */
+   ExecResetTupleTable(estate->es_tupleTable, false);
+   FreeExecutorState(estate);
+   PopActiveSnapshot();
+}
+
 /*
  * Executes default values for columns for which we can't map to remote
  * relation columns.
@@ -634,9 +657,6 @@ apply_handle_insert(StringInfo s)
                                        RelationGetDescr(rel->localrel),
                                        &TTSOpsVirtual);
 
-   /* Input functions may need an active snapshot, so get one */
-   PushActiveSnapshot(GetTransactionSnapshot());
-
    /* Process and store remote tuple in the slot */
    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    slot_store_cstrings(remoteslot, rel, newtup.values);
@@ -651,13 +671,7 @@ apply_handle_insert(StringInfo s)
        apply_handle_insert_internal(estate->es_result_relation_info, estate,
                                     remoteslot);
 
-   PopActiveSnapshot();
-
-   /* Handle queued AFTER triggers. */
-   AfterTriggerEndQuery(estate);
-
-   ExecResetTupleTable(estate->es_tupleTable, false);
-   FreeExecutorState(estate);
+   finish_estate(estate);
 
    logicalrep_rel_close(rel, NoLock);
 
@@ -778,8 +792,6 @@ apply_handle_update(StringInfo s)
    /* Also populate extraUpdatedCols, in case we have generated columns */
    fill_extraUpdatedCols(target_rte, rel->localrel);
 
-   PushActiveSnapshot(GetTransactionSnapshot());
-
    /* Build the search tuple. */
    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    slot_store_cstrings(remoteslot, rel,
@@ -794,13 +806,7 @@ apply_handle_update(StringInfo s)
        apply_handle_update_internal(estate->es_result_relation_info, estate,
                                     remoteslot, &newtup, rel);
 
-   PopActiveSnapshot();
-
-   /* Handle queued AFTER triggers. */
-   AfterTriggerEndQuery(estate);
-
-   ExecResetTupleTable(estate->es_tupleTable, false);
-   FreeExecutorState(estate);
+   finish_estate(estate);
 
    logicalrep_rel_close(rel, NoLock);
 
@@ -902,8 +908,6 @@ apply_handle_delete(StringInfo s)
                                        RelationGetDescr(rel->localrel),
                                        &TTSOpsVirtual);
 
-   PushActiveSnapshot(GetTransactionSnapshot());
-
    /* Build the search tuple. */
    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    slot_store_cstrings(remoteslot, rel, oldtup.values);
@@ -917,13 +921,7 @@ apply_handle_delete(StringInfo s)
        apply_handle_delete_internal(estate->es_result_relation_info, estate,
                                     remoteslot, &rel->remoterel);
 
-   PopActiveSnapshot();
-
-   /* Handle queued AFTER triggers. */
-   AfterTriggerEndQuery(estate);
-
-   ExecResetTupleTable(estate->es_tupleTable, false);
-   FreeExecutorState(estate);
+   finish_estate(estate);
 
    logicalrep_rel_close(rel, NoLock);
 
@@ -1248,7 +1246,7 @@ apply_handle_truncate(StringInfo s)
    List       *relids = NIL;
    List       *relids_logged = NIL;
    ListCell   *lc;
-   LOCKMODE    lockmode = AccessExclusiveLock;
+   LOCKMODE    lockmode = AccessExclusiveLock;
 
    ensure_transaction();
 
index fa80e0e635b9cee5406284a7f7661d1e172221d0..a88a054fb4d0f7468b00c051d5eacd0fc556c413 100644 (file)
@@ -476,6 +476,13 @@ PortalStart(Portal portal, ParamListInfo params,
                else
                    PushActiveSnapshot(GetTransactionSnapshot());
 
+               /*
+                * We could remember the snapshot in portal->portalSnapshot,
+                * but presently there seems no need to, as this code path
+                * cannot be used for non-atomic execution.  Hence there can't
+                * be any commit/abort that might destroy the snapshot.
+                */
+
                /*
                 * Create QueryDesc in portal's context; for the moment, set
                 * the destination to DestNone.
@@ -1114,45 +1121,26 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
                 bool isTopLevel, bool setHoldSnapshot,
                 DestReceiver *dest, QueryCompletion *qc)
 {
-   Node       *utilityStmt = pstmt->utilityStmt;
-   Snapshot    snapshot;
-
    /*
-    * Set snapshot if utility stmt needs one.  Most reliable way to do this
-    * seems to be to enumerate those that do not need one; this is a short
-    * list.  Transaction control, LOCK, and SET must *not* set a snapshot
-    * since they need to be executable at the start of a transaction-snapshot
-    * mode transaction without freezing a snapshot.  By extension we allow
-    * SHOW not to set a snapshot.  The other stmts listed are just efficiency
-    * hacks.  Beware of listing anything that can modify the database --- if,
-    * say, it has to update an index with expressions that invoke
-    * user-defined functions, then it had better have a snapshot.
+    * Set snapshot if utility stmt needs one.
     */
-   if (!(IsA(utilityStmt, TransactionStmt) ||
-         IsA(utilityStmt, LockStmt) ||
-         IsA(utilityStmt, VariableSetStmt) ||
-         IsA(utilityStmt, VariableShowStmt) ||
-         IsA(utilityStmt, ConstraintsSetStmt) ||
-   /* efficiency hacks from here down */
-         IsA(utilityStmt, FetchStmt) ||
-         IsA(utilityStmt, ListenStmt) ||
-         IsA(utilityStmt, NotifyStmt) ||
-         IsA(utilityStmt, UnlistenStmt) ||
-         IsA(utilityStmt, CheckPointStmt)))
+   if (PlannedStmtRequiresSnapshot(pstmt))
    {
-       snapshot = GetTransactionSnapshot();
+       Snapshot    snapshot = GetTransactionSnapshot();
+
        /* If told to, register the snapshot we're using and save in portal */
        if (setHoldSnapshot)
        {
            snapshot = RegisterSnapshot(snapshot);
            portal->holdSnapshot = snapshot;
        }
+       /* In any case, make the snapshot active and remember it in portal */
        PushActiveSnapshot(snapshot);
        /* PushActiveSnapshot might have copied the snapshot */
-       snapshot = GetActiveSnapshot();
+       portal->portalSnapshot = GetActiveSnapshot();
    }
    else
-       snapshot = NULL;
+       portal->portalSnapshot = NULL;
 
    ProcessUtility(pstmt,
                   portal->sourceText,
@@ -1166,13 +1154,17 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
    MemoryContextSwitchTo(portal->portalContext);
 
    /*
-    * Some utility commands may pop the ActiveSnapshot stack from under us,
-    * so be careful to only pop the stack if our snapshot is still at the
-    * top.
+    * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
+    * under us, so don't complain if it's now empty.  Otherwise, our snapshot
+    * should be the top one; pop it.  Note that this could be a different
+    * snapshot from the one we made above; see EnsurePortalSnapshotExists.
     */
-   if (snapshot != NULL && ActiveSnapshotSet() &&
-       snapshot == GetActiveSnapshot())
+   if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
+   {
+       Assert(portal->portalSnapshot == GetActiveSnapshot());
        PopActiveSnapshot();
+   }
+   portal->portalSnapshot = NULL;
 }
 
 /*
@@ -1254,6 +1246,12 @@ PortalRunMulti(Portal portal,
                 * from what holdSnapshot has.)
                 */
                PushCopiedSnapshot(snapshot);
+
+               /*
+                * As for PORTAL_ONE_SELECT portals, it does not seem
+                * necessary to maintain portal->portalSnapshot here.
+                */
+
                active_snapshot_set = true;
            }
            else
@@ -1690,3 +1688,78 @@ DoPortalRewind(Portal portal)
    portal->atEnd = false;
    portal->portalPos = 0;
 }
+
+/*
+ * PlannedStmtRequiresSnapshot - what it says on the tin
+ */
+bool
+PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
+{
+   Node       *utilityStmt = pstmt->utilityStmt;
+
+   /* If it's not a utility statement, it definitely needs a snapshot */
+   if (utilityStmt == NULL)
+       return true;
+
+   /*
+    * Most utility statements need a snapshot, and the default presumption
+    * about new ones should be that they do too.  Hence, enumerate those that
+    * do not need one.
+    *
+    * Transaction control, LOCK, and SET must *not* set a snapshot, since
+    * they need to be executable at the start of a transaction-snapshot-mode
+    * transaction without freezing a snapshot.  By extension we allow SHOW
+    * not to set a snapshot.  The other stmts listed are just efficiency
+    * hacks.  Beware of listing anything that can modify the database --- if,
+    * say, it has to update an index with expressions that invoke
+    * user-defined functions, then it had better have a snapshot.
+    */
+   if (IsA(utilityStmt, TransactionStmt) ||
+       IsA(utilityStmt, LockStmt) ||
+       IsA(utilityStmt, VariableSetStmt) ||
+       IsA(utilityStmt, VariableShowStmt) ||
+       IsA(utilityStmt, ConstraintsSetStmt) ||
+   /* efficiency hacks from here down */
+       IsA(utilityStmt, FetchStmt) ||
+       IsA(utilityStmt, ListenStmt) ||
+       IsA(utilityStmt, NotifyStmt) ||
+       IsA(utilityStmt, UnlistenStmt) ||
+       IsA(utilityStmt, CheckPointStmt))
+       return false;
+
+   return true;
+}
+
+/*
+ * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
+ *
+ * Generally, we will have an active snapshot whenever we are executing
+ * inside a Portal, unless the Portal's query is one of the utility
+ * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
+ * However, procedures and DO blocks can commit or abort the transaction,
+ * and thereby destroy all snapshots.  This function can be called to
+ * re-establish the Portal-level snapshot when none exists.
+ */
+void
+EnsurePortalSnapshotExists(void)
+{
+   Portal      portal;
+
+   /*
+    * Nothing to do if a snapshot is set.  (We take it on faith that the
+    * outermost active snapshot belongs to some Portal; or if there is no
+    * Portal, it's somebody else's responsibility to manage things.)
+    */
+   if (ActiveSnapshotSet())
+       return;
+
+   /* Otherwise, we'd better have an active Portal */
+   portal = ActivePortal;
+   Assert(portal != NULL);
+   Assert(portal->portalSnapshot == NULL);
+
+   /* Create a new snapshot and make it active */
+   PushActiveSnapshot(GetTransactionSnapshot());
+   /* PushActiveSnapshot might have copied the snapshot */
+   portal->portalSnapshot = GetActiveSnapshot();
+}
index 7072ce48a3ec88dcdee176f78160b96fb4c126de..a34abbd80c45296c6c3fc48b6294bb9a5157e122 100644 (file)
@@ -502,6 +502,9 @@ PortalDrop(Portal portal, bool isTopCommit)
        portal->cleanup = NULL;
    }
 
+   /* There shouldn't be an active snapshot anymore, except after error */
+   Assert(portal->portalSnapshot == NULL || !isTopCommit);
+
    /*
     * Remove portal from hash table.  Because we do this here, we will not
     * come back to try to remove the portal again if there's any error in the
@@ -709,6 +712,8 @@ PreCommit_Portals(bool isPrepare)
                portal->holdSnapshot = NULL;
            }
            portal->resowner = NULL;
+           /* Clear portalSnapshot too, for cleanliness */
+           portal->portalSnapshot = NULL;
            continue;
        }
 
@@ -1278,3 +1283,54 @@ HoldPinnedPortals(void)
        }
    }
 }
+
+/*
+ * Drop the outer active snapshots for all portals, so that no snapshots
+ * remain active.
+ *
+ * Like HoldPinnedPortals, this must be called when initiating a COMMIT or
+ * ROLLBACK inside a procedure.  This has to be separate from that since it
+ * should not be run until we're done with steps that are likely to fail.
+ *
+ * It's tempting to fold this into PreCommit_Portals, but to do so, we'd
+ * need to clean up snapshot management in VACUUM and perhaps other places.
+ */
+void
+ForgetPortalSnapshots(void)
+{
+   HASH_SEQ_STATUS status;
+   PortalHashEnt *hentry;
+   int         numPortalSnaps = 0;
+   int         numActiveSnaps = 0;
+
+   /* First, scan PortalHashTable and clear portalSnapshot fields */
+   hash_seq_init(&status, PortalHashTable);
+
+   while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
+   {
+       Portal      portal = hentry->portal;
+
+       if (portal->portalSnapshot != NULL)
+       {
+           portal->portalSnapshot = NULL;
+           numPortalSnaps++;
+       }
+       /* portal->holdSnapshot will be cleaned up in PreCommit_Portals */
+   }
+
+   /*
+    * Now, pop all the active snapshots, which should be just those that were
+    * portal snapshots.  Ideally we'd drive this directly off the portal
+    * scan, but there's no good way to visit the portals in the correct
+    * order.  So just cross-check after the fact.
+    */
+   while (ActiveSnapshotSet())
+   {
+       PopActiveSnapshot();
+       numActiveSnaps++;
+   }
+
+   if (numPortalSnaps != numActiveSnaps)
+       elog(ERROR, "portal snapshots (%d) did not account for all active snapshots (%d)",
+            numPortalSnaps, numActiveSnaps);
+}
index 6220928bd3401dd8c9a5b393633d4b0b3f0678ab..5e6575d2eb72852dbb34cebb367741e13b52ea6f 100644 (file)
@@ -92,7 +92,7 @@ typedef struct _SPI_plan
    int         magic;          /* should equal _SPI_PLAN_MAGIC */
    bool        saved;          /* saved or unsaved plan? */
    bool        oneshot;        /* one-shot plan? */
-   bool        no_snapshots;   /* let the caller handle the snapshots */
+   bool        no_snapshots;   /* allow nonatomic CALL/DO execution */
    List       *plancache_list; /* one CachedPlanSource per parsetree */
    MemoryContext plancxt;      /* Context containing _SPI_plan and data */
    int         cursor_options; /* Cursor options used for planning */
index 437642cc72c6828569013c60fd55f64c7153151c..1385a007ab1ea0fde06cac42212123b10970129d 100644 (file)
@@ -17,6 +17,8 @@
 #include "nodes/parsenodes.h"
 #include "utils/portal.h"
 
+struct PlannedStmt;                /* avoid including plannodes.h here */
+
 
 extern PGDLLIMPORT Portal ActivePortal;
 
@@ -42,4 +44,8 @@ extern uint64 PortalRunFetch(Portal portal,
                             long count,
                             DestReceiver *dest);
 
+extern bool PlannedStmtRequiresSnapshot(struct PlannedStmt *pstmt);
+
+extern void EnsurePortalSnapshotExists(void);
+
 #endif                         /* PQUERY_H */
index d41ff2efdad9e021029acd845f149d311146928e..cb40bfa761aeadf133e07291d60a1a62e695e4f4 100644 (file)
@@ -194,6 +194,14 @@ typedef struct PortalData
    /* Presentation data, primarily used by the pg_cursors system view */
    TimestampTz creation_time;  /* time at which this portal was defined */
    bool        visible;        /* include this portal in pg_cursors? */
+
+   /*
+    * Outermost ActiveSnapshot for execution of the portal's queries.  For
+    * all but a few utility commands, we require such a snapshot to exist.
+    * This ensures that TOAST references in query results can be detoasted,
+    * and helps to reduce thrashing of the process's exposed xmin.
+    */
+   Snapshot    portalSnapshot; /* active snapshot, or NULL if none */
 }          PortalData;
 
 /*
@@ -237,5 +245,6 @@ extern void PortalCreateHoldStore(Portal portal);
 extern void PortalHashTableDeleteAll(void);
 extern bool ThereAreNoReadyPortals(void);
 extern void HoldPinnedPortals(void);
+extern void ForgetPortalSnapshots(void);
 
 #endif                         /* PORTAL_H */
index abb18268cb1e3bd7c6b1169c442bf2b1204ae1dd..b5d20c09be96917c21addea7e9ffe94cd31c858f 100644 (file)
@@ -2146,7 +2146,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
    PLpgSQL_variable *volatile cur_target = stmt->target;
    volatile LocalTransactionId before_lxid;
    LocalTransactionId after_lxid;
-   volatile bool pushed_active_snap = false;
    volatile int rc;
 
    /*
@@ -2184,9 +2183,8 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
            Assert(!expr->expr_simple_expr);
 
            /*
-            * The procedure call could end transactions, which would upset
-            * the snapshot management in SPI_execute*, so don't let it do it.
-            * Instead, we set the snapshots ourselves below.
+            * Tell SPI to allow non-atomic execution.  (The field name is a
+            * legacy choice.)
             */
            plan->no_snapshots = true;
 
@@ -2328,16 +2326,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 
        before_lxid = MyProc->lxid;
 
-       /*
-        * Set snapshot only for non-read-only procedures, similar to SPI
-        * behavior.
-        */
-       if (!estate->readonly_func)
-       {
-           PushActiveSnapshot(GetTransactionSnapshot());
-           pushed_active_snap = true;
-       }
-
        rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI,
                                             estate->readonly_func, 0);
    }
@@ -2372,17 +2360,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 
    after_lxid = MyProc->lxid;
 
-   if (before_lxid == after_lxid)
-   {
-       /*
-        * If we are still in the same transaction after the call, pop the
-        * snapshot that we might have pushed.  (If it's a new transaction,
-        * then all the snapshots are gone already.)
-        */
-       if (pushed_active_snap)
-           PopActiveSnapshot();
-   }
-   else
+   if (before_lxid != after_lxid)
    {
        /*
         * If we are in a new transaction after the call, we need to build new
@@ -4946,6 +4924,7 @@ exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt)
  *
  * We just parse and execute the statement normally, but we have to do it
  * without setting a snapshot, for things like SET TRANSACTION.
+ * XXX spi.c now handles this correctly, so we no longer need a special case.
  */
 static int
 exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
@@ -4954,10 +4933,7 @@ exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
    int         rc;
 
    if (expr->plan == NULL)
-   {
        exec_prepare_plan(estate, expr, 0, true);
-       expr->plan->no_snapshots = true;
-   }
 
    rc = SPI_execute_plan(expr->plan, NULL, NULL, estate->readonly_func, 0);
 
index 4f216b94b62281d921fe30e3b44a10d647ac5bba..213bddad4fde12fd94868906df93681b2ef0c599 100644 (file)
@@ -235,3 +235,30 @@ s1: NOTICE:  length(r) = 6002
 s1: NOTICE:  length(r) = 9002
 s1: NOTICE:  length(r) = 12002
 step assign6: <... completed>
+
+starting permutation: fetch-after-commit
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+s1: NOTICE:  length(t) = 6000
+s1: NOTICE:  length(t) = 9000
+s1: NOTICE:  length(t) = 12000
+step fetch-after-commit: 
+do $$
+  declare
+    r record;
+    t text;
+  begin
+    insert into test1 values (2, repeat('bar', 3000));
+    insert into test1 values (3, repeat('baz', 4000));
+    for r in select test1.a from test1 loop
+      commit;
+      select b into t from test1 where a = r.a;
+      raise notice 'length(t) = %', length(t);
+    end loop;
+  end;
+$$;
+
index d360f8fccbf9fd113ee64c64941c87d51d1a8cae..fb40588d4f01e6480c42b902d18e0f128b1ab523 100644 (file)
@@ -131,6 +131,26 @@ do $$
 $$;
 }
 
+# Check that the results of a query can be detoasted just after committing
+# (there's no interaction with VACUUM here)
+step "fetch-after-commit"
+{
+do $$
+  declare
+    r record;
+    t text;
+  begin
+    insert into test1 values (2, repeat('bar', 3000));
+    insert into test1 values (3, repeat('baz', 4000));
+    for r in select test1.a from test1 loop
+      commit;
+      select b into t from test1 where a = r.a;
+      raise notice 'length(t) = %', length(t);
+    end loop;
+  end;
+$$;
+}
+
 session "s2"
 setup
 {
@@ -155,3 +175,4 @@ permutation "lock" "assign3" "vacuum" "unlock"
 permutation "lock" "assign4" "vacuum" "unlock"
 permutation "lock" "assign5" "vacuum" "unlock"
 permutation "lock" "assign6" "vacuum" "unlock"
+permutation "fetch-after-commit"
index a04c03a7e249ae45ba8592f56bd55dc244644208..4b7d637c70d9242d141839130ffd5d013973f5d3 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 51;
+use Test::More tests => 54;
 
 # setup
 
@@ -67,6 +67,40 @@ $node_subscriber1->safe_psql('postgres',
    "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
 );
 
+# Add set of AFTER replica triggers for testing that they are fired
+# correctly.  This uses a table that records details of all trigger
+# activities.  Triggers are marked as enabled for a subset of the
+# partition tree.
+$node_subscriber1->safe_psql(
+   'postgres', qq{
+CREATE TABLE sub1_trigger_activity (tgtab text, tgop text,
+  tgwhen text, tglevel text, olda int, newa int);
+CREATE FUNCTION sub1_trigger_activity_func() RETURNS TRIGGER AS \$\$
+BEGIN
+  IF (TG_OP = 'INSERT') THEN
+    INSERT INTO public.sub1_trigger_activity
+      SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a;
+  ELSIF (TG_OP = 'UPDATE') THEN
+    INSERT INTO public.sub1_trigger_activity
+      SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a;
+  END IF;
+  RETURN NULL;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER sub1_tab1_log_op_trigger
+  AFTER INSERT OR UPDATE ON tab1
+  FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub1_tab1_log_op_trigger;
+CREATE TRIGGER sub1_tab1_2_log_op_trigger
+  AFTER INSERT OR UPDATE ON tab1_2
+  FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub1_tab1_2_log_op_trigger;
+CREATE TRIGGER sub1_tab1_2_2_log_op_trigger
+  AFTER INSERT OR UPDATE ON tab1_2_2
+  FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1_2_2 ENABLE REPLICA TRIGGER sub1_tab1_2_2_log_op_trigger;
+});
+
 # subscriber 2
 #
 # This does not use partitioning.  The tables match the leaf tables on
@@ -87,6 +121,34 @@ $node_subscriber2->safe_psql('postgres',
    "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"
 );
 
+# Add set of AFTER replica triggers for testing that they are fired
+# correctly, using the same method as the first subscriber.
+$node_subscriber2->safe_psql(
+   'postgres', qq{
+CREATE TABLE sub2_trigger_activity (tgtab text,
+  tgop text, tgwhen text, tglevel text, olda int, newa int);
+CREATE FUNCTION sub2_trigger_activity_func() RETURNS TRIGGER AS \$\$
+BEGIN
+  IF (TG_OP = 'INSERT') THEN
+    INSERT INTO public.sub2_trigger_activity
+      SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a;
+  ELSIF (TG_OP = 'UPDATE') THEN
+    INSERT INTO public.sub2_trigger_activity
+      SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a;
+  END IF;
+  RETURN NULL;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER sub2_tab1_log_op_trigger
+  AFTER INSERT OR UPDATE ON tab1
+  FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func();
+ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub2_tab1_log_op_trigger;
+CREATE TRIGGER sub2_tab1_2_log_op_trigger
+  AFTER INSERT OR UPDATE ON tab1_2
+  FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func();
+ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
+});
+
 # Wait for initial sync of all subscriptions
 my $synced_query =
   "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
@@ -130,6 +192,14 @@ $result = $node_subscriber2->safe_psql('postgres',
    "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
 is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated');
 
+# The AFTER trigger of tab1_2 should have recorded one INSERT.
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result,
+   qq(tab1_2|INSERT|AFTER|ROW||5),
+   'check replica insert after trigger applied on subscriber');
+
 $result = $node_subscriber2->safe_psql('postgres',
    "SELECT c, a FROM tab1_def ORDER BY 1, 2");
 is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated');
@@ -161,6 +231,15 @@ $result = $node_subscriber1->safe_psql('postgres',
    "SELECT a FROM tab1_2_2 ORDER BY 1");
 is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly');
 
+# The AFTER trigger should have recorded the UPDATEs of tab1_2_2.
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT * FROM sub1_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result, qq(tab1_2_2|INSERT|AFTER|ROW||6
+tab1_2_2|UPDATE|AFTER|ROW|4|6
+tab1_2_2|UPDATE|AFTER|ROW|6|4),
+   'check replica update after trigger applied on subscriber');
+
 $result = $node_subscriber2->safe_psql('postgres',
    "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
 is( $result, qq(sub2_tab1_1|2
@@ -170,6 +249,16 @@ $result = $node_subscriber2->safe_psql('postgres',
    "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
 is($result, qq(sub2_tab1_2|6), 'tab1_2 updated');
 
+# The AFTER trigger should have recorded the updates of tab1_2.
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result, qq(tab1_2|INSERT|AFTER|ROW||5
+tab1_2|UPDATE|AFTER|ROW|4|6
+tab1_2|UPDATE|AFTER|ROW|5|6
+tab1_2|UPDATE|AFTER|ROW|6|4),
+   'check replica update after trigger applied on subscriber');
+
 $result = $node_subscriber2->safe_psql('postgres',
    "SELECT c, a FROM tab1_def ORDER BY 1");
 is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged');