Raise a WARNING for missing publications.
authorAmit Kapila
Thu, 31 Mar 2022 02:54:19 +0000 (08:24 +0530)
committerAmit Kapila
Thu, 31 Mar 2022 02:55:50 +0000 (08:25 +0530)
When we create or alter a subscription to add publications raise a warning
for non-existent publications. We don't want to give an error here because
it is possible that users can later create the missing publications.

Author: Vignesh C
Reviewed-by: Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma, Amit Kapila
Discussion: https://postgr.es/m/CALDaNm0f4YujGW+q-Di0CbZpnQKFFrXntikaQQKuEmGG0=Zw=Q@mail.gmail.com

doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/commands/subscriptioncmds.c
src/test/subscription/t/007_ddl.pl

index 3e46bbdb0460466607924373f9922cfc9d99def5..fe13ab9a2defbcdec59c283a73242973d450a304 100644 (file)
@@ -114,7 +114,9 @@ ALTER SUBSCRIPTION name RENAME TO <
       replaces the entire list of publications with a new list,
       ADD adds additional publications to the list of
       publications, and DROP removes the publications from
-      the list of publications.  See 
+      the list of publications.  We allow non-existent publications to be
+      specified in ADD and SET variants
+      so that users can add those later.  See 
       for more information.  By default, this command will also act like
       REFRESH PUBLICATION.
      
index b701752fc9b2131293546ad7c8f6ff30aaecba52..ebf7db57c58d309806a8c110e7c1f52a9995ea7b 100644 (file)
@@ -356,6 +356,13 @@ CREATE SUBSCRIPTION subscription_name
    copied data that would be incompatible with subsequent filtering.
   
 
+  
+   We allow non-existent publications to be specified so that users can add
+   those later. This means
+   pg_subscription
+   can have non-existent publications.
+  
+
  
 
  
index abebffdf3bb582c5d0ede44d9889f7db36016a07..85dacbe93d69ae1e94bc61118f6d08436e6d138f 100644 (file)
@@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
    }
 }
 
+/*
+ * Add publication names from the list to a string.
+ */
+static void
+get_publications_str(List *publications, StringInfo dest, bool quote_literal)
+{
+   ListCell   *lc;
+   bool        first = true;
+
+   Assert(list_length(publications) > 0);
+
+   foreach(lc, publications)
+   {
+       char       *pubname = strVal(lfirst(lc));
+
+       if (first)
+           first = false;
+       else
+           appendStringInfoString(dest, ", ");
+
+       if (quote_literal)
+           appendStringInfoString(dest, quote_literal_cstr(pubname));
+       else
+       {
+           appendStringInfoChar(dest, '"');
+           appendStringInfoString(dest, pubname);
+           appendStringInfoChar(dest, '"');
+       }
+   }
+}
+
+/*
+ * Check the specified publication(s) is(are) present in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+   WalRcvExecResult *res;
+   StringInfo  cmd;
+   TupleTableSlot *slot;
+   List       *publicationsCopy = NIL;
+   Oid         tableRow[1] = {TEXTOID};
+
+   cmd = makeStringInfo();
+   appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
+                          " pg_catalog.pg_publication t WHERE\n"
+                          " t.pubname IN (");
+   get_publications_str(publications, cmd, true);
+   appendStringInfoChar(cmd, ')');
+
+   res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+   pfree(cmd->data);
+   pfree(cmd);
+
+   if (res->status != WALRCV_OK_TUPLES)
+       ereport(ERROR,
+               errmsg_plural("could not receive publication from the publisher: %s",
+                             "could not receive list of publications from the publisher: %s",
+                             list_length(publications),
+                             res->err));
+
+   publicationsCopy = list_copy(publications);
+
+   /* Process publication(s). */
+   slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+   while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+   {
+       char       *pubname;
+       bool        isnull;
+
+       pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+       Assert(!isnull);
+
+       /* Delete the publication present in publisher from the list. */
+       publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+       ExecClearTuple(slot);
+   }
+
+   ExecDropSingleTupleTableSlot(slot);
+
+   walrcv_clear_result(res);
+
+   if (list_length(publicationsCopy))
+   {
+       /* Prepare the list of non-existent publication(s) for error message. */
+       StringInfo  pubnames = makeStringInfo();
+
+       get_publications_str(publicationsCopy, pubnames, false);
+       ereport(WARNING,
+               errcode(ERRCODE_UNDEFINED_OBJECT),
+               errmsg_plural("publication %s does not exist in the publisher",
+                             "publications %s do not exist in the publisher",
+                             list_length(publicationsCopy),
+                             pubnames->data));
+   }
+}
+
 /*
  * Auxiliary function to build a text array out of a list of String nodes.
  */
@@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
        PG_TRY();
        {
+           check_publications(wrconn, publications);
+
            /*
             * Set sync state based on if we were asked to do data copy or
             * not.
@@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data,
+                         List *validate_publications)
 {
    char       *err;
    List       *pubrel_names;
@@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
    PG_TRY();
    {
+       if (validate_publications)
+           check_publications(wrconn, validate_publications);
+
        /* Get the list of relations from publisher. */
        pubrel_names = fetch_table_list(wrconn, sub->publications);
        pubrel_names = list_concat(pubrel_names,
@@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                    /* Make sure refresh sees the new list of publications. */
                    sub->publications = stmt->publication;
 
-                   AlterSubscription_refresh(sub, opts.copy_data);
+                   AlterSubscription_refresh(sub, opts.copy_data,
+                                             stmt->publication);
                }
 
                break;
@@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                /* Refresh if user asked us to. */
                if (opts.refresh)
                {
+                   /* We only need to validate user specified publications. */
+                   List       *validate_publications = (isadd) ? stmt->publication : NULL;
+
                    if (!sub->enabled)
                        ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                    /* Refresh the new list of publications. */
                    sub->publications = publist;
 
-                   AlterSubscription_refresh(sub, opts.copy_data);
+                   AlterSubscription_refresh(sub, opts.copy_data,
+                                             validate_publications);
                }
 
                break;
@@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
                PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-               AlterSubscription_refresh(sub, opts.copy_data);
+               AlterSubscription_refresh(sub, opts.copy_data, NULL);
 
                break;
            }
@@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
    StringInfoData cmd;
    TupleTableSlot *slot;
    Oid         tableRow[2] = {TEXTOID, TEXTOID};
-   ListCell   *lc;
-   bool        first;
    List       *tablelist = NIL;
 
-   Assert(list_length(publications) > 0);
-
    initStringInfo(&cmd);
    appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
                           "  FROM pg_catalog.pg_publication_tables t\n"
                           " WHERE t.pubname IN (");
-   first = true;
-   foreach(lc, publications)
-   {
-       char       *pubname = strVal(lfirst(lc));
-
-       if (first)
-           first = false;
-       else
-           appendStringInfoString(&cmd, ", ");
-
-       appendStringInfoString(&cmd, quote_literal_cstr(pubname));
-   }
+   get_publications_str(publications, &cmd, true);
    appendStringInfoChar(&cmd, ')');
 
    res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
index 1144b005f6df75d17468e260a37a4e3d8528f4a4..39c32eda44d82bdf879d0c2353f207286374423b 100644 (file)
@@ -41,6 +41,43 @@ COMMIT;
 
 pass "subscription disable and drop in same transaction did not hang";
 
+# One of the specified publications exists.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+   "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub"
+);
+ok( $stderr =~
+     m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+   "Create subscription throws warning for non-existent publication");
+
+$node_publisher->wait_for_catchup('mysub1');
+
+# Also wait for initial table sync to finish.
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for initial table sync to finish.
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Specifying non-existent publication along with add publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+   'postgres',
+   "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2"
+);
+ok( $stderr =~
+     m/WARNING:  publications "non_existent_pub1", "non_existent_pub2" do not exist in the publisher/,
+   "Alter subscription add publication throws warning for non-existent publications");
+
+# Specifying non-existent publication along with set publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+   "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
+);
+ok( $stderr =~
+     m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+   "Alter subscription set publication throws warning for non-existent publication");
+
 $node_subscriber->stop;
 $node_publisher->stop;