From: Massimo Dal Zotto
authorMarc G. Fournier
Tue, 25 Aug 1998 21:20:32 +0000 (21:20 +0000)
committerMarc G. Fournier
Tue, 25 Aug 1998 21:20:32 +0000 (21:20 +0000)
lock.patch

        I have rewritten lock.c cleaning up the code and adding better
        assert checking I have also added some fields to the lock and
        xid tags for better support of user locks. There is also a new
        function which returns an array of pids owning a lock.
        I'm using this code from over six months and it works fine.

src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/multi.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h

index 4b83c9c0d16ac3a0dc65f9d15843f49d43aa56ba..b35008a3bb8afc84d393eddbd4d8e3a07c7ade8e 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.33 1998/08/25 21:20:26 scrappy Exp $
  *
  * NOTES
  *   Outside modules can create a lock table and acquire/release
  *
  * Interface:
  *
- * LockAcquire(), LockRelease(), LockMethodTableInit().
- *
- * LockReplace() is called only within this module and by the
- *     lkchain module.  It releases a lock without looking
- *     the lock up in the lock table.
+ * LockAcquire(), LockRelease(), LockMethodTableInit(),
+ * LockMethodTableRename(), LockReleaseAll, LockOwners()
+ * LockResolveConflicts(), GrantLock()
  *
  * NOTE: This module is used to define new lock tables.  The
  *     multi-level lock table (multi.c) used by the heap
@@ -35,6 +33,7 @@
 #include 
 #include 
 #include 
+#include 
 
 #include "postgres.h"
 #include "miscadmin.h"
 #include "utils/palloc.h"
 #include "access/xact.h"
 #include "access/transam.h"
+#include "utils/trace.h"
+#include "utils/ps_status.h"
 
-static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
-
-/*#define LOCK_MGR_DEBUG*/
-
-#ifndef LOCK_MGR_DEBUG
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+                     TransactionId xid);
 
-#define LOCK_PRINT(where,tag,type)
-#define LOCK_DUMP(where,lock,type)
-#define LOCK_DUMP_AUX(where,lock,type)
+/*
+ * lockDebugRelation can be used to trace unconditionally a single relation,
+ * for example pg_listener, if you suspect there are locking problems.
+ *
+ * lockDebugOidMin is is used to avoid tracing postgres relations, which
+ * would produce a lot of output. Unfortunately most system relations are
+ * created after bootstrap and have oid greater than BootstrapObjectIdData.
+ * If you are using tprintf you should specify a value greater than the max
+ * oid of system relations, which can be found with the following query:
+ *
+ *   select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
+ *
+ * To get a useful lock trace you can use the following pg_options:
+ *
+ *   -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
+ */
+#define LOCKDEBUG(lockmethod)  (pg_options[TRACE_SHORTLOCKS+lockmethod])
+#define lockDebugRelation      (pg_options[TRACE_LOCKRELATION])
+#define lockDebugOidMin            (pg_options[TRACE_LOCKOIDMIN])
+#define lockReadPriority       (pg_options[OPT_LOCKREADPRIORITY])
+
+#ifdef LOCK_MGR_DEBUG
+#define LOCK_PRINT(where,lock,type) \
+    if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+        && (lock->tag.relId >= lockDebugOidMin)) \
+       || (lock->tag.relId == lockDebugRelation)) \
+       LOCK_PRINT_AUX(where,lock,type)
+
+#define LOCK_PRINT_AUX(where,lock,type) \
+   TPRINTF(TRACE_ALL, \
+        "%s: lock(%x) tbl(%d) rel(%d) db(%d) tid(%d,%d) mask(%x) " \
+        "hold(%d,%d,%d,%d,%d)=%d " \
+        "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+        where, \
+        MAKE_OFFSET(lock), \
+        lock->tag.lockmethod, \
+        lock->tag.relId, \
+        lock->tag.dbId, \
+        ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+ \
+         lock->tag.tupleId.ip_blkid.bi_lo), \
+        lock->tag.tupleId.ip_posid, \
+        lock->mask, \
+        lock->holders[1], \
+        lock->holders[2], \
+        lock->holders[3], \
+        lock->holders[4], \
+        lock->holders[5], \
+        lock->nHolding, \
+        lock->activeHolders[1], \
+        lock->activeHolders[2], \
+        lock->activeHolders[3], \
+        lock->activeHolders[4], \
+        lock->activeHolders[5], \
+        lock->nActive, \
+        lock->waitProcs.size, \
+        lock_types[type])
+
+#define XID_PRINT(where,xidentP) \
+   if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
+        && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+            >= lockDebugOidMin)) \
+       || (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+           == lockDebugRelation)) \
+       XID_PRINT_AUX(where,xidentP)
+
+#define XID_PRINT_AUX(where,xidentP) \
+   TPRINTF(TRACE_ALL, \
+        "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
+        "hold(%d,%d,%d,%d,%d)=%d", \
+        where, \
+        MAKE_OFFSET(xidentP), \
+        xidentP->tag.lock, \
+        XIDENT_LOCKMETHOD(*(xidentP)), \
+        xidentP->tag.pid, \
+        xidentP->tag.xid, \
+        xidentP->holders[1], \
+        xidentP->holders[2], \
+        xidentP->holders[3], \
+        xidentP->holders[4], \
+        xidentP->holders[5], \
+        xidentP->nHolding)
+
+#define LOCK_TPRINTF(lock, args...) \
+    if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+        && (lock->tag.relId >= lockDebugOidMin)) \
+       || (lock->tag.relId == lockDebugRelation)) \
+       TPRINTF(TRACE_ALL, args)
+
+#else  /* !LOCK_MGR_DEBUG */
+#define LOCK_PRINT(where,lock,type)
+#define LOCK_PRINT_AUX(where,lock,type)
 #define XID_PRINT(where,xidentP)
+#define XID_PRINT_AUX(where,xidentP)
+#define LOCK_TPRINTF(lock, args...)
+#endif /* !LOCK_MGR_DEBUG */
 
-#else                          /* LOCK_MGR_DEBUG */
-
-int            lockDebug = 0;
-unsigned int lock_debug_oid_min = BootstrapObjectIdData;
 static char *lock_types[] = {
-   "NONE",
+   "",
    "WRITE",
    "READ",
    "WRITE INTENT",
@@ -75,59 +159,6 @@ static char *lock_types[] = {
    "EXTEND"
 };
 
-#define LOCK_PRINT(where,tag,type)\
-   if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
-       elog(DEBUG, \
-            "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
-            MyProcPid,\
-            tag->relId, tag->dbId, \
-            ((tag->tupleId.ip_blkid.bi_hi<<16)+\
-             tag->tupleId.ip_blkid.bi_lo),\
-            tag->tupleId.ip_posid, \
-            lock_types[type])
-
-#define LOCK_DUMP(where,lock,type)\
-   if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
-       LOCK_DUMP_AUX(where,lock,type)
-
-#define LOCK_DUMP_AUX(where,lock,type)\
-       elog(DEBUG, \
-            "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
-            "holders (%d,%d,%d,%d,%d) type (%s)",where, \
-            MyProcPid,\
-            lock->tag.relId, lock->tag.dbId, \
-            ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
-             lock->tag.tupleId.ip_blkid.bi_lo),\
-            lock->tag.tupleId.ip_posid, \
-            lock->nHolding,\
-            lock->holders[1],\
-            lock->holders[2],\
-            lock->holders[3],\
-            lock->holders[4],\
-            lock->holders[5],\
-            lock_types[type])
-
-#define XID_PRINT(where,xidentP)\
-   if ((lockDebug >= 2) && \
-       (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
-        >= lock_debug_oid_min)) \
-       elog(DEBUG,\
-            "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
-            "holders (%d,%d,%d,%d,%d)",\
-            where,\
-            MyProcPid,\
-            xidentP->tag.xid,\
-            xidentP->tag.pid,\
-            xidentP->tag.lock,\
-            xidentP->nHolding,\
-            xidentP->holders[1],\
-            xidentP->holders[2],\
-            xidentP->holders[3],\
-            xidentP->holders[4],\
-            xidentP->holders[5])
-
-#endif                         /* LOCK_MGR_DEBUG */
-
 SPINLOCK   LockMgrLock;        /* in Shmem or created in
                                 * CreateSpinlocks() */
 
@@ -171,6 +202,16 @@ InitLocks()
        BITS_ON[i] = bit;
        BITS_OFF[i] = ~bit;
    }
+
+#ifdef LOCK_MGR_DEBUG
+   /*
+    * If lockDebugOidMin value has not been specified
+    * in pg_options set a default value.
+    */
+   if (!lockDebugOidMin) {
+       lockDebugOidMin = BootstrapObjectIdData;
+   }
+#endif
 }
 
 /* -------------------
@@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName,
    {
        elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
             numModes, MAX_LOCKMODES);
-       return (INVALID_TABLEID);
+       return (INVALID_LOCKMETHOD);
    }
 
    /* allocate a string for the shmem index table lookup */
@@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName,
    if (!shmemName)
    {
        elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
-       return (INVALID_TABLEID);
+       return (INVALID_LOCKMETHOD);
    }
 
    /* each lock table has a non-shared header */
@@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName,
    {
        elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
        pfree(shmemName);
-       return (INVALID_TABLEID);
+       return (INVALID_LOCKMETHOD);
    }
 
    /* ------------------------
@@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName,
    if (status)
        return (lockMethodTable->ctl->lockmethod);
    else
-       return (INVALID_TABLEID);
+       return (INVALID_LOCKMETHOD);
 }
 
 /*
@@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName,
  *     client to use different lockmethods when acquiring/releasing
  *     short term and long term locks.
  */
-#ifdef NOT_USED
+
 LOCKMETHOD
 LockMethodTableRename(LOCKMETHOD lockmethod)
 {
    LOCKMETHOD newLockMethod;
 
    if (NumLockMethods >= MAX_LOCK_METHODS)
-       return (INVALID_TABLEID);
-   if (LockMethodTable[lockmethod] == INVALID_TABLEID)
-       return (INVALID_TABLEID);
+       return (INVALID_LOCKMETHOD);
+   if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+       return (INVALID_LOCKMETHOD);
 
    /* other modules refer to the lock table by a lockmethod */
    newLockMethod = NumLockMethods;
@@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
    LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
    return (newLockMethod);
 }
-#endif
 
 /*
  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
@@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  * Side Effects: The lock is always acquired.  No way to abort
  *     a lock acquisition other than aborting the transaction.
  *     Lock is recorded in the lkchain.
+ *
 #ifdef USER_LOCKS
+ *
  * Note on User Locks:
+ *
  *     User locks are handled totally on the application side as
  *     long term cooperative locks which extend beyond the normal
  *     transaction boundaries.  Their purpose is to indicate to an
@@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *     acquired if already held by another process.  They must be
  *     released explicitly by the application but they are released
  *     automatically when a backend terminates.
- *     They are indicated by a dummy lockmethod 0 which doesn't have
- *     any table allocated but uses the normal lock table, and are
- *     distinguished from normal locks for the following differences:
+ *     They are indicated by a lockmethod 2 which is an alias for the
+ *     normal lock table, and are distinguished from normal locks
+ *     for the following differences:
  *
  *                                     normal lock     user lock
  *
- *     lockmethod                      1               0
+ *     lockmethod                      1               2
  *     tag.relId                       rel oid         0
  *     tag.ItemPointerData.ip_blkid    block id        lock id2
  *     tag.ItemPointerData.ip_posid    tuple offset    lock id1
  *     xid.pid                         0               backend pid
- *     xid.xid                         current xid     0
+ *     xid.xid                         xid or 0        0
  *     persistence                     transaction     user or backend
  *
  *     The lockmode parameter can have the same values for normal locks
  *     although probably only WRITE_LOCK can have some practical use.
  *
- *                                                     DZ - 4 Oct 1996
+ *                                                     DZ - 22 Nov 1997
 #endif
  */
 
@@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    SPINLOCK    masterLock;
    LOCKMETHODTABLE    *lockMethodTable;
    int         status;
-   TransactionId myXid;
+   TransactionId xid;
 
 #ifdef USER_LOCKS
    int         is_user_lock;
 
-   is_user_lock = (lockmethod == 0);
+   is_user_lock = (lockmethod == USER_LOCKMETHOD);
    if (is_user_lock)
    {
-       lockmethod = 1;
 #ifdef USER_LOCKS_DEBUG
-       elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
-            locktag->tupleId.ip_posid,
-            ((locktag->tupleId.ip_blkid.bi_hi << 16) +
-             locktag->tupleId.ip_blkid.bi_lo),
-            lockmode);
+       TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u,%u] %s",
+               locktag->tupleId.ip_posid,
+               ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+                locktag->tupleId.ip_blkid.bi_lo),
+               lock_types[lockmode]);
 #endif
    }
 #endif
 
+   /* ???????? This must be changed when short term locks will be used */
+   locktag->lockmethod = lockmethod;
+
    Assert(lockmethod < NumLockMethods);
    lockMethodTable = LockMethodTable[lockmethod];
    if (!lockMethodTable)
@@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    if (LockingIsDisabled)
        return (TRUE);
 
-   LOCK_PRINT("Acquire", locktag, lockmode);
    masterLock = lockMethodTable->ctl->masterLock;
 
    SpinAcquire(masterLock);
 
+   /*
+    * Find or create a lock with this tag
+    */
    Assert(lockMethodTable->lockHash->hash == tag_hash);
-   lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found);
-
+   lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+                               HASH_ENTER, &found);
    if (!lock)
    {
        SpinRelease(masterLock);
@@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    if (!found)
    {
        lock->mask = 0;
-       ProcQueueInit(&(lock->waitProcs));
-       MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
-       MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
        lock->nHolding = 0;
        lock->nActive = 0;
-
+       MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
+       MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
+       ProcQueueInit(&(lock->waitProcs));
        Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
                             &(locktag->tupleId.ip_blkid)));
-
+       LOCK_PRINT("LockAcquire: new", lock, lockmode);
+   } else {
+       LOCK_PRINT("LockAcquire: found", lock, lockmode);
+       Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+       Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+       Assert(lock->nActive <= lock->nHolding);
    }
 
    /* ------------------
@@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
     * ------------------
     */
    xidTable = lockMethodTable->xidHash;
-   myXid = GetCurrentTransactionId();
 
    /* ------------------
     * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
     * ------------------
     */
    MemSet(&item, 0, XID_TAGSIZE);      /* must clear padding, needed */
-   TransactionIdStore(myXid, &item.tag.xid);
    item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-   item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+   item.tag.lockmethod = lockmethod;
 #endif
-
 #ifdef USER_LOCKS
    if (is_user_lock)
    {
        item.tag.pid = MyProcPid;
-       item.tag.xid = myXid = 0;
-#ifdef USER_LOCKS_DEBUG
-       elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
-            item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+       item.tag.xid = xid = 0;
+   } else {
+       xid = GetCurrentTransactionId();
+       TransactionIdStore(xid, &item.tag.xid);
    }
+#else
+   xid = GetCurrentTransactionId();
+   TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-   result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
+   /*
+    * Find or create an xid entry with this tag
+    */
+   result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+                                         HASH_ENTER, &found);
    if (!result)
    {
        elog(NOTICE, "LockAcquire: xid table corrupted");
        return (STATUS_ERROR);
    }
+
+   /*
+    * If not found initialize the new entry
+    */
    if (!found)
    {
-       XID_PRINT("LockAcquire: queueing XidEnt", result);
-       ProcAddLock(&result->queue);
        result->nHolding = 0;
        MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
+       ProcAddLock(&result->queue);
+       XID_PRINT("LockAcquire: new", result);
+   } else {
+       XID_PRINT("LockAcquire: found", result);
+       Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
+       Assert(result->nHolding <= lock->nActive);
    }
 
    /* ----------------
@@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
     */
    lock->nHolding++;
    lock->holders[lockmode]++;
+   Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
 
    /* --------------------
     * If I'm the only one holding a lock, then there
@@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    {
        result->holders[lockmode]++;
        result->nHolding++;
+       XID_PRINT("LockAcquire: owning", result);
+       Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
        GrantLock(lock, lockmode);
        SpinRelease(masterLock);
        return (TRUE);
    }
 
-   Assert(result->nHolding <= lock->nActive);
-
-   status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
-
+   status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
    if (status == STATUS_OK)
        GrantLock(lock, lockmode);
    else if (status == STATUS_FOUND)
    {
 #ifdef USER_LOCKS
-
        /*
-        * User locks are non blocking. If we can't acquire a lock remove
-        * the xid entry and return FALSE without waiting.
+        * User locks are non blocking. If we can't acquire a lock we must
+        * remove the xid entry and return FALSE without waiting.
         */
        if (is_user_lock)
        {
            if (!result->nHolding)
            {
                SHMQueueDelete(&result->queue);
-               hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
+               result = (XIDLookupEnt *) hash_search(xidTable,
+                                                     (Pointer) &result,
+                                                     HASH_REMOVE, &found);
+               if (!result || !found) {
+                   elog(NOTICE, "LockAcquire: remove xid, table corrupted");
+               }
+           } else {
+               XID_PRINT_AUX("LockAcquire: NHOLDING", result);
            }
            lock->nHolding--;
            lock->holders[lockmode]--;
+           LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
+           Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+           Assert(lock->nActive <= lock->nHolding);
            SpinRelease(masterLock);
-#ifdef USER_LOCKS_DEBUG
-           elog(NOTICE, "LockAcquire: user lock failed");
-#endif
            return (FALSE);
        }
 #endif
-       status = WaitOnLock(lockmethod, lock, lockmode);
-       XID_PRINT("Someone granted me the lock", result);
+       status = WaitOnLock(lockmethod, lock, lockmode, xid);
+       /*
+        * Check the xid entry status, in case something in the
+        * ipc communication doesn't work correctly.
+        */
+       if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) {
+           XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
+           LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
+           /* Should we retry ? */
+           return (FALSE);
+       }
+       XID_PRINT("LockAcquire: granted", result);
+       LOCK_PRINT("LockAcquire: granted", lock, lockmode);
    }
 
    SpinRelease(masterLock);
@@ -646,7 +725,8 @@ int
 LockResolveConflicts(LOCKMETHOD lockmethod,
                     LOCK *lock,
                     LOCKMODE lockmode,
-                    TransactionId xid)
+                    TransactionId xid,
+                    XIDLookupEnt *xidentP)     /* xident ptr or NULL */
 {
    XIDLookupEnt *result,
                item;
@@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
    int         bitmask;
    int         i,
                tmpMask;
+#ifdef USER_LOCKS
+   int         is_user_lock;
+#endif
 
    numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
    xidTable = LockMethodTable[lockmethod]->xidHash;
 
-   /* ---------------------
-    * read my own statistics from the xid table.  If there
-    * isn't an entry, then we'll just add one.
-    *
-    * Zero out the tag, this clears the padding bytes for long
-    * word alignment and ensures hashing consistency.
-    * ------------------
-    */
-   MemSet(&item, 0, XID_TAGSIZE);
-   TransactionIdStore(xid, &item.tag.xid);
-   item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-   item.tag.pid = pid;
+   if (xidentP) {
+       /*
+        * A pointer to the xid entry was supplied from the caller.
+        * Actually only LockAcquire can do it.
+        */
+       result = xidentP;
+   } else {
+       /* ---------------------
+        * read my own statistics from the xid table.  If there
+        * isn't an entry, then we'll just add one.
+        *
+        * Zero out the tag, this clears the padding bytes for long
+        * word alignment and ensures hashing consistency.
+        * ------------------
+        */
+       MemSet(&item, 0, XID_TAGSIZE);
+       item.tag.lock = MAKE_OFFSET(lock);
+#ifdef USE_XIDTAG_LOCKMETHOD
+       item.tag.lockmethod = lockmethod;
+#endif
+#ifdef USER_LOCKS
+       is_user_lock = (lockmethod == 2);
+       if (is_user_lock) {
+           item.tag.pid = MyProcPid;
+           item.tag.xid = 0;
+       } else {
+           TransactionIdStore(xid, &item.tag.xid);
+       }
+#else
+       TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-   if (!(result = (XIDLookupEnt *)
-         hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
-   {
-       elog(NOTICE, "LockResolveConflicts: xid table corrupted");
-       return (STATUS_ERROR);
-   }
-   myHolders = result->holders;
+       /*
+        * Find or create an xid entry with this tag
+        */
+       result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+                                             HASH_ENTER, &found);
+       if (!result)
+       {
+           elog(NOTICE, "LockResolveConflicts: xid table corrupted");
+           return (STATUS_ERROR);
+       }
 
-   if (!found)
-   {
-       /* ---------------
-        * we're not holding any type of lock yet.  Clear
-        * the lock stats.
-        * ---------------
+       /*
+        * If not found initialize the new entry. THIS SHOULD NEVER HAPPEN,
+        * if we are trying to resolve a conflict we must already have
+        * allocated an xid entry for this lock.    dz 21-11-1997
         */
-       MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
-       result->nHolding = 0;
+       if (!found)
+       {
+           /* ---------------
+            * we're not holding any type of lock yet.  Clear
+            * the lock stats.
+            * ---------------
+            */
+           MemSet(result->holders,0, numLockModes * sizeof(*(lock->holders)));
+           result->nHolding = 0;
+           XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
+       } else {
+           XID_PRINT("LockResolveConflicts: found", result);
+       }
    }
+   Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
+   /*
+    * We can control runtime this option. Default is lockReadPriority=0
+    */
+   if (!lockReadPriority)
    {
        /* ------------------------
         * If someone with a greater priority is waiting for the lock,
@@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
        PROC_QUEUE *waitQueue = &(lock->waitProcs);
        PROC       *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
-       if (waitQueue->size && topproc->prio > myprio)
+       if (waitQueue->size && topproc->prio > myprio) {
+           XID_PRINT("LockResolveConflicts: higher priority proc waiting",
+                     result);
            return STATUS_FOUND;
+       }
    }
 
    /* ----------------------------
@@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
     */
    if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
    {
-
        result->holders[lockmode]++;
        result->nHolding++;
-
-       XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+       XID_PRINT("LockResolveConflicts: no conflict", result);
+       Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
        return (STATUS_OK);
    }
 
@@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
     * that does not reflect our own locks.
     * ------------------------
     */
+   myHolders = result->holders;
    bitmask = 0;
    tmpMask = 2;
    for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
@@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
     */
    if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
    {
-
        /* no conflict. Get the lock and go on */
-
        result->holders[lockmode]++;
        result->nHolding++;
-
-       XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+       XID_PRINT("LockResolveConflicts: resolved", result);
+       Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
        return (STATUS_OK);
-
    }
 
+   XID_PRINT("LockResolveConflicts: conflicting", result);
    return (STATUS_FOUND);
 }
 
+/*
+ * GrantLock -- update the lock data structure to show
+ *     the new lock holder.
+ */
+void
+GrantLock(LOCK *lock, LOCKMODE lockmode)
+{
+   lock->nActive++;
+   lock->activeHolders[lockmode]++;
+   lock->mask |= BITS_ON[lockmode];
+   LOCK_PRINT("GrantLock", lock, lockmode);
+   Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
+   Assert(lock->nActive <= lock->nHolding);
+}
+
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+          TransactionId xid)
 {
    PROC_QUEUE *waitQueue = &(lock->waitProcs);
    LOCKMETHODTABLE    *lockMethodTable = LockMethodTable[lockmethod];
    int         prio = lockMethodTable->ctl->prio[lockmode];
+   char        old_status[64],
+               new_status[64];
 
    Assert(lockmethod < NumLockMethods);
 
@@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
     * synchronization for this queue.  That will not be true if/when
     * people can be deleted from the queue by a SIGINT or something.
     */
-   LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+   LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+   strcpy(old_status, PS_STATUS);
+   strcpy(new_status, PS_STATUS);
+   strcat(new_status, " waiting");
+   PS_SET_STATUS(new_status);
    if (ProcSleep(waitQueue,
                  lockMethodTable->ctl->masterLock,
                  lockmode,
                  prio,
-                 lock) != NO_ERROR)
+                 lock,
+                 xid) != NO_ERROR)
    {
        /* -------------------
-        * This could have happend as a result of a deadlock, see HandleDeadLock()
-        * Decrement the lock nHolding and holders fields as we are no longer
-        * waiting on this lock.
+        * This could have happend as a result of a deadlock,
+        * see HandleDeadLock().
+        * Decrement the lock nHolding and holders fields as
+        * we are no longer waiting on this lock.
         * -------------------
         */
        lock->nHolding--;
        lock->holders[lockmode]--;
-       LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+       LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+       Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+       Assert(lock->nActive <= lock->nHolding);
        SpinRelease(lockMethodTable->ctl->masterLock);
        elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
+
+       /* not reached */
    }
 
-   LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
+   PS_SET_STATUS(old_status);
+   LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
    return (STATUS_OK);
 }
 
@@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    XIDLookupEnt *result,
                item;
    HTAB       *xidTable;
+   TransactionId xid;
    bool        wakeupNeeded = true;
+   int         trace_flag;
 
 #ifdef USER_LOCKS
    int         is_user_lock;
 
-   is_user_lock = (lockmethod == 0);
+   is_user_lock = (lockmethod == USER_LOCKMETHOD);
    if (is_user_lock)
    {
-       lockmethod = 1;
-#ifdef USER_LOCKS_DEBUG
-       elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
-            locktag->tupleId.ip_posid,
-            ((locktag->tupleId.ip_blkid.bi_hi << 16) +
-             locktag->tupleId.ip_blkid.bi_lo),
-            lockmode);
-#endif
+       TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u,%u] %d",
+               locktag->tupleId.ip_posid,
+               ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+                locktag->tupleId.ip_blkid.bi_lo),
+               lockmode);
    }
 #endif
 
+   /* ???????? This must be changed when short term locks will be used */
+   locktag->lockmethod = lockmethod;
+
+#ifdef USER_LOCKS
+   trace_flag = \
+       (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+   trace_flag = TRACE_LOCKS;
+#endif
+
    Assert(lockmethod < NumLockMethods);
    lockMethodTable = LockMethodTable[lockmethod];
    if (!lockMethodTable)
@@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    if (LockingIsDisabled)
        return (TRUE);
 
-   LOCK_PRINT("Release", locktag, lockmode);
-
    masterLock = lockMethodTable->ctl->masterLock;
-   xidTable = lockMethodTable->xidHash;
-
    SpinAcquire(masterLock);
 
-   Assert(lockMethodTable->lockHash->hash == tag_hash);
-   lock = (LOCK *)
-       hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
-
-#ifdef USER_LOCKS
-
    /*
-    * If the entry is not found hash_search returns TRUE instead of NULL,
-    * so we must check it explicitly.
+    * Find a lock with this tag
     */
-   if ((is_user_lock) && (lock == (LOCK *) TRUE))
-   {
-       SpinRelease(masterLock);
-       elog(NOTICE, "LockRelease: there are no locks with this tag");
-       return (FALSE);
-   }
-#endif
+   Assert(lockMethodTable->lockHash->hash == tag_hash);
+   lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+                               HASH_FIND, &found);
 
    /*
-    * let the caller print its own error message, too. Do not
-    * elog(ERROR).
+    * let the caller print its own error message, too. Do not elog(ERROR).
     */
    if (!lock)
    {
@@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
    if (!found)
    {
        SpinRelease(masterLock);
+#ifdef USER_LOCKS
+       if (is_user_lock) {
+           TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+           return (FALSE);
+       }
+#endif
        elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
        return (FALSE);
    }
+   LOCK_PRINT("LockRelease: found", lock, lockmode);
+   Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+   Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+   Assert(lock->nActive <= lock->nHolding);
 
-   Assert(lock->nHolding > 0);
-
-#ifdef USER_LOCKS
-
-   /*
-    * If this is an user lock it can be removed only after checking that
-    * it was acquired by the current process, so this code is skipped and
-    * executed later.
-    */
-   if (!is_user_lock)
-   {
-#endif
-
-       /*
-        * fix the general lock stats
-        */
-       lock->nHolding--;
-       lock->holders[lockmode]--;
-       lock->nActive--;
-       lock->activeHolders[lockmode]--;
-
-       Assert(lock->nActive >= 0);
-
-       if (!lock->nHolding)
-       {
-           /* ------------------
-            * if there's no one waiting in the queue,
-            * we just released the last lock.
-            * Delete it from the lock table.
-            * ------------------
-            */
-           Assert(lockMethodTable->lockHash->hash == tag_hash);
-           lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                       (Pointer) &(lock->tag),
-                                       HASH_REMOVE_SAVED,
-                                       &found);
-           Assert(lock && found);
-           wakeupNeeded = false;
-       }
-#ifdef USER_LOCKS
-   }
-#endif
 
    /* ------------------
     * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
     * ------------------
     */
    MemSet(&item, 0, XID_TAGSIZE);
-
-   TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
    item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-   item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+   item.tag.lockmethod = lockmethod;
 #endif
-
 #ifdef USER_LOCKS
-   if (is_user_lock)
-   {
+   if (is_user_lock) {
        item.tag.pid = MyProcPid;
-       item.tag.xid = 0;
-#ifdef USER_LOCKS_DEBUG
-       elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
-            item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+       item.tag.xid = xid = 0;
+   } else {
+       xid = GetCurrentTransactionId();
+       TransactionIdStore(xid, &item.tag.xid);
    }
+#else
+   xid = GetCurrentTransactionId();
+   TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-   if (!(result = (XIDLookupEnt *) hash_search(xidTable,
-                                               (Pointer) &item,
-                                               HASH_FIND_SAVE,
-                                               &found))
-       || !found)
+   /*
+    * Find an xid entry with this tag
+    */
+   xidTable = lockMethodTable->xidHash;
+   result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+                                         HASH_FIND_SAVE, &found);
+   if (!result || !found)
    {
        SpinRelease(masterLock);
 #ifdef USER_LOCKS
-       if ((is_user_lock) && (result))
-           elog(NOTICE, "LockRelease: you don't have a lock on this tag");
-       else
-           elog(NOTICE, "LockRelease: find xid, table corrupted");
-#else
-       elog(NOTICE, "LockReplace: xid table corrupted");
+       if (!found && is_user_lock) {
+           TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+       } else
 #endif
+       elog(NOTICE, "LockReplace: xid table corrupted");
        return (FALSE);
    }
+   XID_PRINT("LockRelease: found", result);
+   Assert(result->tag.lock == MAKE_OFFSET(lock));
+
+   /*
+    * Check that we are actually holding a lock of the type we want
+    * to release.
+    */
+   if (!(result->holders[lockmode] > 0)) {
+       SpinRelease(masterLock);
+       XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
+       elog(NOTICE, "LockRelease: you don't own a lock of type %s",
+            lock_types[lockmode]);
+       Assert(result->holders[lockmode] >= 0);
+       return (FALSE);
+   }
+   Assert(result->nHolding > 0);
+
+   /*
+    * fix the general lock stats
+    */
+   lock->nHolding--;
+   lock->holders[lockmode]--;
+   lock->nActive--;
+    lock->activeHolders[lockmode]--;
+
+   /* --------------------------
+    * If there are still active locks of the type I just released, no one
+    * should be woken up.  Whoever is asleep will still conflict
+    * with the remaining locks.
+    * --------------------------
+    */
+   if (lock->activeHolders[lockmode])
+   {
+       wakeupNeeded = false;
+   }
+   else
+   {
+       /* change the conflict mask.  No more of this lock type. */
+       lock->mask &= BITS_OFF[lockmode];
+   }
+
+   LOCK_PRINT("LockRelease: updated", lock, lockmode);
+   Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+   Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
+   Assert(lock->nActive <= lock->nHolding);
+
+   if (!lock->nHolding)
+   {
+       /* ------------------
+        * if there's no one waiting in the queue,
+        * we just released the last lock.
+        * Delete it from the lock table.
+        * ------------------
+        */
+       Assert(lockMethodTable->lockHash->hash == tag_hash);
+       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+                                   (Pointer) &(lock->tag),
+                                   HASH_REMOVE,
+                                   &found);
+       Assert(lock && found);
+       wakeupNeeded = false;
+   }
 
    /*
     * now check to see if I have any private locks.  If I do, decrement
@@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
     */
    result->holders[lockmode]--;
    result->nHolding--;
-
-   XID_PRINT("LockRelease updated xid stats", result);
+   XID_PRINT("LockRelease: updated", result);
+   Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
    /*
     * If this was my last hold on this lock, delete my entry in the XID
@@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
     */
    if (!result->nHolding)
    {
-#ifdef USER_LOCKS
-       if (result->queue.prev == INVALID_OFFSET)
+       if (result->queue.prev == INVALID_OFFSET) {
            elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
-       if (result->queue.next == INVALID_OFFSET)
+       }
+       if (result->queue.next == INVALID_OFFSET) {
            elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
-#endif
+       }
        if (result->queue.next != INVALID_OFFSET)
            SHMQueueDelete(&result->queue);
-       if (!(result = (XIDLookupEnt *)
-             hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
-           !found)
+       XID_PRINT("LockRelease: deleting", result);
+       result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
+                                             HASH_REMOVE_SAVED, &found);
+       if (!result || !found)
        {
            SpinRelease(masterLock);
-#ifdef USER_LOCKS
            elog(NOTICE, "LockRelease: remove xid, table corrupted");
-#else
-           elog(NOTICE, "LockReplace: xid table corrupted");
-#endif
            return (FALSE);
        }
    }
 
-#ifdef USER_LOCKS
-
-   /*
-    * If this is an user lock remove it now, after the corresponding xid
-    * entry has been found and deleted.
-    */
-   if (is_user_lock)
-   {
-
-       /*
-        * fix the general lock stats
-        */
-       lock->nHolding--;
-       lock->holders[lockmode]--;
-       lock->nActive--;
-       lock->activeHolders[lockmode]--;
-
-       Assert(lock->nActive >= 0);
-
-       if (!lock->nHolding)
-       {
-           /* ------------------
-            * if there's no one waiting in the queue,
-            * we just released the last lock.
-            * Delete it from the lock table.
-            * ------------------
-            */
-           Assert(lockMethodTable->lockHash->hash == tag_hash);
-           lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                       (Pointer) &(lock->tag),
-                                       HASH_REMOVE,
-                                       &found);
-           Assert(lock && found);
-           wakeupNeeded = false;
-       }
-   }
-#endif
-
-   /* --------------------------
-    * If there are still active locks of the type I just released, no one
-    * should be woken up.  Whoever is asleep will still conflict
-    * with the remaining locks.
-    * --------------------------
-    */
-   if (!(lock->activeHolders[lockmode]))
-   {
-       /* change the conflict mask.  No more of this lock type. */
-       lock->mask &= BITS_OFF[lockmode];
-   }
-
    if (wakeupNeeded)
    {
        /* --------------------------
@@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         */
        ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
    }
+   else
+   {
+       LOCK_TPRINTF(lock, "LockRelease: no wakeup needed");
+   }
 
    SpinRelease(masterLock);
    return (TRUE);
 }
 
-/*
- * GrantLock -- update the lock data structure to show
- *     the new lock holder.
- */
-void
-GrantLock(LOCK *lock, LOCKMODE lockmode)
-{
-   lock->nActive++;
-   lock->activeHolders[lockmode]++;
-   lock->mask |= BITS_ON[lockmode];
-}
-
-#ifdef USER_LOCKS
 /*
  * LockReleaseAll -- Release all locks in a process lock queue.
- *
- * Note: This code is a little complicated by the presence in the
- *      same queue of user locks which can't be removed from the
- *      normal lock queue at the end of a transaction. They must
- *      however be removed when the backend exits.
- *      A dummy lockmethod 0 is used to indicate that we are releasing
- *      the user locks, from the code added to ProcKill().
  */
-#endif
 bool
 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 {
@@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
    int         done;
    XIDLookupEnt *xidLook = NULL;
    XIDLookupEnt *tmp = NULL;
+   XIDLookupEnt *result;
    SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
    SPINLOCK    masterLock;
    LOCKMETHODTABLE    *lockMethodTable;
@@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                numLockModes;
    LOCK       *lock;
    bool        found;
+   int         trace_flag;
+   int         xidtag_lockmethod;
 
 #ifdef USER_LOCKS
    int         is_user_lock_table,
                count,
-               nskip;
+               nleft;
 
-   is_user_lock_table = (lockmethod == 0);
-#ifdef USER_LOCKS_DEBUG
-   elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
-#endif
-   if (is_user_lock_table)
-       lockmethod = 1;
+   count = nleft = 0;
+
+   is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
+   trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+   trace_flag = TRACE_LOCKS;
 #endif
+   TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
+           lockmethod, MyProcPid);
 
    Assert(lockmethod < NumLockMethods);
    lockMethodTable = LockMethodTable[lockmethod];
-   if (!lockMethodTable)
+   if (!lockMethodTable) {
+       elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
        return (FALSE);
-
-   numLockModes = lockMethodTable->ctl->numLockModes;
-   masterLock = lockMethodTable->ctl->masterLock;
+   }
 
    if (SHMQueueEmpty(lockQueue))
        return TRUE;
 
-#ifdef USER_LOCKS
+   numLockModes = lockMethodTable->ctl->numLockModes;
+   masterLock = lockMethodTable->ctl->masterLock;
+
    SpinAcquire(masterLock);
-#endif
    SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
 
-   XID_PRINT("LockReleaseAll", xidLook);
-
-#ifndef USER_LOCKS
-   SpinAcquire(masterLock);
-#else
-   count = nskip = 0;
-#endif
    for (;;)
    {
+       /*
+        * Sometimes the queue appears to be messed up.
+        */
+       if (count++ > 1000)
+       {
+           elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
+           nleft = 0;
+           break;
+       }
+
        /* ---------------------------
         * XXX Here we assume the shared memory queue is circular and
         * that we know its internal structure.  Should have some sort of
@@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
        done = (xidLook->queue.next == end);
        lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-       LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
-
-#ifdef USER_LOCKS
+       xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
+       if ((xidtag_lockmethod == lockmethod) || (trace_flag >= 2)) {
+           XID_PRINT("LockReleaseAll", xidLook);
+           LOCK_PRINT("LockReleaseAll", lock, 0);
+       }
 
-       /*
-        * Sometimes the queue appears to be messed up.
-        */
-       if (count++ > 2000)
-       {
-           elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
-           nskip = 0;
-           break;
+#ifdef USE_XIDTAG_LOCKMETHOD
+       if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
+           elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
+                xidtag_lockmethod, lock->tag.lockmethod);
+#endif
+       if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2)) {
+           TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
+           nleft++;
+           goto next_item;
        }
+
+       Assert(lock->nHolding > 0);
+       Assert(lock->nActive > 0);
+       Assert(lock->nActive <= lock->nHolding);
+       Assert(xidLook->nHolding >= 0);
+       Assert(xidLook->nHolding <= lock->nHolding);
+
+#ifdef USER_LOCKS
        if (is_user_lock_table)
        {
            if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
            {
-#ifdef USER_LOCKS_DEBUG
-               elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
-                 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-               nskip++;
+               TPRINTF(TRACE_USERLOCKS,
+                       "LockReleaseAll: skiping normal lock [%d,%d,%d]",
+                       xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+               nleft++;
                goto next_item;
            }
            if (xidLook->tag.pid != MyProcPid)
            {
-               /* This should never happen */
-#ifdef USER_LOCKS_DEBUG
+               /* Should never happen */
                elog(NOTICE,
-                    "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+                    "LockReleaseAll: INVALID PID: [%u,%u] [%d,%d,%d]",
                     lock->tag.tupleId.ip_posid,
                     ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
                      lock->tag.tupleId.ip_blkid.bi_lo),
-                 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-               nskip++;
+                    xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+               nleft++;
                goto next_item;
            }
-#ifdef USER_LOCKS_DEBUG
-           elog(NOTICE,
-                "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
-                lock->tag.tupleId.ip_posid,
-                ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
-                 lock->tag.tupleId.ip_blkid.bi_lo),
-                xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
+           TPRINTF(TRACE_USERLOCKS,
+                   "LockReleaseAll: releasing user lock [%u,%u] [%d,%d,%d]",
+                   lock->tag.tupleId.ip_posid,
+                   ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+                    lock->tag.tupleId.ip_blkid.bi_lo),
+                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
        }
        else
        {
-           if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
+           /* Can't check xidLook->tag.xid, can be 0 also for normal locks */
+           if (xidLook->tag.pid != 0)
            {
-#ifdef USER_LOCKS_DEBUG
-               elog(NOTICE,
-                    "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
-                    lock->tag.tupleId.ip_posid,
-                    ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
-                     lock->tag.tupleId.ip_blkid.bi_lo),
-                 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-               nskip++;
+               TPRINTF(TRACE_LOCKS,
+                       "LockReleaseAll: skiping user lock [%u,%u] [%d,%d,%d]",
+                       lock->tag.tupleId.ip_posid,
+                       ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+                        lock->tag.tupleId.ip_blkid.bi_lo),
+                       xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+               nleft++;
                goto next_item;
            }
-#ifdef USER_LOCKS_DEBUG
-           elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
-                xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
        }
 #endif
 
@@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
         */
        if (lock->nHolding != xidLook->nHolding)
        {
-           lock->nHolding -= xidLook->nHolding;
-           lock->nActive -= xidLook->nHolding;
-           Assert(lock->nActive >= 0);
            for (i = 1; i <= numLockModes; i++)
            {
+               Assert(xidLook->holders[i] >= 0);
                lock->holders[i] -= xidLook->holders[i];
                lock->activeHolders[i] -= xidLook->holders[i];
+               Assert((lock->holders[i] >= 0) \
+                      && (lock->activeHolders[i] >= 0));
                if (!lock->activeHolders[i])
                    lock->mask &= BITS_OFF[i];
            }
+           lock->nHolding -= xidLook->nHolding;
+           lock->nActive -= xidLook->nHolding;
+           Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
+           Assert(lock->nActive <= lock->nHolding);
        }
        else
        {
@@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
             * --------------
             */
            lock->nHolding = 0;
+           /* Fix the lock status, just for next LOCK_PRINT message. */
+           for (i=1; i<=numLockModes; i++) {
+               Assert(lock->holders[i] == lock->activeHolders[i]);
+               lock->holders[i] = lock->activeHolders[i] = 0;
+           }
        }
+       LOCK_PRINT("LockReleaseAll: updated", lock, 0);
+
+       /*
+        * Remove the xid from the process lock queue
+        */
+       SHMQueueDelete(&xidLook->queue);
+
        /* ----------------
         * always remove the xidLookup entry, we're done with it now
         * ----------------
         */
-#ifdef USER_LOCKS
-       SHMQueueDelete(&xidLook->queue);
-#endif
-       if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
-           || !found)
+
+       XID_PRINT("LockReleaseAll: deleting", xidLook);
+       result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
+                                             (Pointer) xidLook, 
+                                             HASH_REMOVE,
+                                             &found);
+       if (!result || !found)
        {
            SpinRelease(masterLock);
            elog(NOTICE, "LockReleaseAll: xid table corrupted");
@@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
             * the last lock.
             * --------------------
             */
-
+           LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
            Assert(lockMethodTable->lockHash->hash == tag_hash);
-           lock = (LOCK *)
-               hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
+           lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+                                       (Pointer) &(lock->tag),
+                                       HASH_REMOVE, &found);
            if ((!lock) || (!found))
            {
                SpinRelease(masterLock);
@@ -1324,15 +1448,18 @@ next_item:
        SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
        xidLook = tmp;
    }
-   SpinRelease(masterLock);
-#ifdef USER_LOCKS
 
    /*
     * Reinitialize the queue only if nothing has been left in.
     */
-   if (nskip == 0)
-#endif
+   if (nleft == 0) {
+       TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
        SHMQueueInit(lockQueue);
+   }
+
+   SpinRelease(masterLock);
+   TPRINTF(trace_flag, "LockReleaseAll: done");
+
    return TRUE;
 }
 
@@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
        checked_procs[0] = MyProc;
        nprocs = 1;
 
-       lockMethodTable = LockMethodTable[1];
+       lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
        xidTable = lockMethodTable->xidHash;
 
        MemSet(&item, 0, XID_TAGSIZE);
@@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
        done = (xidLook->queue.next == end);
        lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-       LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
+       LOCK_PRINT("DeadLockCheck", lock, 0);
 
        /*
         * This is our only check to see if we found the lock we want.
@@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
    return false;
 }
 
+/*
+ * Return an array with the pids of all processes owning a lock.
+ * This works only for user locks because normal locks have no
+ * pid information in the corresponding XIDLookupEnt.
+ */
+ArrayType *
+LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
+{
+   XIDLookupEnt *xidLook = NULL;
+   SPINLOCK    masterLock;
+   LOCK        *lock;
+   SHMEM_OFFSET lock_offset;
+   int         count = 0;
+   LOCKMETHODTABLE    *lockMethodTable;
+   HTAB        *xidTable;
+   bool        found;
+   int         ndims,
+               nitems,
+               hdrlen,
+               size;
+   int         lbounds[1],
+               hbounds[1];
+   ArrayType   *array;
+   int         *data_ptr;
+
+   /* Assume that no one will modify the result */
+   static int  empty_array[] = { 20, 1, 0, 0, 0 };
+
+#ifdef USER_LOCKS
+   int         is_user_lock;
+
+   is_user_lock = (lockmethod == USER_LOCKMETHOD);
+   if (is_user_lock)
+   {
+       TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u,%u]",
+               locktag->tupleId.ip_posid,
+               ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+                locktag->tupleId.ip_blkid.bi_lo));
+   }
+#endif
+
+   /* This must be changed when short term locks will be used */
+   locktag->lockmethod = lockmethod;
+
+   Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
+   lockMethodTable = LockMethodTable[lockmethod];
+   if (!lockMethodTable)
+   {
+       elog(NOTICE, "lockMethodTable is null in LockOwners");
+       return ((ArrayType *) &empty_array);
+   }
+
+   if (LockingIsDisabled)
+   {
+       return ((ArrayType *) &empty_array);
+   }
+
+   masterLock = lockMethodTable->ctl->masterLock;
+   SpinAcquire(masterLock);
+
+   /*
+    * Find a lock with this tag
+    */
+   Assert(lockMethodTable->lockHash->hash == tag_hash);
+   lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+                               HASH_FIND, &found);
+
+   /*
+    * let the caller print its own error message, too. Do not elog(WARN).
+    */
+   if (!lock)
+   {
+       SpinRelease(masterLock);
+       elog(NOTICE, "LockOwners: locktable corrupted");
+       return ((ArrayType *) &empty_array);
+   }
+
+   if (!found)
+   {
+       SpinRelease(masterLock);
+#ifdef USER_LOCKS
+       if (is_user_lock) {
+           TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
+           return ((ArrayType *) &empty_array);
+       }
+#endif
+       elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
+       return ((ArrayType *) &empty_array);
+   }
+   LOCK_PRINT("LockOwners: found", lock, 0);
+   Assert((lock->nHolding > 0) && (lock->nActive > 0));
+   Assert(lock->nActive <= lock->nHolding);
+   lock_offset = MAKE_OFFSET(lock);
+
+   /* Construct a 1-dimensional array */
+   ndims = 1;
+   hdrlen = ARR_OVERHEAD(ndims);
+   lbounds[0] = 0;
+   hbounds[0] = lock->nActive;
+   size = hdrlen + sizeof(int) * hbounds[0];
+   array = (ArrayType *) palloc(size);
+   MemSet(array, 0, size);
+   memmove((char *) array, (char *) &size, sizeof(int));
+   memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
+   memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+   memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
+   SET_LO_FLAG(false, array);
+   data_ptr = (int *) ARR_DATA_PTR(array);
+
+   xidTable = lockMethodTable->xidHash;
+   hash_seq(NULL);
+   nitems = 0;
+   while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+          (xidLook != (XIDLookupEnt *)TRUE)) {
+       if (count++ > 1000) {
+           elog(NOTICE,"LockOwners: possible loop, giving up");
+           break;
+       }
+
+       if (xidLook->tag.pid == 0) {
+           XID_PRINT("LockOwners: no pid", xidLook);
+           continue;
+       }
+
+       if (!xidLook->tag.lock) {
+           XID_PRINT("LockOwners: NULL LOCK", xidLook);
+           continue;
+       }
+
+       if (xidLook->tag.lock != lock_offset) {
+           XID_PRINT("LockOwners: different lock", xidLook);
+           continue;
+       }
+
+       if (LOCK_LOCKMETHOD(*lock) != lockmethod) {
+           XID_PRINT("LockOwners: other table", xidLook);
+           continue;
+       }
+
+       if (xidLook->nHolding <= 0) {
+           XID_PRINT("LockOwners: not holding", xidLook);
+           continue;
+       }
+
+       if (nitems >= hbounds[0]) {
+           elog(NOTICE,"LockOwners: array size exceeded");
+           break;
+       }
+
+       /*
+        * Check that the holding process is still alive by sending
+        * him an unused (ignored) signal. If the kill fails the
+        * process is not alive.
+        */
+       if ((xidLook->tag.pid != MyProcPid) \
+           && (kill(xidLook->tag.pid, SIGCHLD)) != 0)
+       {
+           /* Return a negative pid to signal that process is dead */
+           data_ptr[nitems++] = - (xidLook->tag.pid);
+           XID_PRINT("LockOwners: not alive", xidLook);
+           /* XXX - TODO: remove this entry and update lock stats */
+           continue;
+       }
+
+       /* Found a process holding the lock */
+       XID_PRINT("LockOwners: holding", xidLook);
+       data_ptr[nitems++] = xidLook->tag.pid;
+   }
+
+   SpinRelease(masterLock);
+
+   /* Adjust the actual size of the array */
+   hbounds[0] = nitems;
+   size = hdrlen + sizeof(int) * hbounds[0];
+   memmove((char *) array, (char *) &size, sizeof(int));
+   memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+
+   return (array);
+}
+
 #ifdef DEADLOCK_DEBUG
 /*
- * Dump all locks. Must have already acquired the masterLock.
+ * Dump all locks in the proc->lockQueue. Must have already acquired
+ * the masterLock.
  */
 void
 DumpLocks()
@@ -1577,9 +1885,8 @@ DumpLocks()
    SPINLOCK    masterLock;
    int         numLockModes;
    LOCK       *lock;
-
-   count;
-   int         lockmethod = 1;
+   int         count = 0;
+   int         lockmethod = DEFAULT_LOCKMETHOD;
    LOCKMETHODTABLE    *lockMethodTable;
 
    ShmemPIDLookup(MyProcPid, &location);
@@ -1604,11 +1911,18 @@ DumpLocks()
    SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
    end = MAKE_OFFSET(lockQueue);
 
-   LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
-   XID_PRINT("DumpLocks", xidLook);
+   if (MyProc->waitLock) {
+       LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
+   }
 
-   for (count = 0;;)
+   for (;;)
    {
+       if (count++ > 2000)
+       {
+           elog(NOTICE, "DumpLocks: xid loop detected, giving up");
+           break;
+       }
+
        /* ---------------------------
         * XXX Here we assume the shared memory queue is circular and
         * that we know its internal structure.  Should have some sort of
@@ -1618,19 +1932,68 @@ DumpLocks()
        done = (xidLook->queue.next == end);
        lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-       LOCK_DUMP("DumpLocks", lock, 0);
-
-       if (count++ > 2000)
-       {
-           elog(NOTICE, "DumpLocks: xid loop detected, giving up");
-           break;
-       }
+       XID_PRINT_AUX("DumpLocks", xidLook);
+       LOCK_PRINT_AUX("DumpLocks", lock, 0);
 
        if (done)
            break;
+
        SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
        xidLook = tmp;
    }
 }
 
+/*
+ * Dump all postgres locks. Must have already acquired the masterLock.
+ */
+void
+DumpAllLocks()
+{
+   SHMEM_OFFSET location;
+   PROC        *proc;
+   XIDLookupEnt *xidLook = NULL;
+   LOCK        *lock;
+   int         pid;
+   int         count = 0;
+   int         lockmethod = DEFAULT_LOCKMETHOD;
+   LOCKMETHODTABLE     *lockMethodTable;
+   HTAB        *xidTable;
+
+   pid = getpid();
+   ShmemPIDLookup(pid,&location);
+   if (location == INVALID_OFFSET)
+       return;
+   proc = (PROC *) MAKE_PTR(location);
+   if (proc != MyProc)
+       return;
+
+   Assert(lockmethod < NumLockMethods);
+   lockMethodTable = LockMethodTable[lockmethod];
+   if (!lockMethodTable)
+       return;
+
+   xidTable = lockMethodTable->xidHash;
+
+   if (MyProc->waitLock) {
+       LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock,0);
+   }
+
+   hash_seq(NULL);
+   while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+          (xidLook != (XIDLookupEnt *)TRUE)) {
+       XID_PRINT_AUX("DumpAllLocks", xidLook);
+
+       if (xidLook->tag.lock) {
+           lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+           LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
+       } else {
+           elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
+       }
+
+       if (count++ > 2000) {
+           elog(NOTICE,"DumpAllLocks: possible loop, giving up");
+           break;
+       }
+   }
+}
 #endif
index 0998389ffe91ec373a01760c722c3b68fb9f493b..3887a8e37d16b7051cb739a8342b511d15db13d2 100644 (file)
@@ -12,7 +12,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.22 1998/08/19 02:02:44 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.23 1998/08/25 21:20:28 scrappy Exp $
  *
  * NOTES:
  *  (1) The lock.c module assumes that the caller here is doing
 #include "utils/rel.h"
 #include "miscadmin.h"         /* MyDatabaseId */
 
-static bool
-MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
-            PG_LOCK_LEVEL level);
-static bool
-MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
-            PG_LOCK_LEVEL level);
+static bool MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag,
+                        LOCKMODE lockmode, PG_LOCK_LEVEL level);
+static bool MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag,
+                        LOCKMODE lockmode, PG_LOCK_LEVEL level);
 
 #ifdef LowLevelLocking
 
@@ -130,6 +128,7 @@ static int  MultiPrios[] = {
  * lock table is ONE lock table, not three.
  */
 LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL;
+LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL;
 #ifdef NOT_USED
 LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL;
 #endif
@@ -150,12 +149,25 @@ InitMultiLevelLocks()
    /* -----------------------
     * No short term lock table for now.  -Jeff 15 July 1991
     *
-    * ShortTermTableId = LockTableRename(lockmethod);
+    * ShortTermTableId = LockMethodTableRename(lockmethod);
     * if (! (ShortTermTableId)) {
     *   elog(ERROR,"InitMultiLocks: couldnt rename lock table");
     * }
     * -----------------------
     */
+
+#ifdef USER_LOCKS
+   /*
+    * Allocate another tableId for long-term locks
+    */
+   LongTermTableId = LockMethodTableRename(MultiTableId);
+   if (!(LongTermTableId))
+   {
+       elog(ERROR,
+            "InitMultiLevelLocks: couldn't rename long-term lock table");
+   }
+#endif
+
    return MultiTableId;
 }
 
index fb052582e79d7fc0ba43338270620225fdf4bc64..4cd9602a8b51dae5ecf3beb29407761da02427fb 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *     This is so that we can support more backends. (system-wide semaphore
  *     sets run out pretty fast.)                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
  */
 #include 
 #include 
 #include "storage/shmem.h"
 #include "storage/spin.h"
 #include "storage/proc.h"
+#include "utils/trace.h"
 
 static void HandleDeadLock(int sig);
 static PROC *ProcWakeup(PROC *proc, int errType);
 
+#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
+
 /* --------------------
  * Spin lock for manipulating the shared process data structure:
  * ProcGlobal.... Adding an extra spin lock seemed like the smallest
@@ -247,10 +250,7 @@ InitProcess(IPCKey key)
     */
    SpinRelease(ProcStructLock);
 
-   MyProc->pid = 0;
-#if 0
    MyProc->pid = MyProcPid;
-#endif
    MyProc->xid = InvalidTransactionId;
 #ifdef LowLevelLocking
    MyProc->xmin = InvalidTransactionId;
@@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid)
     * ---------------
     */
    ProcReleaseSpins(proc);
-   LockReleaseAll(1, &proc->lockQueue);
+   LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
 
 #ifdef USER_LOCKS
-   LockReleaseAll(0, &proc->lockQueue);
+   /*
+    * Assume we have a second lock table.
+    */
+   LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
 #endif
 
    /* ----------------
@@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue)
  * NOTES: The process queue is now a priority queue for locking.
  */
 int
-ProcSleep(PROC_QUEUE *waitQueue,
+ProcSleep(PROC_QUEUE *waitQueue,       /* lock->waitProcs */
          SPINLOCK spinlock,
-         int token,
+         int token,                    /* lockmode */
          int prio,
-         LOCK *lock)
+         LOCK *lock,
+         TransactionId xid)            /* needed by user locks, see below */
 {
    int         i;
    PROC       *proc;
@@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
    proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
    /* If we are a reader, and they are writers, skip past them */
-
    for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
        proc = (PROC *) MAKE_PTR(proc->links.prev);
 
@@ -482,12 +485,22 @@ ProcSleep(PROC_QUEUE *waitQueue,
    MyProc->token = token;
    MyProc->waitLock = lock;
 
+#ifdef USER_LOCKS
+   /* -------------------
+    * Currently, we only need this for the ProcWakeup routines.
+    * This must be 0 for user lock, so we can't just use the value
+    * from GetCurrentTransactionId().
+    * -------------------
+    */
+   TransactionIdStore(xid, &MyProc->xid);
+#else
 #ifndef LowLevelLocking
    /* -------------------
     * currently, we only need this for the ProcWakeup routines
     * -------------------
     */
    TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
+#endif
 #endif
 
    /* -------------------
@@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
     * --------------
     */
    MemSet(&timeval, 0, sizeof(struct itimerval));
-   timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
+   timeval.it_value.tv_sec = \
+       (DeadlockCheckTimer ? DeadlockCheckTimer : DEADLOCK_CHECK_TIMER);
 
    do
    {
@@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
         * the semaphore implementation.
         * --------------
         */
-       IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+       IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
+                        IpcExclusiveLock);
    } while (MyProc->errType == STATUS_NOT_FOUND);      /* sleep after deadlock
                                                         * check */
 
@@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
     * ---------------
     */
    timeval.it_value.tv_sec = 0;
-
-
    if (setitimer(ITIMER_REAL, &timeval, &dummy))
        elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
 
@@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue,
     */
    SpinAcquire(spinlock);
 
+#ifdef LOCK_MGR_DEBUG
+   /* Just to get meaningful debug messages from DumpLocks() */
+   MyProc->waitLock = (LOCK *)NULL;
+#endif
+
    return (MyProc->errType);
 }
 
@@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 {
    PROC       *proc;
    int         count;
+   int         trace_flag;
+   int         last_locktype = -1;
+   int         queue_size = queue->size;
+
+   Assert(queue->size >= 0);
 
    if (!queue->size)
        return (STATUS_NOT_FOUND);
 
    proc = (PROC *) MAKE_PTR(queue->links.prev);
    count = 0;
-   while ((LockResolveConflicts(lockmethod,
+   while ((queue_size--) && (proc))
+   {
+       /*
+        * This proc will conflict as the previous one did, don't even try.
+        */
+       if (proc->token == last_locktype)
+       {
+           continue;
+       }
+
+       /*
+        * This proc conflicts with locks held by others, ignored.
+        */
+       if (LockResolveConflicts(lockmethod,
                                 lock,
                                 proc->token,
-                                proc->xid) == STATUS_OK))
-   {
+                                proc->xid,
+                                (XIDLookupEnt *) NULL) != STATUS_OK)
+       {
+           last_locktype = proc->token;
+           continue;
+       }
 
        /*
         * there was a waiting process, grant it the lock before waking it
@@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
         * time that the awoken process begins executing again.
         */
        GrantLock(lock, proc->token);
-       queue->size--;
 
        /*
         * ProcWakeup removes proc from the lock waiting process queue and
         * returns the next proc in chain.
         */
-       proc = ProcWakeup(proc, NO_ERROR);
 
        count++;
-       if (!proc || queue->size == 0)
-           break;
+       queue->size--;
+       proc = ProcWakeup(proc, NO_ERROR);
    }
 
+   Assert(queue->size >= 0);
+
    if (count)
        return (STATUS_OK);
-   else
+   else {
        /* Something is still blocking us.  May have deadlocked. */
+       trace_flag = (lock->tag.lockmethod == USER_LOCKMETHOD) ? \
+           TRACE_USERLOCKS : TRACE_LOCKS;
+       TPRINTF(trace_flag,
+               "ProcLockWakeup: lock(%x) can't wake up any process",
+               MAKE_OFFSET(lock));
+#ifdef DEADLOCK_DEBUG
+       if (pg_options[trace_flag] >= 2)
+           DumpAllLocks();
+#endif
        return (STATUS_NOT_FOUND);
+   }
 }
 
 void
@@ -685,7 +735,7 @@ HandleDeadLock(int sig)
    }
 
 #ifdef DEADLOCK_DEBUG
-   DumpLocks();
+   DumpAllLocks();
 #endif
 
    if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
@@ -711,7 +761,8 @@ HandleDeadLock(int sig)
     * I was awoken by a signal, not by someone unlocking my semaphore.
     * ------------------
     */
-   IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+   IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
+                      IpcExclusiveLock);
 
    /* -------------
     * Set MyProc->errType to STATUS_ERROR so that we abort after
index fd1f95aab988368eef0b1b5d5bed73f5d541384c..e989e57b4404ce8e47c8bf0b14bfbfb594cdd625 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.16 1998/08/01 15:26:37 vadim Exp $
+ * $Id: lock.h,v 1.17 1998/08/25 21:20:31 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,6 +15,8 @@
 
 #include 
 #include 
+#include 
+#include 
 
 extern SPINLOCK LockMgrLock;
 typedef int MASK;
@@ -32,7 +34,7 @@ typedef int MASK;
  * NLOCKENTS - The maximum number of lock entries in the lock table.
  * ----------------------
  */
-#define NBACKENDS 50
+#define NBACKENDS MaxBackendId
 #define NLOCKS_PER_XACT 40
 #define NLOCKENTS NLOCKS_PER_XACT*NBACKENDS
 
@@ -51,9 +53,14 @@ typedef int LOCKMETHOD;
  * CreateSpinLocks() or the number of shared memory locations allocated
  * for lock table spin locks in the case of machines with TAS instructions.
  */
-#define MAX_LOCK_METHODS 2
+#define MAX_LOCK_METHODS   3
 
-#define INVALID_TABLEID 0
+#define INVALID_TABLEID        0
+
+#define INVALID_LOCKMETHOD INVALID_TABLEID
+#define DEFAULT_LOCKMETHOD 1
+#define USER_LOCKMETHOD        2
+#define MIN_LOCKMETHOD     DEFAULT_LOCKMETHOD
 
 /*typedef struct LOCK LOCK; */
 
@@ -63,9 +70,11 @@ typedef struct LTAG
    Oid         relId;
    Oid         dbId;
    ItemPointerData tupleId;
+   uint16      lockmethod;             /* needed by user locks */
 } LOCKTAG;
 
 #define TAGSIZE (sizeof(LOCKTAG))
+#define LOCKTAG_LOCKMETHOD(locktag) ((locktag).lockmethod)
 
 /* This is the control structure for a lock table. It
  * lives in shared memory:
@@ -143,8 +152,18 @@ typedef struct XIDTAG
    SHMEM_OFFSET lock;
    int         pid;
    TransactionId xid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+   uint16      lockmethod;             /* for debug or consistency checking */
+#endif
 } XIDTAG;
 
+#ifdef USE_XIDTAG_LOCKMETHOD
+#define XIDTAG_LOCKMETHOD(xidtag) ((xidtag).lockmethod)
+#else
+#define XIDTAG_LOCKMETHOD(xidtag) \
+       (((LOCK*) MAKE_PTR((xidtag).lock))->tag.lockmethod)
+#endif
+
 typedef struct XIDLookupEnt
 {
    /* tag */
@@ -157,6 +176,7 @@ typedef struct XIDLookupEnt
 } XIDLookupEnt;
 
 #define XID_TAGSIZE (sizeof(XIDTAG))
+#define XIDENT_LOCKMETHOD(xident) (XIDTAG_LOCKMETHOD((xident).tag))
 
 /* originally in procq.h */
 typedef struct PROC_QUEUE
@@ -191,14 +211,16 @@ typedef struct LOCK
    int         nActive;
 } LOCK;
 
-#define LockGetLock_nHolders(l) l->nHolders
+#define LOCK_LOCKMETHOD(lock) (LOCKTAG_LOCKMETHOD((lock).tag))
 
+#define LockGetLock_nHolders(l) l->nHolders
+#ifdef NOT_USED
 #define LockDecrWaitHolders(lock, lockmode) \
 ( \
   lock->nHolding--, \
   lock->holders[lockmode]-- \
 )
-
+#endif
 #define LockLockTable() SpinAcquire(LockMgrLock);
 #define UnlockLockTable() SpinRelease(LockMgrLock);
 
@@ -209,23 +231,27 @@ extern SPINLOCK LockMgrLock;
  */
 extern void InitLocks(void);
 extern void LockDisable(int status);
-extern LOCKMETHOD
-LockMethodTableInit(char *tabName, MASK *conflictsP, int *prioP,
-           int numModes);
-extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode);
-extern int
-LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-                    TransactionId xid);
-extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode);
+extern LOCKMETHOD LockMethodTableInit(char *tabName, MASK *conflictsP,
+                                     int *prioP, int numModes);
+extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
+extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+                       LOCKMODE lockmode);
+extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock,
+                               LOCKMODE lockmode, TransactionId xid,
+                               XIDLookupEnt *xidentP);
+extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+                       LOCKMODE lockmode);
 extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
 extern int LockShmemSize(void);
 extern bool LockingDisabled(void);
-extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check);
+extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
+                         bool skip_check);
+ArrayType* LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag);
 
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
-
+extern void DumpAllLocks(void);
 #endif
 
 #endif                         /* LOCK_H */
index 8b8b7e28488daea7a1956b5d886a9a3115dd0c67..50f7b03ef633d37b4208e0af2ee6593609c063d7 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.13 1998/07/27 19:38:38 vadim Exp $
+ * $Id: proc.h,v 1.14 1998/08/25 21:20:32 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -105,10 +105,10 @@ extern bool ProcRemove(int pid);
 /* make static in storage/lmgr/proc.c -- jolly */
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int
-ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
-         int prio, LOCK *lock);
-extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock);
+extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
+                    int prio, LOCK *lock, TransactionId xid);
+extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
+                          LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);
 extern void ProcReleaseSpins(PROC *proc);
 extern void ProcFreeAllSemaphores(void);