Back-Patch "Add wait_for_subscription_sync for TAP tests."
authorAmit Kapila
Fri, 12 Aug 2022 05:58:54 +0000 (11:28 +0530)
committerAmit Kapila
Fri, 12 Aug 2022 05:58:54 +0000 (11:28 +0530)
This was originally done in commit 0c20dd33db for 16 only, to eliminate
duplicate code and as an infrastructure that makes it easier to write
future tests. However, it has been suggested that it would be good to
back-patch this testing infrastructure to aid future tests in
back-branches.

Backpatch to all supported versions.

Author: Masahiko Sawada
Reviewed by: Amit Kapila, Shi yu
Discussion: https://postgr.es/m/CAD21AoC-fvAkaKHa4t1urupwL8xbAcWRePeETvshvy80f6WV1A@mail.gmail.com
Discussion: https://postgr.es/m/[email protected]

26 files changed:
src/test/perl/PostgreSQL/Test/Cluster.pm
src/test/subscription/t/001_rep_changes.pl
src/test/subscription/t/002_types.pl
src/test/subscription/t/004_sync.pl
src/test/subscription/t/005_encoding.pl
src/test/subscription/t/006_rewrite.pl
src/test/subscription/t/007_ddl.pl
src/test/subscription/t/008_diff_schema.pl
src/test/subscription/t/010_truncate.pl
src/test/subscription/t/011_generated.pl
src/test/subscription/t/013_partition.pl
src/test/subscription/t/014_binary.pl
src/test/subscription/t/015_stream.pl
src/test/subscription/t/016_stream_subxact.pl
src/test/subscription/t/017_stream_ddl.pl
src/test/subscription/t/018_stream_subxact_abort.pl
src/test/subscription/t/019_stream_subxact_ddl_abort.pl
src/test/subscription/t/021_twophase.pl
src/test/subscription/t/023_twophase_stream.pl
src/test/subscription/t/024_add_drop_pub.pl
src/test/subscription/t/025_rep_changes_for_schema.pl
src/test/subscription/t/027_nosuperuser.pl
src/test/subscription/t/028_row_filter.pl
src/test/subscription/t/029_on_error.pl
src/test/subscription/t/031_column_list.pl
src/test/subscription/t/100_bugs.pl

index c8c7bc5045a18ad41f9c64f0b6c6529f299711dc..27fa607da41d108bfc121d49eee72628a77f1b19 100644 (file)
@@ -2648,6 +2648,50 @@ sub wait_for_slot_catchup
 
 =pod
 
+=item $node->wait_for_subscription_sync(publisher, subname, dbname)
+
+Wait for all tables in pg_subscription_rel to complete the initial
+synchronization (i.e to be either in 'syncdone' or 'ready' state).
+
+If the publisher node is given, additionally, check if the subscriber has
+caught up to what has been committed on the primary. This is useful to
+ensure that the initial data synchronization has been completed after
+creating a new subscription.
+
+If there is no active replication connection from this peer, wait until
+poll_query_until timeout.
+
+This is not a test. It die()s on failure.
+
+=cut
+
+sub wait_for_subscription_sync
+{
+   my ($self, $publisher, $subname, $dbname) = @_;
+   my $name = $self->name;
+
+   $dbname = defined($dbname) ? $dbname : 'postgres';
+
+   # Wait for all tables to finish initial sync.
+   print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
+   my $query =
+       qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');];
+   $self->poll_query_until($dbname, $query)
+     or croak "timed out waiting for subscriber to synchronize data";
+
+   # Then, wait for the replication to catchup if required.
+   if (defined($publisher))
+   {
+       croak 'subscription name must be specified' unless defined($subname);
+       $publisher->wait_for_catchup($subname);
+   }
+
+   print "done\n";
+   return;
+}
+
+=pod
+
 =item $node->wait_for_log(regexp, offset)
 
 Waits for the contents of the server log file, starting at the given offset, to
index f53b3b7db0c5f60fe16c455a5489a6169c0fc92b..c5b5be419cc2732ab56d2c5575a9621b565cfb85 100644 (file)
@@ -102,13 +102,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 my $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
@@ -237,13 +232,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
 );
 
-$node_publisher->wait_for_catchup('tap_sub_temp1');
-
-# Also wait for initial table sync to finish
-$synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_temp1');
 
 # Subscriber table will have no rows initially
 $result =
index 3f1f00f7c8d9d338d86406c156fccaba355eadf0..d6c6f4932720e943163f1d30cc7d13a70141b817 100644 (file)
@@ -114,13 +114,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
 # Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Insert initial test data
 $node_publisher->safe_psql(
index cf61fc1e0f605e8573728802b6f57b5d33708848..fd4bf7bacd1af4072ff77f676ebcf03e315f35f8 100644 (file)
@@ -39,13 +39,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 my $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
@@ -71,8 +66,7 @@ $node_subscriber->poll_query_until('postgres', $started_query)
 $node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
 
 # wait for sync to finish this time
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # check that all data is synced
 $result =
@@ -107,8 +101,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # and wait for data sync to finish again
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # check that all data is synced
 $result =
@@ -133,8 +126,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
 
 # wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*) FROM tab_rep_next");
index 38a74a897f805b4cf8451903ad93c9d72f166dc5..3ee0522460b78fb081b9d70c6be94aa241122b5f 100644 (file)
@@ -32,13 +32,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
 );
 
-$node_publisher->wait_for_catchup('mysub');
-
-# Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
 
 $node_publisher->safe_psql('postgres',
    q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
index c924ff35f7174457a8fc7eb9b54b39d0972da6a4..fdcb3f811ce00520ea9254e0dd2099ceea9fe508 100644 (file)
@@ -28,13 +28,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
 );
 
-$node_publisher->wait_for_catchup('mysub');
-
-# Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
 
 $node_publisher->safe_psql('postgres',
    q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
index 01df54229c4b1f446b80fceea1a35932e94d5110..8882addc18f85e9c85ed1f4e9ce6c65e6dbd3b79 100644 (file)
@@ -49,13 +49,8 @@ ok( $stderr =~
      m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
    "Create subscription throws warning for non-existent publication");
 
-$node_publisher->wait_for_catchup('mysub1');
-
-# Also wait for initial table sync to finish.
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish.
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1');
 
 # Specifying non-existent publication along with add publication.
 ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
index 67b4026afa407f10e68890aced4559a9cc09a597..b4d44a200bb6a6f8ccc83a178b39a9859213acc4 100644 (file)
@@ -38,13 +38,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 my $result =
   $node_subscriber->safe_psql('postgres',
@@ -105,8 +100,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
 $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
 
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # Add replica identity column.  (The serial is not necessary, but it's
 # a convenient way to get a default on the new column so that rows
index d51924943136696b0db27e69b3fc621dd133feb0..a6fe82a71f2e2049480f54162b852d6f26468dcc 100644 (file)
@@ -67,10 +67,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # Wait for initial sync of all subscriptions
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # insert data to truncate
 
@@ -211,8 +208,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # wait for initial data sync
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 # insert data to truncate
 
index e991a0803217384a437573cb68887015dc15c90d..3d96f6f30f825c1bf6c1b184a3a0efe4015db815 100644 (file)
@@ -40,10 +40,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # Wait for initial sync of all subscriptions
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
 is( $result, qq(1|22
index 0dfbbabc3b0365a532925725920b9f225d14876a..8b33e4e7ae15afc10f84d49cb280f8342b149116 100644 (file)
@@ -153,12 +153,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
 });
 
 # Wait for initial sync of all subscriptions
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber1->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
 
 # Tests for replication using leaf partition identity and schema
 
@@ -490,10 +486,8 @@ $node_subscriber2->safe_psql('postgres',
    "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
 
 # Wait for initial sync of all subscriptions
-$node_subscriber1->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
 
 # check that data is synced correctly
 $result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
@@ -568,8 +562,7 @@ $node_subscriber2->safe_psql('postgres',
 
 # make sure the subscription on the second subscriber is synced, before
 # continuing
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->wait_for_subscription_sync;
 
 # Insert a change into the leaf partition, should be replicated through
 # the partition root (thanks to the FOR ALL TABLES partition).
@@ -824,8 +817,7 @@ $node_subscriber2->safe_psql(
 $node_subscriber2->safe_psql('postgres',
    "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
 
-$node_subscriber2->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->wait_for_subscription_sync;
 
 # Make partition map cache
 $node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
index a1f03e7adc2ad24199659667e22eeec837e9bea4..8d8b35721fcb88a714186bb8379f936094001a47 100644 (file)
@@ -46,10 +46,7 @@ $node_subscriber->safe_psql('postgres',
      . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
 
 # Ensure nodes are in sync with each other
-$node_publisher->wait_for_catchup('tsub');
-$node_subscriber->poll_query_until('postgres',
-   "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
-) or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
 
 # Insert some content and make sure it's replicated across
 $node_publisher->safe_psql(
index 6561b189de8ba552ce1d81cae9fd6e2ef014d643..cbaa327e441d96f574efbae1bf28c104dd8ffc3e 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
index f27f1694f291dc27cfeef7ea2f911f4f985c06f9..bc0a9cd0531a4b9c56ff9fda79a5c22d1022a0ea 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
index 0bce63b7167c2b918c08175cfecd456b45156acd..866f1512e47b4f1462f1134b8df3460a145b87d9 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
index 7155442e76cb234bb23f74df409cb508cc5ec800..551f16df6ddbc9d108559fe3facfae1b895c3bef 100644 (file)
@@ -40,13 +40,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
index dbd0fca4d1e7a991de437f3512d42b31d8a76453..4d7da82b7a8021582b43318a099d8682793913b0 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 my $result =
   $node_subscriber->safe_psql('postgres',
index c3e9857f7ce88100d4716d2859f6ad17d4371e90..caa90897ec0f26525e9304ecdd725e13e50a7634 100644 (file)
@@ -53,14 +53,8 @@ $node_subscriber->safe_psql(
    PUBLICATION tap_pub
    WITH (two_phase = on)");
 
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # Also wait for two-phase to be enabled
 my $twophase_query =
@@ -331,12 +325,8 @@ $node_subscriber->safe_psql(
    PUBLICATION tap_pub_copy
    WITH (two_phase=on, copy_data=false);");
 
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname_copy);
-
-# Also wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
 
 # Also wait for two-phase to be enabled
 $node_subscriber->poll_query_until('postgres', $twophase_query)
index d8475d25a497bac2c6928dd49ecc3eb1d47b8ca7..9b454106bdf3f20a6e77815785017a2ba4967eff 100644 (file)
@@ -55,14 +55,8 @@ $node_subscriber->safe_psql(
    PUBLICATION tap_pub
    WITH (streaming = on, two_phase = on)");
 
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # Also wait for two-phase to be enabled
 my $twophase_query =
index 246f8c923724cfbbacc21e6ac47a619f4265228c..eaf47e66f1acb9f06df2292fbbf915ce7bf9a83b 100644 (file)
@@ -37,13 +37,7 @@ $node_subscriber->safe_psql('postgres',
 );
 
 # Wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Check the initial data of tab_1 is copied to subscriber
 my $result = $node_subscriber->safe_psql('postgres',
@@ -67,10 +61,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
 
 # Wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Check the initial data of tab_drop_refresh was copied to subscriber
 $result = $node_subscriber->safe_psql('postgres',
@@ -82,10 +73,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
 
 # Wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 # Check the initial data of tab_1 was copied to subscriber again
 $result = $node_subscriber->safe_psql('postgres',
index 5ce275cf72545f6996f0717473a50c0f91eef0a9..627c63b529a70173798642a6e923828ae7350fb5 100644 (file)
@@ -62,13 +62,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
 );
 
-$node_publisher->wait_for_catchup('tap_sub_schema');
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_schema');
 
 # Check the schema table data is synced up
 my $result = $node_subscriber->safe_psql('postgres',
@@ -123,8 +118,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
 
 # Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
 
@@ -158,8 +152,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
 
 # Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
@@ -183,8 +176,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
 
 # Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
index e8f3e4bba11582da7d20c9ede7580ef9308279c2..f9f6117dfb49a5e7f6c8b7c11f2000d5ec6acf2c 100644 (file)
@@ -153,13 +153,8 @@ SET SESSION AUTHORIZATION regress_admin;
 CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
 ));
 
-$node_publisher->wait_for_catchup('admin_sub');
-
-# Wait for initial sync to finish as well
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub');
 
 # Verify that "regress_admin" can replicate into the tables
 #
index b1fb2d7cae4126b7f428be634b704d6ed4775d84..f5f8a670920ff4329d1d8f51473401a723398b92 100644 (file)
@@ -17,9 +17,6 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 my $appname           = 'tap_sub';
 
@@ -48,10 +45,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
 );
 
-$node_publisher->wait_for_catchup($appname);
 # wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # The subscription of the FOR ALL TABLES publication means there should be no
 # filtering on the tablesync COPY, so all expect all 5 will be present.
@@ -133,10 +128,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
 );
 
-$node_publisher->wait_for_catchup($appname);
 # wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # The subscription of the ALL TABLES IN SCHEMA publication means there should be
 # no filtering on the tablesync COPY, so expect all 5 will be present.
@@ -397,11 +390,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
 
-$node_publisher->wait_for_catchup($appname);
-
 # wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
 
 # Check expected replicated rows for tab_rowfilter_1
 # tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
@@ -622,8 +612,7 @@ $node_subscriber->safe_psql('postgres',
    "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
 
 # wait for table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
index 303e8ec3fc2461a5a824fd3d9cd541f827eb2033..05daa77c58a3c0593d7ea0e05914e62bd71768ff 100644 (file)
@@ -124,10 +124,7 @@ $node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
 $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
 
 # Wait for the data to replicate.
-$node_publisher->wait_for_catchup('sub');
-$node_subscriber->poll_query_until('postgres',
-   "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
-);
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
 
 # Confirm that we have finished the table sync.
 my $result =
index 9fa6e0b35ffb923a54a832c0ce9ef08cce358fc4..b6644556cf4931187abacf7015fd47cbcb4dff90 100644 (file)
@@ -22,18 +22,6 @@ $node_subscriber->start;
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 my $offset            = 0;
 
-sub wait_for_subscription_sync
-{
-   my ($node) = @_;
-
-   # Also wait for initial table sync to finish
-   my $synced_query =
-     "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
-   $node->poll_query_until('postgres', $synced_query)
-     or die "Timed out while waiting for subscriber to synchronize data";
-}
-
 # setup tables on both nodes
 
 # tab1: simple 1:1 replication
@@ -160,7 +148,7 @@ $node_subscriber->safe_psql(
    CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 # tab1: only (a,b) is replicated
 $result =
@@ -333,7 +321,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for the tablesync to complete, add a bit more data and then check
 # the results of the replication
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -385,9 +373,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
 ));
 
-wait_for_subscription_sync($node_subscriber);
-
-$node_publisher->wait_for_catchup('sub1');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1');
 
 # insert data and make sure the columns in column list get fully replicated
 $node_publisher->safe_psql(
@@ -428,7 +414,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -465,7 +451,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -504,7 +490,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -621,7 +607,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -687,7 +673,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -758,7 +744,7 @@ $node_subscriber->safe_psql(
    CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -795,7 +781,7 @@ $node_subscriber->safe_psql(
    TRUNCATE test_part_c;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -855,7 +841,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -898,7 +884,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -938,7 +924,7 @@ $node_subscriber->safe_psql(
    CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -985,7 +971,7 @@ $node_subscriber->safe_psql(
    CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -1034,7 +1020,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -1058,7 +1044,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -1102,7 +1088,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
@@ -1150,7 +1136,7 @@ $node_subscriber->safe_psql(
    ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
 ));
 
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
 
 $node_publisher->safe_psql(
    'postgres', qq(
index 11ba47371596c939842aad4e5328dbe31d2b1b5f..6247aa773014ae796560074a54008824dfbab32a 100644 (file)
@@ -144,12 +144,7 @@ $node_twoways->safe_psql('d2',
 # We cannot rely solely on wait_for_catchup() here; it isn't sufficient
 # when tablesync workers might still be running. So in addition to that,
 # verify that tables are synced.
-# XXX maybe this should be integrated in wait_for_catchup() itself.
-$node_twoways->wait_for_catchup('testsub');
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_twoways->poll_query_until('d2', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2');
 
 is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
    $rows * 2, "2x$rows rows in t");
@@ -278,11 +273,8 @@ $node_subscriber->safe_psql('postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
 is( $node_subscriber->safe_psql(
        'postgres', "SELECT * FROM tab_replidentity_index"),