Rename the logical replication global "wrconn"
authorAlvaro Herrera
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
committerAlvaro Herrera
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
The worker.c global wrconn is only meant to be used by logical apply/
tablesync workers, but there are other variables with the same name. To
reduce future confusion rename the global from "wrconn" to
"LogRepWorkerWalRcvConn".

While this is just cosmetic, it seems better to backpatch it all the way
back to 10 where this code appeared, to avoid future backpatching
issues.

Author: Peter Smith 
Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com

src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index ff985b9b24ca1bf12f8e4d185adb3ec8708d3ff7..0a34d269859a1d68f7d1fde1736642a1e93ee61a 100644 (file)
@@ -704,8 +704,8 @@ static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
    /* Disconnect gracefully from the remote side. */
-   if (wrconn)
-       walrcv_disconnect(wrconn);
+   if (LogRepWorkerWalRcvConn)
+       walrcv_disconnect(LogRepWorkerWalRcvConn);
 
    logicalrep_worker_detach();
 
index c27d97058955a9c6023afb71b8017648cd740ea3..a3989d40dd1ff55d27ed00c8a1adb5198a246c08 100644 (file)
@@ -288,7 +288,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                   MyLogicalRepWorker->relstate,
                                   MyLogicalRepWorker->relstate_lsn);
 
-       walrcv_endstreaming(wrconn, &tli);
+       walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
        finish_sync_worker();
    }
    else
@@ -584,7 +584,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
        for (;;)
        {
            /* Try read the data. */
-           len = walrcv_receive(wrconn, &buf, &fd);
+           len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
            CHECK_FOR_INTERRUPTS();
 
@@ -657,7 +657,8 @@ fetch_remote_table_info(char *nspname, char *relname,
                     "   AND c.relname = %s",
                     quote_literal_cstr(nspname),
                     quote_literal_cstr(relname));
-   res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                     lengthof(tableRow), tableRow);
 
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
@@ -695,9 +696,11 @@ fetch_remote_table_info(char *nspname, char *relname,
                     "   AND a.attrelid = %u"
                     " ORDER BY a.attnum",
                     lrel->remoteid,
-                    (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+                    (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+                     "AND a.attgenerated = ''" : ""),
                     lrel->remoteid);
-   res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                     lengthof(attrRow), attrRow);
 
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
@@ -784,7 +787,7 @@ copy_table(Relation rel)
        appendStringInfo(&cmd, " FROM %s) TO STDOUT",
                         quote_qualified_identifier(lrel.nspname, lrel.relname));
    }
-   res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    pfree(cmd.data);
    if (res->status != WALRCV_OK_COPY_OUT)
        ereport(ERROR,
@@ -851,8 +854,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
     * application_name, so that it is different from the main apply worker,
     * so that synchronous replication can distinguish them.
     */
-   wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
-   if (wrconn == NULL)
+   LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                           slotname, &err);
+   if (LogRepWorkerWalRcvConn == NULL)
        ereport(ERROR,
                (errmsg("could not connect to the publisher: %s", err)));
 
@@ -897,7 +901,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 * inside the transaction so that we can use the snapshot made
                 * by the slot to get existing data.
                 */
-               res = walrcv_exec(wrconn,
+               res = walrcv_exec(LogRepWorkerWalRcvConn,
                                  "BEGIN READ ONLY ISOLATION LEVEL "
                                  "REPEATABLE READ", 0, NULL);
                if (res->status != WALRCV_OK_COMMAND)
@@ -914,14 +918,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 * that is consistent with the lsn used by the slot to start
                 * decoding.
                 */
-               walrcv_create_slot(wrconn, slotname, true,
+               walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
                                   CRS_USE_SNAPSHOT, origin_startpos);
 
                PushActiveSnapshot(GetTransactionSnapshot());
                copy_table(rel);
                PopActiveSnapshot();
 
-               res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+               res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
                if (res->status != WALRCV_OK_COMMAND)
                    ereport(ERROR,
                            (errmsg("table copy could not finish transaction on publisher"),
index f6c0c28672ef1a039f0fd030220f925262739ad0..ff887ea437a422cbe9be608e5741889b3504990d 100644 (file)
@@ -100,7 +100,7 @@ typedef struct SlotErrCallbackArg
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool       MySubscriptionValid = false;
@@ -1517,7 +1517,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
        MemoryContextSwitchTo(ApplyMessageContext);
 
-       len = walrcv_receive(wrconn, &buf, &fd);
+       len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
        if (len != 0)
        {
@@ -1597,7 +1597,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                    MemoryContextReset(ApplyMessageContext);
                }
 
-               len = walrcv_receive(wrconn, &buf, &fd);
+               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
            }
        }
 
@@ -1627,7 +1627,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        {
            TimeLineID  tli;
 
-           walrcv_endstreaming(wrconn, &tli);
+           walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
            break;
        }
 
@@ -1790,7 +1790,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
         (uint32) (flushpos >> 32), (uint32) flushpos
        );
 
-   walrcv_send(wrconn, reply_message->data, reply_message->len);
+   walrcv_send(LogRepWorkerWalRcvConn,
+               reply_message->data, reply_message->len);
 
    if (recvpos > last_recvpos)
        last_recvpos = recvpos;
@@ -2088,9 +2089,9 @@ ApplyWorkerMain(Datum main_arg)
        origin_startpos = replorigin_session_get_progress(false);
        CommitTransactionCommand();
 
-       wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
-                               &err);
-       if (wrconn == NULL)
+       LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                               MySubscription->name, &err);
+       if (LogRepWorkerWalRcvConn == NULL)
            ereport(ERROR,
                    (errmsg("could not connect to the publisher: %s", err)));
 
@@ -2098,7 +2099,7 @@ ApplyWorkerMain(Datum main_arg)
         * We don't really use the output identify_system for anything but it
         * does some initializations on the upstream so let's still call it.
         */
-       (void) walrcv_identify_system(wrconn, &startpointTLI);
+       (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
    }
 
    /*
@@ -2117,7 +2118,7 @@ ApplyWorkerMain(Datum main_arg)
    options.proto.logical.publication_names = MySubscription->publications;
 
    /* Start normal logical streaming replication. */
-   walrcv_startstreaming(wrconn, &options);
+   walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
    /* Run the main loop. */
    LogicalRepApplyLoop(origin_startpos);
index 8ed7e45056c9805ede8845ea113ddbd0663780bb..58abc3eadcde97ebe8b3b09e0ed8009dd3c5d4d7 100644 (file)
@@ -60,7 +60,7 @@ typedef struct LogicalRepWorker
 extern MemoryContext ApplyContext;
 
 /* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
 
 /* Worker and subscription objects. */
 extern Subscription *MySubscription;