Fix handling of pending inserts in nodeModifyTable.c.
authorEtsuro Fujita
Fri, 25 Nov 2022 08:45:03 +0000 (17:45 +0900)
committerEtsuro Fujita
Fri, 25 Nov 2022 08:45:03 +0000 (17:45 +0900)
Commit b663a4136, which allowed FDWs to INSERT rows in bulk, added to
nodeModifyTable.c code to flush pending inserts to the foreign-table
result relation(s) before completing processing of the ModifyTable node,
but the code failed to take into account the case where the INSERT query
has modifying CTEs, leading to incorrect results.

Also, that commit failed to flush pending inserts before firing BEFORE
ROW triggers so that rows are visible to such triggers.

In that commit we scanned through EState's
es_tuple_routing_result_relations or es_opened_result_relations list to
find the foreign-table result relations to which pending inserts are
flushed, but that would be inefficient in some cases.  So to fix, 1) add
a List member to EState to record the insert-pending result relations,
and 2) modify nodeModifyTable.c so that it adds the foreign-table result
relation to the list in ExecInsert() if appropriate, and flushes pending
inserts properly using the list where needed.

While here, fix a copy-and-pasteo in a comment in ExecBatchInsert(),
which was added by that commit.

Back-patch to v14 where that commit appeared.

Discussion: https://postgr.es/m/CAPmGK16qutyCmyJJzgQOhfBq%3DNoGDqTB6O0QBZTihrbqre%2BoxA%40mail.gmail.com

contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/sql/postgres_fdw.sql
src/backend/executor/execMain.c
src/backend/executor/execPartition.c
src/backend/executor/execUtils.c
src/backend/executor/nodeModifyTable.c
src/include/nodes/execnodes.h

index d4c8c5692c603b50509a44a43f6a69f46f4d7bd7..ee231cedcf424663a2763976b95b124a8593306e 100644 (file)
@@ -10168,7 +10168,130 @@ SELECT * FROM batch_table ORDER BY x;
  50 | test50 | test50
 (50 rows)
 
+-- Clean up
+DROP TABLE batch_table;
+DROP TABLE batch_table_p0;
+DROP TABLE batch_table_p1;
 ALTER SERVER loopback OPTIONS (DROP batch_size);
+-- Test that pending inserts are handled properly when needed
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable (a text, b int)
+   SERVER loopback
+   OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable (a text, b int);
+CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS
+$$
+begin
+   raise notice '%: there are % rows in ftable',
+       TG_NAME, (SELECT count(*) FROM ftable);
+   if TG_OP = 'DELETE' then
+       return OLD;
+   else
+       return NEW;
+   end if;
+end;
+$$;
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT OR UPDATE OR DELETE ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+WITH t AS (
+   INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+NOTICE:  ftable_rowcount_trigger: there are 0 rows in ftable
+NOTICE:  ftable_rowcount_trigger: there are 1 rows in ftable
+SELECT * FROM ltable;
+  a  | b  
+-----+----
+ AAA | 42
+ BBB | 42
+(2 rows)
+
+SELECT * FROM ftable;
+  a  | b  
+-----+----
+ AAA | 42
+ BBB | 42
+(2 rows)
+
+DELETE FROM ftable;
+WITH t AS (
+   UPDATE ltable SET b = b + 100 RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+NOTICE:  ftable_rowcount_trigger: there are 0 rows in ftable
+NOTICE:  ftable_rowcount_trigger: there are 1 rows in ftable
+SELECT * FROM ltable;
+  a  |  b  
+-----+-----
+ AAA | 142
+ BBB | 142
+(2 rows)
+
+SELECT * FROM ftable;
+  a  |  b  
+-----+-----
+ AAA | 142
+ BBB | 142
+(2 rows)
+
+DELETE FROM ftable;
+WITH t AS (
+   DELETE FROM ltable RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+NOTICE:  ftable_rowcount_trigger: there are 0 rows in ftable
+NOTICE:  ftable_rowcount_trigger: there are 1 rows in ftable
+SELECT * FROM ltable;
+ a | b 
+---+---
+(0 rows)
+
+SELECT * FROM ftable;
+  a  |  b  
+-----+-----
+ AAA | 142
+ BBB | 142
+(2 rows)
+
+DELETE FROM ftable;
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+CREATE TABLE parent (a text, b int) PARTITION BY LIST (a);
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable
+   PARTITION OF parent
+   FOR VALUES IN ('AAA')
+   SERVER loopback
+   OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable
+   PARTITION OF parent
+   FOR VALUES IN ('BBB');
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42);
+NOTICE:  ftable_rowcount_trigger: there are 1 rows in ftable
+NOTICE:  ftable_rowcount_trigger: there are 2 rows in ftable
+SELECT tableoid::regclass, * FROM parent;
+ tableoid |  a  | b  
+----------+-----+----
+ ftable   | AAA | 42
+ ftable   | AAA | 42
+ ltable   | BBB | 42
+ ltable   | BBB | 42
+(4 rows)
+
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+DROP TABLE parent;
+DROP FUNCTION ftable_rowcount_trigf;
 -- ===================================================================
 -- test asynchronous execution
 -- ===================================================================
index 94a7d367d12c2f995b0a6b8e723204bdad4bfc8c..258506b01a453237b3d83fa4aa9b4bcc06693af1 100644 (file)
@@ -3205,8 +3205,94 @@ INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1,
 SELECT COUNT(*) FROM batch_table;
 SELECT * FROM batch_table ORDER BY x;
 
+-- Clean up
+DROP TABLE batch_table;
+DROP TABLE batch_table_p0;
+DROP TABLE batch_table_p1;
+
 ALTER SERVER loopback OPTIONS (DROP batch_size);
 
+-- Test that pending inserts are handled properly when needed
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable (a text, b int)
+   SERVER loopback
+   OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable (a text, b int);
+CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS
+$$
+begin
+   raise notice '%: there are % rows in ftable',
+       TG_NAME, (SELECT count(*) FROM ftable);
+   if TG_OP = 'DELETE' then
+       return OLD;
+   else
+       return NEW;
+   end if;
+end;
+$$;
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT OR UPDATE OR DELETE ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+
+WITH t AS (
+   INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+
+SELECT * FROM ltable;
+SELECT * FROM ftable;
+DELETE FROM ftable;
+
+WITH t AS (
+   UPDATE ltable SET b = b + 100 RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+
+SELECT * FROM ltable;
+SELECT * FROM ftable;
+DELETE FROM ftable;
+
+WITH t AS (
+   DELETE FROM ltable RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+
+SELECT * FROM ltable;
+SELECT * FROM ftable;
+DELETE FROM ftable;
+
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+
+CREATE TABLE parent (a text, b int) PARTITION BY LIST (a);
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable
+   PARTITION OF parent
+   FOR VALUES IN ('AAA')
+   SERVER loopback
+   OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable
+   PARTITION OF parent
+   FOR VALUES IN ('BBB');
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+
+INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42);
+
+SELECT tableoid::regclass, * FROM parent;
+
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+DROP TABLE parent;
+DROP FUNCTION ftable_rowcount_trigf;
+
 -- ===================================================================
 -- test asynchronous execution
 -- ===================================================================
index b3ce4bae53078a2914ad54f7fad5061ff4271d76..83d21d612b143f1bfe044a2fb10db93699f49a71 100644 (file)
@@ -1257,6 +1257,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
    resultRelInfo->ri_ChildToRootMap = NULL;
    resultRelInfo->ri_ChildToRootMapValid = false;
    resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+   resultRelInfo->ri_ModifyTableState = NULL;
 }
 
 /*
index 606c920b06805942a37e3616872ff44db0446e4c..216da08d0cf811bcf0b5ce20a2ac7218b1b3129d 100644 (file)
@@ -934,6 +934,13 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
 
    Assert(partRelInfo->ri_BatchSize >= 1);
 
+   /*
+    * If doing batch insert, setup back-link so we can easily find the
+    * mtstate again.
+    */
+   if (partRelInfo->ri_BatchSize > 1)
+       partRelInfo->ri_ModifyTableState = mtstate;
+
    partRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
    /*
index ad11392b99de1927f65712e307224e742e081ea8..64a8c2e59312a8efdbc136cc27bbbfeb06177cd8 100644 (file)
@@ -127,6 +127,7 @@ CreateExecutorState(void)
    estate->es_result_relations = NULL;
    estate->es_opened_result_relations = NIL;
    estate->es_tuple_routing_result_relations = NIL;
+   estate->es_insert_pending_result_relations = NIL;
    estate->es_trig_target_relations = NIL;
 
    estate->es_param_list_info = NULL;
index 37ba4755cbc45e6b24a607ef12fa1ad916152bdb..ee0f04220408e0f0b7defdc370944ba94bac3e11 100644 (file)
@@ -67,6 +67,7 @@ static void ExecBatchInsert(ModifyTableState *mtstate,
                            int numSlots,
                            EState *estate,
                            bool canSetTag);
+static void ExecPendingInserts(EState *estate);
 static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
                                 ResultRelInfo *resultRelInfo,
                                 ItemPointer conflictTid,
@@ -645,6 +646,10 @@ ExecInsert(ModifyTableState *mtstate,
    if (resultRelInfo->ri_TrigDesc &&
        resultRelInfo->ri_TrigDesc->trig_insert_before_row)
    {
+       /* Flush any pending inserts, so rows are visible to the triggers */
+       if (estate->es_insert_pending_result_relations != NIL)
+           ExecPendingInserts(estate);
+
        if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
            return NULL;        /* "do nothing" */
    }
@@ -678,6 +683,8 @@ ExecInsert(ModifyTableState *mtstate,
         */
        if (resultRelInfo->ri_BatchSize > 1)
        {
+           bool        flushed = false;
+
            /*
             * When we've reached the desired batch size, perform the
             * insertion.
@@ -690,6 +697,7 @@ ExecInsert(ModifyTableState *mtstate,
                                resultRelInfo->ri_NumSlots,
                                estate, canSetTag);
                resultRelInfo->ri_NumSlots = 0;
+               flushed = true;
            }
 
            oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -732,6 +740,24 @@ ExecInsert(ModifyTableState *mtstate,
            ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
                         planSlot);
 
+           /*
+            * If these are the first tuples stored in the buffers, add the
+            * target rel to the es_insert_pending_result_relations list,
+            * except in the case where flushing was done above, in which case
+            * the target rel would already have been added to the list, so no
+            * need to do this.
+            */
+           if (resultRelInfo->ri_NumSlots == 0 && !flushed)
+           {
+               Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
+                                       resultRelInfo));
+               estate->es_insert_pending_result_relations =
+                   lappend(estate->es_insert_pending_result_relations,
+                           resultRelInfo);
+           }
+           Assert(list_member_ptr(estate->es_insert_pending_result_relations,
+                                  resultRelInfo));
+
            resultRelInfo->ri_NumSlots++;
 
            MemoryContextSwitchTo(oldContext);
@@ -1034,9 +1060,8 @@ ExecBatchInsert(ModifyTableState *mtstate,
        slot = rslots[i];
 
        /*
-        * AFTER ROW Triggers or RETURNING expressions might reference the
-        * tableoid column, so (re-)initialize tts_tableOid before evaluating
-        * them.
+        * AFTER ROW Triggers might reference the tableoid column, so
+        * (re-)initialize tts_tableOid before evaluating them.
         */
        slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 
@@ -1107,6 +1132,10 @@ ExecDelete(ModifyTableState *mtstate,
    {
        bool        dodelete;
 
+       /* Flush any pending inserts, so rows are visible to the triggers */
+       if (estate->es_insert_pending_result_relations != NIL)
+           ExecPendingInserts(estate);
+
        dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
                                        tupleid, oldtuple, epqreturnslot);
 
@@ -1410,6 +1439,32 @@ ldelete:;
    return NULL;
 }
 
+/*
+ * ExecPendingInserts -- flushes all pending inserts to the foreign tables
+ */
+static void
+ExecPendingInserts(EState *estate)
+{
+   ListCell   *lc;
+
+   foreach(lc, estate->es_insert_pending_result_relations)
+   {
+       ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(lc);
+       ModifyTableState *mtstate = resultRelInfo->ri_ModifyTableState;
+
+       Assert(mtstate);
+       ExecBatchInsert(mtstate, resultRelInfo,
+                       resultRelInfo->ri_Slots,
+                       resultRelInfo->ri_PlanSlots,
+                       resultRelInfo->ri_NumSlots,
+                       estate, mtstate->canSetTag);
+       resultRelInfo->ri_NumSlots = 0;
+   }
+
+   list_free(estate->es_insert_pending_result_relations);
+   estate->es_insert_pending_result_relations = NIL;
+}
+
 /*
  * ExecCrossPartitionUpdate --- Move an updated tuple to another partition.
  *
@@ -1634,6 +1689,10 @@ ExecUpdate(ModifyTableState *mtstate,
    if (resultRelInfo->ri_TrigDesc &&
        resultRelInfo->ri_TrigDesc->trig_update_before_row)
    {
+       /* Flush any pending inserts, so rows are visible to the triggers */
+       if (estate->es_insert_pending_result_relations != NIL)
+           ExecPendingInserts(estate);
+
        if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
                                  tupleid, oldtuple, slot))
            return NULL;        /* "do nothing" */
@@ -2361,9 +2420,6 @@ ExecModifyTable(PlanState *pstate)
    ItemPointerData tuple_ctid;
    HeapTupleData oldtupdata;
    HeapTuple   oldtuple;
-   PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
-   List       *relinfos = NIL;
-   ListCell   *lc;
 
    CHECK_FOR_INTERRUPTS();
 
@@ -2620,21 +2676,8 @@ ExecModifyTable(PlanState *pstate)
    /*
     * Insert remaining tuples for batch insert.
     */
-   if (proute)
-       relinfos = estate->es_tuple_routing_result_relations;
-   else
-       relinfos = estate->es_opened_result_relations;
-
-   foreach(lc, relinfos)
-   {
-       resultRelInfo = lfirst(lc);
-       if (resultRelInfo->ri_NumSlots > 0)
-           ExecBatchInsert(node, resultRelInfo,
-                           resultRelInfo->ri_Slots,
-                           resultRelInfo->ri_PlanSlots,
-                           resultRelInfo->ri_NumSlots,
-                           estate, node->canSetTag);
-   }
+   if (estate->es_insert_pending_result_relations != NIL)
+       ExecPendingInserts(estate);
 
    /*
     * We're done, but fire AFTER STATEMENT triggers before exiting.
@@ -3140,6 +3183,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
        }
        else
            resultRelInfo->ri_BatchSize = 1;
+
+       /*
+        * If doing batch insert, setup back-link so we can easily find the
+        * mtstate again.
+        */
+       if (resultRelInfo->ri_BatchSize > 1)
+           resultRelInfo->ri_ModifyTableState = mtstate;
    }
 
    /*
index 3dfac5bd5feab543f86f9b966ed31a8055a09639..71d0cf44dd55a2751580e512a9b72a6eb6166387 100644 (file)
@@ -524,6 +524,9 @@ typedef struct ResultRelInfo
 
    /* for use by copyfrom.c when performing multi-inserts */
    struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
+
+   /* for use by nodeModifyTable.c when performing batch-inserts */
+   struct ModifyTableState *ri_ModifyTableState;
 } ResultRelInfo;
 
 /* ----------------
@@ -645,6 +648,12 @@ typedef struct EState
    int         es_jit_flags;
    struct JitContext *es_jit;
    struct JitInstrumentation *es_jit_worker_instr;
+
+   /*
+    * The following list contains ResultRelInfos for foreign tables on which
+    * batch-inserts are to be executed.
+    */
+   List       *es_insert_pending_result_relations;
 } EState;