Use 64-bit atomics for xlblocks array elements.
authorJeff Davis
Wed, 20 Dec 2023 01:35:42 +0000 (17:35 -0800)
committerJeff Davis
Wed, 20 Dec 2023 01:35:42 +0000 (17:35 -0800)
In preparation for reading the contents of WAL buffers without a
lock. Also, avoids the previously-needed comment in GetXLogBuffer()
explaining why it's safe from torn reads.

Author: Bharath Rupireddy
Discussion: https://postgr.es/m/CALj2ACVfFMfqD5oLzZSQQZWfXiJqd-NdX0_317veP6FuB31QWA@mail.gmail.com
Reviewed-by: Andres Freund
src/backend/access/transam/xlog.c

index 01e0484584b4becb91ac6ea8cf3bad0ac672cb6e..1c77daa611a1d4974a8e3d9af935f0f54e9223d7 100644 (file)
@@ -501,7 +501,7 @@ typedef struct XLogCtlData
     * WALBufMappingLock.
     */
    char       *pages;          /* buffers for unwritten XLOG pages */
-   XLogRecPtr *xlblocks;       /* 1st byte ptr-s + XLOG_BLCKSZ */
+   pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
    int         XLogCacheBlck;  /* highest allocated xlog buffer index */
 
    /*
@@ -1636,20 +1636,16 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
     * out to disk and evicted, and the caller is responsible for making sure
     * that doesn't happen.
     *
-    * However, we don't hold a lock while we read the value. If someone has
-    * just initialized the page, it's possible that we get a "torn read" of
-    * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
-    * that case we will see a bogus value. That's ok, we'll grab the mapping
-    * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
-    * the page we're looking for. But it means that when we do this unlocked
-    * read, we might see a value that appears to be ahead of the page we're
-    * looking for. Don't PANIC on that, until we've verified the value while
-    * holding the lock.
+    * We don't hold a lock while we read the value. If someone is just about
+    * to initialize or has just initialized the page, it's possible that we
+    * get InvalidXLogRecPtr. That's ok, we'll grab the mapping lock (in
+    * AdvanceXLInsertBuffer) and retry if we see anything other than the page
+    * we're looking for.
     */
    expectedEndPtr = ptr;
    expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
 
-   endptr = XLogCtl->xlblocks[idx];
+   endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
    if (expectedEndPtr != endptr)
    {
        XLogRecPtr  initializedUpto;
@@ -1680,7 +1676,7 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
        WALInsertLockUpdateInsertingAt(initializedUpto);
 
        AdvanceXLInsertBuffer(ptr, tli, false);
-       endptr = XLogCtl->xlblocks[idx];
+       endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
 
        if (expectedEndPtr != endptr)
            elog(PANIC, "could not find WAL buffer for %X/%X",
@@ -1867,7 +1863,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
         * be zero if the buffer hasn't been used yet).  Fall through if it's
         * already written out.
         */
-       OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
+       OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
        if (LogwrtResult.Write < OldPageRqstPtr)
        {
            /*
@@ -1989,8 +1985,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
         */
        pg_write_barrier();
 
-       *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
-
+       pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
        XLogCtl->InitializedUpTo = NewPageEndPtr;
 
        npages++;
@@ -2208,7 +2203,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
         * if we're passed a bogus WriteRqst.Write that is past the end of the
         * last page that's been initialized by AdvanceXLInsertBuffer.
         */
-       XLogRecPtr  EndPtr = XLogCtl->xlblocks[curridx];
+       XLogRecPtr  EndPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[curridx]);
 
        if (LogwrtResult.Write >= EndPtr)
            elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
@@ -4632,7 +4627,7 @@ XLOGShmemSize(void)
    /* WAL insertion locks, plus alignment */
    size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
    /* xlblocks array */
-   size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
+   size = add_size(size, mul_size(sizeof(pg_atomic_uint64), XLOGbuffers));
    /* extra alignment padding for XLOG I/O buffers */
    size = add_size(size, Max(XLOG_BLCKSZ, PG_IO_ALIGN_SIZE));
    /* and the buffers themselves */
@@ -4710,10 +4705,13 @@ XLOGShmemInit(void)
     * needed here.
     */
    allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
-   XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
-   memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
-   allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
+   XLogCtl->xlblocks = (pg_atomic_uint64 *) allocptr;
+   allocptr += sizeof(pg_atomic_uint64) * XLOGbuffers;
 
+   for (i = 0; i < XLOGbuffers; i++)
+   {
+       pg_atomic_init_u64(&XLogCtl->xlblocks[i], InvalidXLogRecPtr);
+   }
 
    /* WAL insertion locks. Ensure they're aligned to the full padded size */
    allocptr += sizeof(WALInsertLockPadded) -
@@ -5750,7 +5748,7 @@ StartupXLOG(void)
        memcpy(page, endOfRecoveryInfo->lastPage, len);
        memset(page + len, 0, XLOG_BLCKSZ - len);
 
-       XLogCtl->xlblocks[firstIdx] = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
+       pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
        XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
    }
    else