Fix code so that we recover cleanly if there are no free semaphores
authorTom Lane
Tue, 4 Sep 2001 21:42:17 +0000 (21:42 +0000)
committerTom Lane
Tue, 4 Sep 2001 21:42:17 +0000 (21:42 +0000)
available in freeSemMap.  As noted by Tatsuo, this is now a likely
scenario for detecting MaxBackends-exceeded; if MaxBackends is a multiple
of PROC_NSEMS_PER_SET then we will fail here and not in sinval.c.  The
cleanup path did not work correctly before, anyway.

src/backend/storage/lmgr/proc.c

index f491bc16f70b5918ef1bd3558c991208c45d807a..71f85cbe58ff56eb22fa18e48a7a9c09c9575849 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.105 2001/09/04 02:26:57 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.106 2001/09/04 21:42:17 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -186,6 +186,17 @@ InitProcess(void)
    unsigned long location,
                myOffset;
 
+   /*
+    * ProcStructLock protects the freelist of PROC entries and the map
+    * of free semaphores.  Note that when we acquire it here, we do not
+    * have a PROC entry and so the ownership of the spinlock is not
+    * recorded anywhere; even if it was, until we register ProcKill as
+    * an on_shmem_exit callback, there is no exit hook that will cause
+    * owned spinlocks to be released.  Upshot: during the first part of
+    * this routine, be careful to release the lock manually before any
+    * elog(), else you'll have a stuck spinlock to add to your woes.
+    */
+
    SpinAcquire(ProcStructLock);
 
    /* attach to the ProcGlobal structure */
@@ -194,13 +205,14 @@ InitProcess(void)
    if (!found)
    {
        /* this should not happen. InitProcGlobal() is called before this. */
+       SpinRelease(ProcStructLock);
        elog(STOP, "InitProcess: Proc Header uninitialized");
    }
 
    if (MyProc != NULL)
    {
        SpinRelease(ProcStructLock);
-       elog(ERROR, "ProcInit: you already exist");
+       elog(ERROR, "InitProcess: you already exist");
    }
 
    /* try to get a proc struct from the free list first */
@@ -214,14 +226,12 @@ InitProcess(void)
    }
    else
    {
-
        /*
         * have to allocate one.  We can't use the normal shmem index
         * table mechanism because the proc structure is stored by PID
         * instead of by a global name (need to look it up by PID when we
         * cleanup dead processes).
         */
-
        MyProc = (PROC *) ShmemAlloc(sizeof(PROC));
        if (!MyProc)
        {
@@ -231,44 +241,32 @@ InitProcess(void)
    }
 
    /*
-    * zero out the spin lock counts and set the sLocks field for
-    * ProcStructLock to 1 as we have acquired this spinlock above but
-    * didn't record it since we didn't have MyProc until now.
-    */
-   MemSet(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
-   MyProc->sLocks[ProcStructLock] = 1;
-
-   /*
-    * Set up a wait-semaphore for the proc.
+    * Initialize all fields of MyProc.
     */
-   if (IsUnderPostmaster)
-   {
-       ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum);
-
-       /*
-        * we might be reusing a semaphore that belongs to a dead backend.
-        * So be careful and reinitialize its value here.
-        */
-       ZeroProcSemaphore(MyProc);
-   }
-   else
-   {
-       MyProc->sem.semId = -1;
-       MyProc->sem.semNum = -1;
-   }
-
    SHMQueueElemInit(&(MyProc->links));
+   MyProc->sem.semId = -1;     /* no wait-semaphore acquired yet */
+   MyProc->sem.semNum = -1;
    MyProc->errType = STATUS_OK;
-   MyProc->pid = MyProcPid;
-   MyProc->databaseId = MyDatabaseId;
    MyProc->xid = InvalidTransactionId;
    MyProc->xmin = InvalidTransactionId;
+   MyProc->logRec.xrecoff = 0;
    MyProc->waitLock = NULL;
    MyProc->waitHolder = NULL;
+   MyProc->pid = MyProcPid;
+   MyProc->databaseId = MyDatabaseId;
    SHMQueueInit(&(MyProc->procHolders));
+   /*
+    * Zero out the spin lock counts and set the sLocks field for
+    * ProcStructLock to 1 as we have acquired this spinlock above but
+    * didn't record it since we didn't have MyProc until now.
+    */
+   MemSet(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
+   MyProc->sLocks[ProcStructLock] = 1;
 
    /*
-    * Release the lock.
+    * Release the lock while accessing shmem index; we still haven't
+    * installed ProcKill and so we don't want to hold lock if there's
+    * an error.
     */
    SpinRelease(ProcStructLock);
 
@@ -283,10 +281,28 @@ InitProcess(void)
        elog(STOP, "InitProcess: ShmemPID table broken");
 
    /*
-    * Arrange to clean up at backend exit.
+    * Arrange to clean up at backend exit.  Once we do this, owned
+    * spinlocks will be released on exit, and so we can be a lot less
+    * tense about errors.
     */
    on_shmem_exit(ProcKill, 0);
 
+   /*
+    * Set up a wait-semaphore for the proc.  (Do this last so that we
+    * can rely on ProcKill to clean up if it fails.)
+    */
+   if (IsUnderPostmaster)
+   {
+       SpinAcquire(ProcStructLock);
+       ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum);
+       SpinRelease(ProcStructLock);
+       /*
+        * We might be reusing a semaphore that belongs to a dead backend.
+        * So be careful and reinitialize its value here.
+        */
+       ZeroProcSemaphore(MyProc);
+   }
+
    /*
     * Now that we have a PROC, we could try to acquire locks, so
     * initialize the deadlock checker.
@@ -425,8 +441,9 @@ ProcKill(void)
 
    SpinAcquire(ProcStructLock);
 
-   /* Free up my wait semaphore */
-   ProcFreeSem(MyProc->sem.semId, MyProc->sem.semNum);
+   /* Free up my wait semaphore, if I got one */
+   if (MyProc->sem.semId >= 0)
+       ProcFreeSem(MyProc->sem.semId, MyProc->sem.semNum);
 
    /* Add PROC struct to freelist so space can be recycled in future */
    MyProc->links.next = ProcGlobal->freeProcs;
@@ -993,10 +1010,7 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum)
        {
            if ((freeSemMap[i] & mask) == 0)
            {
-
-               /*
-                * a free semaphore found. Mark it as allocated.
-                */
+               /* A free semaphore found. Mark it as allocated. */
                freeSemMap[i] |= mask;
 
                *semId = procSemIds[i];
@@ -1007,8 +1021,13 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum)
        }
    }
 
-   /* if we reach here, all the semaphores are in use. */
-   elog(ERROR, "ProcGetNewSemIdAndNum: cannot allocate a free semaphore");
+   /*
+    * If we reach here, all the semaphores are in use.  This is one of the
+    * possible places to detect "too many backends", so give the standard
+    * error message.  (Whether we detect it here or in sinval.c depends on
+    * whether MaxBackends is a multiple of PROC_NSEMS_PER_SET.)
+    */
+   elog(FATAL, "Sorry, too many clients already");
 }
 
 /*