Fix LMGR for MVCC.
authorVadim B. Mikheev
Fri, 7 May 1999 01:23:11 +0000 (01:23 +0000)
committerVadim B. Mikheev
Fri, 7 May 1999 01:23:11 +0000 (01:23 +0000)
Get rid of Extend lock mode.

src/backend/access/heap/hio.c
src/backend/storage/lmgr/lmgr.c
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lmgr.h
src/include/storage/lock.h
src/include/storage/proc.h

index 3b4fa4af0bb08daccde6296e0bdd12b1d0b6d058..f991b206d4426f147e4bac3242c6f0aa61dfa2b9 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Id: hio.c,v 1.18 1999/05/01 15:04:46 vadim Exp $
+ *   $Id: hio.c,v 1.19 1999/05/07 01:22:53 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -111,14 +111,13 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
    Item        item;
 
    /*
-    * Actually, we lock _relation_ here, not page, but I believe
-    * that locking page is faster... Obviously, we could get rid
-    * of ExtendLock mode at all and use ExclusiveLock mode on
-    * page 0, as long as we use page-level locking for indices only,
-    * but we are in 6.5-beta currently...  - vadim 05/01/99
+    * Lock relation for extention. We can use LockPage here as long as 
+    * in all other places we use page-level locking for indices only.
+    * Alternatevely, we could define pseudo-table as we do for
+    * transactions with XactLockTable.
     */
    if (!relation->rd_myxactonly)
-       LockPage(relation, 0, ExtendLock);
+       LockPage(relation, 0, ExclusiveLock);
 
    /*
     * XXX This does an lseek - VERY expensive - but at the moment it is
@@ -166,7 +165,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
    }
 
    if (!relation->rd_myxactonly)
-       UnlockPage(relation, 0, ExtendLock);
+       UnlockPage(relation, 0, ExclusiveLock);
 
    offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data,
                         tuple->t_len, InvalidOffsetNumber, LP_USED);
index 7d34499b01704b7e1c9858a489782b0a136b97fc..7eb51207c7e5a69ca4e1e6214b6dbbb4d1bc77ce 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.22 1999/02/13 23:18:24 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.23 1999/05/07 01:23:02 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -79,9 +79,6 @@ static MASK LockConflicts[] = {
    (1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) | 
    (1 << AccessShareLock),
 
-/* ExtendLock */
-   (1 << ExtendLock)
-
 };
 
 static int LockPrios[] = {
@@ -92,8 +89,7 @@ static int    LockPrios[] = {
    4,
    5,
    6,
-   7,
-   1
+   7
 };
 
 LOCKMETHOD LockTableId = (LOCKMETHOD) NULL;
index c39138ff129fe88371a5bf8d2ef79ac0de3fbf45..c1996fe0713a9f0cfec276ee269f5ed95792fcd4 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.49 1999/04/30 17:03:04 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.50 1999/05/07 01:23:03 vadim Exp $
  *
  * NOTES
  *   Outside modules can create a lock table and acquire/release
@@ -50,8 +50,7 @@
 #include "utils/trace.h"
 #include "utils/ps_status.h"
 
-static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-          TransactionId xid);
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 /*
  * lockDebugRelation can be used to trace unconditionally a single relation,
@@ -143,12 +142,14 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
 #endif  /* !LOCK_MGR_DEBUG */
 
 static char *lock_types[] = {
-   "",
-   "WRITE",
-   "READ",
-   "WRITE INTENT",
-   "READ INTENT",
-   "EXTEND"
+   "INVALID",
+   "AccessShareLock",
+   "RowShareLock",
+   "RowExclusiveLock",
+   "ShareLock",
+   "ShareRowExclusiveLock",
+   "ExclusiveLock",
+   "AccessExclusiveLock"
 };
 
 SPINLOCK   LockMgrLock;        /* in Shmem or created in
@@ -631,12 +632,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 
    /* --------------------
     * If I'm the only one holding a lock, then there
-    * cannot be a conflict.  Need to subtract one from the
-    * lock's count since we just bumped the count up by 1
-    * above.
+    * cannot be a conflict. The same is true if we already
+    * hold this lock.
     * --------------------
     */
-   if (result->nHolding == lock->nActive)
+   if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
    {
        result->holders[lockmode]++;
        result->nHolding++;
@@ -647,7 +647,39 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        return TRUE;
    }
 
-   status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+   /*
+    * If lock requested conflicts with locks requested by waiters...
+    */
+   if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+   {
+       int     i = 1;
+
+       /*
+        * If I don't hold locks or my locks don't conflict 
+        * with waiters then force to sleep.
+        */
+       if (result->nHolding > 0)
+       {
+           for ( ; i <= lockMethodTable->ctl->numLockModes; i++)
+           {
+               if (result->holders[i] > 0 && 
+                   lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+                   break;  /* conflict */
+           }
+       }
+   
+       if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
+       {
+           XID_PRINT("LockAcquire: higher priority proc waiting",
+                     result);
+           status = STATUS_FOUND;
+       }
+       else
+           status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+   }
+   else
+       status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+
    if (status == STATUS_OK)
        GrantLock(lock, lockmode);
    else if (status == STATUS_FOUND)
@@ -680,7 +712,25 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
            return FALSE;
        }
 #endif
-       status = WaitOnLock(lockmethod, lock, lockmode, xid);
+       /*
+        * Construct bitmask of locks we hold before going to sleep.
+        */
+       MyProc->holdLock = 0;
+       if (result->nHolding > 0)
+       {
+           int     i,
+                   tmpMask = 2;
+
+           for (i = 1; i <= lockMethodTable->ctl->numLockModes; 
+                   i++, tmpMask <<= 1)
+           {
+               if (result->holders[i] > 0)
+                   MyProc->holdLock |= tmpMask;
+           }
+           Assert(MyProc->holdLock != 0);
+       }
+
+       status = WaitOnLock(lockmethod, lock, lockmode);
 
        /*
         * Check the xid entry status, in case something in the ipc
@@ -712,10 +762,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
  * determining whether or not any new lock acquired conflicts with
  * the old ones.
  *
- * For example, if I am already holding a WRITE_INTENT lock,
- * there will not be a conflict with my own READ_LOCK.  If I
- * don't consider the intent lock when checking for conflicts,
- * I find no conflict.
  * ----------------------------
  */
 int
@@ -812,32 +858,6 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
    }
    Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
-   /*
-    * We can control runtime this option. Default is lockReadPriority=0
-    */
-   if (!lockReadPriority)
-   {
-       /* ------------------------
-        * If someone with a greater priority is waiting for the lock,
-        * do not continue and share the lock, even if we can.
-        * Don't do this if the process already has some locks, because
-        * this could hold up other people waiting on our locks, causing
-        * a priority inversion.  bjm
-        * ------------------------
-        */
-       int         myprio = LockMethodTable[lockmethod]->ctl->prio[lockmode];
-       PROC_QUEUE *waitQueue = &(lock->waitProcs);
-       PROC       *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
-
-       if (SHMQueueEmpty(&MyProc->lockQueue) && waitQueue->size &&
-           topproc->prio > myprio)
-       {
-           XID_PRINT("LockResolveConflicts: higher priority proc waiting",
-                     result);
-           return STATUS_FOUND;
-       }
-   }
-
    /* ----------------------------
     * first check for global conflicts: If no locks conflict
     * with mine, then I get the lock.
@@ -909,12 +929,10 @@ GrantLock(LOCK *lock, LOCKMODE lockmode)
 }
 
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-          TransactionId xid)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
 {
    PROC_QUEUE *waitQueue = &(lock->waitProcs);
    LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
-   int         prio = lockMethodTable->ctl->prio[lockmode];
    char        old_status[64],
                new_status[64];
 
@@ -934,11 +952,9 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
    strcat(new_status, " waiting");
    PS_SET_STATUS(new_status);
    if (ProcSleep(waitQueue,
-                 lockMethodTable->ctl->masterLock,
+                 lockMethodTable->ctl,
                  lockmode,
-                 prio,
-                 lock,
-                 xid) != NO_ERROR)
+                 lock) != NO_ERROR)
    {
        /* -------------------
         * This could have happend as a result of a deadlock,
@@ -952,12 +968,16 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
        LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
        Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
        Assert(lock->nActive <= lock->nHolding);
+       if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+           lock->waitMask &= BITS_OFF[lockmode];
        SpinRelease(lockMethodTable->ctl->masterLock);
        elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
 
        /* not reached */
    }
 
+   if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+       lock->waitMask &= BITS_OFF[lockmode];
    PS_SET_STATUS(old_status);
    LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
    return STATUS_OK;
@@ -1129,6 +1149,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    lock->nActive--;
    lock->activeHolders[lockmode]--;
 
+#ifdef NOT_USED
    /* --------------------------
     * If there are still active locks of the type I just released, no one
     * should be woken up.  Whoever is asleep will still conflict
@@ -1138,6 +1159,19 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    if (lock->activeHolders[lockmode])
        wakeupNeeded = false;
    else
+#endif
+   /*
+    * Above is not valid any more (due to MVCC lock modes). 
+    * Actually we should compare activeHolders[lockmode] with 
+    * number of waiters holding lock of this type and try to
+    * wakeup only if these numbers are equal (and lock released 
+    * conflicts with locks requested by waiters). For the moment
+    * we only check the last condition.
+    */
+   if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+       wakeupNeeded = true;
+
+   if (!(lock->activeHolders[lockmode]))
    {
        /* change the conflict mask.  No more of this lock type. */
        lock->mask &= BITS_OFF[lockmode];
@@ -1199,12 +1233,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 
    if (wakeupNeeded)
    {
-       /* --------------------------
-        * Wake the first waiting process and grant him the lock if it
-        * doesn't conflict.  The woken process must record the lock
-        * himself.
-        * --------------------------
-        */
        ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
    }
    else
@@ -1275,6 +1303,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 
    for (;;)
    {
+       bool    wakeupNeeded = false;
 
        /*
         * Sometimes the queue appears to be messed up.
@@ -1380,6 +1409,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                       &&(lock->activeHolders[i] >= 0));
                if (!lock->activeHolders[i])
                    lock->mask &= BITS_OFF[i];
+               /*
+                * Read comments in LockRelease
+                */
+               if (!wakeupNeeded && xidLook->holders[i] > 0 && 
+                   lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+                   wakeupNeeded = true;
            }
            lock->nHolding -= xidLook->nHolding;
            lock->nActive -= xidLook->nHolding;
@@ -1444,14 +1479,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                return FALSE;
            }
        }
-       else
+       else if (wakeupNeeded)
        {
-           /* --------------------
-            * Wake the first waiting process and grant him the lock if it
-            * doesn't conflict.  The woken process must record the lock
-            * him/herself.
-            * --------------------
-            */
            waitQueue = &(lock->waitProcs);
            ProcLockWakeup(waitQueue, lockmethod, lock);
        }
@@ -1534,46 +1563,22 @@ LockingDisabled()
 bool
 DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 {
-   int         done;
-   XIDLookupEnt *xidLook = NULL;
-   XIDLookupEnt *tmp = NULL;
-   SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
-   LOCK       *lock;
+   int                     done;
+   XIDLookupEnt           *xidLook = NULL;
+   XIDLookupEnt           *tmp = NULL;
+   SHMEM_OFFSET            end = MAKE_OFFSET(lockQueue);
+   LOCK                   *lock;
 
-   LOCKMETHODTABLE *lockMethodTable;
-   XIDLookupEnt *result,
-               item;
-   HTAB       *xidTable;
-   bool        found;
-
-   static PROC *checked_procs[MAXBACKENDS];
-   static int  nprocs;
-   static bool MyNHolding;
+   static PROC            *checked_procs[MAXBACKENDS];
+   static int              nprocs;
 
    /* initialize at start of recursion */
    if (skip_check)
    {
        checked_procs[0] = MyProc;
        nprocs = 1;
-
-       lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
-       xidTable = lockMethodTable->xidHash;
-
-       MemSet(&item, 0, XID_TAGSIZE);
-       TransactionIdStore(MyProc->xid, &item.tag.xid);
-       item.tag.lock = MAKE_OFFSET(findlock);
-#ifdef NOT_USED
-       item.tag.pid = pid;
-#endif
-
-       if (!(result = (XIDLookupEnt *)
-             hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
-       {
-           elog(NOTICE, "LockAcquire: xid table corrupted");
-           return true;
-       }
-       MyNHolding = result->nHolding;
    }
+
    if (SHMQueueEmpty(lockQueue))
        return false;
 
@@ -1583,12 +1588,6 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 
    for (;;)
    {
-       /* ---------------------------
-        * XXX Here we assume the shared memory queue is circular and
-        * that we know its internal structure.  Should have some sort of
-        * macros to allow one to walk it.  mer 20 July 1991
-        * ---------------------------
-        */
        done = (xidLook->queue.next == end);
        lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
@@ -1613,45 +1612,21 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
            proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
            for (i = 0; i < waitQueue->size; i++)
            {
+               /*
+                * If I hold some locks on findlock and another proc 
+                * waits on it holding locks too - check if we are
+                * waiting one another.
+                */
                if (proc != MyProc &&
-                   lock == findlock && /* skip_check also true */
-                   MyNHolding) /* I already hold some lock on it */
+                   lock == findlock && /* skip_check also true */
+                   MyProc->holdLock)
                {
-
-                   /*
-                    * For findlock's wait queue, we are interested in
-                    * procs who are blocked waiting for a write-lock on
-                    * the table we are waiting on, and already hold a
-                    * lock on it. We first check to see if there is an
-                    * escalation deadlock, where we hold a readlock and
-                    * want a writelock, and someone else holds readlock
-                    * on the same table, and wants a writelock.
-                    *
-                    * Basically, the test is, "Do we both hold some lock on
-                    * findlock, and we are both waiting in the lock
-                    * queue?"   bjm
-                    */
+                   LOCKMETHODCTL  *lockctl = 
+                           LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
 
                    Assert(skip_check);
-                   Assert(MyProc->prio >= 2);
-
-                   lockMethodTable = LockMethodTable[1];
-                   xidTable = lockMethodTable->xidHash;
-
-                   MemSet(&item, 0, XID_TAGSIZE);
-                   TransactionIdStore(proc->xid, &item.tag.xid);
-                   item.tag.lock = MAKE_OFFSET(findlock);
-#ifdef NOT_USED
-                   item.tag.pid = pid;
-#endif
-
-                   if (!(result = (XIDLookupEnt *)
-                         hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
-                   {
-                       elog(NOTICE, "LockAcquire: xid table corrupted");
-                       return true;
-                   }
-                   if (result->nHolding)
+                   if (lockctl->conflictTab[MyProc->token] & proc->holdLock && 
+                       lockctl->conflictTab[proc->token] & MyProc->holdLock)
                        return true;
                }
 
index 229a78587cbd643389b2c42aac4f00fcbe29ef1f..b80d32e1b44e727c3b4cbb9568936c884781f283 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *     This is so that we can support more backends. (system-wide semaphore
  *     sets run out pretty fast.)                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
  */
 #include 
 #include 
@@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid);
 static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
 static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
 
+static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
+
 /*
  * InitProcGlobal -
  *   initializes the global process table. We put it here so that
@@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue)
  */
 int
 ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
-         SPINLOCK spinlock,
+         LOCKMETHODCTL *lockctl,
          int token,            /* lockmode */
-         int prio,
-         LOCK *lock,
-         TransactionId xid)    /* needed by user locks, see below */
+         LOCK *lock)
 {
    int         i;
+   SPINLOCK    spinlock = lockctl->masterLock;
    PROC       *proc;
+   int         myMask = (1 << token);
+   int         waitMask = lock->waitMask;
+   int         aheadHolders[MAX_LOCKMODES];
+   bool        selfConflict = (lockctl->conflictTab[token] & myMask),
+               prevSame = false;
    bool        deadlock_checked = false;
    struct itimerval timeval,
                dummy;
 
-   /*
-    * If the first entries in the waitQueue have a greater priority than
-    * we have, we must be a reader, and they must be a writers, and we
-    * must be here because the current holder is a writer or a reader but
-    * we don't share shared locks if a writer is waiting. We put
-    * ourselves after the writers.  This way, we have a FIFO, but keep
-    * the readers together to give them decent priority, and no one
-    * starves.  Because we group all readers together, a non-empty queue
-    * only has a few possible configurations:
-    *
-    * [readers] [writers] [readers][writers] [writers][readers]
-    * [writers][readers][writers]
-    *
-    * In a full queue, we would have a reader holding a lock, then a writer
-    * gets the lock, then a bunch of readers, made up of readers who
-    * could not share the first readlock because a writer was waiting,
-    * and new readers arriving while the writer had the lock.  bjm
-    */
+   MyProc->token = token;
+   MyProc->waitLock = lock;
+
    proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
-   /* If we are a reader, and they are writers, skip past them */
-   for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
-       proc = (PROC *) MAKE_PTR(proc->links.prev);
+   /* if we don't conflict with any waiter - be first in queue */
+   if (!(lockctl->conflictTab[token] & waitMask))
+       goto ins;
 
-   /* The rest of the queue is FIFO, with readers first, writers last */
-   for (; i < waitQueue->size && proc->prio <= prio; i++)
-       proc = (PROC *) MAKE_PTR(proc->links.prev);
+   for (i = 1; i < MAX_LOCKMODES; i++)
+       aheadHolders[i] = lock->activeHolders[i];
+   (aheadHolders[token])++;
 
-   MyProc->prio = prio;
-   MyProc->token = token;
-   MyProc->waitLock = lock;
+   for (i = 0; i < waitQueue->size; i++)
+   {
+       /* am I waiting for him ? */
+       if (lockctl->conflictTab[token] & proc->holdLock)
+       {
+           /* is he waiting for me ? */
+           if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+           {
+               MyProc->errType = STATUS_ERROR;
+               elog(NOTICE, DeadLockMessage);
+               goto rt;
+           }
+           /* being waiting for him - go past */
+       }
+       /* if he waits for me */
+       else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+       {
+           break;
+       }
+       /* if conflicting locks requested */
+       else if (lockctl->conflictTab[proc->token] & myMask)
+       {
+           /*
+            * If I request non self-conflicting lock and there
+            * are others requesting the same lock just before me -
+            * stay here.
+            */
+           if (!selfConflict && prevSame)
+               break;
+       }
+       /*
+        * Last attempt to don't move any more: if we don't conflict
+        * with rest waiters in queue.
+        */
+       else if (!(lockctl->conflictTab[token] & waitMask))
+           break;
 
-#ifdef USER_LOCKS
-   /* -------------------
-    * Currently, we only need this for the ProcWakeup routines.
-    * This must be 0 for user lock, so we can't just use the value
-    * from GetCurrentTransactionId().
-    * -------------------
-    */
-   TransactionIdStore(xid, &MyProc->xid);
-#else
-#ifndef LowLevelLocking
-   /* -------------------
-    * currently, we only need this for the ProcWakeup routines
-    * -------------------
-    */
-   TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
-#endif
-#endif
+       prevSame = (proc->token == token);
+       (aheadHolders[proc->token])++;
+       if (aheadHolders[proc->token] == lock->holders[proc->token])
+           waitMask &= ~ (1 << proc->token);
+       proc = (PROC *) MAKE_PTR(proc->links.prev);
+   }
 
+ins:;
    /* -------------------
     * assume that these two operations are atomic (because
     * of the spinlock).
@@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
    SHMQueueInsertTL(&(proc->links), &(MyProc->links));
    waitQueue->size++;
 
+   lock->waitMask |= myMask;
    SpinRelease(spinlock);
 
    /* --------------
@@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
     */
    SpinAcquire(spinlock);
 
+rt:;
+
 #ifdef LOCK_MGR_DEBUG
    /* Just to get meaningful debug messages from DumpLocks() */
    MyProc->waitLock = (LOCK *) NULL;
@@ -655,9 +672,9 @@ int
 ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 {
    PROC       *proc;
-   int         count;
+   int         count = 0;
    int         trace_flag;
-   int         last_locktype = -1;
+   int         last_locktype = 0;
    int         queue_size = queue->size;
 
    Assert(queue->size >= 0);
@@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
        return STATUS_NOT_FOUND;
 
    proc = (PROC *) MAKE_PTR(queue->links.prev);
-   count = 0;
    while ((queue_size--) && (proc))
    {
 
@@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
            continue;
 
        /*
-        * This proc conflicts with locks held by others, ignored.
+        * Does this proc conflict with locks held by others ?
         */
        if (LockResolveConflicts(lockmethod,
                                 lock,
@@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
                                 proc->xid,
                                 (XIDLookupEnt *) NULL) != STATUS_OK)
        {
+           if (count != 0)
+               break;
            last_locktype = proc->token;
            continue;
        }
@@ -828,7 +846,7 @@ HandleDeadLock(int sig)
     */
    UnlockLockTable();
 
-   elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
+   elog(NOTICE, DeadLockMessage);
    return;
 }
 
index 0e78c33de9b51f0bae8fdea790075ccfc046de63..3af0071d468f51817a343a723967800786d73b25 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lmgr.h,v 1.18 1999/02/19 06:06:34 tgl Exp $
+ * $Id: lmgr.h,v 1.19 1999/05/07 01:23:05 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,8 +25,6 @@
 #define ExclusiveLock          6
 #define AccessExclusiveLock        7
 
-#define ExtendLock             8
-
 extern LOCKMETHOD LockTableId;
 
 
index da77f1d523ccbe86be493fb7f04c02dad10a0a43..387f164247cacbad8b8457c646d7390f2e76c38c 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.24 1999/03/06 21:17:43 tgl Exp $
+ * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,7 +41,7 @@ typedef int LOCKMODE;
 typedef int LOCKMETHOD;
 
 /* MAX_LOCKMODES cannot be larger than the bits in MASK */
-#define MAX_LOCKMODES  9
+#define MAX_LOCKMODES  8
 
 /*
  * MAX_LOCK_METHODS corresponds to the number of spin locks allocated in
@@ -204,6 +204,7 @@ typedef struct LOCK
 
    /* data */
    int         mask;
+   int         waitMask;
    PROC_QUEUE  waitProcs;
    int         holders[MAX_LOCKMODES];
    int         nHolding;
index 952f50553ca47a6662604ad0eff2c20259e9fc35..53b677858fd59c4d88bef2fdcc7c742addf42219 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.20 1999/02/19 07:10:47 tgl Exp $
+ * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -48,8 +48,9 @@ typedef struct proc
                                 * were starting our xact: vacuum must not
                                 * remove tuples deleted by xid >= xmin ! */
 
-   LOCK       *waitLock;       /* Lock we're sleeping on */
-   int         token;          /* info for proc wakeup routines */
+   LOCK       *waitLock;       /* Lock we're sleeping on ... */
+   int         token;          /* type of lock we sleeping for */
+   int         holdLock;       /* while holding these locks */
    int         pid;            /* This procs process id */
    short       sLocks[MAX_SPINS];      /* Spin lock stats */
    SHM_QUEUE   lockQueue;      /* locks associated with current
@@ -116,8 +117,8 @@ extern bool ProcRemove(int pid);
 /* make static in storage/lmgr/proc.c -- jolly */
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
-         int prio, LOCK *lock, TransactionId xid);
+extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token, 
+                   LOCK *lock);
 extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
               LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);