/*-------------------------------------------------------------------------
*
* lock.c
- * POSTGRES low-level lock mechanism
+ * POSTGRES primary lock mechanism
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.161 2005/12/09 01:22:04 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.162 2005/12/11 21:02:18 tgl Exp $
*
* NOTES
* A lock table is a shared memory hash table. When
/*
- * Links to hash tables containing lock state
+ * Pointers to hash tables containing lock state
+ *
+ * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
+ * shared memory; LockMethodLocalHash is local to each backend.
*/
-static HTAB *LockMethodLockHash;
-static HTAB *LockMethodProcLockHash;
+static HTAB *LockMethodLockHash[NUM_LOCK_PARTITIONS];
+static HTAB *LockMethodProcLockHash[NUM_LOCK_PARTITIONS];
static HTAB *LockMethodLocalHash;
static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
-static void WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
- ResourceOwner owner);
+static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable);
-static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
- PROCLOCK *proclock, bool wakeupNeeded);
+static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
+ LockMethod lockMethodTable, int partition,
+ bool wakeupNeeded);
/*
- * InitLocks -- Initialize the lock module's shared memory.
+ * InitLocks -- Initialize the lock manager's data structures.
+ *
+ * This is called from CreateSharedMemoryAndSemaphores(), which see for
+ * more comments. In the normal postmaster case, the shared hash tables
+ * are created here, as well as a locallock hash table that will remain
+ * unused and empty in the postmaster itself. Backends inherit the pointers
+ * to the shared tables via fork(), and also inherit an image of the locallock
+ * hash table, which they proceed to use. In the EXEC_BACKEND case, each
+ * backend re-executes this code to obtain pointers to the already existing
+ * shared hash tables and to create its locallock hash table.
*/
void
InitLocks(void)
int hash_flags;
long init_table_size,
max_table_size;
+ int i;
- /* Compute init/max size to request for lock hashtables */
+ /*
+ * Compute init/max size to request for lock hashtables. Note these
+ * calculations must agree with LockShmemSize!
+ */
max_table_size = NLOCKENTS();
+ max_table_size = (max_table_size - 1) / NUM_LOCK_PARTITIONS + 1;
init_table_size = max_table_size / 2;
/*
- * allocate a hash table for LOCK structs. This is used to store
+ * Allocate hash tables for LOCK structs. These are used to store
* per-locked-object information.
*/
MemSet(&info, 0, sizeof(info));
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
- sprintf(shmemName, "LOCK hash");
- LockMethodLockHash = ShmemInitHash(shmemName,
- init_table_size,
- max_table_size,
- &info,
- hash_flags);
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ {
+ sprintf(shmemName, "LOCK hash %d", i);
+ LockMethodLockHash[i] = ShmemInitHash(shmemName,
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+ if (!LockMethodLockHash[i])
+ elog(FATAL, "could not initialize lock table \"%s\"", shmemName);
+ }
- if (!LockMethodLockHash)
- elog(FATAL, "could not initialize lock table \"%s\"", shmemName);
+ /* Assume an average of 2 holders per lock */
+ max_table_size *= 2;
+ init_table_size *= 2;
/*
- * allocate a hash table for PROCLOCK structs. This is used to store
- * per-lock-holder information.
+ * Allocate hash tables for PROCLOCK structs. These are used to store
+ * per-lock-per-holder information.
*/
info.keysize = sizeof(PROCLOCKTAG);
info.entrysize = sizeof(PROCLOCK);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
- sprintf(shmemName, "PROCLOCK hash");
- LockMethodProcLockHash = ShmemInitHash(shmemName,
- init_table_size,
- max_table_size,
- &info,
- hash_flags);
-
- if (!LockMethodProcLockHash)
- elog(FATAL, "could not initialize lock table \"%s\"", shmemName);
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ {
+ sprintf(shmemName, "PROCLOCK hash %d", i);
+ LockMethodProcLockHash[i] = ShmemInitHash(shmemName,
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+ if (!LockMethodProcLockHash[i])
+ elog(FATAL, "could not initialize lock table \"%s\"", shmemName);
+ }
/*
- * allocate a non-shared hash table for LOCALLOCK structs. This is used
+ * Allocate one non-shared hash table for LOCALLOCK structs. This is used
* to store lock counts and resource owner information.
*
* The non-shared table could already exist in this process (this occurs
}
+/*
+ * Given a LOCKTAG, determine which partition the lock belongs in.
+ *
+ * Basically what we want to do here is hash the locktag. However, it
+ * seems unwise to use hash_any() because that is the same function that
+ * will be used to distribute the locks within each partition's hash table;
+ * if we use it, we run a big risk of having uneven distribution of hash
+ * codes within each hash table. Instead, we use a simple linear XOR of the
+ * bits of the locktag.
+ */
+int
+LockTagToPartition(const LOCKTAG *locktag)
+{
+ const uint8 *ptr = (const uint8 *) locktag;
+ int result = 0;
+ int i;
+
+ for (i = 0; i < sizeof(LOCKTAG); i++)
+ result ^= *ptr++;
+#if NUM_LOCK_PARTITIONS == 16
+ result ^= result >> 4;
+ result &= 0x0F;
+#elif NUM_LOCK_PARTITIONS == 4
+ result ^= result >> 4;
+ result ^= result >> 2;
+ result &= 0x03;
+#else
+#error unsupported NUM_LOCK_PARTITIONS
+#endif
+ return result;
+}
+
+
/*
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
PROCLOCKTAG proclocktag;
bool found;
ResourceOwner owner;
- LWLockId masterLock;
+ int partition;
+ LWLockId partitionLock;
int status;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
locallock->lock = NULL;
locallock->proclock = NULL;
locallock->isTempObject = isTempObject;
+ locallock->partition = LockTagToPartition(&(localtag.lock));
locallock->nLocks = 0;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
/*
* Otherwise we've got to mess with the shared lock table.
*/
- masterLock = LockMgrLock;
+ partition = locallock->partition;
+ partitionLock = FirstLockMgrLock + partition;
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Find or create a lock with this tag.
* pointer is valid, since a lock object with no locks can go away
* anytime.
*/
- lock = (LOCK *) hash_search(LockMethodLockHash,
+ lock = (LOCK *) hash_search(LockMethodLockHash[partition],
(void *) locktag,
HASH_ENTER_NULL, &found);
if (!lock)
{
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
/*
* Find or create a proclock entry with this tag
*/
- proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
(void *) &proclocktag,
HASH_ENTER_NULL, &found);
if (!proclock)
* anyone to release the lock object later.
*/
Assert(SHMQueueEmpty(&(lock->procLocks)));
- if (!hash_search(LockMethodLockHash,
+ if (!hash_search(LockMethodLockHash[partition],
(void *) &(lock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "lock table corrupted");
}
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
- SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
+ SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]),
+ &proclock->procLink);
PROCLOCK_PRINT("LockAcquire: new", proclock);
}
else
{
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
- if (!hash_search(LockMethodProcLockHash,
+ if (!hash_search(LockMethodProcLockHash[partition],
(void *) &(proclock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "proclock table corrupted");
LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
return LOCKACQUIRE_NOT_AVAIL;
/*
* Sleep till someone wakes me up.
*/
- WaitOnLock(lockmethodid, locallock, owner);
+ WaitOnLock(locallock, owner);
/*
* NOTE: do not do any material change of state between here and
PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
elog(ERROR, "LockAcquire failed");
}
PROCLOCK_PRINT("LockAcquire: granted", proclock);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
return LOCKACQUIRE_OK;
}
* should be called after UnGrantLock, and wakeupNeeded is the result from
* UnGrantLock.)
*
- * The locktable's masterLock must be held at entry, and will be
+ * The lock table's partition lock must be held at entry, and will be
* held at exit.
*/
static void
-CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock, PROCLOCK *proclock,
+CleanUpLock(LOCK *lock, PROCLOCK *proclock,
+ LockMethod lockMethodTable, int partition,
bool wakeupNeeded)
{
/*
PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
- if (!hash_search(LockMethodProcLockHash,
+ if (!hash_search(LockMethodProcLockHash[partition],
(void *) &(proclock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "proclock table corrupted");
*/
LOCK_PRINT("CleanUpLock: deleting", lock, 0);
Assert(SHMQueueEmpty(&(lock->procLocks)));
- if (!hash_search(LockMethodLockHash,
+ if (!hash_search(LockMethodLockHash[partition],
(void *) &(lock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "lock table corrupted");
else if (wakeupNeeded)
{
/* There are waiters on this lock, so wake them up. */
- ProcLockWakeup(LockMethods[lockmethodid], lock);
+ ProcLockWakeup(lockMethodTable, lock);
}
}
* Caller must have set MyProc->heldLocks to reflect locks already held
* on the lockable object by this process.
*
- * The locktable's masterLock must be held at entry.
+ * The appropriate partition lock must be held at entry.
*/
static void
-WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
- ResourceOwner owner)
+WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
{
+ LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
LockMethod lockMethodTable = LockMethods[lockmethodid];
const char *old_status;
char *new_status;
* will also happen in the cancel/die case.
*/
- if (ProcSleep(lockMethodTable,
- locallock->tag.mode,
- locallock->lock,
- locallock->proclock) != STATUS_OK)
+ if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
{
/*
* We failed as a result of a deadlock, see CheckDeadLock(). Quit now.
awaitedLock = NULL;
LOCK_PRINT("WaitOnLock: aborting on lock",
locallock->lock, locallock->tag.mode);
- LWLockRelease(LockMgrLock);
+ LWLockRelease(FirstLockMgrLock + locallock->partition);
/*
- * Now that we aren't holding the LockMgrLock, we can give an error
+ * Now that we aren't holding the partition lock, we can give an error
* report including details about the detected deadlock.
*/
DeadLockReport();
* Remove a proc from the wait-queue it is on
* (caller must know it is on one).
*
- * Locktable lock must be held by caller.
+ * Appropriate partition lock must be held by caller.
*
* NB: this does not clean up any locallock object that may exist for the lock.
*/
void
-RemoveFromWaitQueue(PGPROC *proc)
+RemoveFromWaitQueue(PGPROC *proc, int partition)
{
LOCK *waitLock = proc->waitLock;
PROCLOCK *proclock = proc->waitProcLock;
* LockRelease expects there to be no remaining proclocks.) Then see if
* any other waiters for the lock can be woken up now.
*/
- CleanUpLock(lockmethodid, waitLock, proclock, true);
+ CleanUpLock(waitLock, proclock,
+ LockMethods[lockmethodid], partition,
+ true);
}
/*
LOCALLOCK *locallock;
LOCK *lock;
PROCLOCK *proclock;
- LWLockId masterLock;
+ int partition;
+ LWLockId partitionLock;
bool wakeupNeeded;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
/*
* Otherwise we've got to mess with the shared lock table.
*/
- masterLock = LockMgrLock;
+ partition = locallock->partition;
+ partitionLock = FirstLockMgrLock + partition;
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* We don't need to re-find the lock or proclock, since we kept their
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
elog(WARNING, "you don't own a lock of type %s",
lockMethodTable->lockModeNames[lockmode]);
RemoveLocalLock(locallock);
*/
wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
- CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
+ CleanUpLock(lock, proclock,
+ lockMethodTable, partition,
+ wakeupNeeded);
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
RemoveLocalLock(locallock);
return TRUE;
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
{
HASH_SEQ_STATUS status;
- SHM_QUEUE *procLocks = &(MyProc->procLocks);
- LWLockId masterLock;
LockMethod lockMethodTable;
int i,
numLockModes;
LOCALLOCK *locallock;
- PROCLOCK *proclock;
LOCK *lock;
+ PROCLOCK *proclock;
+ int partition;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
#endif
numLockModes = lockMethodTable->numLockModes;
- masterLock = LockMgrLock;
/*
* First we run through the locallock table and get rid of unwanted
RemoveLocalLock(locallock);
}
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ /*
+ * Now, scan each lock partition separately.
+ */
+ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
+ {
+ LWLockId partitionLock = FirstLockMgrLock + partition;
+ SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
- offsetof(PROCLOCK, procLink));
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
- while (proclock)
- {
- bool wakeupNeeded = false;
- PROCLOCK *nextplock;
+ if (!proclock)
+ continue; /* needn't examine this partition */
- /* Get link first, since we may unlink/delete this proclock */
- nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
- offsetof(PROCLOCK, procLink));
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
- Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
+ while (proclock)
+ {
+ bool wakeupNeeded = false;
+ PROCLOCK *nextplock;
- lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+ /* Get link first, since we may unlink/delete this proclock */
+ nextplock = (PROCLOCK *)
+ SHMQueueNext(procLocks, &proclock->procLink,
+ offsetof(PROCLOCK, procLink));
- /* Ignore items that are not of the lockmethod to be removed */
- if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
- goto next_item;
+ Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
- /*
- * In allLocks mode, force release of all locks even if locallock
- * table had problems
- */
- if (allLocks)
- proclock->releaseMask = proclock->holdMask;
- else
- Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
+ lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
- /*
- * Ignore items that have nothing to be released, unless they have
- * holdMask == 0 and are therefore recyclable
- */
- if (proclock->releaseMask == 0 && proclock->holdMask != 0)
- goto next_item;
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
+ goto next_item;
- PROCLOCK_PRINT("LockReleaseAll", proclock);
- LOCK_PRINT("LockReleaseAll", lock, 0);
- Assert(lock->nRequested >= 0);
- Assert(lock->nGranted >= 0);
- Assert(lock->nGranted <= lock->nRequested);
- Assert((proclock->holdMask & ~lock->grantMask) == 0);
+ /*
+ * In allLocks mode, force release of all locks even if locallock
+ * table had problems
+ */
+ if (allLocks)
+ proclock->releaseMask = proclock->holdMask;
+ else
+ Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
- /*
- * Release the previously-marked lock modes
- */
- for (i = 1; i <= numLockModes; i++)
- {
- if (proclock->releaseMask & LOCKBIT_ON(i))
- wakeupNeeded |= UnGrantLock(lock, i, proclock,
- lockMethodTable);
- }
- Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
- Assert(lock->nGranted <= lock->nRequested);
- LOCK_PRINT("LockReleaseAll: updated", lock, 0);
+ /*
+ * Ignore items that have nothing to be released, unless they have
+ * holdMask == 0 and are therefore recyclable
+ */
+ if (proclock->releaseMask == 0 && proclock->holdMask != 0)
+ goto next_item;
- proclock->releaseMask = 0;
+ PROCLOCK_PRINT("LockReleaseAll", proclock);
+ LOCK_PRINT("LockReleaseAll", lock, 0);
+ Assert(lock->nRequested >= 0);
+ Assert(lock->nGranted >= 0);
+ Assert(lock->nGranted <= lock->nRequested);
+ Assert((proclock->holdMask & ~lock->grantMask) == 0);
+
+ /*
+ * Release the previously-marked lock modes
+ */
+ for (i = 1; i <= numLockModes; i++)
+ {
+ if (proclock->releaseMask & LOCKBIT_ON(i))
+ wakeupNeeded |= UnGrantLock(lock, i, proclock,
+ lockMethodTable);
+ }
+ Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
+ LOCK_PRINT("LockReleaseAll: updated", lock, 0);
- /* CleanUpLock will wake up waiters if needed. */
- CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
+ proclock->releaseMask = 0;
-next_item:
- proclock = nextplock;
- }
+ /* CleanUpLock will wake up waiters if needed. */
+ CleanUpLock(lock, proclock,
+ lockMethodTable, partition,
+ wakeupNeeded);
- LWLockRelease(masterLock);
+ next_item:
+ proclock = nextplock;
+ } /* loop over PROCLOCKs within this partition */
+
+ LWLockRelease(partitionLock);
+ } /* loop over partitions */
#ifdef LOCK_DEBUG
if (*(lockMethodTable->trace_flag))
{
PGPROC *newproc = TwoPhaseGetDummyProc(xid);
HASH_SEQ_STATUS status;
- SHM_QUEUE *procLocks = &(MyProc->procLocks);
- LWLockId masterLock;
LOCALLOCK *locallock;
+ LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
bool found;
- LOCK *lock;
+ int partition;
/* This is a critical section: any error means big trouble */
START_CRIT_SECTION();
- masterLock = LockMgrLock;
-
/*
* First we run through the locallock table and get rid of unwanted
* entries, then we scan the process's proclocks and transfer them to the
RemoveLocalLock(locallock);
}
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ /*
+ * Now, scan each lock partition separately.
+ */
+ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
+ {
+ LWLockId partitionLock = FirstLockMgrLock + partition;
+ SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]);
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
- offsetof(PROCLOCK, procLink));
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
- while (proclock)
- {
- PROCLOCK *nextplock;
- LOCKMASK holdMask;
- PROCLOCK *newproclock;
+ if (!proclock)
+ continue; /* needn't examine this partition */
- /* Get link first, since we may unlink/delete this proclock */
- nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
- offsetof(PROCLOCK, procLink));
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
- Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
+ while (proclock)
+ {
+ PROCLOCK *nextplock;
+ LOCKMASK holdMask;
+ PROCLOCK *newproclock;
- lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+ /* Get link first, since we may unlink/delete this proclock */
+ nextplock = (PROCLOCK *)
+ SHMQueueNext(procLocks, &proclock->procLink,
+ offsetof(PROCLOCK, procLink));
- /* Ignore nontransactional locks */
- if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
- goto next_item;
+ Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
- PROCLOCK_PRINT("PostPrepare_Locks", proclock);
- LOCK_PRINT("PostPrepare_Locks", lock, 0);
- Assert(lock->nRequested >= 0);
- Assert(lock->nGranted >= 0);
- Assert(lock->nGranted <= lock->nRequested);
- Assert((proclock->holdMask & ~lock->grantMask) == 0);
+ lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
- /*
- * Since there were no session locks, we should be releasing all locks
- */
- if (proclock->releaseMask != proclock->holdMask)
- elog(PANIC, "we seem to have dropped a bit somewhere");
+ /* Ignore nontransactional locks */
+ if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
+ goto next_item;
- holdMask = proclock->holdMask;
+ PROCLOCK_PRINT("PostPrepare_Locks", proclock);
+ LOCK_PRINT("PostPrepare_Locks", lock, 0);
+ Assert(lock->nRequested >= 0);
+ Assert(lock->nGranted >= 0);
+ Assert(lock->nGranted <= lock->nRequested);
+ Assert((proclock->holdMask & ~lock->grantMask) == 0);
- /*
- * We cannot simply modify proclock->tag.proc to reassign ownership of
- * the lock, because that's part of the hash key and the proclock
- * would then be in the wrong hash chain. So, unlink and delete the
- * old proclock; create a new one with the right contents; and link it
- * into place. We do it in this order to be certain we won't run out
- * of shared memory (the way dynahash.c works, the deleted object is
- * certain to be available for reallocation).
- */
- SHMQueueDelete(&proclock->lockLink);
- SHMQueueDelete(&proclock->procLink);
- if (!hash_search(LockMethodProcLockHash,
- (void *) &(proclock->tag),
- HASH_REMOVE, NULL))
- elog(PANIC, "proclock table corrupted");
+ /*
+ * Since there were no session locks, we should be releasing all
+ * locks
+ */
+ if (proclock->releaseMask != proclock->holdMask)
+ elog(PANIC, "we seem to have dropped a bit somewhere");
- /*
- * Create the hash key for the new proclock table.
- */
- MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));
- proclocktag.lock = MAKE_OFFSET(lock);
- proclocktag.proc = MAKE_OFFSET(newproc);
-
- newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
- (void *) &proclocktag,
- HASH_ENTER_NULL, &found);
- if (!newproclock)
- ereport(PANIC, /* should not happen */
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of shared memory"),
- errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
+ holdMask = proclock->holdMask;
- /*
- * If new, initialize the new entry
- */
- if (!found)
- {
- newproclock->holdMask = 0;
- newproclock->releaseMask = 0;
- /* Add new proclock to appropriate lists */
- SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
- SHMQueueInsertBefore(&newproc->procLocks, &newproclock->procLink);
- PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
- }
- else
- {
- PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
- Assert((newproclock->holdMask & ~lock->grantMask) == 0);
- }
+ /*
+ * We cannot simply modify proclock->tag.proc to reassign
+ * ownership of the lock, because that's part of the hash key and
+ * the proclock would then be in the wrong hash chain. So, unlink
+ * and delete the old proclock; create a new one with the right
+ * contents; and link it into place. We do it in this order to be
+ * certain we won't run out of shared memory (the way dynahash.c
+ * works, the deleted object is certain to be available for
+ * reallocation).
+ */
+ SHMQueueDelete(&proclock->lockLink);
+ SHMQueueDelete(&proclock->procLink);
+ if (!hash_search(LockMethodProcLockHash[partition],
+ (void *) &(proclock->tag),
+ HASH_REMOVE, NULL))
+ elog(PANIC, "proclock table corrupted");
- /*
- * Pass over the identified lock ownership.
- */
- Assert((newproclock->holdMask & holdMask) == 0);
- newproclock->holdMask |= holdMask;
+ /*
+ * Create the hash key for the new proclock table.
+ */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));
+ proclocktag.lock = MAKE_OFFSET(lock);
+ proclocktag.proc = MAKE_OFFSET(newproc);
+
+ newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
+ (void *) &proclocktag,
+ HASH_ENTER_NULL, &found);
+ if (!newproclock)
+ ereport(PANIC, /* should not happen */
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
-next_item:
- proclock = nextplock;
- }
+ /*
+ * If new, initialize the new entry
+ */
+ if (!found)
+ {
+ newproclock->holdMask = 0;
+ newproclock->releaseMask = 0;
+ /* Add new proclock to appropriate lists */
+ SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
+ SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
+ &newproclock->procLink);
+ PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
+ }
+ else
+ {
+ PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
+ Assert((newproclock->holdMask & ~lock->grantMask) == 0);
+ }
+
+ /*
+ * Pass over the identified lock ownership.
+ */
+ Assert((newproclock->holdMask & holdMask) == 0);
+ newproclock->holdMask |= holdMask;
+
+ next_item:
+ proclock = nextplock;
+ } /* loop over PROCLOCKs within this partition */
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
+ } /* loop over partitions */
END_CRIT_SECTION();
}
LockShmemSize(void)
{
Size size = 0;
- long max_table_size = NLOCKENTS();
+ Size tabsize;
+ long max_table_size;
- /* lockHash table */
- size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
+ /* lock hash tables */
+ max_table_size = NLOCKENTS();
+ max_table_size = (max_table_size - 1) / NUM_LOCK_PARTITIONS + 1;
+ tabsize = hash_estimate_size(max_table_size, sizeof(LOCK));
+ size = add_size(size, mul_size(tabsize, NUM_LOCK_PARTITIONS));
- /* proclockHash table */
- size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
+ /* proclock hash tables */
+ max_table_size *= 2;
+ tabsize = hash_estimate_size(max_table_size, sizeof(PROCLOCK));
+ size = add_size(size, mul_size(tabsize, NUM_LOCK_PARTITIONS));
/*
- * Note we count only one pair of hash tables, since the userlocks table
- * actually overlays the main one.
- *
- * Since the lockHash entry count above is only an estimate, add 10%
- * safety margin.
+ * Since there is likely to be some space wastage due to uneven use
+ * of the partitions, add 10% safety margin.
*/
size = add_size(size, size / 10);
* copies of the same PGPROC and/or LOCK objects are likely to appear.
* It is the caller's responsibility to match up duplicates if wanted.
*
- * The design goal is to hold the LockMgrLock for as short a time as possible;
+ * The design goal is to hold the LWLocks for as short a time as possible;
* thus, this function simply makes a copy of the necessary data and releases
- * the lock, allowing the caller to contemplate and format the data for as
+ * the locks, allowing the caller to contemplate and format the data for as
* long as it pleases.
*/
LockData *
HTAB *proclockTable;
PROCLOCK *proclock;
HASH_SEQ_STATUS seqstat;
+ int els;
+ int el;
int i;
data = (LockData *) palloc(sizeof(LockData));
- LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
-
- proclockTable = LockMethodProcLockHash;
-
- data->nelements = i = proclockTable->hctl->nentries;
+ /*
+ * Acquire lock on the entire shared lock data structures. We can't
+ * operate one partition at a time if we want to deliver a self-consistent
+ * view of the state.
+ *
+ * Since this is a read-only operation, we take shared instead of exclusive
+ * lock. There's not a whole lot of point to this, because all the normal
+ * operations require exclusive lock, but it doesn't hurt anything either.
+ * It will at least allow two backends to do GetLockStatusData in parallel.
+ *
+ * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
+ *
+ * Use same loop to count up the total number of PROCLOCK objects.
+ */
+ els = 0;
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ {
+ LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+ proclockTable = LockMethodProcLockHash[i];
+ els += proclockTable->hctl->nentries;
+ }
- data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
- data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
- data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
- data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
+ data->nelements = els;
+ data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * els);
+ data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els);
+ data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els);
+ data->locks = (LOCK *) palloc(sizeof(LOCK) * els);
- hash_seq_init(&seqstat, proclockTable);
+ el = 0;
- i = 0;
- while ((proclock = hash_seq_search(&seqstat)))
+ /* Now scan the tables to copy the data */
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
{
- PGPROC *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
- LOCK *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+ proclockTable = LockMethodProcLockHash[i];
+ hash_seq_init(&seqstat, proclockTable);
- data->proclockaddrs[i] = MAKE_OFFSET(proclock);
- memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
- memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
- memcpy(&(data->locks[i]), lock, sizeof(LOCK));
+ while ((proclock = hash_seq_search(&seqstat)))
+ {
+ PGPROC *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
+ LOCK *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+
+ data->proclockaddrs[el] = MAKE_OFFSET(proclock);
+ memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK));
+ memcpy(&(data->procs[el]), proc, sizeof(PGPROC));
+ memcpy(&(data->locks[el]), lock, sizeof(LOCK));
- i++;
+ el++;
+ }
}
- LWLockRelease(LockMgrLock);
+ /* And release locks */
+ for (i = NUM_LOCK_PARTITIONS; --i >= 0; )
+ LWLockRelease(FirstLockMgrLock + i);
- Assert(i == data->nelements);
+ Assert(el == data->nelements);
return data;
}
#ifdef LOCK_DEBUG
/*
- * Dump all locks in the given proc's procLocks list.
+ * Dump all locks in the given proc's myProcLocks lists.
*
* Caller is responsible for having acquired appropriate LWLocks.
*/
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
+ int i;
if (proc == NULL)
return;
- procLocks = &proc->procLocks;
-
if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
- offsetof(PROCLOCK, procLink));
-
- while (proclock)
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
{
- Assert(proclock->tag.proc == MAKE_OFFSET(proc));
+ procLocks = &(proc->myProcLocks[i]);
- lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
- PROCLOCK_PRINT("DumpLocks", proclock);
- LOCK_PRINT("DumpLocks", lock, 0);
+ while (proclock)
+ {
+ Assert(proclock->tag.proc == MAKE_OFFSET(proc));
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
- offsetof(PROCLOCK, procLink));
+ lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+
+ PROCLOCK_PRINT("DumpLocks", proclock);
+ LOCK_PRINT("DumpLocks", lock, 0);
+
+ proclock = (PROCLOCK *)
+ SHMQueueNext(procLocks, &proclock->procLink,
+ offsetof(PROCLOCK, procLink));
+ }
}
}
LOCK *lock;
HTAB *proclockTable;
HASH_SEQ_STATUS status;
+ int i;
proc = MyProc;
- proclockTable = LockMethodProcLockHash;
if (proc && proc->waitLock)
LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
- hash_seq_init(&status, proclockTable);
- while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
{
- PROCLOCK_PRINT("DumpAllLocks", proclock);
+ proclockTable = LockMethodProcLockHash[i];
+ hash_seq_init(&status, proclockTable);
- if (proclock->tag.lock)
+ while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
{
- lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
- LOCK_PRINT("DumpAllLocks", lock, 0);
+ PROCLOCK_PRINT("DumpAllLocks", proclock);
+
+ if (proclock->tag.lock)
+ {
+ lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+ LOCK_PRINT("DumpAllLocks", lock, 0);
+ }
+ else
+ elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
}
- else
- elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
}
}
#endif /* LOCK_DEBUG */
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
bool found;
- LWLockId masterLock;
+ int partition;
+ LWLockId partitionLock;
LockMethod lockMethodTable;
Assert(len == sizeof(TwoPhaseLockRecord));
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
lockMethodTable = LockMethods[lockmethodid];
- masterLock = LockMgrLock;
+ partition = LockTagToPartition(locktag);
+ partitionLock = FirstLockMgrLock + partition;
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Find or create a lock with this tag.
*/
- lock = (LOCK *) hash_search(LockMethodLockHash,
+ lock = (LOCK *) hash_search(LockMethodLockHash[partition],
(void *) locktag,
HASH_ENTER_NULL, &found);
if (!lock)
{
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
/*
* Find or create a proclock entry with this tag
*/
- proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
(void *) &proclocktag,
HASH_ENTER_NULL, &found);
if (!proclock)
* anyone to release the lock object later.
*/
Assert(SHMQueueEmpty(&(lock->procLocks)));
- if (!hash_search(LockMethodLockHash,
+ if (!hash_search(LockMethodLockHash[partition],
(void *) &(lock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "lock table corrupted");
}
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
- SHMQueueInsertBefore(&proc->procLocks, &proclock->procLink);
+ SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
+ &proclock->procLink);
PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
}
else
*/
GrantLock(lock, proclock, lockmode);
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
}
/*
LOCKTAG *locktag;
LOCKMODE lockmode;
LOCKMETHODID lockmethodid;
- PROCLOCKTAG proclocktag;
LOCK *lock;
PROCLOCK *proclock;
- LWLockId masterLock;
+ PROCLOCKTAG proclocktag;
+ int partition;
+ LWLockId partitionLock;
LockMethod lockMethodTable;
bool wakeupNeeded;
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
lockMethodTable = LockMethods[lockmethodid];
- masterLock = LockMgrLock;
+ partition = LockTagToPartition(locktag);
+ partitionLock = FirstLockMgrLock + partition;
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Re-find the lock object (it had better be there).
*/
- lock = (LOCK *) hash_search(LockMethodLockHash,
+ lock = (LOCK *) hash_search(LockMethodLockHash[partition],
(void *) locktag,
HASH_FIND, NULL);
if (!lock)
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(proc);
- proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[partition],
(void *) &proclocktag,
HASH_FIND, NULL);
if (!proclock)
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
elog(WARNING, "you don't own a lock of type %s",
lockMethodTable->lockModeNames[lockmode]);
return;
*/
wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
- CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
+ CleanUpLock(lock, proclock,
+ lockMethodTable, partition,
+ wakeupNeeded);
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
}
/*
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.169 2005/12/09 01:22:04 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.170 2005/12/11 21:02:18 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* ProcQueueAlloc() -- create a shm queue for sleeping processes
* ProcQueueInit() -- create a queue without allocing memory
*
- * Locking and waiting for buffers can cause the backend to be
- * put to sleep. Whoever releases the lock, etc. wakes the
- * process up again (and gives it an error code so it knows
+ * Waiting for a lock causes the backend to be put to sleep. Whoever releases
+ * the lock wakes the process up again (and gives it an error code so it knows
* whether it was awoken on an error condition).
*
* Interface (b):
* ProcReleaseLocks -- frees the locks associated with current transaction
*
* ProcKill -- destroys the shared memory state (and locks)
- * associated with the process.
+ * associated with the process.
*/
#include "postgres.h"
static PROC_HDR *ProcGlobal = NULL;
static PGPROC *DummyProcs = NULL;
-static bool waitingForLock = false;
+/* If we are waiting for a lock, this points to the associated LOCALLOCK */
+static LOCALLOCK *lockAwaited = NULL;
/* Mark these volatile because they can be changed by signal handler */
static volatile bool statement_timeout_active = false;
void
InitProcess(void)
{
- SHMEM_OFFSET myOffset;
-
/* use volatile pointer to prevent code rearrangement */
volatile PROC_HDR *procglobal = ProcGlobal;
+ SHMEM_OFFSET myOffset;
+ int i;
/*
* ProcGlobal should be set by a previous call to InitProcGlobal (if we
MyProc->lwWaitLink = NULL;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
- SHMQueueInit(&(MyProc->procLocks));
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ SHMQueueInit(&(MyProc->myProcLocks[i]));
/*
* Add our PGPROC to the PGPROC array in shared memory.
InitDummyProcess(int proctype)
{
PGPROC *dummyproc;
+ int i;
/*
* ProcGlobal should be set by a previous call to InitProcGlobal (we
MyProc->lwWaitLink = NULL;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
- SHMQueueInit(&(MyProc->procLocks));
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ SHMQueueInit(&(MyProc->myProcLocks[i]));
/*
* Arrange to clean up at process exit.
bool
LockWaitCancel(void)
{
+ LWLockId partitionLock;
+
/* Nothing to do if we weren't waiting for a lock */
- if (!waitingForLock)
+ if (lockAwaited == NULL)
return false;
/* Turn off the deadlock timer, if it's still running (see ProcSleep) */
disable_sig_alarm(false);
/* Unlink myself from the wait queue, if on it (might not be anymore!) */
- LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
+ partitionLock = FirstLockMgrLock + lockAwaited->partition;
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
if (MyProc->links.next != INVALID_OFFSET)
{
/* We could not have been granted the lock yet */
Assert(MyProc->waitStatus == STATUS_ERROR);
- RemoveFromWaitQueue(MyProc);
+ RemoveFromWaitQueue(MyProc, lockAwaited->partition);
}
else
{
GrantAwaitedLock();
}
- waitingForLock = false;
+ lockAwaited = NULL;
- LWLockRelease(LockMgrLock);
+ LWLockRelease(partitionLock);
/*
* Reset the proc wait semaphore to zero. This is necessary in the
/*
- * ProcSleep -- put a process to sleep
+ * ProcSleep -- put a process to sleep on the specified lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
* on the lockable object by this process (under all XIDs).
*
- * Locktable's masterLock must be held at entry, and will be held
+ * The lock table's partition lock must be held at entry, and will be held
* at exit.
*
* Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
*
* ASSUME: that no one will fiddle with the queue until after
- * we release the masterLock.
+ * we release the partition lock.
*
* NOTES: The process queue is now a priority queue for locking.
*
* semaphore is normally zero, so when we try to acquire it, we sleep.
*/
int
-ProcSleep(LockMethod lockMethodTable,
- LOCKMODE lockmode,
- LOCK *lock,
- PROCLOCK *proclock)
+ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
{
- LWLockId masterLock = LockMgrLock;
+ LOCKMODE lockmode = locallock->tag.mode;
+ LOCK *lock = locallock->lock;
+ PROCLOCK *proclock = locallock->proclock;
+ int partition = locallock->partition;
+ LWLockId partitionLock = FirstLockMgrLock + partition;
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMASK myHeldLocks = MyProc->heldLocks;
bool early_deadlock = false;
*/
if (early_deadlock)
{
- RemoveFromWaitQueue(MyProc);
+ RemoveFromWaitQueue(MyProc, partition);
return STATUS_ERROR;
}
/* mark that we are waiting for a lock */
- waitingForLock = true;
+ lockAwaited = locallock;
/*
- * Release the locktable's masterLock.
+ * Release the lock table's partition lock.
*
* NOTE: this may also cause us to exit critical-section state, possibly
* allowing a cancel/die interrupt to be accepted. This is OK because we
* have recorded the fact that we are waiting for a lock, and so
* LockWaitCancel will clean up if cancel/die happens.
*/
- LWLockRelease(masterLock);
+ LWLockRelease(partitionLock);
/*
* Set timer so we can wake up after awhile and check for a deadlock. If a
elog(FATAL, "could not disable timer for process wakeup");
/*
- * Re-acquire the locktable's masterLock. We have to do this to hold off
- * cancel/die interrupts before we can mess with waitingForLock (else we
- * might have a missed or duplicated locallock update).
+ * Re-acquire the lock table's partition lock. We have to do this to
+ * hold off cancel/die interrupts before we can mess with lockAwaited
+ * (else we might have a missed or duplicated locallock update).
*/
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
+ LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* We no longer want LockWaitCancel to do anything.
*/
- waitingForLock = false;
+ lockAwaited = NULL;
/*
* If we got the lock, be sure to remember it in the locallock table.
* Also remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue.
*
+ * The appropriate lock partition lock must be held by caller.
+ *
* XXX: presently, this code is only used for the "success" case, and only
* works correctly for that case. To clean up in failure case, would need
* to twiddle the lock's request counts too --- see RemoveFromWaitQueue.
{
PGPROC *retProc;
- /* assume that masterLock has been acquired */
-
/* Proc should be sleeping ... */
if (proc->links.prev == INVALID_OFFSET ||
proc->links.next == INVALID_OFFSET)
* ProcLockWakeup -- routine for waking up processes when a lock is
* released (or a prior waiter is aborted). Scan all waiters
* for lock, waken any that are no longer blocked.
+ *
+ * The appropriate lock partition lock must be held by caller.
*/
void
ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
Assert(waitQueue->size >= 0);
}
-/* --------------------
+/*
+ * CheckDeadLock
+ *
* We only get to this routine if we got SIGALRM after DeadlockTimeout
* while waiting for a lock to be released by some other process. Look
* to see if there's a deadlock; if not, just return and continue waiting.
* If we have a real deadlock, remove ourselves from the lock's wait queue
* and signal an error to ProcSleep.
- * --------------------
*/
static void
CheckDeadLock(void)
{
+ int i;
+
/*
- * Acquire locktable lock. Note that the deadlock check interrupt had
- * better not be enabled anywhere that this process itself holds the
- * locktable lock, else this will wait forever. Also note that
- * LWLockAcquire creates a critical section, so that this routine cannot
- * be interrupted by cancel/die interrupts.
+ * Acquire exclusive lock on the entire shared lock data structures.
+ * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
+ *
+ * Note that the deadlock check interrupt had better not be enabled
+ * anywhere that this process itself holds lock partition locks, else this
+ * will wait forever. Also note that LWLockAcquire creates a critical
+ * section, so that this routine cannot be interrupted by cancel/die
+ * interrupts.
*/
- LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
+ for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+ LWLockAcquire(FirstLockMgrLock + i, LW_EXCLUSIVE);
/*
* Check to see if we've been awoken by anyone in the interim.
*
* We check by looking to see if we've been unlinked from the wait queue.
* This is quicker than checking our semaphore's state, since no kernel
- * call is needed, and it is safe because we hold the locktable lock.
+ * call is needed, and it is safe because we hold the lock partition lock.
*/
if (MyProc->links.prev == INVALID_OFFSET ||
MyProc->links.next == INVALID_OFFSET)
- {
- LWLockRelease(LockMgrLock);
- return;
- }
+ goto check_done;
#ifdef LOCK_DEBUG
if (Debug_deadlocks)
if (!DeadLockCheck(MyProc))
{
/* No deadlock, so keep waiting */
- LWLockRelease(LockMgrLock);
- return;
+ goto check_done;
}
/*
* Oops. We have a deadlock.
*
- * Get this process out of wait state.
+ * Get this process out of wait state. (Note: we could do this more
+ * efficiently by relying on lockAwaited, but use this coding to preserve
+ * the flexibility to kill some other transaction than the one detecting
+ * the deadlock.)
*/
- RemoveFromWaitQueue(MyProc);
+ Assert(MyProc->waitLock != NULL);
+ RemoveFromWaitQueue(MyProc, LockTagToPartition(&(MyProc->waitLock->tag)));
/*
* Set MyProc->waitStatus to STATUS_ERROR so that ProcSleep will report an
* them anymore. However, RemoveFromWaitQueue took care of waking up any
* such processes.
*/
- LWLockRelease(LockMgrLock);
+
+ /*
+ * Release locks acquired at head of routine. Order is not critical,
+ * so do it back-to-front to avoid waking another CheckDeadLock instance
+ * before it can get all the locks.
+ */
+check_done:
+ for (i = NUM_LOCK_PARTITIONS; --i >= 0; )
+ LWLockRelease(FirstLockMgrLock + i);
}