Add a function GetLockConflicts() to lock.c to report xacts holding
authorTom Lane
Sun, 27 Aug 2006 19:14:34 +0000 (19:14 +0000)
committerTom Lane
Sun, 27 Aug 2006 19:14:34 +0000 (19:14 +0000)
locks that would conflict with a specified lock request, without
actually trying to get that lock.  Use this instead of the former ad hoc
method of doing the first wait step in CREATE INDEX CONCURRENTLY.
Fixes problem with undetected deadlock and in many cases will allow the
index creation to proceed sooner than it otherwise could've.  Per
discussion with Greg Stark.

src/backend/commands/indexcmds.c
src/backend/storage/lmgr/lock.c
src/include/storage/lock.h

index e96235c6f53371484a282eea93f49f55aa34d3ec..81246768bc9f72a420bff364ac77b31f131dfee5 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.147 2006/08/25 04:06:48 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.148 2006/08/27 19:14:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -119,8 +119,11 @@ DefineIndex(RangeVar *heapRelation,
    Datum       reloptions;
    IndexInfo  *indexInfo;
    int         numberOfAttributes;
+   List       *old_xact_list;
+   ListCell   *lc;
    uint32      ixcnt;
    LockRelId   heaprelid;
+   LOCKTAG     heaplocktag;
    Snapshot    snapshot;
    Relation pg_index;
    HeapTuple indexTuple;
@@ -466,33 +469,26 @@ DefineIndex(RangeVar *heapRelation,
    CommitTransactionCommand();
    StartTransactionCommand();
 
-   /* Establish transaction snapshot ... else GetLatestSnapshot complains */
-   (void) GetTransactionSnapshot();
-
    /*
     * Now we must wait until no running transaction could have the table open
-    * with the old list of indexes.  If we can take an exclusive lock then
-    * there are none now and anybody who opens it later will get the new
-    * index in their relcache entry.  Alternatively, if our Xmin reaches our
-    * own (new) transaction then we know no transactions that started before
-    * the index was visible are left anyway.
+    * with the old list of indexes.  To do this, inquire which xacts currently
+    * would conflict with ShareLock on the table -- ie, which ones have
+    * a lock that permits writing the table.  Then wait for each of these
+    * xacts to commit or abort.  Note we do not need to worry about xacts
+    * that open the table for writing after this point; they will see the
+    * new index when they open it.
+    *
+    * Note: GetLockConflicts() never reports our own xid,
+    * hence we need not check for that.
     */
-   for (;;)
-   {
-       CHECK_FOR_INTERRUPTS();
+   SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
+   old_xact_list = GetLockConflicts(&heaplocktag, ShareLock);
 
-       if (ConditionalLockRelationOid(relationId, ExclusiveLock))
-       {
-           /* Release the lock right away to avoid blocking anyone */
-           UnlockRelationOid(relationId, ExclusiveLock);
-           break;
-       }
-
-       if (TransactionIdEquals(GetLatestSnapshot()->xmin,
-                               GetTopTransactionId()))
-           break;
+   foreach(lc, old_xact_list)
+   {
+       TransactionId xid = lfirst_xid(lc);
 
-       pg_usleep(1000000L);        /* 1 sec */
+       XactLockTableWait(xid);
    }
 
    /*
index 7fe011a2baf0bba541f524927aa3dea1f84c3405..c59999e20ca96430382af845d69214f2504fce8d 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.171 2006/08/19 01:36:28 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.172 2006/08/27 19:14:34 tgl Exp $
  *
  * NOTES
  *   A lock table is a shared memory hash table.  When
@@ -1685,6 +1685,104 @@ LockReassignCurrentOwner(void)
 }
 
 
+/*
+ * GetLockConflicts
+ *     Get a list of TransactionIds of xacts currently holding locks
+ *     that would conflict with the specified lock/lockmode.
+ *     xacts merely awaiting such a lock are NOT reported.
+ *
+ * Of course, the result could be out of date by the time it's returned,
+ * so use of this function has to be thought about carefully.
+ *
+ * Only top-level XIDs are reported.  Note we never include the current xact
+ * in the result list, since an xact never blocks itself.
+ */
+List *
+GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
+{
+   List       *result = NIL;
+   LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+   LockMethod  lockMethodTable;
+   LOCK       *lock;
+   LOCKMASK    conflictMask;
+   SHM_QUEUE  *procLocks;
+   PROCLOCK   *proclock;
+   uint32      hashcode;
+   LWLockId    partitionLock;
+
+   if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+       elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+   lockMethodTable = LockMethods[lockmethodid];
+   if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
+       elog(ERROR, "unrecognized lock mode: %d", lockmode);
+
+   /*
+    * Look up the lock object matching the tag.
+    */
+   hashcode = LockTagHashCode(locktag);
+   partitionLock = LockHashPartitionLock(hashcode);
+
+   LWLockAcquire(partitionLock, LW_SHARED);
+
+   lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
+                                               (void *) locktag,
+                                               hashcode,
+                                               HASH_FIND,
+                                               NULL);
+   if (!lock)
+   {
+       /*
+        * If the lock object doesn't exist, there is nothing holding a
+        * lock on this lockable object.
+        */
+       LWLockRelease(partitionLock);
+       return NIL;
+   }
+
+   /*
+    * Examine each existing holder (or awaiter) of the lock.
+    */
+   conflictMask = lockMethodTable->conflictTab[lockmode];
+
+   procLocks = &(lock->procLocks);
+
+   proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                        offsetof(PROCLOCK, lockLink));
+
+   while (proclock)
+   {
+       if (conflictMask & proclock->holdMask)
+       {
+           PGPROC *proc = proclock->tag.myProc;
+
+           /* A backend never blocks itself */
+           if (proc != MyProc)
+           {
+               /* Fetch xid just once - see GetNewTransactionId */
+               TransactionId xid = proc->xid;
+
+               /*
+                * Race condition: during xact commit/abort we zero out
+                * PGPROC's xid before we mark its locks released.  If we see
+                * zero in the xid field, assume the xact is in process of
+                * shutting down and act as though the lock is already
+                * released.
+                */
+               if (TransactionIdIsValid(xid))
+                   result = lappend_xid(result, xid);
+           }
+       }
+
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
+                                            offsetof(PROCLOCK, lockLink));
+   }
+
+   LWLockRelease(partitionLock);
+
+   return result;
+}
+
+
 /*
  * AtPrepare_Locks
  *     Do the preparatory work for a PREPARE: make 2PC state file records
index 208b4a93ccc6fac824e6d76f381aa9db83dd4b92..120fd4f3ca4180d16f83842d2735b165a3b01307 100644 (file)
@@ -7,13 +7,14 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.97 2006/07/31 20:09:05 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.98 2006/08/27 19:14:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef LOCK_H_
 #define LOCK_H_
 
+#include "nodes/pg_list.h"
 #include "storage/itemptr.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -412,6 +413,7 @@ extern bool LockRelease(const LOCKTAG *locktag,
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
 extern void LockReleaseCurrentOwner(void);
 extern void LockReassignCurrentOwner(void);
+extern List *GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode);
 extern void AtPrepare_Locks(void);
 extern void PostPrepare_Locks(TransactionId xid);
 extern int LockCheckConflicts(LockMethod lockMethodTable,
@@ -421,13 +423,6 @@ extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
 extern void GrantAwaitedLock(void);
 extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode);
 extern Size LockShmemSize(void);
-extern bool DeadLockCheck(PGPROC *proc);
-extern void DeadLockReport(void);
-extern void RememberSimpleDeadLock(PGPROC *proc1,
-                      LOCKMODE lockmode,
-                      LOCK *lock,
-                      PGPROC *proc2);
-extern void InitDeadLockChecking(void);
 extern LockData *GetLockStatusData(void);
 extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode);
 
@@ -438,6 +433,14 @@ extern void lock_twophase_postcommit(TransactionId xid, uint16 info,
 extern void lock_twophase_postabort(TransactionId xid, uint16 info,
                        void *recdata, uint32 len);
 
+extern bool DeadLockCheck(PGPROC *proc);
+extern void DeadLockReport(void);
+extern void RememberSimpleDeadLock(PGPROC *proc1,
+                      LOCKMODE lockmode,
+                      LOCK *lock,
+                      PGPROC *proc2);
+extern void InitDeadLockChecking(void);
+
 #ifdef LOCK_DEBUG
 extern void DumpLocks(PGPROC *proc);
 extern void DumpAllLocks(void);