#include "parser/parse_func.h"
#include "parser/parse_type.h"
#include "pgstat.h"
+#include "tcop/pquery.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
if (fcinfo->isnull)
elog(ERROR, "procedure returned null record");
+ /*
+ * Ensure there's an active snapshot whilst we execute whatever's
+ * involved here. Note that this is *not* sufficient to make the
+ * world safe for TOAST pointers to be included in the returned data:
+ * the referenced data could have gone away while we didn't hold a
+ * snapshot. Hence, it's incumbent on PLs that can do COMMIT/ROLLBACK
+ * to not return TOAST pointers, unless those pointers were fetched
+ * after the last COMMIT/ROLLBACK in the procedure.
+ *
+ * XXX that is a really nasty, hard-to-test requirement. Is there a
+ * way to remove it?
+ */
+ EnsurePortalSnapshotExists();
+
td = DatumGetHeapTupleHeader(retval);
tupType = HeapTupleHeaderGetTypeId(td);
tupTypmod = HeapTupleHeaderGetTypMod(td);
/* Start the actual commit */
_SPI_current->internal_xact = true;
- /*
- * Before committing, pop all active snapshots to avoid error about
- * "snapshot %p still active".
- */
- while (ActiveSnapshotSet())
- PopActiveSnapshot();
+ /* Release snapshots associated with portals */
+ ForgetPortalSnapshots();
if (chain)
SaveTransactionCharacteristics();
/* Start the actual rollback */
_SPI_current->internal_xact = true;
+ /* Release snapshots associated with portals */
+ ForgetPortalSnapshots();
+
if (chain)
SaveTransactionCharacteristics();
uint64 my_processed = 0;
SPITupleTable *my_tuptable = NULL;
int res = 0;
+ bool allow_nonatomic = plan->no_snapshots; /* legacy API name */
bool pushed_active_snap = false;
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
* In the first two cases, we can just push the snap onto the stack once
* for the whole plan list.
*
- * But if the plan has no_snapshots set to true, then don't manage
- * snapshots at all. The caller should then take care of that.
+ * Note that snapshot != InvalidSnapshot implies an atomic execution
+ * context.
*/
- if (snapshot != InvalidSnapshot && !plan->no_snapshots)
+ if (snapshot != InvalidSnapshot)
{
+ Assert(!allow_nonatomic);
if (read_only)
{
PushActiveSnapshot(snapshot);
stmt_list = cplan->stmt_list;
/*
- * In the default non-read-only case, get a new snapshot, replacing
- * any that we pushed in a previous cycle.
+ * If we weren't given a specific snapshot to use, and the statement
+ * list requires a snapshot, set that up.
*/
- if (snapshot == InvalidSnapshot && !read_only && !plan->no_snapshots)
+ if (snapshot == InvalidSnapshot &&
+ (list_length(stmt_list) > 1 ||
+ (list_length(stmt_list) == 1 &&
+ PlannedStmtRequiresSnapshot(linitial_node(PlannedStmt,
+ stmt_list)))))
{
- if (pushed_active_snap)
- PopActiveSnapshot();
- PushActiveSnapshot(GetTransactionSnapshot());
- pushed_active_snap = true;
+ /*
+ * First, ensure there's a Portal-level snapshot. This back-fills
+ * the snapshot stack in case the previous operation was a COMMIT
+ * or ROLLBACK inside a procedure or DO block. (We can't put back
+ * the Portal snapshot any sooner, or we'd break cases like doing
+ * SET or LOCK just after COMMIT.) It's enough to check once per
+ * statement list, since COMMIT/ROLLBACK/CALL/DO can't appear
+ * within a multi-statement list.
+ */
+ EnsurePortalSnapshotExists();
+
+ /*
+ * In the default non-read-only case, get a new per-statement-list
+ * snapshot, replacing any that we pushed in a previous cycle.
+ * Skip it when doing non-atomic execution, though (we rely
+ * entirely on the Portal snapshot in that case).
+ */
+ if (!read_only && !allow_nonatomic)
+ {
+ if (pushed_active_snap)
+ PopActiveSnapshot();
+ PushActiveSnapshot(GetTransactionSnapshot());
+ pushed_active_snap = true;
+ }
}
foreach(lc2, stmt_list)
_SPI_current->processed = 0;
_SPI_current->tuptable = NULL;
+ /* Check for unsupported cases. */
if (stmt->utilityStmt)
{
if (IsA(stmt->utilityStmt, CopyStmt))
/*
* If not read-only mode, advance the command counter before each
- * command and update the snapshot.
+ * command and update the snapshot. (But skip it if the snapshot
+ * isn't under our control.)
*/
- if (!read_only && !plan->no_snapshots)
+ if (!read_only && pushed_active_snap)
{
CommandCounterIncrement();
UpdateActiveSnapshotCommandId();
QueryCompletion qc;
/*
- * If the SPI context is atomic, or we are asked to manage
- * snapshots, then we are in an atomic execution context.
- * Conversely, to propagate a nonatomic execution context, the
- * caller must be in a nonatomic SPI context and manage
- * snapshots itself.
+ * If the SPI context is atomic, or we were not told to allow
+ * nonatomic operations, tell ProcessUtility this is an atomic
+ * execution context.
*/
- if (_SPI_current->atomic || !plan->no_snapshots)
+ if (_SPI_current->atomic || !allow_nonatomic)
context = PROCESS_UTILITY_QUERY;
else
context = PROCESS_UTILITY_QUERY_NONATOMIC;
ResultRelInfo *resultRelInfo;
RangeTblEntry *rte;
+ /*
+ * Input functions may need an active snapshot, as may AFTER triggers
+ * invoked during finish_estate. For safety, ensure an active snapshot
+ * exists throughout all our usage of the executor.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
return estate;
}
+/*
+ * Finish any operations related to the executor state created by
+ * create_estate_for_relation().
+ */
+static void
+finish_estate(EState *estate)
+{
+ /* Handle any queued AFTER triggers. */
+ AfterTriggerEndQuery(estate);
+
+ /* Cleanup. */
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+ PopActiveSnapshot();
+}
+
/*
* Executes default values for columns for which we can't map to remote
* relation columns.
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
- /* Input functions may need an active snapshot, so get one */
- PushActiveSnapshot(GetTransactionSnapshot());
-
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
apply_handle_insert_internal(estate->es_result_relation_info, estate,
remoteslot);
- PopActiveSnapshot();
-
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
/* Also populate extraUpdatedCols, in case we have generated columns */
fill_extraUpdatedCols(target_rte, rel->localrel);
- PushActiveSnapshot(GetTransactionSnapshot());
-
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel,
apply_handle_update_internal(estate->es_result_relation_info, estate,
remoteslot, &newtup, rel);
- PopActiveSnapshot();
-
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
- PushActiveSnapshot(GetTransactionSnapshot());
-
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, oldtup.values);
apply_handle_delete_internal(estate->es_result_relation_info, estate,
remoteslot, &rel->remoterel);
- PopActiveSnapshot();
-
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
+ finish_estate(estate);
logicalrep_rel_close(rel, NoLock);
List *relids = NIL;
List *relids_logged = NIL;
ListCell *lc;
- LOCKMODE lockmode = AccessExclusiveLock;
+ LOCKMODE lockmode = AccessExclusiveLock;
ensure_transaction();
else
PushActiveSnapshot(GetTransactionSnapshot());
+ /*
+ * We could remember the snapshot in portal->portalSnapshot,
+ * but presently there seems no need to, as this code path
+ * cannot be used for non-atomic execution. Hence there can't
+ * be any commit/abort that might destroy the snapshot.
+ */
+
/*
* Create QueryDesc in portal's context; for the moment, set
* the destination to DestNone.
bool isTopLevel, bool setHoldSnapshot,
DestReceiver *dest, QueryCompletion *qc)
{
- Node *utilityStmt = pstmt->utilityStmt;
- Snapshot snapshot;
-
/*
- * Set snapshot if utility stmt needs one. Most reliable way to do this
- * seems to be to enumerate those that do not need one; this is a short
- * list. Transaction control, LOCK, and SET must *not* set a snapshot
- * since they need to be executable at the start of a transaction-snapshot
- * mode transaction without freezing a snapshot. By extension we allow
- * SHOW not to set a snapshot. The other stmts listed are just efficiency
- * hacks. Beware of listing anything that can modify the database --- if,
- * say, it has to update an index with expressions that invoke
- * user-defined functions, then it had better have a snapshot.
+ * Set snapshot if utility stmt needs one.
*/
- if (!(IsA(utilityStmt, TransactionStmt) ||
- IsA(utilityStmt, LockStmt) ||
- IsA(utilityStmt, VariableSetStmt) ||
- IsA(utilityStmt, VariableShowStmt) ||
- IsA(utilityStmt, ConstraintsSetStmt) ||
- /* efficiency hacks from here down */
- IsA(utilityStmt, FetchStmt) ||
- IsA(utilityStmt, ListenStmt) ||
- IsA(utilityStmt, NotifyStmt) ||
- IsA(utilityStmt, UnlistenStmt) ||
- IsA(utilityStmt, CheckPointStmt)))
+ if (PlannedStmtRequiresSnapshot(pstmt))
{
- snapshot = GetTransactionSnapshot();
+ Snapshot snapshot = GetTransactionSnapshot();
+
/* If told to, register the snapshot we're using and save in portal */
if (setHoldSnapshot)
{
snapshot = RegisterSnapshot(snapshot);
portal->holdSnapshot = snapshot;
}
+ /* In any case, make the snapshot active and remember it in portal */
PushActiveSnapshot(snapshot);
/* PushActiveSnapshot might have copied the snapshot */
- snapshot = GetActiveSnapshot();
+ portal->portalSnapshot = GetActiveSnapshot();
}
else
- snapshot = NULL;
+ portal->portalSnapshot = NULL;
ProcessUtility(pstmt,
portal->sourceText,
MemoryContextSwitchTo(portal->portalContext);
/*
- * Some utility commands may pop the ActiveSnapshot stack from under us,
- * so be careful to only pop the stack if our snapshot is still at the
- * top.
+ * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
+ * under us, so don't complain if it's now empty. Otherwise, our snapshot
+ * should be the top one; pop it. Note that this could be a different
+ * snapshot from the one we made above; see EnsurePortalSnapshotExists.
*/
- if (snapshot != NULL && ActiveSnapshotSet() &&
- snapshot == GetActiveSnapshot())
+ if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
+ {
+ Assert(portal->portalSnapshot == GetActiveSnapshot());
PopActiveSnapshot();
+ }
+ portal->portalSnapshot = NULL;
}
/*
* from what holdSnapshot has.)
*/
PushCopiedSnapshot(snapshot);
+
+ /*
+ * As for PORTAL_ONE_SELECT portals, it does not seem
+ * necessary to maintain portal->portalSnapshot here.
+ */
+
active_snapshot_set = true;
}
else
portal->atEnd = false;
portal->portalPos = 0;
}
+
+/*
+ * PlannedStmtRequiresSnapshot - what it says on the tin
+ */
+bool
+PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
+{
+ Node *utilityStmt = pstmt->utilityStmt;
+
+ /* If it's not a utility statement, it definitely needs a snapshot */
+ if (utilityStmt == NULL)
+ return true;
+
+ /*
+ * Most utility statements need a snapshot, and the default presumption
+ * about new ones should be that they do too. Hence, enumerate those that
+ * do not need one.
+ *
+ * Transaction control, LOCK, and SET must *not* set a snapshot, since
+ * they need to be executable at the start of a transaction-snapshot-mode
+ * transaction without freezing a snapshot. By extension we allow SHOW
+ * not to set a snapshot. The other stmts listed are just efficiency
+ * hacks. Beware of listing anything that can modify the database --- if,
+ * say, it has to update an index with expressions that invoke
+ * user-defined functions, then it had better have a snapshot.
+ */
+ if (IsA(utilityStmt, TransactionStmt) ||
+ IsA(utilityStmt, LockStmt) ||
+ IsA(utilityStmt, VariableSetStmt) ||
+ IsA(utilityStmt, VariableShowStmt) ||
+ IsA(utilityStmt, ConstraintsSetStmt) ||
+ /* efficiency hacks from here down */
+ IsA(utilityStmt, FetchStmt) ||
+ IsA(utilityStmt, ListenStmt) ||
+ IsA(utilityStmt, NotifyStmt) ||
+ IsA(utilityStmt, UnlistenStmt) ||
+ IsA(utilityStmt, CheckPointStmt))
+ return false;
+
+ return true;
+}
+
+/*
+ * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
+ *
+ * Generally, we will have an active snapshot whenever we are executing
+ * inside a Portal, unless the Portal's query is one of the utility
+ * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
+ * However, procedures and DO blocks can commit or abort the transaction,
+ * and thereby destroy all snapshots. This function can be called to
+ * re-establish the Portal-level snapshot when none exists.
+ */
+void
+EnsurePortalSnapshotExists(void)
+{
+ Portal portal;
+
+ /*
+ * Nothing to do if a snapshot is set. (We take it on faith that the
+ * outermost active snapshot belongs to some Portal; or if there is no
+ * Portal, it's somebody else's responsibility to manage things.)
+ */
+ if (ActiveSnapshotSet())
+ return;
+
+ /* Otherwise, we'd better have an active Portal */
+ portal = ActivePortal;
+ Assert(portal != NULL);
+ Assert(portal->portalSnapshot == NULL);
+
+ /* Create a new snapshot and make it active */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ /* PushActiveSnapshot might have copied the snapshot */
+ portal->portalSnapshot = GetActiveSnapshot();
+}
portal->cleanup = NULL;
}
+ /* There shouldn't be an active snapshot anymore, except after error */
+ Assert(portal->portalSnapshot == NULL || !isTopCommit);
+
/*
* Remove portal from hash table. Because we do this here, we will not
* come back to try to remove the portal again if there's any error in the
portal->holdSnapshot = NULL;
}
portal->resowner = NULL;
+ /* Clear portalSnapshot too, for cleanliness */
+ portal->portalSnapshot = NULL;
continue;
}
}
}
}
+
+/*
+ * Drop the outer active snapshots for all portals, so that no snapshots
+ * remain active.
+ *
+ * Like HoldPinnedPortals, this must be called when initiating a COMMIT or
+ * ROLLBACK inside a procedure. This has to be separate from that since it
+ * should not be run until we're done with steps that are likely to fail.
+ *
+ * It's tempting to fold this into PreCommit_Portals, but to do so, we'd
+ * need to clean up snapshot management in VACUUM and perhaps other places.
+ */
+void
+ForgetPortalSnapshots(void)
+{
+ HASH_SEQ_STATUS status;
+ PortalHashEnt *hentry;
+ int numPortalSnaps = 0;
+ int numActiveSnaps = 0;
+
+ /* First, scan PortalHashTable and clear portalSnapshot fields */
+ hash_seq_init(&status, PortalHashTable);
+
+ while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
+ {
+ Portal portal = hentry->portal;
+
+ if (portal->portalSnapshot != NULL)
+ {
+ portal->portalSnapshot = NULL;
+ numPortalSnaps++;
+ }
+ /* portal->holdSnapshot will be cleaned up in PreCommit_Portals */
+ }
+
+ /*
+ * Now, pop all the active snapshots, which should be just those that were
+ * portal snapshots. Ideally we'd drive this directly off the portal
+ * scan, but there's no good way to visit the portals in the correct
+ * order. So just cross-check after the fact.
+ */
+ while (ActiveSnapshotSet())
+ {
+ PopActiveSnapshot();
+ numActiveSnaps++;
+ }
+
+ if (numPortalSnaps != numActiveSnaps)
+ elog(ERROR, "portal snapshots (%d) did not account for all active snapshots (%d)",
+ numPortalSnaps, numActiveSnaps);
+}
int magic; /* should equal _SPI_PLAN_MAGIC */
bool saved; /* saved or unsaved plan? */
bool oneshot; /* one-shot plan? */
- bool no_snapshots; /* let the caller handle the snapshots */
+ bool no_snapshots; /* allow nonatomic CALL/DO execution */
List *plancache_list; /* one CachedPlanSource per parsetree */
MemoryContext plancxt; /* Context containing _SPI_plan and data */
int cursor_options; /* Cursor options used for planning */
#include "nodes/parsenodes.h"
#include "utils/portal.h"
+struct PlannedStmt; /* avoid including plannodes.h here */
+
extern PGDLLIMPORT Portal ActivePortal;
long count,
DestReceiver *dest);
+extern bool PlannedStmtRequiresSnapshot(struct PlannedStmt *pstmt);
+
+extern void EnsurePortalSnapshotExists(void);
+
#endif /* PQUERY_H */
/* Presentation data, primarily used by the pg_cursors system view */
TimestampTz creation_time; /* time at which this portal was defined */
bool visible; /* include this portal in pg_cursors? */
+
+ /*
+ * Outermost ActiveSnapshot for execution of the portal's queries. For
+ * all but a few utility commands, we require such a snapshot to exist.
+ * This ensures that TOAST references in query results can be detoasted,
+ * and helps to reduce thrashing of the process's exposed xmin.
+ */
+ Snapshot portalSnapshot; /* active snapshot, or NULL if none */
} PortalData;
/*
extern void PortalHashTableDeleteAll(void);
extern bool ThereAreNoReadyPortals(void);
extern void HoldPinnedPortals(void);
+extern void ForgetPortalSnapshots(void);
#endif /* PORTAL_H */
PLpgSQL_variable *volatile cur_target = stmt->target;
volatile LocalTransactionId before_lxid;
LocalTransactionId after_lxid;
- volatile bool pushed_active_snap = false;
volatile int rc;
/*
Assert(!expr->expr_simple_expr);
/*
- * The procedure call could end transactions, which would upset
- * the snapshot management in SPI_execute*, so don't let it do it.
- * Instead, we set the snapshots ourselves below.
+ * Tell SPI to allow non-atomic execution. (The field name is a
+ * legacy choice.)
*/
plan->no_snapshots = true;
before_lxid = MyProc->lxid;
- /*
- * Set snapshot only for non-read-only procedures, similar to SPI
- * behavior.
- */
- if (!estate->readonly_func)
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- pushed_active_snap = true;
- }
-
rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI,
estate->readonly_func, 0);
}
after_lxid = MyProc->lxid;
- if (before_lxid == after_lxid)
- {
- /*
- * If we are still in the same transaction after the call, pop the
- * snapshot that we might have pushed. (If it's a new transaction,
- * then all the snapshots are gone already.)
- */
- if (pushed_active_snap)
- PopActiveSnapshot();
- }
- else
+ if (before_lxid != after_lxid)
{
/*
* If we are in a new transaction after the call, we need to build new
*
* We just parse and execute the statement normally, but we have to do it
* without setting a snapshot, for things like SET TRANSACTION.
+ * XXX spi.c now handles this correctly, so we no longer need a special case.
*/
static int
exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
int rc;
if (expr->plan == NULL)
- {
exec_prepare_plan(estate, expr, 0, true);
- expr->plan->no_snapshots = true;
- }
rc = SPI_execute_plan(expr->plan, NULL, NULL, estate->readonly_func, 0);
s1: NOTICE: length(r) = 9002
s1: NOTICE: length(r) = 12002
step assign6: <... completed>
+
+starting permutation: fetch-after-commit
+pg_advisory_unlock_all
+
+
+pg_advisory_unlock_all
+
+
+s1: NOTICE: length(t) = 6000
+s1: NOTICE: length(t) = 9000
+s1: NOTICE: length(t) = 12000
+step fetch-after-commit:
+do $$
+ declare
+ r record;
+ t text;
+ begin
+ insert into test1 values (2, repeat('bar', 3000));
+ insert into test1 values (3, repeat('baz', 4000));
+ for r in select test1.a from test1 loop
+ commit;
+ select b into t from test1 where a = r.a;
+ raise notice 'length(t) = %', length(t);
+ end loop;
+ end;
+$$;
+
$$;
}
+# Check that the results of a query can be detoasted just after committing
+# (there's no interaction with VACUUM here)
+step "fetch-after-commit"
+{
+do $$
+ declare
+ r record;
+ t text;
+ begin
+ insert into test1 values (2, repeat('bar', 3000));
+ insert into test1 values (3, repeat('baz', 4000));
+ for r in select test1.a from test1 loop
+ commit;
+ select b into t from test1 where a = r.a;
+ raise notice 'length(t) = %', length(t);
+ end loop;
+ end;
+$$;
+}
+
session "s2"
setup
{
permutation "lock" "assign4" "vacuum" "unlock"
permutation "lock" "assign5" "vacuum" "unlock"
permutation "lock" "assign6" "vacuum" "unlock"
+permutation "fetch-after-commit"
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 51;
+use Test::More tests => 54;
# setup
"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
);
+# Add set of AFTER replica triggers for testing that they are fired
+# correctly. This uses a table that records details of all trigger
+# activities. Triggers are marked as enabled for a subset of the
+# partition tree.
+$node_subscriber1->safe_psql(
+ 'postgres', qq{
+CREATE TABLE sub1_trigger_activity (tgtab text, tgop text,
+ tgwhen text, tglevel text, olda int, newa int);
+CREATE FUNCTION sub1_trigger_activity_func() RETURNS TRIGGER AS \$\$
+BEGIN
+ IF (TG_OP = 'INSERT') THEN
+ INSERT INTO public.sub1_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a;
+ ELSIF (TG_OP = 'UPDATE') THEN
+ INSERT INTO public.sub1_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a;
+ END IF;
+ RETURN NULL;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER sub1_tab1_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1
+ FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub1_tab1_log_op_trigger;
+CREATE TRIGGER sub1_tab1_2_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1_2
+ FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub1_tab1_2_log_op_trigger;
+CREATE TRIGGER sub1_tab1_2_2_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1_2_2
+ FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1_2_2 ENABLE REPLICA TRIGGER sub1_tab1_2_2_log_op_trigger;
+});
+
# subscriber 2
#
# This does not use partitioning. The tables match the leaf tables on
"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"
);
+# Add set of AFTER replica triggers for testing that they are fired
+# correctly, using the same method as the first subscriber.
+$node_subscriber2->safe_psql(
+ 'postgres', qq{
+CREATE TABLE sub2_trigger_activity (tgtab text,
+ tgop text, tgwhen text, tglevel text, olda int, newa int);
+CREATE FUNCTION sub2_trigger_activity_func() RETURNS TRIGGER AS \$\$
+BEGIN
+ IF (TG_OP = 'INSERT') THEN
+ INSERT INTO public.sub2_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a;
+ ELSIF (TG_OP = 'UPDATE') THEN
+ INSERT INTO public.sub2_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a;
+ END IF;
+ RETURN NULL;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER sub2_tab1_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1
+ FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func();
+ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub2_tab1_log_op_trigger;
+CREATE TRIGGER sub2_tab1_2_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1_2
+ FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func();
+ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
+});
+
# Wait for initial sync of all subscriptions
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
"SELECT c, a FROM tab1_2 ORDER BY 1, 2");
is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated');
+# The AFTER trigger of tab1_2 should have recorded one INSERT.
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result,
+ qq(tab1_2|INSERT|AFTER|ROW||5),
+ 'check replica insert after trigger applied on subscriber');
+
$result = $node_subscriber2->safe_psql('postgres',
"SELECT c, a FROM tab1_def ORDER BY 1, 2");
is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated');
"SELECT a FROM tab1_2_2 ORDER BY 1");
is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly');
+# The AFTER trigger should have recorded the UPDATEs of tab1_2_2.
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT * FROM sub1_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result, qq(tab1_2_2|INSERT|AFTER|ROW||6
+tab1_2_2|UPDATE|AFTER|ROW|4|6
+tab1_2_2|UPDATE|AFTER|ROW|6|4),
+ 'check replica update after trigger applied on subscriber');
+
$result = $node_subscriber2->safe_psql('postgres',
"SELECT c, a FROM tab1_1 ORDER BY 1, 2");
is( $result, qq(sub2_tab1_1|2
"SELECT c, a FROM tab1_2 ORDER BY 1, 2");
is($result, qq(sub2_tab1_2|6), 'tab1_2 updated');
+# The AFTER trigger should have recorded the updates of tab1_2.
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result, qq(tab1_2|INSERT|AFTER|ROW||5
+tab1_2|UPDATE|AFTER|ROW|4|6
+tab1_2|UPDATE|AFTER|ROW|5|6
+tab1_2|UPDATE|AFTER|ROW|6|4),
+ 'check replica update after trigger applied on subscriber');
+
$result = $node_subscriber2->safe_psql('postgres',
"SELECT c, a FROM tab1_def ORDER BY 1");
is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged');