Clean up latch related code.
authorAndres Freund
Tue, 6 Jun 2017 23:13:00 +0000 (16:13 -0700)
committerAndres Freund
Tue, 6 Jun 2017 23:13:00 +0000 (16:13 -0700)
The larger part of this patch replaces usages of MyProc->procLatch
with MyLatch.  The latter works even early during backend startup,
where MyProc->procLatch doesn't yet.  While the affected code
shouldn't run in cases where it's not initialized, it might get copied
into places where it might.  Using MyLatch is simpler and a bit faster
to boot, so there's little point to stick with the previous coding.

While doing so I noticed some weaknesses around newly introduced uses
of latches that could lead to missed events, and an omitted
CHECK_FOR_INTERRUPTS() call in worker_spi.

As all the actual bugs are in v10 code, there doesn't seem to be
sufficient reason to backpatch this.

Author: Andres Freund
Discussion:
    https://postgr.es/m/20170606195321[email protected]
    https://postgr.es/m/20170606210405[email protected]
Backpatch: -

src/backend/access/transam/parallel.c
src/backend/libpq/pqmq.c
src/backend/postmaster/bgworker.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/storage/lmgr/condition_variable.c
src/test/modules/worker_spi/worker_spi.c

index cb22174270677ce2339bfcfef15d013ffbacf3ce..16a0bee61037b719c2be28f3d750ddf89a9b392a 100644 (file)
@@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
        if (!anyone_alive)
            break;
 
-       WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1,
+       WaitLatch(MyLatch, WL_LATCH_SET, -1,
                  WAIT_EVENT_PARALLEL_FINISH);
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
    }
 
    if (pcxt->toc != NULL)
index 96939327c38a2cee4f4373baaf6c6054f59d5bfd..8fbc03819d96702be8727ddd51e4370c18874a66 100644 (file)
@@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len)
        if (result != SHM_MQ_WOULD_BLOCK)
            break;
 
-       WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0,
+       WaitLatch(MyLatch, WL_LATCH_SET, 0,
                  WAIT_EVENT_MQ_PUT_MESSAGE);
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
        CHECK_FOR_INTERRUPTS();
    }
 
index c3454276bfa406dcc2a47d9737bccd1c643e625e..712d700481db1e69f4a55f39837d6224a5bef0fe 100644 (file)
@@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
        if (status == BGWH_STOPPED)
            break;
 
-       rc = WaitLatch(&MyProc->procLatch,
+       rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
                       WAIT_EVENT_BGWORKER_SHUTDOWN);
 
@@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
            break;
        }
 
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
    }
 
    return status;
index 726d1b5bd81f812649ab7836fc2329eb5993532a..89c34b822521b266eb6757587c32d876e7f30ba3 100644 (file)
@@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
                   ? WL_SOCKET_READABLE
                   : WL_SOCKET_WRITEABLE);
 
-       rc = WaitLatchOrSocket(&MyProc->procLatch,
+       rc = WaitLatchOrSocket(MyLatch,
                               WL_POSTMASTER_DEATH |
                               WL_LATCH_SET | io_flag,
                               PQsocket(conn->streamConn),
@@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
        /* Interrupted? */
        if (rc & WL_LATCH_SET)
        {
-           ResetLatch(&MyProc->procLatch);
+           ResetLatch(MyLatch);
            CHECK_FOR_INTERRUPTS();
        }
 
@@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
             * the signal arrives in the middle of establishment of
             * replication connection.
             */
-           ResetLatch(&MyProc->procLatch);
-           rc = WaitLatchOrSocket(&MyProc->procLatch,
+           rc = WaitLatchOrSocket(MyLatch,
                                   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
                                   WL_LATCH_SET,
                                   PQsocket(streamConn),
                                   0,
                                   WAIT_EVENT_LIBPQWALRECEIVER);
+
+           /* Emergency bailout? */
            if (rc & WL_POSTMASTER_DEATH)
                exit(1);
 
-           /* interrupted */
+           /* Interrupted? */
            if (rc & WL_LATCH_SET)
            {
+               ResetLatch(MyLatch);
                CHECK_FOR_INTERRUPTS();
-               continue;
            }
            if (PQconsumeInput(streamConn) == 0)
                return NULL;    /* trouble */
index 5aaf24bfe4fa0550acf7a080dedd780b7747a30f..5a3274b2c2392509bba5ec4c7f29a866bdc00498 100644 (file)
@@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
+       /* emergency bailout if postmaster has died */
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
-       ResetLatch(MyLatch);
+       if (rc & WL_LATCH_SET)
+       {
+           ResetLatch(MyLatch);
+           CHECK_FOR_INTERRUPTS();
+       }
    }
 
    return;
@@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
        LWLockRelease(LogicalRepWorkerLock);
 
-       CHECK_FOR_INTERRUPTS();
-
        /* Wait for signal. */
-       rc = WaitLatch(&MyProc->procLatch,
+       rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
@@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
-       ResetLatch(&MyProc->procLatch);
+       if (rc & WL_LATCH_SET)
+       {
+           ResetLatch(MyLatch);
+           CHECK_FOR_INTERRUPTS();
+       }
 
        /* Check worker status. */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
        CHECK_FOR_INTERRUPTS();
 
        /* Wait for more work. */
-       rc = WaitLatch(&MyProc->procLatch,
+       rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
 
@@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
-       ResetLatch(&MyProc->procLatch);
+       if (rc & WL_LATCH_SET)
+       {
+           ResetLatch(MyLatch);
+           CHECK_FOR_INTERRUPTS();
+       }
    }
 }
 
@@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg)
        }
 
        /* Wait for more work. */
-       rc = WaitLatch(&MyProc->procLatch,
+       rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       wait_time,
                       WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
@@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
+       if (rc & WL_LATCH_SET)
+       {
+           ResetLatch(MyLatch);
+           CHECK_FOR_INTERRUPTS();
+       }
+
        if (got_SIGHUP)
        {
            got_SIGHUP = false;
            ProcessConfigFile(PGC_SIGHUP);
        }
-
-       ResetLatch(&MyProc->procLatch);
    }
 
    LogicalRepCtx->launcher_pid = 0;
index ed66602935ffdacd2b099430166fc8376edd799c..6e55d2d606950a1738bd17c180c29fff188d6406 100644 (file)
@@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
        if (!worker)
            return false;
 
-       rc = WaitLatch(&MyProc->procLatch,
+       rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
@@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
    }
 
    return false;
@@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state)
        if (MyLogicalRepWorker->relstate == expected_state)
            return true;
 
-       rc = WaitLatch(&MyProc->procLatch,
+       rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
@@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
    }
 
    return false;
@@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
        /*
         * Wait for more data or latch.
         */
-       rc = WaitLatchOrSocket(&MyProc->procLatch,
+       rc = WaitLatchOrSocket(MyLatch,
                               WL_SOCKET_READABLE | WL_LATCH_SET |
                               WL_TIMEOUT | WL_POSTMASTER_DEATH,
                               fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
@@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
    }
 
    return bytesread;
index 51a64487cd9ac934a6a2306a09667a200aee1d5f..999d627c87205629c3c7e3ac784dd7abe22a5f5d 100644 (file)
@@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        /*
         * Wait for more data or latch.
         */
-       rc = WaitLatchOrSocket(&MyProc->procLatch,
+       rc = WaitLatchOrSocket(MyLatch,
                               WL_SOCKET_READABLE | WL_LATCH_SET |
                               WL_TIMEOUT | WL_POSTMASTER_DEATH,
                               fd, NAPTIME_PER_CYCLE,
@@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
+       if (rc & WL_LATCH_SET)
+       {
+           ResetLatch(MyLatch);
+           CHECK_FOR_INTERRUPTS();
+       }
+
        if (got_SIGHUP)
        {
            got_SIGHUP = false;
@@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
            send_feedback(last_received, requestReply, requestReply);
        }
-
-       ResetLatch(&MyProc->procLatch);
    }
 }
 
index 5afb21121b6d08c442e00eb494a19bb571ca710c..b4b7d28dd5dbbff1265e152dea43cfd7603e8bcd 100644 (file)
@@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
    {
        cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
        AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
-                         &MyProc->procLatch, NULL);
+                         MyLatch, NULL);
    }
 
    /*
     * Reset my latch before adding myself to the queue and before entering
     * the caller's predicate loop.
     */
-   ResetLatch(&MyProc->procLatch);
+   ResetLatch(MyLatch);
 
    /* Add myself to the wait queue. */
    SpinLockAcquire(&cv->mutex);
@@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
        WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
 
        /* Reset latch before testing whether we can return. */
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
 
        /*
         * If this process has been taken out of the wait list, then we know
index 9abfc714a997d7af26db101062152221033a0aa1..553baf0045413d783768d70a66c9f83abb38cdbb 100644 (file)
@@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg)
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);
 
+       CHECK_FOR_INTERRUPTS();
+
        /*
         * In case of a SIGHUP, just reload the configuration.
         */