Have logical replication subscriber fire column triggers
authorPeter Eisentraut
Mon, 6 Jan 2020 07:21:14 +0000 (08:21 +0100)
committerPeter Eisentraut
Mon, 6 Jan 2020 10:38:33 +0000 (11:38 +0100)
The logical replication apply worker did not fire per-column update
triggers because the updatedCols bitmap in the RTE was not populated.
This fixes that.

Reviewed-by: Euler Taveira
Discussion: https://www.postgresql.org/message-id/flat/21673e2d-597c-6afe-637e-e8b10425b240%402ndquadrant.com

src/backend/replication/logical/worker.c
src/test/subscription/t/003_constraints.pl

index 671260883858b5dd48ab3f8b7ed6f288096c7997..7e3f3f0b0c05483809cb372d816e023b608b92b9 100644 (file)
@@ -27,6 +27,7 @@
 #include "pgstat.h"
 #include "funcapi.h"
 
+#include "access/sysattr.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 
@@ -703,6 +704,8 @@ apply_handle_update(StringInfo s)
    bool        has_oldtup;
    TupleTableSlot *localslot;
    TupleTableSlot *remoteslot;
+   RangeTblEntry *target_rte;
+   int         i;
    bool        found;
    MemoryContext oldctx;
 
@@ -732,6 +735,21 @@ apply_handle_update(StringInfo s)
    ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
    EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 
+   /*
+    * Populate updatedCols so that per-column triggers can fire.  This could
+    * include more columns than were actually changed on the publisher
+    * because the logical replication protocol doesn't contain that
+    * information.  But it would for example exclude columns that only exist
+    * on the subscriber, since we are not touching those.
+    */
+   target_rte = list_nth(estate->es_range_table, 0);
+   for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
+   {
+       if (newtup.changed[i])
+           target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
+                                                    i + 1 - FirstLowInvalidHeapAttributeNumber);
+   }
+
    PushActiveSnapshot(GetTransactionSnapshot());
    ExecOpenIndices(estate->es_result_relation_info, false);
 
index 06863aef84a5356ed0d261a5bbe20723c9804479..65528edf537fcdeba6bb21299b919689f6407876 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 4;
+use Test::More tests => 6;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -88,6 +88,8 @@ BEGIN
         ELSE
             RETURN NULL;
         END IF;
+    ELSIF (TG_OP = 'UPDATE') THEN
+        RETURN NULL;
     ELSE
         RAISE WARNING 'Unknown action';
         RETURN NULL;
@@ -95,7 +97,7 @@ BEGIN
 END;
 \$\$ LANGUAGE plpgsql;
 CREATE TRIGGER filter_basic_dml_trg
-    BEFORE INSERT ON tab_fk_ref
+    BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref
     FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
 ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
 });
@@ -107,10 +109,34 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->poll_query_until('postgres', $caughtup_query)
   or die "Timed out while waiting for subscriber to catch up";
 
-# The row should be skipped on subscriber
+# The trigger should cause the insert to be skipped on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber');
+
+# Update data
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# The trigger should cause the update to be skipped on subscriber
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
-is($result, qq(2|1|2), 'check replica trigger applied on subscriber');
+is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber');
+
+# Update on a column not specified in the trigger, but it will trigger
+# anyway because logical replication ships all columns in an update.
+$node_publisher->safe_psql('postgres',
+   "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check column trigger applied on even for other column');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');