relid >= FirstNormalObjectId;
}
+/*
+ * Filter out the partitions whose parent tables were also specified in
+ * the publication.
+ */
+static List *
+filter_partitions(List *relids)
+{
+ List *result = NIL;
+ ListCell *lc;
+ ListCell *lc2;
+
+ foreach(lc, relids)
+ {
+ bool skip = false;
+ List *ancestors = NIL;
+ Oid relid = lfirst_oid(lc);
+
+ if (get_rel_relispartition(relid))
+ ancestors = get_partition_ancestors(relid);
+
+ foreach(lc2, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc2);
+
+ /* Check if the parent table exists in the published table list. */
+ if (list_member_oid(relids, ancestor))
+ {
+ skip = true;
+ break;
+ }
+ }
+
+ if (!skip)
+ result = lappend_oid(result, relid);
+ }
+
+ return result;
+}
+
/*
* Another variant of this, taking a Relation.
*/
if (publication->alltables)
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
else
+ {
tables = GetPublicationRelations(publication->oid,
publication->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
+
+ /*
+ * If the publication publishes partition changes via their
+ * respective root partitioned tables, we must exclude partitions
+ * in favor of including the root partitioned tables. Otherwise,
+ * the function could return both the child and parent tables
+ * which could cause data of the child table to be
+ * double-published on the subscriber side.
+ */
+ if (publication->pubviaroot)
+ tables = filter_partitions(tables);
+ }
funcctx->user_fctx = (void *) tables;
MemoryContextSwitchTo(oldcontext);
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
+-- publication includes both the parent table and the child table
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
+-- only parent is listed as being in publication, not the partition
+SELECT * FROM pg_publication_tables;
+ pubname | schemaname | tablename
+-------------------+------------+----------------
+ testpub_forparted | public | testpub_parted
+(1 row)
+
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
+-- publication includes both the parent table and the child table
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
+-- only parent is listed as being in publication, not the partition
+SELECT * FROM pg_publication_tables;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 62;
+use Test::More tests => 63;
# setup
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
# Note: tab3_1's parent is not in the publication, in which case its
-# changes are published using own identity.
+# changes are published using own identity. For tab2, even though both parent
+# and child tables are present but changes will be replicated via the parent's
+# identity and only once.
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)"
+ "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
);
+# prepare data for the initial sync
+$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");
+
# subscriber 1
$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
$node_subscriber1->safe_psql('postgres',
$node_subscriber2->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
+# check that data is synced correctly
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2");
+is( $result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot');
+
# insert
$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)");
$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)");
$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)");
$node_publisher->safe_psql('postgres',
- "INSERT INTO tab2 VALUES (1), (0), (3), (5)");
+ "INSERT INTO tab2 VALUES (0), (3), (5)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab3 VALUES (1), (0), (3), (5)");