Fix double publish of child table's data.
authorAmit Kapila
Thu, 9 Dec 2021 03:30:35 +0000 (09:00 +0530)
committerAmit Kapila
Thu, 9 Dec 2021 03:30:35 +0000 (09:00 +0530)
We publish the child table's data twice for a publication that has both
child and parent tables and is published with publish_via_partition_root
as true. This happens because subscribers will initiate synchronization
using both parent and child tables, since it gets both as separate tables
in the initial table list.

Ensure that pg_publication_tables returns only parent tables in such
cases.

Author: Hou Zhijie
Reviewed-by: Greg Nancarrow, Amit Langote, Vignesh C, Amit Kapila
Backpatch-through: 13
Discussion: https://postgr.es/m/OS0PR01MB57167F45D481F78CDC5986F794B99@OS0PR01MB5716.jpnprd01.prod.outlook.com

src/backend/catalog/pg_publication.c
src/test/regress/expected/publication.out
src/test/regress/sql/publication.sql
src/test/subscription/t/013_partition.pl

index 5d7e1b184dcbd8141aa0245e9685bcc77d348654..95e37f380971cb5ae55cf7ee2ee49f2dc90ea6ad 100644 (file)
@@ -105,6 +105,45 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
        relid >= FirstNormalObjectId;
 }
 
+/*
+ * Filter out the partitions whose parent tables were also specified in
+ * the publication.
+ */
+static List *
+filter_partitions(List *relids)
+{
+   List       *result = NIL;
+   ListCell   *lc;
+   ListCell   *lc2;
+
+   foreach(lc, relids)
+   {
+       bool        skip = false;
+       List       *ancestors = NIL;
+       Oid         relid = lfirst_oid(lc);
+
+       if (get_rel_relispartition(relid))
+           ancestors = get_partition_ancestors(relid);
+
+       foreach(lc2, ancestors)
+       {
+           Oid         ancestor = lfirst_oid(lc2);
+
+           /* Check if the parent table exists in the published table list. */
+           if (list_member_oid(relids, ancestor))
+           {
+               skip = true;
+               break;
+           }
+       }
+
+       if (!skip)
+           result = lappend_oid(result, relid);
+   }
+
+   return result;
+}
+
 /*
  * Another variant of this, taking a Relation.
  */
@@ -557,10 +596,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
        if (publication->alltables)
            tables = GetAllTablesPublicationRelations(publication->pubviaroot);
        else
+       {
            tables = GetPublicationRelations(publication->oid,
                                             publication->pubviaroot ?
                                             PUBLICATION_PART_ROOT :
                                             PUBLICATION_PART_LEAF);
+
+           /*
+            * If the publication publishes partition changes via their
+            * respective root partitioned tables, we must exclude partitions
+            * in favor of including the root partitioned tables. Otherwise,
+            * the function could return both the child and parent tables
+            * which could cause data of the child table to be
+            * double-published on the subscriber side.
+            */
+           if (publication->pubviaroot)
+               tables = filter_partitions(tables);
+       }
        funcctx->user_fctx = (void *) tables;
 
        MemoryContextSwitchTo(oldcontext);
index f27859373d153312ef58def7c21d097ab7d67885..a2aca234ef8c8dd0be0b72d484bf0402ea7b0688 100644 (file)
@@ -163,6 +163,15 @@ HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
 ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
 -- works again, because update is no longer replicated
 UPDATE testpub_parted2 SET a = 2;
+-- publication includes both the parent table and the child table
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
+-- only parent is listed as being in publication, not the partition
+SELECT * FROM pg_publication_tables;
+      pubname      | schemaname |   tablename    
+-------------------+------------+----------------
+ testpub_forparted | public     | testpub_parted
+(1 row)
+
 DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- Test cache invalidation FOR ALL TABLES publication
index e5745d575b018e73c7974c5e5fbfa0e338c2588d..4f2445ad1175aa49bc9160312e3ee34412ab0502 100644 (file)
@@ -97,6 +97,10 @@ UPDATE testpub_parted2 SET a = 2;
 ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
 -- works again, because update is no longer replicated
 UPDATE testpub_parted2 SET a = 2;
+-- publication includes both the parent table and the child table
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
+-- only parent is listed as being in publication, not the partition
+SELECT * FROM pg_publication_tables;
 DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
index 7784fcfd9054fede0c7dd8591c08dbf19e48f269..6d77001469b7d78a0d5e847177e875c3293d943b 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 62;
+use Test::More tests => 63;
 
 # setup
 
@@ -409,11 +409,16 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
    "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
 # Note: tab3_1's parent is not in the publication, in which case its
-# changes are published using own identity.
+# changes are published using own identity. For tab2, even though both parent
+# and child tables are present but changes will be replicated via the parent's
+# identity and only once.
 $node_publisher->safe_psql('postgres',
-   "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)"
+   "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
 );
 
+# prepare data for the initial sync
+$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");
+
 # subscriber 1
 $node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
 $node_subscriber1->safe_psql('postgres',
@@ -465,12 +470,17 @@ $node_subscriber1->poll_query_until('postgres', $synced_query)
 $node_subscriber2->poll_query_until('postgres', $synced_query)
   or die "Timed out while waiting for subscriber to synchronize data";
 
+# check that data is synced correctly
+$result = $node_subscriber1->safe_psql('postgres',
+   "SELECT c, a FROM tab2");
+is( $result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot');
+
 # insert
 $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)");
 $node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)");
 $node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)");
 $node_publisher->safe_psql('postgres',
-   "INSERT INTO tab2 VALUES (1), (0), (3), (5)");
+   "INSERT INTO tab2 VALUES (0), (3), (5)");
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab3 VALUES (1), (0), (3), (5)");