Add option to modify sync commit per subscription
authorPeter Eisentraut
Fri, 14 Apr 2017 17:58:46 +0000 (13:58 -0400)
committerPeter Eisentraut
Fri, 14 Apr 2017 17:58:46 +0000 (13:58 -0400)
This also changes default behaviour of subscription workers to
synchronous_commit = off.

Author: Petr Jelinek 

13 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/worker.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/include/catalog/pg_subscription.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql

index 5883673448caf64a029ddf3610804e2fe503269d..5254bb3025a9238f2f7d20b517c3fd779f479ed0 100644 (file)
       If true, the subscription is enabled and should be replicating.
      
 
+     
+      subsynccommit
+      text
+      
+      
+       Contains the value of the synchronous_commit
+       setting for the subscription workers.
+      
+     
+
      
       subconninfo
       text
index 640fac0a1590351bcdbd5d92ea5d8dfccb2c2b3b..f71ee38b40cfaff06f69a2a4f62fdc1e2f069b73 100644 (file)
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION name WITH ( 
 where suboption can be:
 
     SLOT NAME = slot_name
+    | SYNCHRONOUS_COMMIT = synchronous_commit
 
 ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] { REFRESH WITH ( puboption [, ... ] ) | NOREFRESH }
 ALTER SUBSCRIPTION name REFRESH PUBLICATION WITH ( puboption [, ... ] )
@@ -91,6 +92,7 @@ ALTER SUBSCRIPTION name DISABLE
    
     CONNECTION 'conninfo'
     SLOT NAME = slot_name
+    SYNCHRONOUS_COMMIT = synchronous_commit
     
      
       These clauses alter properties originally set by
index 3410d6fc8c2fd481a02b5f097cf3077b95ca1730..3c51012df8b998673f78171a92e32f233f2b35ce 100644 (file)
@@ -32,6 +32,7 @@ CREATE SUBSCRIPTION subscription_name
     | CREATE SLOT | NOCREATE SLOT
     | SLOT NAME = slot_name
     | COPY DATA | NOCOPY DATA
+    | SYNCHRONOUS_COMMIT = synchronous_commit
     | NOCONNECT
 
  
@@ -147,6 +148,36 @@ CREATE SUBSCRIPTION subscription_name
     
    
 
+   
+    SYNCHRONOUS_COMMIT = synchronous_commit
+    
+     
+      The value of this parameter overrides the
+       setting.  The default value is
+      off.
+     
+
+     
+      It is safe to use off for logical replication: If the
+      subscriber loses transactions because of missing synchronization, the
+      data will be resent from the publisher.
+     
+
+     
+      A different setting might be appropriate when doing synchronous logical
+      replication.  The logical replication workers report the positions of
+      writes and flushes to the publisher, and when using synchronous
+      replication, the publisher will wait for the actual flush.  This means
+      that setting SYNCHRONOUS_COMMIT for the subscriber
+      to off when the subscription is used for synchronous
+      replication might increase the latency for COMMIT on
+      the publisher.  In this scenario, it can be advantageous to set
+      SYNCHRONOUS_COMMIT to local or
+      higher.
+     
+    
+   
+
    
     NOCONNECT
     
index 7e38b1a31cdf4c30158850e46ef2105188aa3c18..a18385055ef8fdf202b2981fbfffb19a226f4fe5 100644 (file)
@@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok)
    Assert(!isnull);
    sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
 
+   /* Get synccommit */
+   datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+                           tup,
+                           Anum_pg_subscription_subsynccommit,
+                           &isnull);
+   Assert(!isnull);
+   sub->synccommit = TextDatumGetCString(datum);
+
    /* Get publications */
    datum = SysCacheGetAttr(SUBSCRIPTIONOID,
                            tup,
index 7b8b11cb81f14af3fa0e758e345256e9c66c0d84..519c6846e35a1724aae35231c11d1bbe4c3b0b7b 100644 (file)
@@ -44,6 +44,7 @@
 #include "storage/lmgr.h"
 
 #include "utils/builtins.h"
+#include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void
 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
                           bool *enabled, bool *create_slot, char **slot_name,
-                          bool *copy_data)
+                          bool *copy_data, char **synchronous_commit)
 {
    ListCell   *lc;
    bool        connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
        *slot_name = NULL;
    if (copy_data)
        *copy_data = true;
+   if (synchronous_commit)
+       *synchronous_commit = NULL;
 
    /* Parse options */
    foreach (lc, options)
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
            copy_data_given = true;
            *copy_data = !defGetBoolean(defel);
        }
+       else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
+                synchronous_commit)
+       {
+           if (*synchronous_commit)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+
+           *synchronous_commit = defGetString(defel);
+
+           /* Test if the given value is valid for synchronous_commit GUC. */
+           (void) set_config_option("synchronous_commit", *synchronous_commit,
+                                    PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
+                                    false, 0, false);
+       }
        else
            elog(ERROR, "unrecognized option: %s", defel->defname);
    }
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
    bool        enabled_given;
    bool        enabled;
    bool        copy_data;
+   char       *synchronous_commit;
    char       *conninfo;
    char       *slotname;
    char        originname[NAMEDATALEN];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
     * Connection and publication should not be specified here.
     */
    parse_subscription_options(stmt->options, &connect, &enabled_given,
-                              &enabled, &create_slot, &slotname, ©_data);
+                              &enabled, &create_slot, &slotname, ©_data,
+                              &synchronous_commit);
 
    /*
     * Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
    if (slotname == NULL)
        slotname = stmt->subname;
+   /* The default for synchronous_commit of subscriptions is off. */
+   if (synchronous_commit == NULL)
+       synchronous_commit = "off";
 
    conninfo = stmt->conninfo;
    publications = stmt->publication;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        CStringGetTextDatum(conninfo);
    values[Anum_pg_subscription_subslotname - 1] =
        DirectFunctionCall1(namein, CStringGetDatum(slotname));
+   values[Anum_pg_subscription_subsynccommit - 1] =
+       CStringGetTextDatum(synchronous_commit);
    values[Anum_pg_subscription_subpublications - 1] =
         publicationListToArray(publications);
 
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
        case ALTER_SUBSCRIPTION_OPTIONS:
            {
                char *slot_name;
+               char       *synchronous_commit;
 
                parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                          NULL, &slot_name, NULL);
+                                          NULL, &slot_name, NULL,
+                                          &synchronous_commit);
 
-               values[Anum_pg_subscription_subslotname - 1] =
-                   DirectFunctionCall1(namein, CStringGetDatum(slot_name));
-               replaces[Anum_pg_subscription_subslotname - 1] = true;
+               if (slot_name)
+               {
+                   values[Anum_pg_subscription_subslotname - 1] =
+                       DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+                   replaces[Anum_pg_subscription_subslotname - 1] = true;
+               }
+               if (synchronous_commit)
+               {
+                   values[Anum_pg_subscription_subsynccommit - 1] =
+                       CStringGetTextDatum(synchronous_commit);
+                   replaces[Anum_pg_subscription_subsynccommit - 1] = true;
+               }
 
                update_tuple = true;
                break;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
                parse_subscription_options(stmt->options, NULL,
                                           &enabled_given, &enabled, NULL,
-                                          NULL, NULL);
+                                          NULL, NULL, NULL);
                Assert(enabled_given);
 
                values[Anum_pg_subscription_subenabled - 1] =
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                Subscription   *sub = GetSubscription(subid, false);
 
                parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                          NULL, NULL, ©_data);
+                                          NULL, NULL, ©_data, NULL);
 
                values[Anum_pg_subscription_subpublications - 1] =
                     publicationListToArray(stmt->publication);
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                Subscription   *sub = GetSubscription(subid, false);
 
                parse_subscription_options(stmt->options, NULL, NULL, NULL,
-                                          NULL, NULL, ©_data);
+                                          NULL, NULL, ©_data, NULL);
 
                AlterSubscription_refresh(sub, copy_data);
 
index 7ba239c02c19777a1728a23c536915be5e2f05ab..2d663f6308f627fb2c3f92f8f579bead120a3bba 100644 (file)
@@ -129,17 +129,13 @@ get_subscription_list(void)
         */
        oldcxt = MemoryContextSwitchTo(resultcxt);
 
-       sub = (Subscription *) palloc(sizeof(Subscription));
+       sub = (Subscription *) palloc0(sizeof(Subscription));
        sub->oid = HeapTupleGetOid(tup);
        sub->dbid = subform->subdbid;
        sub->owner = subform->subowner;
        sub->enabled = subform->subenabled;
        sub->name = pstrdup(NameStr(subform->subname));
-
        /* We don't fill fields we are not interested in. */
-       sub->conninfo = NULL;
-       sub->slotname = NULL;
-       sub->publications = NIL;
 
        res = lappend(res, sub);
        MemoryContextSwitchTo(oldcxt);
index 3313448e7b9a408d8902e20c1938dea94ed0db16..29b6c6a168943121fd2ae5b5e93a01c199376ac7 100644 (file)
@@ -1416,6 +1416,10 @@ reread_subscription(void)
 
    MemoryContextSwitchTo(oldctx);
 
+   /* Change synchronous commit according to the user's wishes */
+   SetConfigOption("synchronous_commit", MySubscription->synccommit,
+                   PGC_BACKEND, PGC_S_OVERRIDE);
+
    if (started_tx)
        CommitTransactionCommand();
 
@@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg)
    MySubscriptionValid = true;
    MemoryContextSwitchTo(oldctx);
 
+   /* Setup synchronous commit according to the user's wishes */
+   SetConfigOption("synchronous_commit", MySubscription->synccommit,
+                   PGC_BACKEND, PGC_S_OVERRIDE);
+
    if (!MySubscription->enabled)
    {
        ereport(LOG,
index 102935446271e49b0a4e19ecbcc2e202aa53590f..3eccfa626bf311850897cdb1c231298aaa5b8485 100644 (file)
@@ -3683,6 +3683,7 @@ getSubscriptions(Archive *fout)
    int         i_rolname;
    int         i_subconninfo;
    int         i_subslotname;
+   int         i_subsynccommit;
    int         i_subpublications;
    int         i,
                ntups;
@@ -3714,7 +3715,8 @@ getSubscriptions(Archive *fout)
    appendPQExpBuffer(query,
                      "SELECT s.tableoid, s.oid, s.subname,"
                      "(%s s.subowner) AS rolname, "
-                     " s.subconninfo, s.subslotname, s.subpublications "
+                     " s.subconninfo, s.subslotname, s.subsynccommit, "
+                     " s.subpublications "
                      "FROM pg_catalog.pg_subscription s "
                      "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
                      "                   WHERE datname = current_database())",
@@ -3729,6 +3731,7 @@ getSubscriptions(Archive *fout)
    i_rolname = PQfnumber(res, "rolname");
    i_subconninfo = PQfnumber(res, "subconninfo");
    i_subslotname = PQfnumber(res, "subslotname");
+   i_subsynccommit = PQfnumber(res, "subsynccommit");
    i_subpublications = PQfnumber(res, "subpublications");
 
    subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -3744,6 +3747,8 @@ getSubscriptions(Archive *fout)
        subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
        subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
        subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
+       subinfo[i].subsynccommit =
+           pg_strdup(PQgetvalue(res, i, i_subsynccommit));
        subinfo[i].subpublications =
            pg_strdup(PQgetvalue(res, i, i_subpublications));
 
@@ -3810,6 +3815,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
 
    appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data);
    appendStringLiteralAH(query, subinfo->subslotname, fout);
+
+   if (strcmp(subinfo->subsynccommit, "off") != 0)
+       appendPQExpBuffer(query, ", SYNCHRONOUS_COMMIT = %s", fmtId(subinfo->subsynccommit));
+
    appendPQExpBufferStr(query, ");\n");
 
    appendPQExpBuffer(labelq, "SUBSCRIPTION %s", fmtId(subinfo->dobj.name));
index ba85392f1183daef2681bcc1927c933eeaf7ad4e..471cfce92a95259254c1f993e0fd17eaa86c9ae0 100644 (file)
@@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo
    char       *rolname;
    char       *subconninfo;
    char       *subslotname;
+   char       *subsynccommit;
    char       *subpublications;
 } SubscriptionInfo;
 
index 2494d046b25c323bd7d72ee2597c1d2dfcb151e4..59121b8d1b06eaf5d1e55858e005ed2ae5ccd038 100644 (file)
@@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose)
    PQExpBufferData buf;
    PGresult   *res;
    printQueryOpt myopt = pset.popt;
-   static const bool translate_columns[] = {false, false, false, false, false};
+   static const bool translate_columns[] = {false, false, false, false,
+       false, false};
 
    if (pset.sversion < 100000)
    {
@@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose)
    if (verbose)
    {
        appendPQExpBuffer(&buf,
+                         ",  subsynccommit AS \"%s\"\n"
                          ",  subconninfo AS \"%s\"\n",
+                         gettext_noop("Synchronous commit"),
                          gettext_noop("Conninfo"));
    }
 
index 0811880a8f37c772aa6832ec433c8bfba3856e96..fae542b6129e2d98533d16dd77351d2131dfb912 100644 (file)
@@ -43,7 +43,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
 #ifdef CATALOG_VARLEN          /* variable-length fields start here */
    text        subconninfo;    /* Connection string to the publisher */
    NameData    subslotname;    /* Slot name on publisher */
-
+   text        subsynccommit;  /* Synchronous commit setting for worker */
    text        subpublications[1]; /* List of publications subscribed to */
 #endif
 } FormData_pg_subscription;
@@ -54,14 +54,15 @@ typedef FormData_pg_subscription *Form_pg_subscription;
  *     compiler constants for pg_subscription
  * ----------------
  */
-#define Natts_pg_subscription                  7
+#define Natts_pg_subscription                  8
 #define Anum_pg_subscription_subdbid           1
 #define Anum_pg_subscription_subname           2
 #define Anum_pg_subscription_subowner          3
 #define Anum_pg_subscription_subenabled            4
 #define Anum_pg_subscription_subconninfo       5
 #define Anum_pg_subscription_subslotname       6
-#define Anum_pg_subscription_subpublications   7
+#define Anum_pg_subscription_subsynccommit     7
+#define Anum_pg_subscription_subpublications   8
 
 
 typedef struct Subscription
@@ -73,6 +74,7 @@ typedef struct Subscription
    bool    enabled;        /* Indicates if the subscription is enabled */
    char   *conninfo;       /* Connection string to the publisher */
    char   *slotname;       /* Name of the replication slot */
+   char   *synccommit;     /* Synchronous commit setting for worker */
    List   *publications;   /* List of publication names to subscribe to */
 } Subscription;
 
index 8760d5970aa5473f58d0fc449bca15bf903b515d..47531edd1b51e8b662adefa3cb8b5fe09407a78f 100644 (file)
@@ -46,10 +46,10 @@ CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WI
 ERROR:  must be superuser to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 \dRs+
-                               List of subscriptions
-  Name   |           Owner           | Enabled | Publication |      Conninfo       
----------+---------------------------+---------+-------------+---------------------
- testsub | regress_subscription_user | f       | {testpub}   | dbname=doesnotexist
+                                         List of subscriptions
+  Name   |           Owner           | Enabled | Publication | Synchronous commit |      Conninfo       
+---------+---------------------------+---------+-------------+--------------------+---------------------
+ testsub | regress_subscription_user | f       | {testpub}   | off                | dbname=doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH;
@@ -59,10 +59,10 @@ ALTER SUBSCRIPTION testsub WITH (SLOT NAME = 'newname');
 ALTER SUBSCRIPTION doesnotexist CONNECTION 'dbname=doesnotexist2';
 ERROR:  subscription "doesnotexist" does not exist
 \dRs+
-                                   List of subscriptions
-  Name   |           Owner           | Enabled |     Publication     |       Conninfo       
----------+---------------------------+---------+---------------------+----------------------
- testsub | regress_subscription_user | f       | {testpub2,testpub3} | dbname=doesnotexist2
+                                              List of subscriptions
+  Name   |           Owner           | Enabled |     Publication     | Synchronous commit |       Conninfo       
+---------+---------------------------+---------+---------------------+--------------------+----------------------
+ testsub | regress_subscription_user | f       | {testpub2,testpub3} | off                | dbname=doesnotexist2
 (1 row)
 
 BEGIN;
@@ -89,11 +89,15 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy;
 ERROR:  must be owner of subscription testsub
 RESET ROLE;
 ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
-\dRs
-                          List of subscriptions
-    Name     |           Owner           | Enabled |     Publication     
--------------+---------------------------+---------+---------------------
- testsub_foo | regress_subscription_user | f       | {testpub2,testpub3}
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local);
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar);
+ERROR:  invalid value for parameter "synchronous_commit": "foobar"
+HINT:  Available values: local, remote_write, remote_apply, on, off.
+\dRs+
+                                                List of subscriptions
+    Name     |           Owner           | Enabled |     Publication     | Synchronous commit |       Conninfo       
+-------------+---------------------------+---------+---------------------+--------------------+----------------------
+ testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | local              | dbname=doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
index 7bdc2b3503c398ea5aeaeee82a1158cc9731a9e7..1b30d150cea15f5d3f04975695c87de73488c26f 100644 (file)
@@ -66,8 +66,10 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy;
 RESET ROLE;
 
 ALTER SUBSCRIPTION testsub RENAME TO testsub_foo;
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = local);
+ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = foobar);
 
-\dRs
+\dRs+
 
 -- rename back to keep the rest simple
 ALTER SUBSCRIPTION testsub_foo RENAME TO testsub;