Seems I was too optimistic in supposing that sinval's maxMsgNum could be
authorTom Lane
Fri, 20 Jun 2008 00:24:53 +0000 (00:24 +0000)
committerTom Lane
Fri, 20 Jun 2008 00:24:53 +0000 (00:24 +0000)
read and written without a lock.  The value itself is atomic, sure, but on
processors with weak memory ordering it's possible for a reader to see the
value change before it sees the associated message written into the buffer
array.  Fix by introducing a spinlock that's used just to read and write
maxMsgNum.  (We could do this with less overhead if we recognized a concept
of "memory access barrier"; is it worth introducing such a thing?  At the
moment probably not --- I can't measure any clear slowdown from adding the
spinlock, so this solution is probably fine.)  Per buildfarm results.

src/backend/storage/ipc/sinvaladt.c

index 0befc4a93419a526d857219c7a80dc60a38e3436..e414e4a507105aef95fdde4e830b915b60490aa9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.72 2008/06/20 00:24:53 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,6 +23,7 @@
 #include "storage/proc.h"
 #include "storage/shmem.h"
 #include "storage/sinvaladt.h"
+#include "storage/spin.h"
 
 
 /*
  * can operate in parallel with one or more readers, because the writer
  * has no need to touch anyone's ProcState, except in the infrequent cases
  * when SICleanupQueue is needed.  The only point of overlap is that
- * the writer might change maxMsgNum while readers are looking at it.
- * This should be okay: we are assuming that fetching or storing an int
- * is atomic, an assumption also made elsewhere in Postgres.  However
- * readers mustn't assume that maxMsgNum isn't changing under them.
+ * the writer wants to change maxMsgNum while readers need to read it.
+ * We deal with that by having a spinlock that readers must take for just
+ * long enough to read maxMsgNum, while writers take it for just long enough
+ * to write maxMsgNum.  (The exact rule is that you need the spinlock to
+ * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * spinlock to write maxMsgNum unless you are holding both locks.)
+ *
+ * Note: since maxMsgNum is an int and hence presumably atomically readable/
+ * writable, the spinlock might seem unnecessary.  The reason it is needed
+ * is to provide a memory barrier: we need to be sure that messages written
+ * to the array are actually there before maxMsgNum is increased, and that
+ * readers will see that data after fetching maxMsgNum.  Multiprocessors
+ * that have weak memory-ordering guarantees can fail without the memory
+ * barrier instructions that are included in the spinlock sequences.
  */
 
 
@@ -153,6 +164,8 @@ typedef struct SISeg
    int         lastBackend;    /* index of last active procState entry, +1 */
    int         maxBackends;    /* size of procState array */
 
+   slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
+
    /*
     * Circular buffer holding shared-inval messages
     */
@@ -209,12 +222,13 @@ CreateSharedInvalidationState(void)
    if (found)
        return;
 
-   /* Clear message counters, save size of procState array */
+   /* Clear message counters, save size of procState array, init spinlock */
    shmInvalBuffer->minMsgNum = 0;
    shmInvalBuffer->maxMsgNum = 0;
    shmInvalBuffer->nextThreshold = CLEANUP_MIN;
    shmInvalBuffer->lastBackend = 0;
    shmInvalBuffer->maxBackends = MaxBackends;
+   SpinLockInit(&shmInvalBuffer->msgnumLock);
 
    /* The buffer[] array is initially all unused, so we need not fill it */
 
@@ -365,6 +379,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
    {
        int     nthistime = Min(n, WRITE_QUANTUM);
        int     numMsgs;
+       int     max;
 
        n -= nthistime;
 
@@ -388,10 +403,21 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
        /*
         * Insert new message(s) into proper slot of circular buffer
         */
+       max = segP->maxMsgNum;
        while (nthistime-- > 0)
        {
-           segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
-           segP->maxMsgNum++;
+           segP->buffer[max % MAXNUMMESSAGES] = *data++;
+           max++;
+       }
+
+       /* Update current value of maxMsgNum using spinlock */
+       {
+           /* use volatile pointer to prevent code rearrangement */
+           volatile SISeg *vsegP = segP;
+
+           SpinLockAcquire(&vsegP->msgnumLock);
+           vsegP->maxMsgNum = max;
+           SpinLockRelease(&vsegP->msgnumLock);
        }
 
        LWLockRelease(SInvalWriteLock);
@@ -431,6 +457,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 {
    SISeg      *segP;
    ProcState  *stateP;
+   int         max;
    int         n;
    
    LWLockAcquire(SInvalReadLock, LW_SHARED);
@@ -438,6 +465,16 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
    segP = shmInvalBuffer;
    stateP = &segP->procState[MyBackendId - 1];
 
+   /* Fetch current value of maxMsgNum using spinlock */
+   {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile SISeg *vsegP = segP;
+
+       SpinLockAcquire(&vsegP->msgnumLock);
+       max = vsegP->maxMsgNum;
+       SpinLockRelease(&vsegP->msgnumLock);
+   }
+
    if (stateP->resetState)
    {
        /*
@@ -445,7 +482,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
         * since the reset, as well; and that means we should clear the
         * signaled flag, too.
         */
-       stateP->nextMsgNum = segP->maxMsgNum;
+       stateP->nextMsgNum = max;
        stateP->resetState = false;
        stateP->signaled = false;
        LWLockRelease(SInvalReadLock);
@@ -459,13 +496,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     * There may be other backends that haven't read the message(s), so we
     * cannot delete them here.  SICleanupQueue() will eventually remove them
     * from the queue.
-    *
-    * Note: depending on the compiler, we might read maxMsgNum only once
-    * here, or each time through the loop.  It doesn't matter (as long as
-    * each fetch is atomic).
     */
    n = 0;
-   while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
+   while (n < datasize && stateP->nextMsgNum < max)
    {
        data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
        stateP->nextMsgNum++;
@@ -474,7 +507,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
    /*
     * Reset our "signaled" flag whenever we have caught up completely.
     */
-   if (stateP->nextMsgNum >= segP->maxMsgNum)
+   if (stateP->nextMsgNum >= max)
        stateP->signaled = false;
 
    LWLockRelease(SInvalReadLock);