Hi, Bruce!
authorBruce Momjian
Thu, 13 May 1999 15:55:45 +0000 (15:55 +0000)
committerBruce Momjian
Thu, 13 May 1999 15:55:45 +0000 (15:55 +0000)
These are my last changes to lmgr fixing deadlock handling.
Please apply them to cvs...

Vadim

src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h

index 6c3437837df504be3774d5adcd8cf86c3e006d04..09e85e5e1341b0f867f641346ae2e54820ec7615 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.51 1999/05/10 00:45:43 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.52 1999/05/13 15:55:44 momjian Exp $
  *
  * NOTES
  *   Outside modules can create a lock table and acquire/release
@@ -83,9 +83,9 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 #define LOCK_PRINT_AUX(where,lock,type) \
    TPRINTF(TRACE_ALL, \
-        "%s: lock(%x) tbl(%d) rel(%u) db(%d) obj(%u) mask(%x) " \
-        "hold(%d,%d,%d,%d,%d)=%d " \
-        "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+        "%s: lock(%x) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) " \
+        "hold(%d,%d,%d,%d,%d,%d,%d)=%d " \
+        "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
         where, \
         MAKE_OFFSET(lock), \
         lock->tag.lockmethod, \
@@ -98,12 +98,16 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
         lock->holders[3], \
         lock->holders[4], \
         lock->holders[5], \
+        lock->holders[6], \
+        lock->holders[7], \
         lock->nHolding, \
         lock->activeHolders[1], \
         lock->activeHolders[2], \
         lock->activeHolders[3], \
         lock->activeHolders[4], \
         lock->activeHolders[5], \
+        lock->activeHolders[6], \
+        lock->activeHolders[7], \
         lock->nActive, \
         lock->waitProcs.size, \
         lock_types[type])
@@ -119,8 +123,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 #define XID_PRINT_AUX(where,xidentP) \
    TPRINTF(TRACE_ALL, \
-        "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
-        "hold(%d,%d,%d,%d,%d)=%d", \
+        "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%u) " \
+        "hold(%d,%d,%d,%d,%d,%d,%d)=%d", \
         where, \
         MAKE_OFFSET(xidentP), \
         xidentP->tag.lock, \
@@ -132,6 +136,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
         xidentP->holders[3], \
         xidentP->holders[4], \
         xidentP->holders[5], \
+        xidentP->holders[6], \
+        xidentP->holders[7], \
         xidentP->nHolding)
 
 #else                          /* !LOCK_MGR_DEBUG */
@@ -1561,19 +1567,26 @@ LockingDisabled()
  * We have already locked the master lock before being called.
  */
 bool
-DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
+DeadLockCheck(void *proc, LOCK *findlock)
 {
-   int                     done;
    XIDLookupEnt           *xidLook = NULL;
    XIDLookupEnt           *tmp = NULL;
+   PROC                   *thisProc = (PROC*) proc,
+                          *waitProc;
+   SHM_QUEUE              *lockQueue = &(thisProc->lockQueue);
    SHMEM_OFFSET            end = MAKE_OFFSET(lockQueue);
    LOCK                   *lock;
+   PROC_QUEUE             *waitQueue;
+   int                     i,
+                           j;
+   bool                    first_run = (thisProc == MyProc),
+                           done;
 
    static PROC            *checked_procs[MAXBACKENDS];
    static int              nprocs;
 
    /* initialize at start of recursion */
-   if (skip_check)
+   if (first_run)
    {
        checked_procs[0] = MyProc;
        nprocs = 1;
@@ -1593,74 +1606,186 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 
        LOCK_PRINT("DeadLockCheck", lock, 0);
 
+       if (lock->tag.relId == 0)   /* user' lock */
+           goto nxtl;
+
        /*
-        * This is our only check to see if we found the lock we want.
-        *
-        * The lock we are waiting for is already in MyProc->lockQueue so we
-        * need to skip it here.  We are trying to find it in someone
-        * else's lockQueue.   bjm
+        * waitLock is always in lockQueue of waiting proc,
+        * if !first_run then upper caller will handle waitProcs
+        * queue of waitLock.
         */
-       if (lock == findlock && !skip_check)
-           return true;
+       if (thisProc->waitLock == lock && !first_run)
+           goto nxtl;
 
+       /*
+        * If we found proc holding findlock and sleeping on some my 
+        * other lock then we have to check does it block me or
+        * another waiters.
+        */
+       if (lock == findlock && !first_run)
        {
-           PROC_QUEUE *waitQueue = &(lock->waitProcs);
-           PROC       *proc;
-           int         i;
-           int         j;
+           LOCKMETHODCTL  *lockctl = 
+                           LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+           int             lm;
 
-           proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
-           for (i = 0; i < waitQueue->size; i++)
+           Assert(xidLook->nHolding > 0);
+           for (lm = 1; lm <= lockctl->numLockModes; lm++)
            {
+               if (xidLook->holders[lm] > 0 && 
+                   lockctl->conflictTab[lm] & findlock->waitMask)
+                   return true;
+           }
+           /*
+            * Else - get the next lock from thisProc's lockQueue
+            */
+           goto nxtl;  
+       }
+
+       waitQueue = &(lock->waitProcs);
+       waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+
+       for (i = 0; i < waitQueue->size; i++)
+       {
+           if (waitProc == thisProc)
+           {
+               Assert(waitProc->waitLock == lock);
+               Assert(waitProc == MyProc);
+               waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+               continue;
+           }
+           if (lock == findlock)   /* first_run also true */
+           {
+               LOCKMETHODCTL  *lockctl = 
+                       LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+
                /*
-                * If I hold some locks on findlock and another proc 
-                * waits on it holding locks too - check if we are
-                * waiting one another.
+                * If me blocked by his holdlock...
                 */
-               if (proc != MyProc &&
-                   lock == findlock && /* skip_check also true */
-                   MyProc->holdLock)
+               if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
                {
-                   LOCKMETHODCTL  *lockctl = 
-                           LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-
-                   Assert(skip_check);
-                   if (lockctl->conflictTab[MyProc->token] & proc->holdLock && 
-                       lockctl->conflictTab[proc->token] & MyProc->holdLock)
+                   /* and he blocked by me -> deadlock */
+                   if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
                        return true;
+                   /* we shouldn't look at lockQueue of our blockers */
+                   waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                   continue;
                }
-
                /*
-                * No sense in looking at the wait queue of the lock we
-                * are looking for. If lock == findlock, and I got here,
-                * skip_check must be true too.
+                * If he isn't blocked by me and we request non-conflicting 
+                * lock modes - no deadlock here because of he isn't
+                * blocked by me in any sence (explicitle or implicitly). 
+                * Note that we don't do like test if !first_run
+                * (when thisProc is holder and non-waiter on lock) and so
+                * we call DeadLockCheck below for every waitProc in
+                * thisProc->lockQueue, even for waitProc-s un-blocked
+                * by thisProc. Should we? This could save us some time...
                 */
-               if (lock != findlock)
+               if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) && 
+                   !(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
                {
-                   for (j = 0; j < nprocs; j++)
-                       if (checked_procs[j] == proc)
-                           break;
-                   if (j >= nprocs && lock != findlock)
-                   {
-                       Assert(nprocs < MAXBACKENDS);
-                       checked_procs[nprocs++] = proc;
+                   waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                   continue;
+               }
+           }
 
+           /*
+            * Look in lockQueue of this waitProc, if didn't do this before.
+            */
+           for (j = 0; j < nprocs; j++)
+           {
+               if (checked_procs[j] == waitProc)
+                   break;
+           }
+           if (j >= nprocs)
+           {
+               Assert(nprocs < MAXBACKENDS);
+               checked_procs[nprocs++] = waitProc;
+
+               if (DeadLockCheck(waitProc, findlock))
+               {
+                   LOCKMETHODCTL  *lockctl = 
+                           LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+                   int             holdLock;
+
+                   /*
+                    * Ok, but is waitProc waiting for me (thisProc) ?
+                    */
+                   if (thisProc->waitLock == lock)
+                   {
+                       Assert(first_run);
+                       holdLock = thisProc->holdLock;
+                   }
+                   else    /* should we cache holdLock ? */
+                   {
+                       int lm, tmpMask = 2;
+
+                       Assert(xidLook->nHolding > 0);
+                       for (holdLock = 0, lm = 1; 
+                               lm <= lockctl->numLockModes; 
+                               lm++, tmpMask <<= 1)
+                       {
+                           if (xidLook->holders[lm] > 0)
+                               holdLock |= tmpMask;
+                       }
+                       Assert(holdLock != 0);
+                   }
+                   if (lockctl->conflictTab[waitProc->token] & holdLock)
+                   {
                        /*
-                        * For non-MyProc entries, we are looking only
-                        * waiters, not necessarily people who already
-                        * hold locks and are waiting. Now we check for
-                        * cases where we have two or more tables in a
-                        * deadlock.  We do this by continuing to search
-                        * for someone holding a lock       bjm
+                        * Last attempt to avoid deadlock - try to wakeup
+                        * myself.
                         */
-                       if (DeadLockCheck(&(proc->lockQueue), findlock, false))
-                           return true;
+                       if (first_run)
+                       {
+                           if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+                                                    MyProc->waitLock,
+                                                    MyProc->token,
+                                                    MyProc->xid,
+                                                    NULL) == STATUS_OK)
+                           {
+                               GrantLock(MyProc->waitLock, MyProc->token);
+                               (MyProc->waitLock->waitProcs.size)--;
+                               ProcWakeup(MyProc, NO_ERROR);
+                               return false;
+                           }
+                       }
+                       return true;
+                   }
+                   /*
+                    * Hell! Is he blocked by any (other) holder ?
+                    */
+                   if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+                                            lock,
+                                            waitProc->token,
+                                            waitProc->xid,
+                                            NULL) != STATUS_OK)
+                   {
+                       /*
+                        * Blocked by others - no deadlock...
+                        */
+#ifdef DEADLOCK_DEBUG
+                       LOCK_PRINT("DeadLockCheck: blocked by others", 
+                                       lock, waitProc->token);
+#endif
+                       waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                       continue;
                    }
+                   /*
+                    * Well - wakeup this guy! This is the case of
+                    * implicit blocking: thisProc blocked someone who blocked
+                    * waitProc by the fact that he (someone) is already
+                    * waiting for lock (we do this for anti-starving).
+                    */
+                   GrantLock(lock, waitProc->token);
+                   waitQueue->size--;
+                   waitProc = ProcWakeup(waitProc, NO_ERROR);
+                   continue;
                }
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
            }
+           waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
        }
 
+nxtl:;
        if (done)
            break;
        SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
index b80d32e1b44e727c3b4cbb9568936c884781f283..4eb97b8a6a86ff3f3b3b1c36ed5b84b15b7daff4 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *     This is so that we can support more backends. (system-wide semaphore
  *     sets run out pretty fast.)                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
  */
 #include 
 #include 
@@ -78,7 +78,6 @@
 #include "utils/trace.h"
 
 static void HandleDeadLock(int sig);
-static PROC *ProcWakeup(PROC *proc, int errType);
 static void ProcFreeAllSemaphores(void);
 
 #define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
@@ -640,7 +639,7 @@ rt:;
  *  remove the process from the wait queue and set its links invalid.
  *  RETURN: the next process in the wait queue.
  */
-static PROC *
+PROC *
 ProcWakeup(PROC *proc, int errType)
 {
    PROC       *retProc;
@@ -806,10 +805,10 @@ HandleDeadLock(int sig)
    DumpAllLocks();
 #endif
 
-   if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
+   MyProc->errType = STATUS_NOT_FOUND;
+   if (!DeadLockCheck(MyProc, MyProc->waitLock))
    {
        UnlockLockTable();
-       MyProc->errType = STATUS_NOT_FOUND;
        return;
    }
 
index 387f164247cacbad8b8457c646d7390f2e76c38c..ab084eb94fdb8db5d6e383a5a70cb6317016595a 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
+ * $Id: lock.h,v 1.26 1999/05/13 15:55:44 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -249,8 +249,7 @@ extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
 extern int LockShmemSize(int maxBackends);
 extern bool LockingDisabled(void);
-extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
-             bool skip_check);
+extern bool DeadLockCheck(void *proc, LOCK *findlock);
 
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
index 53b677858fd59c4d88bef2fdcc7c742addf42219..3dcf1e281b5c6772baa09a9a7a67d23eef1aa244 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
+ * $Id: proc.h,v 1.22 1999/05/13 15:55:45 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -119,6 +119,7 @@ extern bool ProcRemove(int pid);
 extern void ProcQueueInit(PROC_QUEUE *queue);
 extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token, 
                    LOCK *lock);
+extern PROC *ProcWakeup(PROC *proc, int errType);
 extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
               LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);