Reimplement hash index locking algorithms, per my recent proposal to
authorTom Lane
Thu, 4 Sep 2003 22:06:27 +0000 (22:06 +0000)
committerTom Lane
Thu, 4 Sep 2003 22:06:27 +0000 (22:06 +0000)
pghackers.  This fixes the problem recently reported by Markus KrÌutner
(hash bucket split corrupts the state of scans being done concurrently),
and I believe it also fixes all the known problems with deadlocks in
hash index operations.  Hash indexes are still not really ready for prime
time (since they aren't WAL-logged), but this is a step forward.

src/backend/access/hash/README
src/backend/access/hash/hash.c
src/backend/access/hash/hashinsert.c
src/backend/access/hash/hashovfl.c
src/backend/access/hash/hashpage.c
src/backend/access/hash/hashscan.c
src/backend/access/hash/hashsearch.c
src/backend/access/hash/hashutil.c
src/backend/storage/lmgr/lmgr.c
src/include/access/hash.h
src/include/storage/lmgr.h

index 118d4348796c5bdbc5bd149a0d4ffae5bb0311e9..ce195eae2cd7814aa3d12f0adbedb295c6846b9a 100644 (file)
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.2 2003/09/02 03:29:01 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.3 2003/09/04 22:06:27 tgl Exp $
 
 This directory contains an implementation of hash indexing for Postgres.
 
@@ -229,8 +229,8 @@ existing bucket in two, thereby lowering the fill ratio:
    check split still needed
    if split not needed anymore, drop locks and exit
    decide which bucket to split
-   Attempt to X-lock new bucket number (shouldn't fail, but...)
    Attempt to X-lock old bucket number (definitely could fail)
+   Attempt to X-lock new bucket number (shouldn't fail, but...)
    if above fail, drop locks and exit
    update meta page to reflect new number of buckets
    write/release meta page
@@ -261,12 +261,6 @@ not be overfull and split attempts will stop.  (We could make a successful
 splitter loop to see if the index is still overfull, but it seems better to
 distribute the split overhead across successive insertions.)
 
-It may be wise to make the initial exclusive-lock-page-zero operation a
-conditional one as well, although the odds of a deadlock failure are quite
-low.  (AFAICS it could only deadlock against a VACUUM operation that is
-trying to X-lock a bucket that the current process has a stopped indexscan
-in.)
-
 A problem is that if a split fails partway through (eg due to insufficient
 disk space) the index is left corrupt.  The probability of that could be
 made quite low if we grab a free page or two before we update the meta
index 7e30754c88f7d25408a623dcc45d805773afb2da..190c95e2c85919943d59bb1acef5cb4ee077cd2a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.67 2003/09/02 18:13:29 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.68 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *   This file contains only the public interface routines.
@@ -27,9 +27,6 @@
 #include "miscadmin.h"
 
 
-bool       BuildingHash = false;
-
-
 /* Working state for hashbuild and its callback */
 typedef struct
 {
@@ -61,9 +58,6 @@ hashbuild(PG_FUNCTION_ARGS)
    double      reltuples;
    HashBuildState buildstate;
 
-   /* set flag to disable locking */
-   BuildingHash = true;
-
    /*
     * We expect to be called exactly once for any index relation. If
     * that's not the case, big trouble's what we have.
@@ -82,9 +76,6 @@ hashbuild(PG_FUNCTION_ARGS)
    reltuples = IndexBuildHeapScan(heap, index, indexInfo,
                                hashbuildCallback, (void *) &buildstate);
 
-   /* all done */
-   BuildingHash = false;
-
    /*
     * Since we just counted the tuples in the heap, we update its stats
     * in pg_class to guarantee that the planner takes advantage of the
@@ -212,10 +203,18 @@ hashgettuple(PG_FUNCTION_ARGS)
    IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
    ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
    HashScanOpaque so = (HashScanOpaque) scan->opaque;
+   Relation    rel = scan->indexRelation;
    Page        page;
    OffsetNumber offnum;
    bool        res;
 
+   /*
+    * We hold pin but not lock on current buffer while outside the hash AM.
+    * Reacquire the read lock here.
+    */
+   if (BufferIsValid(so->hashso_curbuf))
+       _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
+
    /*
     * If we've already initialized this scan, we can just advance it in
     * the appropriate direction.  If we haven't done so yet, we call a
@@ -267,6 +266,10 @@ hashgettuple(PG_FUNCTION_ARGS)
        }
    }
 
+   /* Release read lock on current buffer, but keep it pinned */
+   if (BufferIsValid(so->hashso_curbuf))
+       _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
+
    PG_RETURN_BOOL(res);
 }
 
@@ -285,6 +288,8 @@ hashbeginscan(PG_FUNCTION_ARGS)
 
    scan = RelationGetIndexScan(rel, keysz, scankey);
    so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
+   so->hashso_bucket_valid = false;
+   so->hashso_bucket_blkno = 0;
    so->hashso_curbuf = so->hashso_mrkbuf = InvalidBuffer;
    scan->opaque = so;
 
@@ -303,28 +308,38 @@ hashrescan(PG_FUNCTION_ARGS)
    IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
    ScanKey     scankey = (ScanKey) PG_GETARG_POINTER(1);
    HashScanOpaque so = (HashScanOpaque) scan->opaque;
-   ItemPointer iptr;
+   Relation    rel = scan->indexRelation;
 
-   /* we hold a read lock on the current page in the scan */
-   if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
+   /* if we are called from beginscan, so is still NULL */
+   if (so)
    {
-       _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
+       /* release any pins we still hold */
+       if (BufferIsValid(so->hashso_curbuf))
+           _hash_dropbuf(rel, so->hashso_curbuf);
        so->hashso_curbuf = InvalidBuffer;
-       ItemPointerSetInvalid(iptr);
-   }
-   if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
-   {
-       _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
+
+       if (BufferIsValid(so->hashso_mrkbuf))
+           _hash_dropbuf(rel, so->hashso_mrkbuf);
        so->hashso_mrkbuf = InvalidBuffer;
-       ItemPointerSetInvalid(iptr);
+
+       /* release lock on bucket, too */
+       if (so->hashso_bucket_blkno)
+           _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
+       so->hashso_bucket_blkno = 0;
    }
 
+   /* set positions invalid (this will cause _hash_first call) */
+   ItemPointerSetInvalid(&(scan->currentItemData));
+   ItemPointerSetInvalid(&(scan->currentMarkData));
+
    /* Update scan key, if a new one is given */
    if (scankey && scan->numberOfKeys > 0)
    {
        memmove(scan->keyData,
                scankey,
                scan->numberOfKeys * sizeof(ScanKeyData));
+       if (so)
+           so->hashso_bucket_valid = false;
    }
 
    PG_RETURN_VOID();
@@ -337,32 +352,32 @@ Datum
 hashendscan(PG_FUNCTION_ARGS)
 {
    IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
-   ItemPointer iptr;
-   HashScanOpaque so;
+   HashScanOpaque so = (HashScanOpaque) scan->opaque;
+   Relation    rel = scan->indexRelation;
 
-   so = (HashScanOpaque) scan->opaque;
+   /* don't need scan registered anymore */
+   _hash_dropscan(scan);
 
-   /* release any locks we still hold */
-   if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
-   {
-       _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
-       so->hashso_curbuf = InvalidBuffer;
-       ItemPointerSetInvalid(iptr);
-   }
+   /* release any pins we still hold */
+   if (BufferIsValid(so->hashso_curbuf))
+       _hash_dropbuf(rel, so->hashso_curbuf);
+   so->hashso_curbuf = InvalidBuffer;
 
-   if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
-   {
-       if (BufferIsValid(so->hashso_mrkbuf))
-           _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
-       so->hashso_mrkbuf = InvalidBuffer;
-       ItemPointerSetInvalid(iptr);
-   }
+   if (BufferIsValid(so->hashso_mrkbuf))
+       _hash_dropbuf(rel, so->hashso_mrkbuf);
+   so->hashso_mrkbuf = InvalidBuffer;
 
-   /* don't need scan registered anymore */
-   _hash_dropscan(scan);
+   /* release lock on bucket, too */
+   if (so->hashso_bucket_blkno)
+       _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
+   so->hashso_bucket_blkno = 0;
 
    /* be tidy */
-   pfree(scan->opaque);
+   ItemPointerSetInvalid(&(scan->currentItemData));
+   ItemPointerSetInvalid(&(scan->currentMarkData));
+
+   pfree(so);
+   scan->opaque = NULL;
 
    PG_RETURN_VOID();
 }
@@ -374,25 +389,21 @@ Datum
 hashmarkpos(PG_FUNCTION_ARGS)
 {
    IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
-   ItemPointer iptr;
-   HashScanOpaque so;
-
-   so = (HashScanOpaque) scan->opaque;
+   HashScanOpaque so = (HashScanOpaque) scan->opaque;
+   Relation    rel = scan->indexRelation;
 
-   /* release lock on old marked data, if any */
-   if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
-   {
-       _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
-       so->hashso_mrkbuf = InvalidBuffer;
-       ItemPointerSetInvalid(iptr);
-   }
+   /* release pin on old marked data, if any */
+   if (BufferIsValid(so->hashso_mrkbuf))
+       _hash_dropbuf(rel, so->hashso_mrkbuf);
+   so->hashso_mrkbuf = InvalidBuffer;
+   ItemPointerSetInvalid(&(scan->currentMarkData));
 
-   /* bump lock on currentItemData and copy to currentMarkData */
+   /* bump pin count on currentItemData and copy to currentMarkData */
    if (ItemPointerIsValid(&(scan->currentItemData)))
    {
-       so->hashso_mrkbuf = _hash_getbuf(scan->indexRelation,
+       so->hashso_mrkbuf = _hash_getbuf(rel,
                                 BufferGetBlockNumber(so->hashso_curbuf),
-                                        HASH_READ);
+                                        HASH_NOLOCK);
        scan->currentMarkData = scan->currentItemData;
    }
 
@@ -406,26 +417,21 @@ Datum
 hashrestrpos(PG_FUNCTION_ARGS)
 {
    IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
-   ItemPointer iptr;
-   HashScanOpaque so;
+   HashScanOpaque so = (HashScanOpaque) scan->opaque;
+   Relation    rel = scan->indexRelation;
 
-   so = (HashScanOpaque) scan->opaque;
+   /* release pin on current data, if any */
+   if (BufferIsValid(so->hashso_curbuf))
+       _hash_dropbuf(rel, so->hashso_curbuf);
+   so->hashso_curbuf = InvalidBuffer;
+   ItemPointerSetInvalid(&(scan->currentItemData));
 
-   /* release lock on current data, if any */
-   if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
-   {
-       _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
-       so->hashso_curbuf = InvalidBuffer;
-       ItemPointerSetInvalid(iptr);
-   }
-
-   /* bump lock on currentMarkData and copy to currentItemData */
+   /* bump pin count on currentMarkData and copy to currentItemData */
    if (ItemPointerIsValid(&(scan->currentMarkData)))
    {
-       so->hashso_curbuf = _hash_getbuf(scan->indexRelation,
+       so->hashso_curbuf = _hash_getbuf(rel,
                                 BufferGetBlockNumber(so->hashso_mrkbuf),
-                                        HASH_READ);
-
+                                        HASH_NOLOCK);
        scan->currentItemData = scan->currentMarkData;
    }
 
@@ -474,7 +480,7 @@ hashbulkdelete(PG_FUNCTION_ARGS)
    orig_maxbucket = metap->hashm_maxbucket;
    orig_ntuples = metap->hashm_ntuples;
    memcpy(&local_metapage, metap, sizeof(local_metapage));
-   _hash_relbuf(rel, metabuf, HASH_READ);
+   _hash_relbuf(rel, metabuf);
 
    /* Scan the buckets that we know exist */
    cur_bucket = 0;
@@ -490,7 +496,12 @@ loop_top:
        /* Get address of bucket's start page */
        bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
 
-       /* XXX lock bucket here */
+       /* Exclusive-lock the bucket so we can shrink it */
+       _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
+
+       /* Shouldn't have any active scans locally, either */
+       if (_hash_has_active_scan(rel, cur_bucket))
+           elog(ERROR, "hash index has active scan during VACUUM");
 
        /* Scan each page in bucket */
        blkno = bucket_blkno;
@@ -522,13 +533,6 @@ loop_top:
                htup = &(hitem->hash_itup.t_tid);
                if (callback(htup, callback_state))
                {
-                   ItemPointerData indextup;
-
-                   /* adjust any active scans that will be affected */
-                   /* (this should be unnecessary) */
-                   ItemPointerSet(&indextup, blkno, offno);
-                   _hash_adjscans(rel, &indextup);
-
                    /* delete the item from the page */
                    PageIndexTupleDelete(page, offno);
                    bucket_dirty = page_dirty = true;
@@ -547,24 +551,22 @@ loop_top:
            }
 
            /*
-            * Write or free page if needed, advance to next page.  We want
-            * to preserve the invariant that overflow pages are nonempty.
+            * Write page if needed, advance to next page.
             */
            blkno = opaque->hasho_nextblkno;
 
-           if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
-               _hash_freeovflpage(rel, buf);
-           else if (page_dirty)
+           if (page_dirty)
                _hash_wrtbuf(rel, buf);
            else
-               _hash_relbuf(rel, buf, HASH_WRITE);
+               _hash_relbuf(rel, buf);
        }
 
        /* If we deleted anything, try to compact free space */
        if (bucket_dirty)
            _hash_squeezebucket(rel, cur_bucket, bucket_blkno);
 
-       /* XXX unlock bucket here */
+       /* Release bucket lock */
+       _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
 
        /* Advance to next bucket */
        cur_bucket++;
@@ -580,7 +582,7 @@ loop_top:
        /* There's been a split, so process the additional bucket(s) */
        cur_maxbucket = metap->hashm_maxbucket;
        memcpy(&local_metapage, metap, sizeof(local_metapage));
-       _hash_relbuf(rel, metabuf, HASH_WRITE);
+       _hash_relbuf(rel, metabuf);
        goto loop_top;
    }
 
index 20cdcabfaa4083414916168744235760e292ddf6..00b3d60b28c11df69b147f074b169ebdf148e08f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.29 2003/09/02 18:13:30 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.30 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "storage/lmgr.h"
+
+
+static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
+                                  Size itemsize, HashItem hitem);
 
-static InsertIndexResult _hash_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, HashItem hitem, Buffer metabuf);
-static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, HashItem hitem);
 
 /*
  * _hash_doinsert() -- Handle insertion of a single HashItem in the table.
  *
  *     This routine is called by the public interface routines, hashbuild
- *     and hashinsert.  By here, hashitem is filled in, and has a unique
- *     (xid, seqno) pair. The datum to be used as a "key" is in the
- *     hashitem.
+ *     and hashinsert.  By here, hashitem is completely filled in.
+ *     The datum to be used as a "key" is in the hashitem.
  */
 InsertIndexResult
 _hash_doinsert(Relation rel, HashItem hitem)
 {
    Buffer      buf;
    Buffer      metabuf;
-   BlockNumber blkno;
    HashMetaPage metap;
    IndexTuple  itup;
+   BlockNumber itup_blkno;
+   OffsetNumber itup_off;
    InsertIndexResult res;
-   ScanKey     itup_scankey;
-   int         natts;
+   BlockNumber blkno;
    Page        page;
-
-   metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
-
-   /* we need a scan key to do our search, so build one */
-   itup = &(hitem->hash_itup);
-   if ((natts = rel->rd_rel->relnatts) != 1)
-       elog(ERROR, "Hash indexes support only one index key");
-   itup_scankey = _hash_mkscankey(rel, itup);
+   HashPageOpaque pageopaque;
+   Size        itemsz;
+   bool        do_expand;
+   uint32      hashkey;
+   Bucket      bucket;
+   Datum       datum;
+   bool        isnull;
 
    /*
-    * find the first page in the bucket chain containing this key and
-    * place it in buf.  _hash_search obtains a read lock for us.
+    * Compute the hash key for the item.  We do this first so as not to
+    * need to hold any locks while running the hash function.
     */
-   _hash_search(rel, natts, itup_scankey, &buf, metap);
-   page = BufferGetPage(buf);
-   _hash_checkpage(rel, page, LH_BUCKET_PAGE);
+   itup = &(hitem->hash_itup);
+   if (rel->rd_rel->relnatts != 1)
+       elog(ERROR, "hash indexes support only one index key");
+   datum = index_getattr(itup, 1, RelationGetDescr(rel), &isnull);
+   Assert(!isnull);
+   hashkey = _hash_datum2hashkey(rel, datum);
+
+   /* compute item size too */
+   itemsz = IndexTupleDSize(hitem->hash_itup)
+       + (sizeof(HashItemData) - sizeof(IndexTupleData));
+
+   itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but
+                                * we need to be consistent */
 
    /*
-    * trade in our read lock for a write lock so that we can do the
-    * insertion.
+    * Acquire shared split lock so we can compute the target bucket
+    * safely (see README).
     */
-   blkno = BufferGetBlockNumber(buf);
-   _hash_relbuf(rel, buf, HASH_READ);
-   buf = _hash_getbuf(rel, blkno, HASH_WRITE);
+   _hash_getlock(rel, 0, HASH_SHARE);
 
+   /* Read the metapage */
+   metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
+   metap = (HashMetaPage) BufferGetPage(metabuf);
+   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
    /*
-    * XXX btree comment (haven't decided what to do in hash): don't think
-    * the bucket can be split while we're reading the metapage.
-    *
-    * If the page was split between the time that we surrendered our read
-    * lock and acquired our write lock, then this page may no longer be
-    * the right place for the key we want to insert.
+    * Check whether the item can fit on a hash page at all. (Eventually,
+    * we ought to try to apply TOAST methods if not.)  Note that at this
+    * point, itemsz doesn't include the ItemId.
     */
+   if (itemsz > HashMaxItemSize((Page) metap))
+       ereport(ERROR,
+               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                errmsg("index tuple size %lu exceeds hash maximum, %lu",
+                       (unsigned long) itemsz,
+                       (unsigned long) HashMaxItemSize((Page) metap))));
 
-   /* do the insertion */
-   res = _hash_insertonpg(rel, buf, natts, itup_scankey,
-                          hitem, metabuf);
+   /*
+    * Compute the target bucket number, and convert to block number.
+    */
+   bucket = _hash_hashkey2bucket(hashkey,
+                                 metap->hashm_maxbucket,
+                                 metap->hashm_highmask,
+                                 metap->hashm_lowmask);
 
-   /* be tidy */
-   _hash_freeskey(itup_scankey);
+   blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-   return res;
-}
+   /* release lock on metapage, but keep pin since we'll need it again */
+   _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
 
-/*
- * _hash_insertonpg() -- Insert a tuple on a particular page in the table.
- *
- *     This recursive procedure does the following things:
- *
- *         +  if necessary, splits the target page.
- *         +  inserts the tuple.
- *
- *     On entry, we must have the right buffer on which to do the
- *     insertion, and the buffer must be pinned and locked.  On return,
- *     we will have dropped both the pin and the write lock on the buffer.
- *
- */
-static InsertIndexResult
-_hash_insertonpg(Relation rel,
-                Buffer buf,
-                int keysz,
-                ScanKey scankey,
-                HashItem hitem,
-                Buffer metabuf)
-{
-   InsertIndexResult res;
-   Page        page;
-   BlockNumber itup_blkno;
-   OffsetNumber itup_off;
-   Size        itemsz;
-   HashPageOpaque pageopaque;
-   bool        do_expand = false;
-   Buffer      ovflbuf;
-   HashMetaPage metap;
-   Bucket      bucket;
+   /*
+    * Acquire share lock on target bucket; then we can release split lock.
+    */
+   _hash_getlock(rel, blkno, HASH_SHARE);
 
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
+   _hash_droplock(rel, 0, HASH_SHARE);
 
+   /* Fetch the primary bucket page for the bucket */
+   buf = _hash_getbuf(rel, blkno, HASH_WRITE);
    page = BufferGetPage(buf);
-   _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+   _hash_checkpage(rel, page, LH_BUCKET_PAGE);
    pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-   bucket = pageopaque->hasho_bucket;
-
-   itemsz = IndexTupleDSize(hitem->hash_itup)
-       + (sizeof(HashItemData) - sizeof(IndexTupleData));
-   itemsz = MAXALIGN(itemsz);
+   Assert(pageopaque->hasho_bucket == bucket);
 
+   /* Do the insertion */
    while (PageGetFreeSpace(page) < itemsz)
    {
        /*
         * no space on this page; check for an overflow page
         */
-       if (BlockNumberIsValid(pageopaque->hasho_nextblkno))
+       BlockNumber nextblkno = pageopaque->hasho_nextblkno;
+
+       if (BlockNumberIsValid(nextblkno))
        {
            /*
             * ovfl page exists; go get it.  if it doesn't have room,
             * we'll find out next pass through the loop test above.
             */
-           ovflbuf = _hash_getbuf(rel, pageopaque->hasho_nextblkno,
-                                  HASH_WRITE);
-           _hash_relbuf(rel, buf, HASH_WRITE);
-           buf = ovflbuf;
+           _hash_relbuf(rel, buf);
+           buf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
            page = BufferGetPage(buf);
        }
        else
@@ -154,65 +142,72 @@ _hash_insertonpg(Relation rel,
             * we're at the end of the bucket chain and we haven't found a
             * page with enough room.  allocate a new overflow page.
             */
-           do_expand = true;
-           ovflbuf = _hash_addovflpage(rel, metabuf, buf);
-           _hash_relbuf(rel, buf, HASH_WRITE);
-           buf = ovflbuf;
+
+           /* release our write lock without modifying buffer */
+           _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
+
+           /* chain to a new overflow page */
+           buf = _hash_addovflpage(rel, metabuf, buf);
            page = BufferGetPage(buf);
 
-           if (PageGetFreeSpace(page) < itemsz)
-           {
-               /* it doesn't fit on an empty page -- give up */
-               elog(ERROR, "hash item too large");
-           }
+           /* should fit now, given test above */
+           Assert(PageGetFreeSpace(page) >= itemsz);
        }
        _hash_checkpage(rel, page, LH_OVERFLOW_PAGE);
        pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
        Assert(pageopaque->hasho_bucket == bucket);
    }
 
-   itup_off = _hash_pgaddtup(rel, buf, keysz, scankey, itemsz, hitem);
+   /* found page with enough space, so add the item here */
+   itup_off = _hash_pgaddtup(rel, buf, itemsz, hitem);
    itup_blkno = BufferGetBlockNumber(buf);
 
-   /* by here, the new tuple is inserted */
-   res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
+   /* write and release the modified page */
+   _hash_wrtbuf(rel, buf);
 
-   ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+   /* We can drop the bucket lock now */
+   _hash_droplock(rel, blkno, HASH_SHARE);
 
-   if (res != NULL)
-   {
-       /*
-        * Increment the number of keys in the table. We switch lock
-        * access type just for a moment to allow greater accessibility to
-        * the metapage.
-        */
-       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
-       metap->hashm_ntuples += 1;
-       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
-   }
+   /*
+    * Write-lock the metapage so we can increment the tuple count.
+    * After incrementing it, check to see if it's time for a split.
+    */
+   _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
 
-   _hash_wrtbuf(rel, buf);
+   metap->hashm_ntuples += 1;
 
-   if (do_expand ||
-       (metap->hashm_ntuples / (metap->hashm_maxbucket + 1))
-       > (double) metap->hashm_ffactor)
+   /* Make sure this stays in sync with _hash_expandtable() */
+   do_expand = metap->hashm_ntuples >
+       (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
+
+   /* Write out the metapage and drop lock, but keep pin */
+   _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
+
+   /* Attempt to split if a split is needed */
+   if (do_expand)
        _hash_expandtable(rel, metabuf);
-   _hash_relbuf(rel, metabuf, HASH_READ);
+
+   /* Finally drop our pin on the metapage */
+   _hash_dropbuf(rel, metabuf);
+
+   /* Create the return data structure */
+   res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
+
+   ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+
    return res;
 }
 
 /*
  * _hash_pgaddtup() -- add a tuple to a particular page in the index.
  *
- *     This routine adds the tuple to the page as requested, and keeps the
- *     write lock and reference associated with the page's buffer.  It is
- *     an error to call pgaddtup() without a write lock and reference.
+ *     This routine adds the tuple to the page as requested; it does
+ *     not write out the page.  It is an error to call pgaddtup() without
+ *     a write lock and pin.
  */
 static OffsetNumber
 _hash_pgaddtup(Relation rel,
               Buffer buf,
-              int keysz,
-              ScanKey itup_scankey,
               Size itemsize,
               HashItem hitem)
 {
@@ -228,8 +223,5 @@ _hash_pgaddtup(Relation rel,
        elog(ERROR, "failed to add index item to \"%s\"",
             RelationGetRelationName(rel));
 
-   /* write the buffer, but hold our lock */
-   _hash_wrtnorelbuf(buf);
-
    return itup_off;
 }
index 388a711832a9814c03a0450f847bdcaeab1ed8cc..fe5e5e95958704aa6fb219530c33bcbc5e0026f9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.40 2003/09/02 18:13:30 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.41 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *   Overflow pages look like ordinary relation pages.
@@ -77,39 +77,68 @@ blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
 /*
  * _hash_addovflpage
  *
- * Add an overflow page to the page currently pointed to by the buffer
- * argument 'buf'.
+ * Add an overflow page to the bucket whose last page is pointed to by 'buf'.
  *
- * metabuf has a read lock upon entering the function; buf has a
- * write lock.  The same is true on exit.  The returned overflow page
- * is write-locked.
+ * On entry, the caller must hold a pin but no lock on 'buf'.  The pin is
+ * dropped before exiting (we assume the caller is not interested in 'buf'
+ * anymore).  The returned overflow page will be pinned and write-locked;
+ * it is guaranteed to be empty.
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * That buffer is returned in the same state.
+ *
+ * The caller must hold at least share lock on the bucket, to ensure that
+ * no one else tries to compact the bucket meanwhile.  This guarantees that
+ * 'buf' won't stop being part of the bucket while it's unlocked.
+ *
+ * NB: since this could be executed concurrently by multiple processes,
+ * one should not assume that the returned overflow page will be the
+ * immediate successor of the originally passed 'buf'.  Additional overflow
+ * pages might have been added to the bucket chain in between.
  */
 Buffer
 _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
 {
    BlockNumber ovflblkno;
    Buffer      ovflbuf;
-   HashMetaPage metap;
-   HashPageOpaque ovflopaque;
-   HashPageOpaque pageopaque;
    Page        page;
    Page        ovflpage;
-
-   /* this had better be the last page in a bucket chain */
-   page = BufferGetPage(buf);
-   _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
-   pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-   Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno));
-
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
+   HashPageOpaque pageopaque;
+   HashPageOpaque ovflopaque;
 
    /* allocate an empty overflow page */
    ovflblkno = _hash_getovflpage(rel, metabuf);
+
+   /* lock the overflow page */
    ovflbuf = _hash_getbuf(rel, ovflblkno, HASH_WRITE);
    ovflpage = BufferGetPage(ovflbuf);
 
-   /* initialize the new overflow page */
+   /*
+    * Write-lock the tail page.  It is okay to hold two buffer locks here
+    * since there cannot be anyone else contending for access to ovflbuf.
+    */
+   _hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_WRITE);
+
+   /* loop to find current tail page, in case someone else inserted too */
+   for (;;)
+   {
+       BlockNumber nextblkno;
+
+       page = BufferGetPage(buf);
+       _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+       pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+       nextblkno = pageopaque->hasho_nextblkno;
+
+       if (!BlockNumberIsValid(nextblkno))
+           break;
+
+       /* we assume we do not need to write the unmodified page */
+       _hash_relbuf(rel, buf);
+
+       buf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
+   }
+
+   /* now that we have correct backlink, initialize new overflow page */
    _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
    ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
    ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
@@ -117,11 +146,12 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
    ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
    ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
    ovflopaque->hasho_filler = HASHO_FILL;
-   _hash_wrtnorelbuf(ovflbuf);
+   _hash_wrtnorelbuf(rel, ovflbuf);
 
    /* logically chain overflow page to previous page */
    pageopaque->hasho_nextblkno = ovflblkno;
-   _hash_wrtnorelbuf(buf);
+   _hash_wrtbuf(rel, buf);
+
    return ovflbuf;
 }
 
@@ -130,9 +160,8 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
  *
  * Find an available overflow page and return its block number.
  *
- * When we enter this function, we have a read lock on metabuf which
- * we change to a write lock immediately. Before exiting, the write lock
- * is exchanged for a read lock.
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.
  */
 static BlockNumber
 _hash_getovflpage(Relation rel, Buffer metabuf)
@@ -140,6 +169,7 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
    HashMetaPage metap;
    Buffer      mapbuf = 0;
    BlockNumber blkno;
+   uint32      orig_firstfree;
    uint32      splitnum;
    uint32     *freep = NULL;
    uint32      max_ovflpg;
@@ -150,51 +180,66 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
    uint32      i,
                j;
 
-   _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   splitnum = metap->hashm_ovflpoint;
+   /* Get exclusive lock on the meta page */
+   _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
 
-   /* end search with the last existing overflow page */
-   max_ovflpg = metap->hashm_spares[splitnum] - 1;
-   last_page = max_ovflpg >> BMPG_SHIFT(metap);
-   last_bit = max_ovflpg & BMPG_MASK(metap);
+   metap = (HashMetaPage) BufferGetPage(metabuf);
+   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
    /* start search at hashm_firstfree */
-   first_page = metap->hashm_firstfree >> BMPG_SHIFT(metap);
-   bit = metap->hashm_firstfree & BMPG_MASK(metap);
+   orig_firstfree = metap->hashm_firstfree;
+   first_page = orig_firstfree >> BMPG_SHIFT(metap);
+   bit = orig_firstfree & BMPG_MASK(metap);
+   i = first_page;
    j = bit / BITS_PER_MAP;
    bit &= ~(BITS_PER_MAP - 1);
 
-   for (i = first_page; i <= last_page; i++)
+   /* outer loop iterates once per bitmap page */
+   for (;;)
    {
        BlockNumber mapblkno;
        Page        mappage;
        uint32      last_inpage;
 
-       mapblkno = metap->hashm_mapp[i];
-       mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
-       mappage = BufferGetPage(mapbuf);
-       _hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
-       freep = HashPageGetBitmap(mappage);
+       /* want to end search with the last existing overflow page */
+       splitnum = metap->hashm_ovflpoint;
+       max_ovflpg = metap->hashm_spares[splitnum] - 1;
+       last_page = max_ovflpg >> BMPG_SHIFT(metap);
+       last_bit = max_ovflpg & BMPG_MASK(metap);
 
-       if (i != first_page)
-       {
-           bit = 0;
-           j = 0;
-       }
+       if (i > last_page)
+           break;
+
+       Assert(i < metap->hashm_nmaps);
+       mapblkno = metap->hashm_mapp[i];
 
        if (i == last_page)
            last_inpage = last_bit;
        else
            last_inpage = BMPGSZ_BIT(metap) - 1;
 
+       /* Release exclusive lock on metapage while reading bitmap page */
+       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+       mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
+       mappage = BufferGetPage(mapbuf);
+       _hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
+       freep = HashPageGetBitmap(mappage);
+
        for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
        {
            if (freep[j] != ALL_SET)
                goto found;
        }
 
-       _hash_relbuf(rel, mapbuf, HASH_WRITE);
+       /* No free space here, try to advance to next map page */
+       _hash_relbuf(rel, mapbuf);
+       i++;
+       j = 0;                  /* scan from start of next map page */
+       bit = 0;
+
+       /* Reacquire exclusive lock on the meta page */
+       _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
    }
 
    /* No Free Page Found - have to allocate a new page */
@@ -225,13 +270,19 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
         */
    }
 
-   /* mark new page as first free so we don't search much next time */
-   metap->hashm_firstfree = bit;
-
    /* Calculate address of the new overflow page */
    blkno = bitno_to_blkno(metap, bit);
 
-   _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
+   /*
+    * Adjust hashm_firstfree to avoid redundant searches.  But don't
+    * risk changing it if someone moved it while we were searching
+    * bitmap pages.
+    */
+   if (metap->hashm_firstfree == orig_firstfree)
+       metap->hashm_firstfree = bit + 1;
+
+   /* Write updated metapage and release lock, but not pin */
+   _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
 
    return blkno;
 
@@ -239,20 +290,36 @@ found:
    /* convert bit to bit number within page */
    bit += _hash_firstfreebit(freep[j]);
 
-   /* mark page "in use" */
+   /* mark page "in use" in the bitmap */
    SETBIT(freep, bit);
    _hash_wrtbuf(rel, mapbuf);
 
+   /* Reacquire exclusive lock on the meta page */
+   _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
+
    /* convert bit to absolute bit number */
    bit += (i << BMPG_SHIFT(metap));
 
-   /* adjust hashm_firstfree to avoid redundant searches */
-   if (bit > metap->hashm_firstfree)
-       metap->hashm_firstfree = bit;
-
+   /* Calculate address of the new overflow page */
    blkno = bitno_to_blkno(metap, bit);
 
-   _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
+   /*
+    * Adjust hashm_firstfree to avoid redundant searches.  But don't
+    * risk changing it if someone moved it while we were searching
+    * bitmap pages.
+    */
+   if (metap->hashm_firstfree == orig_firstfree)
+   {
+       metap->hashm_firstfree = bit + 1;
+
+       /* Write updated metapage and release lock, but not pin */
+       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
+   }
+   else
+   {
+       /* We didn't change the metapage, so no need to write */
+       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+   }
 
    return blkno;
 }
@@ -275,7 +342,10 @@ _hash_firstfreebit(uint32 map)
            return i;
        mask <<= 1;
    }
-   return i;
+
+   elog(ERROR, "firstfreebit found no free bit");
+
+   return 0;                   /* keep compiler quiet */
 }
 
 /*
@@ -287,7 +357,9 @@ _hash_firstfreebit(uint32 map)
  * Returns the block number of the page that followed the given page
  * in the bucket, or InvalidBlockNumber if no following page.
  *
- * NB: caller must not hold lock on metapage.
+ * NB: caller must not hold lock on metapage, nor on either page that's
+ * adjacent in the bucket chain.  The caller had better hold exclusive lock
+ * on the bucket, too.
  */
 BlockNumber
 _hash_freeovflpage(Relation rel, Buffer ovflbuf)
@@ -308,10 +380,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
                bitmapbit;
    Bucket      bucket;
 
-   metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
-
+   /* Get information from the doomed page */
    ovflblkno = BufferGetBlockNumber(ovflbuf);  
    ovflpage = BufferGetPage(ovflbuf);
    _hash_checkpage(rel, ovflpage, LH_OVERFLOW_PAGE);
@@ -319,17 +388,16 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
    nextblkno = ovflopaque->hasho_nextblkno;
    prevblkno = ovflopaque->hasho_prevblkno;
    bucket = ovflopaque->hasho_bucket;
+
+   /* Zero the page for debugging's sake; then write and release it */
    MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
    _hash_wrtbuf(rel, ovflbuf);
 
    /*
-    * fix up the bucket chain.  this is a doubly-linked list, so we must
+    * Fix up the bucket chain.  this is a doubly-linked list, so we must
     * fix up the bucket chain members behind and ahead of the overflow
-    * page being deleted.
-    *
-    * XXX this should look like: - lock prev/next - modify/write prev/next
-    * (how to do write ordering with a doubly-linked list?) - unlock
-    * prev/next
+    * page being deleted.  No concurrency issues since we hold exclusive
+    * lock on the entire bucket.
     */
    if (BlockNumberIsValid(prevblkno))
    {
@@ -354,9 +422,12 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
        _hash_wrtbuf(rel, nextbuf);
    }
 
-   /*
-    * Clear the bitmap bit to indicate that this overflow page is free.
-    */
+   /* Read the metapage so we can determine which bitmap page to use */
+   metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
+   metap = (HashMetaPage) BufferGetPage(metabuf);
+   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
+
+   /* Identify which bit to set */
    ovflbitno = blkno_to_bitno(metap, ovflblkno);
 
    bitmappage = ovflbitno >> BMPG_SHIFT(metap);
@@ -366,18 +437,32 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
        elog(ERROR, "invalid overflow bit number %u", ovflbitno);
    blkno = metap->hashm_mapp[bitmappage];
 
+   /* Release metapage lock while we access the bitmap page */
+   _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+   /* Clear the bitmap bit to indicate that this overflow page is free */
    mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE);
    mappage = BufferGetPage(mapbuf);
    _hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
    freep = HashPageGetBitmap(mappage);
+   Assert(ISSET(freep, bitmapbit));
    CLRBIT(freep, bitmapbit);
    _hash_wrtbuf(rel, mapbuf);
 
+   /* Get write-lock on metapage to update firstfree */
+   _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
+
    /* if this is now the first free page, update hashm_firstfree */
    if (ovflbitno < metap->hashm_firstfree)
+   {
        metap->hashm_firstfree = ovflbitno;
-
-   _hash_wrtbuf(rel, metabuf);
+       _hash_wrtbuf(rel, metabuf);
+   }
+   else
+   {
+       /* no need to change metapage */
+       _hash_relbuf(rel, metabuf);
+   }
 
    return nextblkno;
 }
@@ -401,9 +486,18 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
    HashPageOpaque op;
    uint32     *freep;
 
-   /* initialize the page */
+   /*
+    * It is okay to write-lock the new bitmap page while holding metapage
+    * write lock, because no one else could be contending for the new page.
+    *
+    * There is some loss of concurrency in possibly doing I/O for the new
+    * page while holding the metapage lock, but this path is taken so
+    * seldom that it's not worth worrying about.
+    */
    buf = _hash_getbuf(rel, blkno, HASH_WRITE);
    pg = BufferGetPage(buf);
+
+   /* initialize the page */
    _hash_pageinit(pg, BufferGetPageSize(buf));
    op = (HashPageOpaque) PageGetSpecialPointer(pg);
    op->hasho_prevblkno = InvalidBlockNumber;
@@ -416,7 +510,7 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
    freep = HashPageGetBitmap(pg);
    MemSet((char *) freep, 0xFF, BMPGSZ_BYTE(metap));
 
-   /* write out the new bitmap page (releasing write lock) */
+   /* write out the new bitmap page (releasing write lock and pin) */
    _hash_wrtbuf(rel, buf);
 
    /* add the new bitmap page to the metapage's list of bitmaps */
@@ -445,7 +539,14 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
  * the write page works forward; the procedure terminates when the
  * read page and write page are the same page.
  *
- * Caller must hold exclusive lock on the target bucket.
+ * At completion of this procedure, it is guaranteed that all pages in
+ * the bucket are nonempty, unless the bucket is totally empty (in
+ * which case all overflow pages will be freed).  The original implementation
+ * required that to be true on entry as well, but it's a lot easier for
+ * callers to leave empty overflow pages and let this guy clean it up.
+ *
+ * Caller must hold exclusive lock on the target bucket.  This allows
+ * us to safely lock multiple pages in the bucket.
  */
 void
 _hash_squeezebucket(Relation rel,
@@ -479,7 +580,7 @@ _hash_squeezebucket(Relation rel,
     */
    if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
    {
-       _hash_relbuf(rel, wbuf, HASH_WRITE);
+       _hash_relbuf(rel, wbuf);
        return;
    }
 
@@ -492,11 +593,10 @@ _hash_squeezebucket(Relation rel,
    {
        rblkno = ropaque->hasho_nextblkno;
        if (ropaque != wopaque)
-           _hash_relbuf(rel, rbuf, HASH_WRITE);
+           _hash_relbuf(rel, rbuf);
        rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
        rpage = BufferGetPage(rbuf);
        _hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
-       Assert(!PageIsEmpty(rpage));
        ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
        Assert(ropaque->hasho_bucket == bucket);
    } while (BlockNumberIsValid(ropaque->hasho_nextblkno));
@@ -507,81 +607,97 @@ _hash_squeezebucket(Relation rel,
    roffnum = FirstOffsetNumber;
    for (;;)
    {
-       hitem = (HashItem) PageGetItem(rpage, PageGetItemId(rpage, roffnum));
-       itemsz = IndexTupleDSize(hitem->hash_itup)
-           + (sizeof(HashItemData) - sizeof(IndexTupleData));
-       itemsz = MAXALIGN(itemsz);
-
-       /*
-        * walk up the bucket chain, looking for a page big enough for
-        * this item.
-        */
-       while (PageGetFreeSpace(wpage) < itemsz)
+       /* this test is needed in case page is empty on entry */
+       if (roffnum <= PageGetMaxOffsetNumber(rpage))
        {
-           wblkno = wopaque->hasho_nextblkno;
+           hitem = (HashItem) PageGetItem(rpage,
+                                          PageGetItemId(rpage, roffnum));
+           itemsz = IndexTupleDSize(hitem->hash_itup)
+               + (sizeof(HashItemData) - sizeof(IndexTupleData));
+           itemsz = MAXALIGN(itemsz);
+
+           /*
+            * Walk up the bucket chain, looking for a page big enough for
+            * this item.  Exit if we reach the read page.
+            */
+           while (PageGetFreeSpace(wpage) < itemsz)
+           {
+               Assert(!PageIsEmpty(wpage));
 
-           _hash_wrtbuf(rel, wbuf);
+               wblkno = wopaque->hasho_nextblkno;
+               Assert(BlockNumberIsValid(wblkno));
 
-           if (!BlockNumberIsValid(wblkno) || (rblkno == wblkno))
-           {
-               _hash_wrtbuf(rel, rbuf);
-               /* wbuf is already released */
-               return;
+               _hash_wrtbuf(rel, wbuf);
+
+               if (rblkno == wblkno)
+               {
+                   /* wbuf is already released */
+                   _hash_wrtbuf(rel, rbuf);
+                   return;
+               }
+
+               wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
+               wpage = BufferGetPage(wbuf);
+               _hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
+               wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
+               Assert(wopaque->hasho_bucket == bucket);
            }
 
-           wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
-           wpage = BufferGetPage(wbuf);
-           _hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
-           Assert(!PageIsEmpty(wpage));
-           wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
-           Assert(wopaque->hasho_bucket == bucket);
+           /*
+            * we have found room so insert on the "write" page.
+            */
+           woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
+           if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
+               == InvalidOffsetNumber)
+               elog(ERROR, "failed to add index item to \"%s\"",
+                    RelationGetRelationName(rel));
+
+           /*
+            * delete the tuple from the "read" page. PageIndexTupleDelete
+            * repacks the ItemId array, so 'roffnum' will be "advanced" to
+            * the "next" ItemId.
+            */
+           PageIndexTupleDelete(rpage, roffnum);
        }
 
        /*
-        * if we're here, we have found room so insert on the "write"
-        * page.
+        * if the "read" page is now empty because of the deletion (or
+        * because it was empty when we got to it), free it.
+        *
+        * Tricky point here: if our read and write pages are adjacent in the
+        * bucket chain, our write lock on wbuf will conflict with
+        * _hash_freeovflpage's attempt to update the sibling links of the
+        * removed page.  However, in that case we are done anyway, so we can
+        * simply drop the write lock before calling _hash_freeovflpage.
         */
-       woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
-       if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
-           == InvalidOffsetNumber)
-           elog(ERROR, "failed to add index item to \"%s\"",
-                RelationGetRelationName(rel));
-
-       /*
-        * delete the tuple from the "read" page. PageIndexTupleDelete
-        * repacks the ItemId array, so 'roffnum' will be "advanced" to
-        * the "next" ItemId.
-        */
-       PageIndexTupleDelete(rpage, roffnum);
-       _hash_wrtnorelbuf(rbuf);
-
-       /*
-        * if the "read" page is now empty because of the deletion, free
-        * it.
-        */
-       if (PageIsEmpty(rpage) && (ropaque->hasho_flag & LH_OVERFLOW_PAGE))
+       if (PageIsEmpty(rpage))
        {
            rblkno = ropaque->hasho_prevblkno;
            Assert(BlockNumberIsValid(rblkno));
 
-           /* free this overflow page */
-           _hash_freeovflpage(rel, rbuf);
-
+           /* are we freeing the page adjacent to wbuf? */
            if (rblkno == wblkno)
            {
-               /* rbuf is already released */
+               /* yes, so release wbuf lock first */
                _hash_wrtbuf(rel, wbuf);
+               /* free this overflow page (releases rbuf) */
+               _hash_freeovflpage(rel, rbuf);
+               /* done */
                return;
            }
 
+           /* free this overflow page, then get the previous one */
+           _hash_freeovflpage(rel, rbuf);
+
            rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
            rpage = BufferGetPage(rbuf);
            _hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
-           Assert(!PageIsEmpty(rpage));
            ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
            Assert(ropaque->hasho_bucket == bucket);
 
            roffnum = FirstOffsetNumber;
        }
    }
+
+   /* NOTREACHED */
 }
index 1c16df33cd350b3ac0b59967f11a75b3a5cfd710..5b9d19acf1b9eadee8e19b81955c83f1cd3eaeb4 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.41 2003/09/02 18:13:31 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.42 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *   Postgres hash pages look like ordinary relation pages.  The opaque
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/genam.h"
 #include "access/hash.h"
-#include "miscadmin.h"
 #include "storage/lmgr.h"
+#include "utils/lsyscache.h"
+
+
+static void _hash_splitbucket(Relation rel, Buffer metabuf,
+                             Bucket obucket, Bucket nbucket,
+                             BlockNumber start_oblkno,
+                             BlockNumber start_nblkno,
+                             uint32 maxbucket,
+                             uint32 highmask, uint32 lowmask);
+
+
+/*
+ * We use high-concurrency locking on hash indexes (see README for an overview
+ * of the locking rules).  There are two cases in which we don't do locking.
+ * One is when the index is newly created in the current transaction.  Since
+ * the creating transaction has not committed, no one else can see the index,
+ * and there's no reason to take locks.  The second case is for temp
+ * relations, which no one else can see either.  (We still take buffer-level
+ * locks, but not lmgr locks.)
+ */
+#define USELOCKING(rel)        (!((rel)->rd_isnew || (rel)->rd_istemp))
 
 
 /*
- * We use high-concurrency locking on hash indices.  There are two cases in
- * which we don't do locking.  One is when we're building the index.
- * Since the creating transaction has not committed, no one can see
- * the index, and there's no reason to share locks.  The second case
- * is when we're just starting up the database system.  We use some
- * special-purpose initialization code in the relation cache manager
- * (see utils/cache/relcache.c) to allow us to do indexed scans on
- * the system catalogs before we'd normally be able to.  This happens
- * before the lock table is fully initialized, so we can't use it.
- * Strictly speaking, this violates 2pl, but we don't do 2pl on the
- * system catalogs anyway.
+ * _hash_getlock() -- Acquire an lmgr lock.
  *
- * Note that our page locks are actual lockmanager locks, not buffer
- * locks (as are used by btree, for example).  This is a good idea because
- * the algorithms are not deadlock-free, and we'd better be able to detect
- * and recover from deadlocks.
+ * 'whichlock' should be zero to acquire the split-control lock, or the
+ * block number of a bucket's primary bucket page to acquire the per-bucket
+ * lock.  (See README for details of the use of these locks.)
  *
- * Another important difference from btree is that a hash indexscan
- * retains both a lock and a buffer pin on the current index page
- * between hashgettuple() calls (btree keeps only a buffer pin).
- * Because of this, it's safe to do item deletions with only a regular
- * write lock on a hash page --- there cannot be an indexscan stopped on
- * the page being deleted, other than an indexscan of our own backend,
- * which will be taken care of by _hash_adjscans.
+ * 'access' must be HASH_SHARE or HASH_EXCLUSIVE.
  */
-#define USELOCKING     (!BuildingHash && !IsInitProcessingMode())
+void
+_hash_getlock(Relation rel, BlockNumber whichlock, int access)
+{
+   if (USELOCKING(rel))
+       LockPage(rel, whichlock, access);
+}
 
+/*
+ * _hash_try_getlock() -- Acquire an lmgr lock, but only if it's free.
+ *
+ * Same as above except we return FALSE without blocking if lock isn't free.
+ */
+bool
+_hash_try_getlock(Relation rel, BlockNumber whichlock, int access)
+{
+   if (USELOCKING(rel))
+       return ConditionalLockPage(rel, whichlock, access);
+   else
+       return true;
+}
 
-static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
-static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
-static void _hash_splitbucket(Relation rel, Buffer metabuf,
-                             Bucket obucket, Bucket nbucket);
+/*
+ * _hash_droplock() -- Release an lmgr lock.
+ */
+void
+_hash_droplock(Relation rel, BlockNumber whichlock, int access)
+{
+   if (USELOCKING(rel))
+       UnlockPage(rel, whichlock, access);
+}
+
+/*
+ * _hash_getbuf() -- Get a buffer by block number for read or write.
+ *
+ *     'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
+ *
+ *     When this routine returns, the appropriate lock is set on the
+ *     requested buffer and its reference count has been incremented
+ *     (ie, the buffer is "locked and pinned").
+ *
+ *     XXX P_NEW is not used because, unlike the tree structures, we
+ *     need the bucket blocks to be at certain block numbers.  we must
+ *     depend on the caller to call _hash_pageinit on the block if it
+ *     knows that this is a new block.
+ */
+Buffer
+_hash_getbuf(Relation rel, BlockNumber blkno, int access)
+{
+   Buffer      buf;
+
+   if (blkno == P_NEW)
+       elog(ERROR, "hash AM does not use P_NEW");
+
+   buf = ReadBuffer(rel, blkno);
+
+   if (access != HASH_NOLOCK)
+       LockBuffer(buf, access);
+
+   /* ref count and lock type are correct */
+   return buf;
+}
+
+/*
+ * _hash_relbuf() -- release a locked buffer.
+ *
+ * Lock and pin (refcount) are both dropped.  Note that either read or
+ * write lock can be dropped this way, but if we modified the buffer,
+ * this is NOT the right way to release a write lock.
+ */
+void
+_hash_relbuf(Relation rel, Buffer buf)
+{
+   LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+   ReleaseBuffer(buf);
+}
+
+/*
+ * _hash_dropbuf() -- release an unlocked buffer.
+ *
+ * This is used to unpin a buffer on which we hold no lock.  It is assumed
+ * that the buffer is not dirty.
+ */
+void
+_hash_dropbuf(Relation rel, Buffer buf)
+{
+   ReleaseBuffer(buf);
+}
+
+/*
+ * _hash_wrtbuf() -- write a hash page to disk.
+ *
+ *     This routine releases the lock held on the buffer and our refcount
+ *     for it.  It is an error to call _hash_wrtbuf() without a write lock
+ *     and a pin on the buffer.
+ *
+ * NOTE: actually, the buffer manager just marks the shared buffer page
+ * dirty here; the real I/O happens later. This is okay since we are not
+ * relying on write ordering anyway.  The WAL mechanism is responsible for
+ * guaranteeing correctness after a crash.
+ */
+void
+_hash_wrtbuf(Relation rel, Buffer buf)
+{
+   LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+   WriteBuffer(buf);
+}
+
+/*
+ * _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
+ *                      our reference or lock.
+ *
+ *     It is an error to call _hash_wrtnorelbuf() without a write lock
+ *     and a pin on the buffer.
+ *
+ * See above NOTE.
+ */
+void
+_hash_wrtnorelbuf(Relation rel, Buffer buf)
+{
+   WriteNoReleaseBuffer(buf);
+}
+
+/*
+ * _hash_chgbufaccess() -- Change the lock type on a buffer, without
+ *         dropping our pin on it.
+ *
+ * from_access and to_access may be HASH_READ, HASH_WRITE, or HASH_NOLOCK,
+ * the last indicating that no buffer-level lock is held or wanted.
+ *
+ * When from_access == HASH_WRITE, we assume the buffer is dirty and tell
+ * bufmgr it must be written out.  If the caller wants to release a write
+ * lock on a page that's not been modified, it's okay to pass from_access
+ * as HASH_READ (a bit ugly, but handy in some places).
+ */
+void
+_hash_chgbufaccess(Relation rel,
+                  Buffer buf,
+                  int from_access,
+                  int to_access)
+{
+   if (from_access != HASH_NOLOCK)
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+   if (from_access == HASH_WRITE)
+       WriteNoReleaseBuffer(buf);
+
+   if (to_access != HASH_NOLOCK)
+       LockBuffer(buf, to_access);
+}
 
 
 /*
  * _hash_metapinit() -- Initialize the metadata page of a hash index,
  *             the two buckets that we begin with and the initial
  *             bitmap page.
+ *
+ * We are fairly cavalier about locking here, since we know that no one else
+ * could be accessing this index.  In particular the rule about not holding
+ * multiple buffer locks is ignored.
  */
 void
 _hash_metapinit(Relation rel)
@@ -83,16 +230,31 @@ _hash_metapinit(Relation rel)
    Buffer      metabuf;
    Buffer      buf;
    Page        pg;
+   int32       data_width;
+   int32       item_width;
+   int32       ffactor;
    uint16      i;
 
-   /* can't be sharing this with anyone, now... */
-   if (USELOCKING)
-       LockRelation(rel, AccessExclusiveLock);
-
+   /* safety check */
    if (RelationGetNumberOfBlocks(rel) != 0)
        elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
             RelationGetRelationName(rel));
 
+   /*
+    * Determine the target fill factor (tuples per bucket) for this index.
+    * The idea is to make the fill factor correspond to pages about 3/4ths
+    * full.  We can compute it exactly if the index datatype is fixed-width,
+    * but for var-width there's some guessing involved.
+    */
+   data_width = get_typavgwidth(RelationGetDescr(rel)->attrs[0]->atttypid,
+                                RelationGetDescr(rel)->attrs[0]->atttypmod);
+   item_width = MAXALIGN(sizeof(HashItemData)) + MAXALIGN(data_width) +
+       sizeof(ItemIdData);     /* include the line pointer */
+   ffactor = (BLCKSZ * 3 / 4) / item_width;
+   /* keep to a sane range */
+   if (ffactor < 10)
+       ffactor = 10;
+
    metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
    pg = BufferGetPage(metabuf);
    _hash_pageinit(pg, BufferGetPageSize(metabuf));
@@ -110,7 +272,7 @@ _hash_metapinit(Relation rel)
    metap->hashm_version = HASH_VERSION;
    metap->hashm_ntuples = 0;
    metap->hashm_nmaps = 0;
-   metap->hashm_ffactor = DEFAULT_FFACTOR;
+   metap->hashm_ffactor = ffactor;
    metap->hashm_bsize = BufferGetPageSize(metabuf);
    /* find largest bitmap array size that will fit in page size */
    for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
@@ -142,7 +304,7 @@ _hash_metapinit(Relation rel)
    metap->hashm_firstfree = 0;
 
    /*
-    * initialize the first two buckets
+    * Initialize the first two buckets
     */
    for (i = 0; i <= 1; i++)
    {
@@ -159,135 +321,17 @@ _hash_metapinit(Relation rel)
    }
 
    /*
-    * Initialize bitmap page.  Can't do this until we
+    * Initialize first bitmap page.  Can't do this until we
     * create the first two buckets, else smgr will complain.
     */
    _hash_initbitmap(rel, metap, 3);
 
    /* all done */
    _hash_wrtbuf(rel, metabuf);
-
-   if (USELOCKING)
-       UnlockRelation(rel, AccessExclusiveLock);
 }
 
 /*
- * _hash_getbuf() -- Get a buffer by block number for read or write.
- *
- *     When this routine returns, the appropriate lock is set on the
- *     requested buffer its reference count is correct.
- *
- *     XXX P_NEW is not used because, unlike the tree structures, we
- *     need the bucket blocks to be at certain block numbers.  we must
- *     depend on the caller to call _hash_pageinit on the block if it
- *     knows that this is a new block.
- */
-Buffer
-_hash_getbuf(Relation rel, BlockNumber blkno, int access)
-{
-   Buffer      buf;
-
-   if (blkno == P_NEW)
-       elog(ERROR, "hash AM does not use P_NEW");
-   switch (access)
-   {
-       case HASH_WRITE:
-       case HASH_READ:
-           _hash_setpagelock(rel, blkno, access);
-           break;
-       default:
-           elog(ERROR, "unrecognized hash access code: %d", access);
-           break;
-   }
-   buf = ReadBuffer(rel, blkno);
-
-   /* ref count and lock type are correct */
-   return buf;
-}
-
-/*
- * _hash_relbuf() -- release a locked buffer.
- */
-void
-_hash_relbuf(Relation rel, Buffer buf, int access)
-{
-   BlockNumber blkno;
-
-   blkno = BufferGetBlockNumber(buf);
-
-   switch (access)
-   {
-       case HASH_WRITE:
-       case HASH_READ:
-           _hash_unsetpagelock(rel, blkno, access);
-           break;
-       default:
-           elog(ERROR, "unrecognized hash access code: %d", access);
-           break;
-   }
-
-   ReleaseBuffer(buf);
-}
-
-/*
- * _hash_wrtbuf() -- write a hash page to disk.
- *
- *     This routine releases the lock held on the buffer and our reference
- *     to it.  It is an error to call _hash_wrtbuf() without a write lock
- *     or a reference to the buffer.
- */
-void
-_hash_wrtbuf(Relation rel, Buffer buf)
-{
-   BlockNumber blkno;
-
-   blkno = BufferGetBlockNumber(buf);
-   WriteBuffer(buf);
-   _hash_unsetpagelock(rel, blkno, HASH_WRITE);
-}
-
-/*
- * _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
- *                      our reference or lock.
- *
- *     It is an error to call _hash_wrtnorelbuf() without a write lock
- *     or a reference to the buffer.
- */
-void
-_hash_wrtnorelbuf(Buffer buf)
-{
-   BlockNumber blkno;
-
-   blkno = BufferGetBlockNumber(buf);
-   WriteNoReleaseBuffer(buf);
-}
-
-/*
- * _hash_chgbufaccess() -- Change from read to write access or vice versa.
- *
- * When changing from write to read, we assume the buffer is dirty and tell
- * bufmgr it must be written out.
- */
-void
-_hash_chgbufaccess(Relation rel,
-                  Buffer buf,
-                  int from_access,
-                  int to_access)
-{
-   BlockNumber blkno;
-
-   blkno = BufferGetBlockNumber(buf);
-
-   if (from_access == HASH_WRITE)
-       _hash_wrtnorelbuf(buf);
-
-   _hash_unsetpagelock(rel, blkno, from_access);
-
-   _hash_setpagelock(rel, blkno, to_access);
-}
-
-/*
- * _hash_pageinit() -- Initialize a new page.
+ * _hash_pageinit() -- Initialize a new hash index page.
  */
 void
 _hash_pageinit(Page page, Size size)
@@ -297,57 +341,14 @@ _hash_pageinit(Page page, Size size)
 }
 
 /*
- *  _hash_setpagelock() -- Acquire the requested type of lock on a page.
- */
-static void
-_hash_setpagelock(Relation rel,
-                 BlockNumber blkno,
-                 int access)
-{
-   if (USELOCKING)
-   {
-       switch (access)
-       {
-           case HASH_WRITE:
-               LockPage(rel, blkno, ExclusiveLock);
-               break;
-           case HASH_READ:
-               LockPage(rel, blkno, ShareLock);
-               break;
-           default:
-               elog(ERROR, "unrecognized hash access code: %d", access);
-               break;
-       }
-   }
-}
-
-/*
- *  _hash_unsetpagelock() -- Release the specified type of lock on a page.
- */
-static void
-_hash_unsetpagelock(Relation rel,
-                   BlockNumber blkno,
-                   int access)
-{
-   if (USELOCKING)
-   {
-       switch (access)
-       {
-           case HASH_WRITE:
-               UnlockPage(rel, blkno, ExclusiveLock);
-               break;
-           case HASH_READ:
-               UnlockPage(rel, blkno, ShareLock);
-               break;
-           default:
-               elog(ERROR, "unrecognized hash access code: %d", access);
-               break;
-       }
-   }
-}
-
-/*
- * Expand the hash table by creating one new bucket.
+ * Attempt to expand the hash table by creating one new bucket.
+ *
+ * This will silently do nothing if it cannot get the needed locks.
+ *
+ * The caller should hold no locks on the hash index.
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.
  */
 void
 _hash_expandtable(Relation rel, Buffer metabuf)
@@ -356,15 +357,72 @@ _hash_expandtable(Relation rel, Buffer metabuf)
    Bucket      old_bucket;
    Bucket      new_bucket;
    uint32      spare_ndx;
+   BlockNumber start_oblkno;
+   BlockNumber start_nblkno;
+   uint32      maxbucket;
+   uint32      highmask;
+   uint32      lowmask;
+
+   /*
+    * Obtain the page-zero lock to assert the right to begin a split
+    * (see README).
+    *
+    * Note: deadlock should be impossible here. Our own backend could only
+    * be holding bucket sharelocks due to stopped indexscans; those will not
+    * block other holders of the page-zero lock, who are only interested in
+    * acquiring bucket sharelocks themselves.  Exclusive bucket locks are
+    * only taken here and in hashbulkdelete, and neither of these operations
+    * needs any additional locks to complete.  (If, due to some flaw in this
+    * reasoning, we manage to deadlock anyway, it's okay to error out; the
+    * index will be left in a consistent state.)
+    */
+   _hash_getlock(rel, 0, HASH_EXCLUSIVE);
+
+   /* Write-lock the meta page */
+   _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
 
    metap = (HashMetaPage) BufferGetPage(metabuf);
    _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
-   _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
+   /*
+    * Check to see if split is still needed; someone else might have already
+    * done one while we waited for the lock.
+    *
+    * Make sure this stays in sync with_hash_doinsert()
+    */
+   if (metap->hashm_ntuples <=
+       (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1))
+       goto fail;
 
-   new_bucket = ++metap->hashm_maxbucket;
+   /*
+    * Determine which bucket is to be split, and attempt to lock the old
+    * bucket.  If we can't get the lock, give up.
+    *
+    * The lock protects us against other backends, but not against our own
+    * backend.  Must check for active scans separately.
+    *
+    * Ideally we would lock the new bucket too before proceeding, but if
+    * we are about to cross a splitpoint then the BUCKET_TO_BLKNO mapping
+    * isn't correct yet.  For simplicity we update the metapage first and
+    * then lock.  This should be okay because no one else should be trying
+    * to lock the new bucket yet...
+    */
+   new_bucket = metap->hashm_maxbucket + 1;
    old_bucket = (new_bucket & metap->hashm_lowmask);
 
+   start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket);
+
+   if (_hash_has_active_scan(rel, old_bucket))
+       goto fail;
+
+   if (!_hash_try_getlock(rel, start_oblkno, HASH_EXCLUSIVE))
+       goto fail;
+
+   /*
+    * Okay to proceed with split.  Update the metapage bucket mapping info.
+    */
+   metap->hashm_maxbucket = new_bucket;
+
    if (new_bucket > metap->hashm_highmask)
    {
        /* Starting a new doubling */
@@ -379,7 +437,7 @@ _hash_expandtable(Relation rel, Buffer metabuf)
     * this new batch of bucket pages.
     *
     * XXX should initialize new bucket pages to prevent out-of-order
-    * page creation.
+    * page creation?  Don't wanna do it right here though.
     */
    spare_ndx = _hash_log2(metap->hashm_maxbucket + 1);
    if (spare_ndx > metap->hashm_ovflpoint)
@@ -389,10 +447,50 @@ _hash_expandtable(Relation rel, Buffer metabuf)
        metap->hashm_ovflpoint = spare_ndx;
    }
 
-   _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
+   /* now we can compute the new bucket's primary block number */
+   start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket);
+
+   Assert(!_hash_has_active_scan(rel, new_bucket));
+
+   if (!_hash_try_getlock(rel, start_nblkno, HASH_EXCLUSIVE))
+       elog(PANIC, "could not get lock on supposedly new bucket");
+
+   /*
+    * Copy bucket mapping info now; this saves re-accessing the meta page
+    * inside _hash_splitbucket's inner loop.  Note that once we drop the
+    * split lock, other splits could begin, so these values might be out of
+    * date before _hash_splitbucket finishes.  That's okay, since all it
+    * needs is to tell which of these two buckets to map hashkeys into.
+    */
+   maxbucket = metap->hashm_maxbucket;
+   highmask = metap->hashm_highmask;
+   lowmask = metap->hashm_lowmask;
+
+   /* Write out the metapage and drop lock, but keep pin */
+   _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
+
+   /* Release split lock; okay for other splits to occur now */
+   _hash_droplock(rel, 0, HASH_EXCLUSIVE);
 
    /* Relocate records to the new bucket */
-   _hash_splitbucket(rel, metabuf, old_bucket, new_bucket);
+   _hash_splitbucket(rel, metabuf, old_bucket, new_bucket,
+                     start_oblkno, start_nblkno,
+                     maxbucket, highmask, lowmask);
+
+   /* Release bucket locks, allowing others to access them */
+   _hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
+   _hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
+
+   return;
+
+   /* Here if decide not to split or fail to acquire old bucket lock */
+fail:
+
+   /* We didn't write the metapage, so just drop lock */
+   _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+   /* Release split lock */
+   _hash_droplock(rel, 0, HASH_EXCLUSIVE);
 }
 
 
@@ -403,27 +501,35 @@ _hash_expandtable(Relation rel, Buffer metabuf)
  * or more overflow (bucket chain) pages.  We must relocate tuples that
  * belong in the new bucket, and compress out any free space in the old
  * bucket.
+ *
+ * The caller must hold exclusive locks on both buckets to ensure that
+ * no one else is trying to access them (see README).
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.  (The metapage is only
+ * touched if it becomes necessary to add or remove overflow pages.)
  */
 static void
 _hash_splitbucket(Relation rel,
                  Buffer metabuf,
                  Bucket obucket,
-                 Bucket nbucket)
+                 Bucket nbucket,
+                 BlockNumber start_oblkno,
+                 BlockNumber start_nblkno,
+                 uint32 maxbucket,
+                 uint32 highmask,
+                 uint32 lowmask)
 {
    Bucket      bucket;
    Buffer      obuf;
    Buffer      nbuf;
-   Buffer      ovflbuf;
    BlockNumber oblkno;
    BlockNumber nblkno;
-   BlockNumber start_oblkno;
-   BlockNumber start_nblkno;
    bool        null;
    Datum       datum;
    HashItem    hitem;
    HashPageOpaque oopaque;
    HashPageOpaque nopaque;
-   HashMetaPage metap;
    IndexTuple  itup;
    Size        itemsz;
    OffsetNumber ooffnum;
@@ -433,12 +539,11 @@ _hash_splitbucket(Relation rel,
    Page        npage;
    TupleDesc   itupdesc = RelationGetDescr(rel);
 
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
-
-   /* get the buffers & pages */
-   start_oblkno = BUCKET_TO_BLKNO(metap, obucket);
-   start_nblkno = BUCKET_TO_BLKNO(metap, nbucket);
+   /*
+    * It should be okay to simultaneously write-lock pages from each
+    * bucket, since no one else can be trying to acquire buffer lock
+    * on pages of either bucket.
+    */
    oblkno = start_oblkno;
    nblkno = start_nblkno;
    obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
@@ -446,7 +551,10 @@ _hash_splitbucket(Relation rel,
    opage = BufferGetPage(obuf);
    npage = BufferGetPage(nbuf);
 
-   /* initialize the new bucket page */
+   _hash_checkpage(rel, opage, LH_BUCKET_PAGE);
+   oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+   /* initialize the new bucket's primary page */
    _hash_pageinit(npage, BufferGetPageSize(nbuf));
    nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
    nopaque->hasho_prevblkno = InvalidBlockNumber;
@@ -454,44 +562,11 @@ _hash_splitbucket(Relation rel,
    nopaque->hasho_bucket = nbucket;
    nopaque->hasho_flag = LH_BUCKET_PAGE;
    nopaque->hasho_filler = HASHO_FILL;
-   _hash_wrtnorelbuf(nbuf);
-
-   /*
-    * make sure the old bucket isn't empty.  advance 'opage' and friends
-    * through the overflow bucket chain until we find a non-empty page.
-    *
-    * XXX we should only need this once, if we are careful to preserve the
-    * invariant that overflow pages are never empty.
-    */
-   _hash_checkpage(rel, opage, LH_BUCKET_PAGE);
-   oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-   if (PageIsEmpty(opage))
-   {
-       oblkno = oopaque->hasho_nextblkno;
-       _hash_relbuf(rel, obuf, HASH_WRITE);
-       if (!BlockNumberIsValid(oblkno))
-       {
-           /*
-            * the old bucket is completely empty; of course, the new
-            * bucket will be as well, but since it's a base bucket page
-            * we don't care.
-            */
-           _hash_relbuf(rel, nbuf, HASH_WRITE);
-           return;
-       }
-       obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
-       opage = BufferGetPage(obuf);
-       _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
-       if (PageIsEmpty(opage))
-           elog(ERROR, "empty hash overflow page %u", oblkno);
-       oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-   }
 
    /*
-    * we are now guaranteed that 'opage' is not empty.  partition the
-    * tuples in the old bucket between the old bucket and the new bucket,
-    * advancing along their respective overflow bucket chains and adding
-    * overflow pages as needed.
+    * Partition the tuples in the old bucket between the old bucket and the
+    * new bucket, advancing along the old bucket's overflow bucket chain
+    * and adding overflow pages to the new bucket as needed.
     */
    ooffnum = FirstOffsetNumber;
    omaxoffnum = PageGetMaxOffsetNumber(opage);
@@ -505,48 +580,39 @@ _hash_splitbucket(Relation rel,
        /* check if we're at the end of the page */
        if (ooffnum > omaxoffnum)
        {
-           /* at end of page, but check for overflow page */
+           /* at end of page, but check for an(other) overflow page */
            oblkno = oopaque->hasho_nextblkno;
-           if (BlockNumberIsValid(oblkno))
-           {
-               /*
-                * we ran out of tuples on this particular page, but we
-                * have more overflow pages; re-init values.
-                */
-               _hash_wrtbuf(rel, obuf);
-               obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
-               opage = BufferGetPage(obuf);
-               _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
-               oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-               /* we're guaranteed that an ovfl page has at least 1 tuple */
-               if (PageIsEmpty(opage))
-                   elog(ERROR, "empty hash overflow page %u", oblkno);
-               ooffnum = FirstOffsetNumber;
-               omaxoffnum = PageGetMaxOffsetNumber(opage);
-           }
-           else
-           {
-               /*
-                * We're at the end of the bucket chain, so now we're
-                * really done with everything.  Before quitting, call
-                * _hash_squeezebucket to ensure the tuples remaining in the
-                * old bucket (including the overflow pages) are packed as
-                * tightly as possible.  The new bucket is already tight.
-                */
-               _hash_wrtbuf(rel, obuf);
-               _hash_wrtbuf(rel, nbuf);
-               _hash_squeezebucket(rel, obucket, start_oblkno);
-               return;
-           }
+           if (!BlockNumberIsValid(oblkno))
+               break;
+           /*
+            * we ran out of tuples on this particular page, but we
+            * have more overflow pages; advance to next page.
+            */
+           _hash_wrtbuf(rel, obuf);
+
+           obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
+           opage = BufferGetPage(obuf);
+           _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
+           oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+           ooffnum = FirstOffsetNumber;
+           omaxoffnum = PageGetMaxOffsetNumber(opage);
+           continue;
        }
 
-       /* hash on the tuple */
+       /*
+        * Re-hash the tuple to determine which bucket it now belongs in.
+        *
+        * It is annoying to call the hash function while holding locks,
+        * but releasing and relocking the page for each tuple is unappealing
+        * too.
+        */
        hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
        itup = &(hitem->hash_itup);
        datum = index_getattr(itup, 1, itupdesc, &null);
        Assert(!null);
 
-       bucket = _hash_call(rel, metap, datum);
+       bucket = _hash_hashkey2bucket(_hash_datum2hashkey(rel, datum),
+                                     maxbucket, highmask, lowmask);
 
        if (bucket == nbucket)
        {
@@ -562,11 +628,13 @@ _hash_splitbucket(Relation rel,
 
            if (PageGetFreeSpace(npage) < itemsz)
            {
-               ovflbuf = _hash_addovflpage(rel, metabuf, nbuf);
-               _hash_wrtbuf(rel, nbuf);
-               nbuf = ovflbuf;
+               /* write out nbuf and drop lock, but keep pin */
+               _hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
+               /* chain to a new overflow page */
+               nbuf = _hash_addovflpage(rel, metabuf, nbuf);
                npage = BufferGetPage(nbuf);
-               _hash_checkpage(rel, npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+               _hash_checkpage(rel, npage, LH_OVERFLOW_PAGE);
+               /* we don't need nopaque within the loop */
            }
 
            noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
@@ -574,7 +642,6 @@ _hash_splitbucket(Relation rel,
                == InvalidOffsetNumber)
                elog(ERROR, "failed to add index item to \"%s\"",
                     RelationGetRelationName(rel));
-           _hash_wrtnorelbuf(nbuf);
 
            /*
             * now delete the tuple from the old bucket.  after this
@@ -586,40 +653,7 @@ _hash_splitbucket(Relation rel,
             * instead of calling PageGetMaxOffsetNumber.
             */
            PageIndexTupleDelete(opage, ooffnum);
-           _hash_wrtnorelbuf(obuf);
            omaxoffnum = OffsetNumberPrev(omaxoffnum);
-
-           /*
-            * tidy up.  if the old page was an overflow page and it is
-            * now empty, we must free it (we want to preserve the
-            * invariant that overflow pages cannot be empty).
-            */
-           if (PageIsEmpty(opage) &&
-               (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
-           {
-               oblkno = _hash_freeovflpage(rel, obuf);
-
-               /* check that we're not through the bucket chain */
-               if (!BlockNumberIsValid(oblkno))
-               {
-                   _hash_wrtbuf(rel, nbuf);
-                   _hash_squeezebucket(rel, obucket, start_oblkno);
-                   return;
-               }
-
-               /*
-                * re-init. again, we're guaranteed that an ovfl page has
-                * at least one tuple.
-                */
-               obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
-               opage = BufferGetPage(obuf);
-               _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
-               oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-               if (PageIsEmpty(opage))
-                   elog(ERROR, "empty hash overflow page %u", oblkno);
-               ooffnum = FirstOffsetNumber;
-               omaxoffnum = PageGetMaxOffsetNumber(opage);
-           }
        }
        else
        {
@@ -632,5 +666,15 @@ _hash_splitbucket(Relation rel,
            ooffnum = OffsetNumberNext(ooffnum);
        }
    }
-   /* NOTREACHED */
+
+   /*
+    * We're at the end of the old bucket chain, so we're done partitioning
+    * the tuples.  Before quitting, call _hash_squeezebucket to ensure the
+    * tuples remaining in the old bucket (including the overflow pages) are
+    * packed as tightly as possible.  The new bucket is already tight.
+    */
+   _hash_wrtbuf(rel, obuf);
+   _hash_wrtbuf(rel, nbuf);
+
+   _hash_squeezebucket(rel, obucket, start_oblkno);
 }
index a0b124cbee40d24ea6ae0c974dd7f9e8087b2096..35ac0622b5051ecaf94ec36b88fc85bef4deadf4 100644 (file)
@@ -8,22 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.30 2003/08/04 02:39:57 momjian Exp $
- *
- * NOTES
- *   Because we can be doing an index scan on a relation while we
- *   update it, we need to avoid missing data that moves around in
- *   the index.  The routines and global variables in this file
- *   guarantee that all scans in the local address space stay
- *   correctly positioned.  This is all we need to worry about, since
- *   write locking guarantees that no one else will be on the same
- *   page at the same time as we are.
- *
- *   The scheme is to manage a list of active scans in the current
- *   backend.  Whenever we add or remove records from an index, we
- *   check the list of active scans to see if any has been affected.
- *   A scan is affected only if it is on the same relation, and the
- *   same page, as the update.
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.31 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -44,10 +29,6 @@ typedef HashScanListData *HashScanList;
 static HashScanList HashScans = (HashScanList) NULL;
 
 
-static void _hash_scandel(IndexScanDesc scan,
-             BlockNumber blkno, OffsetNumber offno);
-
-
 /*
  * AtEOXact_hash() --- clean up hash subsystem at xact abort or commit.
  *
@@ -67,9 +48,6 @@ AtEOXact_hash(void)
     * at end of transaction anyway.
     */
    HashScans = NULL;
-
-   /* If we were building a hash, we ain't anymore. */
-   BuildingHash = false;
 }
 
 /*
@@ -112,70 +90,26 @@ _hash_dropscan(IndexScanDesc scan)
    pfree(chk);
 }
 
-void
-_hash_adjscans(Relation rel, ItemPointer tid)
+/*
+ * Is there an active scan in this bucket?
+ */
+bool
+_hash_has_active_scan(Relation rel, Bucket bucket)
 {
+   Oid         relid = RelationGetRelid(rel);
    HashScanList l;
-   Oid         relid;
 
-   relid = RelationGetRelid(rel);
-   for (l = HashScans; l != (HashScanList) NULL; l = l->hashsl_next)
+   for (l = HashScans; l != NULL; l = l->hashsl_next)
    {
        if (relid == l->hashsl_scan->indexRelation->rd_id)
-           _hash_scandel(l->hashsl_scan, ItemPointerGetBlockNumber(tid),
-                         ItemPointerGetOffsetNumber(tid));
-   }
-}
+       {
+           HashScanOpaque so = (HashScanOpaque) l->hashsl_scan->opaque;
 
-static void
-_hash_scandel(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno)
-{
-   ItemPointer current;
-   ItemPointer mark;
-   Buffer      buf;
-   Buffer      metabuf;
-   HashScanOpaque so;
-
-   so = (HashScanOpaque) scan->opaque;
-   current = &(scan->currentItemData);
-   mark = &(scan->currentMarkData);
-
-   if (ItemPointerIsValid(current)
-       && ItemPointerGetBlockNumber(current) == blkno
-       && ItemPointerGetOffsetNumber(current) >= offno)
-   {
-       metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
-       buf = so->hashso_curbuf;
-       _hash_step(scan, &buf, BackwardScanDirection, metabuf);
+           if (so->hashso_bucket_valid &&
+               so->hashso_bucket == bucket)
+               return true;
+       }
    }
 
-   if (ItemPointerIsValid(mark)
-       && ItemPointerGetBlockNumber(mark) == blkno
-       && ItemPointerGetOffsetNumber(mark) >= offno)
-   {
-       /*
-        * The idea here is to exchange the current and mark positions,
-        * then step backwards (affecting current), then exchange again.
-        */
-       ItemPointerData tmpitem;
-       Buffer      tmpbuf;
-
-       tmpitem = *mark;
-       *mark = *current;
-       *current = tmpitem;
-       tmpbuf = so->hashso_mrkbuf;
-       so->hashso_mrkbuf = so->hashso_curbuf;
-       so->hashso_curbuf = tmpbuf;
-
-       metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
-       buf = so->hashso_curbuf;
-       _hash_step(scan, &buf, BackwardScanDirection, metabuf);
-
-       tmpitem = *mark;
-       *mark = *current;
-       *current = tmpitem;
-       tmpbuf = so->hashso_mrkbuf;
-       so->hashso_mrkbuf = so->hashso_curbuf;
-       so->hashso_curbuf = tmpbuf;
-   }
+   return false;
 }
index c5321e4b6b479c59cca7fefa0a90370f60028966..d8982ffdbc9a1f8f92e82cf4dc377ef17f026045 100644 (file)
@@ -8,55 +8,16 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.33 2003/09/02 18:13:31 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.34 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "storage/lmgr.h"
 
 
-/*
- * _hash_search() -- Find the bucket that contains the scankey
- *     and fetch its primary bucket page into *bufP.
- *
- * the buffer has a read lock.
- */
-void
-_hash_search(Relation rel,
-            int keysz,
-            ScanKey scankey,
-            Buffer *bufP,
-            HashMetaPage metap)
-{
-   BlockNumber blkno;
-   Bucket      bucket;
-
-   if (scankey == NULL ||
-       (scankey[0].sk_flags & SK_ISNULL))
-   {
-       /*
-        * If the scankey is empty, all tuples will satisfy the
-        * scan so we start the scan at the first bucket (bucket 0).
-        *
-        * If the scankey is NULL, no tuples will satisfy the search;
-        * this should have been checked already, but arbitrarily return
-        * bucket zero.
-        */
-       bucket = 0;
-   }
-   else
-   {
-       bucket = _hash_call(rel, metap, scankey[0].sk_argument);
-   }
-
-   blkno = BUCKET_TO_BLKNO(metap, bucket);
-
-   *bufP = _hash_getbuf(rel, blkno, HASH_READ);
-}
-
 /*
  * _hash_next() -- Get the next item in a scan.
  *
@@ -69,31 +30,23 @@ _hash_search(Relation rel,
 bool
 _hash_next(IndexScanDesc scan, ScanDirection dir)
 {
-   Relation    rel;
+   Relation    rel = scan->indexRelation;
+   HashScanOpaque so = (HashScanOpaque) scan->opaque;
    Buffer      buf;
-   Buffer      metabuf;
    Page        page;
    OffsetNumber offnum;
    ItemPointer current;
    HashItem    hitem;
    IndexTuple  itup;
-   HashScanOpaque so;
-
-   rel = scan->indexRelation;
-   so = (HashScanOpaque) scan->opaque;
 
-   /* we still have the buffer pinned and locked */
+   /* we still have the buffer pinned and read-locked */
    buf = so->hashso_curbuf;
    Assert(BufferIsValid(buf));
 
-   metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
-
    /*
-    * step to next valid tuple.  note that _hash_step releases our lock
-    * on 'metabuf'; if we switch to a new 'buf' while looking for the
-    * next tuple, we come back with a lock on that buffer.
+    * step to next valid tuple.
     */
-   if (!_hash_step(scan, &buf, dir, metabuf))
+   if (!_hash_step(scan, &buf, dir))
        return false;
 
    /* if we're here, _hash_step found a valid tuple */
@@ -108,6 +61,9 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
    return true;
 }
 
+/*
+ * Advance to next page in a bucket, if any.
+ */
 static void
 _hash_readnext(Relation rel,
               Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
@@ -115,7 +71,7 @@ _hash_readnext(Relation rel,
    BlockNumber blkno;
 
    blkno = (*opaquep)->hasho_nextblkno;
-   _hash_relbuf(rel, *bufp, HASH_READ);
+   _hash_relbuf(rel, *bufp);
    *bufp = InvalidBuffer;
    if (BlockNumberIsValid(blkno))
    {
@@ -123,10 +79,12 @@ _hash_readnext(Relation rel,
        *pagep = BufferGetPage(*bufp);
        _hash_checkpage(rel, *pagep, LH_OVERFLOW_PAGE);
        *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
-       Assert(!PageIsEmpty(*pagep));
    }
 }
 
+/*
+ * Advance to previous page in a bucket, if any.
+ */
 static void
 _hash_readprev(Relation rel,
               Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
@@ -134,7 +92,7 @@ _hash_readprev(Relation rel,
    BlockNumber blkno;
 
    blkno = (*opaquep)->hasho_prevblkno;
-   _hash_relbuf(rel, *bufp, HASH_READ);
+   _hash_relbuf(rel, *bufp);
    *bufp = InvalidBuffer;
    if (BlockNumberIsValid(blkno))
    {
@@ -142,28 +100,26 @@ _hash_readprev(Relation rel,
        *pagep = BufferGetPage(*bufp);
        _hash_checkpage(rel, *pagep, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
        *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
-       if (PageIsEmpty(*pagep))
-       {
-           Assert((*opaquep)->hasho_flag & LH_BUCKET_PAGE);
-           _hash_relbuf(rel, *bufp, HASH_READ);
-           *bufp = InvalidBuffer;
-       }
    }
 }
 
 /*
  * _hash_first() -- Find the first item in a scan.
  *
- *     Find the first item in the tree that
+ *     Find the first item in the index that
  *     satisfies the qualification associated with the scan descriptor. On
- *     exit, the page containing the current index tuple is read locked
+ *     success, the page containing the current index tuple is read locked
  *     and pinned, and the scan's opaque data entry is updated to
  *     include the buffer.
  */
 bool
 _hash_first(IndexScanDesc scan, ScanDirection dir)
 {
-   Relation    rel;
+   Relation    rel = scan->indexRelation;
+   HashScanOpaque so = (HashScanOpaque) scan->opaque;
+   uint32      hashkey;
+   Bucket      bucket;
+   BlockNumber blkno;
    Buffer      buf;
    Buffer      metabuf;
    Page        page;
@@ -173,70 +129,89 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
    IndexTuple  itup;
    ItemPointer current;
    OffsetNumber offnum;
-   HashScanOpaque so;
 
-   rel = scan->indexRelation;
-   so = (HashScanOpaque) scan->opaque;
    current = &(scan->currentItemData);
+   ItemPointerSetInvalid(current);
+
+   /*
+    * We do not support hash scans with no index qualification, because
+    * we would have to read the whole index rather than just one bucket.
+    * That creates a whole raft of problems, since we haven't got a
+    * practical way to lock all the buckets against splits or compactions.
+    */
+   if (scan->numberOfKeys < 1)
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("hash indexes do not support whole-index scans")));
+
+   /*
+    * If the constant in the index qual is NULL, assume it cannot match
+    * any items in the index.
+    */
+   if (scan->keyData[0].sk_flags & SK_ISNULL)
+       return false;
+
+   /*
+    * Okay to compute the hash key.  We want to do this before acquiring
+    * any locks, in case a user-defined hash function happens to be slow.
+    */
+   hashkey = _hash_datum2hashkey(rel, scan->keyData[0].sk_argument);
 
+   /*
+    * Acquire shared split lock so we can compute the target bucket
+    * safely (see README).
+    */
+   _hash_getlock(rel, 0, HASH_SHARE);
+
+   /* Read the metapage */
    metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
    metap = (HashMetaPage) BufferGetPage(metabuf);
    _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
    /*
-    * XXX -- The attribute number stored in the scan key is the attno in
-    * the heap relation.  We need to transmogrify this into the index
-    * relation attno here.  For the moment, we have hardwired attno == 1.
+    * Compute the target bucket number, and convert to block number.
     */
+   bucket = _hash_hashkey2bucket(hashkey,
+                                 metap->hashm_maxbucket,
+                                 metap->hashm_highmask,
+                                 metap->hashm_lowmask);
+
+   blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-   /* find the correct bucket page and load it into buf */
-   _hash_search(rel, 1, scan->keyData, &buf, metap);
+   /* done with the metapage */
+   _hash_relbuf(rel, metabuf);
+
+   /*
+    * Acquire share lock on target bucket; then we can release split lock.
+    */
+   _hash_getlock(rel, blkno, HASH_SHARE);
+
+   _hash_droplock(rel, 0, HASH_SHARE);
+
+   /* Update scan opaque state to show we have lock on the bucket */
+   so->hashso_bucket = bucket;
+   so->hashso_bucket_valid = true;
+   so->hashso_bucket_blkno = blkno;
+
+   /* Fetch the primary bucket page for the bucket */
+   buf = _hash_getbuf(rel, blkno, HASH_READ);
    page = BufferGetPage(buf);
    _hash_checkpage(rel, page, LH_BUCKET_PAGE);
    opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+   Assert(opaque->hasho_bucket == bucket);
 
-   /*
-    * if we are scanning forward, we need to find the first non-empty
-    * page (if any) in the bucket chain.  since overflow pages are never
-    * empty, this had better be either the bucket page or the first
-    * overflow page.
-    *
-    * if we are scanning backward, we always go all the way to the end of
-    * the bucket chain.
-    */
-   if (PageIsEmpty(page))
-   {
-       if (BlockNumberIsValid(opaque->hasho_nextblkno))
-           _hash_readnext(rel, &buf, &page, &opaque);
-       else
-       {
-           ItemPointerSetInvalid(current);
-           so->hashso_curbuf = InvalidBuffer;
-
-           /*
-            * If there is no scankeys, all tuples will satisfy the scan -
-            * so we continue in _hash_step to get tuples from all
-            * buckets. - vadim 04/29/97
-            */
-           if (scan->numberOfKeys >= 1)
-           {
-               _hash_relbuf(rel, buf, HASH_READ);
-               _hash_relbuf(rel, metabuf, HASH_READ);
-               return false;
-           }
-       }
-   }
+   /* If a backwards scan is requested, move to the end of the chain */
    if (ScanDirectionIsBackward(dir))
    {
        while (BlockNumberIsValid(opaque->hasho_nextblkno))
            _hash_readnext(rel, &buf, &page, &opaque);
    }
 
-   if (!_hash_step(scan, &buf, dir, metabuf))
+   /* Now find the first tuple satisfying the qualification */
+   if (!_hash_step(scan, &buf, dir))
        return false;
 
    /* if we're here, _hash_step found a valid tuple */
-   current = &(scan->currentItemData);
    offnum = ItemPointerGetOffsetNumber(current);
    page = BufferGetPage(buf);
    _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
@@ -254,19 +229,16 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
  *     false.  Else, return true and set the CurrentItemData for the
  *     scan to the right thing.
  *
- *     'bufP' points to the buffer which contains the current page
- *     that we'll step through.
- *
- *     'metabuf' is released when this returns.
+ *     'bufP' points to the current buffer, which is pinned and read-locked.
+ *     On success exit, we have pin and read-lock on whichever page
+ *     contains the right item; on failure, we have released all buffers.
  */
 bool
-_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
+_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 {
-   Relation    rel;
+   Relation    rel = scan->indexRelation;
+   HashScanOpaque so = (HashScanOpaque) scan->opaque;
    ItemPointer current;
-   HashScanOpaque so;
-   int         allbuckets;
-   HashMetaPage metap;
    Buffer      buf;
    Page        page;
    HashPageOpaque opaque;
@@ -277,18 +249,13 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
    HashItem    hitem;
    IndexTuple  itup;
 
-   rel = scan->indexRelation;
    current = &(scan->currentItemData);
-   so = (HashScanOpaque) scan->opaque;
-   allbuckets = (scan->numberOfKeys < 1);
-
-   metap = (HashMetaPage) BufferGetPage(metabuf);
-   _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
    buf = *bufP;
    page = BufferGetPage(buf);
    _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
    opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+   bucket = opaque->hasho_bucket;
 
    /*
     * If _hash_step is called from _hash_first, current will not be
@@ -309,107 +276,63 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
     */
    do
    {
-       bucket = opaque->hasho_bucket;
-
        switch (dir)
        {
            case ForwardScanDirection:
                if (offnum != InvalidOffsetNumber)
-               {
                    offnum = OffsetNumberNext(offnum);  /* move forward */
-               }
                else
-               {
                    offnum = FirstOffsetNumber; /* new page */
-               }
+
                while (offnum > maxoff)
                {
-
-                   /*--------
+                   /*
                     * either this page is empty
                     * (maxoff == InvalidOffsetNumber)
                     * or we ran off the end.
-                    *--------
                     */
                    _hash_readnext(rel, &buf, &page, &opaque);
-                   if (BufferIsInvalid(buf))
-                   {           /* end of chain */
-                       if (allbuckets && bucket < metap->hashm_maxbucket)
-                       {
-                           ++bucket;
-                           blkno = BUCKET_TO_BLKNO(metap, bucket);
-                           buf = _hash_getbuf(rel, blkno, HASH_READ);
-                           page = BufferGetPage(buf);
-                           _hash_checkpage(rel, page, LH_BUCKET_PAGE);
-                           opaque = (HashPageOpaque) PageGetSpecialPointer(page);
-                           Assert(opaque->hasho_bucket == bucket);
-                           while (PageIsEmpty(page) &&
-                            BlockNumberIsValid(opaque->hasho_nextblkno))
-                               _hash_readnext(rel, &buf, &page, &opaque);
-                           maxoff = PageGetMaxOffsetNumber(page);
-                           offnum = FirstOffsetNumber;
-                       }
-                       else
-                       {
-                           maxoff = offnum = InvalidOffsetNumber;
-                           break;      /* while */
-                       }
-                   }
-                   else
+                   if (BufferIsValid(buf))
                    {
-                       /* _hash_readnext never returns an empty page */
                        maxoff = PageGetMaxOffsetNumber(page);
                        offnum = FirstOffsetNumber;
                    }
+                   else
+                   {
+                       /* end of bucket */
+                       maxoff = offnum = InvalidOffsetNumber;
+                       break;  /* exit while */
+                   }
                }
                break;
+
            case BackwardScanDirection:
                if (offnum != InvalidOffsetNumber)
-               {
                    offnum = OffsetNumberPrev(offnum);  /* move back */
-               }
                else
-               {
                    offnum = maxoff;    /* new page */
-               }
+
                while (offnum < FirstOffsetNumber)
                {
-
-                   /*---------
+                   /*
                     * either this page is empty
                     * (offnum == InvalidOffsetNumber)
                     * or we ran off the end.
-                    *---------
                     */
                    _hash_readprev(rel, &buf, &page, &opaque);
-                   if (BufferIsInvalid(buf))
-                   {           /* end of chain */
-                       if (allbuckets && bucket > 0)
-                       {
-                           --bucket;
-                           blkno = BUCKET_TO_BLKNO(metap, bucket);
-                           buf = _hash_getbuf(rel, blkno, HASH_READ);
-                           page = BufferGetPage(buf);
-                           _hash_checkpage(rel, page, LH_BUCKET_PAGE);
-                           opaque = (HashPageOpaque) PageGetSpecialPointer(page);
-                           Assert(opaque->hasho_bucket == bucket);
-                           while (BlockNumberIsValid(opaque->hasho_nextblkno))
-                               _hash_readnext(rel, &buf, &page, &opaque);
-                           maxoff = offnum = PageGetMaxOffsetNumber(page);
-                       }
-                       else
-                       {
-                           maxoff = offnum = InvalidOffsetNumber;
-                           break;      /* while */
-                       }
+                   if (BufferIsValid(buf))
+                   {
+                       maxoff = offnum = PageGetMaxOffsetNumber(page);
                    }
                    else
                    {
-                       /* _hash_readprev never returns an empty page */
-                       maxoff = offnum = PageGetMaxOffsetNumber(page);
+                       /* end of bucket */
+                       maxoff = offnum = InvalidOffsetNumber;
+                       break;  /* exit while */
                    }
                }
                break;
+
            default:
                /* NoMovementScanDirection */
                /* this should not be reached */
@@ -419,7 +342,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
        /* we ran off the end of the world without finding a match */
        if (offnum == InvalidOffsetNumber)
        {
-           _hash_relbuf(rel, metabuf, HASH_READ);
            *bufP = so->hashso_curbuf = InvalidBuffer;
            ItemPointerSetInvalid(current);
            return false;
@@ -431,7 +353,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
    } while (!_hash_checkqual(scan, itup));
 
    /* if we made it to here, we've found a valid tuple */
-   _hash_relbuf(rel, metabuf, HASH_READ);
    blkno = BufferGetBlockNumber(buf);
    *bufP = so->hashso_curbuf = buf;
    ItemPointerSet(current, blkno, offnum);
index ce62a3a84415167d75bcb21692f8982a5816f6d0..0cfbe5e7a12ce675355582d12eaea98c6ab4df76 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.35 2003/09/02 18:13:31 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.36 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/iqual.h"
 
 
-/*
- * _hash_mkscankey -- build a scan key matching the given indextuple
- *
- * Note: this is prepared for multiple index columns, but very little
- * else in access/hash is ...
- */
-ScanKey
-_hash_mkscankey(Relation rel, IndexTuple itup)
-{
-   ScanKey     skey;
-   TupleDesc   itupdesc = RelationGetDescr(rel);
-   int         natts = rel->rd_rel->relnatts;
-   AttrNumber  i;
-   Datum       arg;
-   FmgrInfo   *procinfo;
-   bool        isnull;
-
-   skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
-
-   for (i = 0; i < natts; i++)
-   {
-       arg = index_getattr(itup, i + 1, itupdesc, &isnull);
-       procinfo = index_getprocinfo(rel, i + 1, HASHPROC);
-       ScanKeyEntryInitializeWithInfo(&skey[i],
-                                      isnull ? SK_ISNULL : 0x0,
-                                      (AttrNumber) (i + 1),
-                                      procinfo,
-                                      CurrentMemoryContext,
-                                      arg);
-   }
-
-   return skey;
-}
-
-void
-_hash_freeskey(ScanKey skey)
-{
-   pfree(skey);
-}
-
 /*
  * _hash_checkqual -- does the index tuple satisfy the scan conditions?
  */
@@ -102,24 +62,31 @@ _hash_formitem(IndexTuple itup)
 }
 
 /*
- * _hash_call -- given a Datum, call the index's hash procedure
- *
- * Returns the bucket number that the hash key maps to.
+ * _hash_datum2hashkey -- given a Datum, call the index's hash procedure
  */
-Bucket
-_hash_call(Relation rel, HashMetaPage metap, Datum key)
+uint32
+_hash_datum2hashkey(Relation rel, Datum key)
 {
    FmgrInfo   *procinfo;
-   uint32      n;
-   Bucket      bucket;
 
    /* XXX assumes index has only one attribute */
    procinfo = index_getprocinfo(rel, 1, HASHPROC);
-   n = DatumGetUInt32(FunctionCall1(procinfo, key));
 
-   bucket = n & metap->hashm_highmask;
-   if (bucket > metap->hashm_maxbucket)
-       bucket = bucket & metap->hashm_lowmask;
+   return DatumGetUInt32(FunctionCall1(procinfo, key));
+}
+
+/*
+ * _hash_hashkey2bucket -- determine which bucket the hashkey maps to.
+ */
+Bucket
+_hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
+                    uint32 highmask, uint32 lowmask)
+{
+   Bucket      bucket;
+
+   bucket = hashkey & highmask;
+   if (bucket > maxbucket)
+       bucket = bucket & lowmask;
 
    return bucket;
 }
index 12845f5593d24b7454aa8f10f72caccfeb93b78e..c4fceb009658cebd66db53c7f8c4d36122975411 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.59 2003/08/17 22:41:12 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.60 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -153,7 +153,7 @@ LockRelation(Relation relation, LOCKMODE lockmode)
  * As above, but only lock if we can get the lock without blocking.
  * Returns TRUE iff the lock was acquired.
  *
- * NOTE: we do not currently need conditional versions of the other
+ * NOTE: we do not currently need conditional versions of all the
  * LockXXX routines in this file, but they could easily be added if needed.
  */
 bool
@@ -264,6 +264,26 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
        elog(ERROR, "LockAcquire failed");
 }
 
+/*
+ *     ConditionalLockPage
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE iff the lock was acquired.
+ */
+bool
+ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
+{
+   LOCKTAG     tag;
+
+   MemSet(&tag, 0, sizeof(tag));
+   tag.relId = relation->rd_lockInfo.lockRelId.relId;
+   tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
+   tag.objId.blkno = blkno;
+
+   return LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
+                      lockmode, true);
+}
+
 /*
  *     UnlockPage
  */
index 7edbdad09846399f81fa0623ab4957afa7842c75..beffa806ea1e72a7bf56f48bcd4288e94d81a4b8 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: hash.h,v 1.52 2003/09/02 18:13:32 tgl Exp $
+ * $Id: hash.h,v 1.53 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *     modeled after Margo Seltzer's hash implementation for unix.
@@ -70,13 +70,27 @@ typedef HashPageOpaqueData *HashPageOpaque;
 #define HASHO_FILL     0x1234
 
 /*
- * ScanOpaqueData is used to remember which buffers we're currently
- * examining in the scan.  We keep these buffers locked and pinned and
- * recorded in the opaque entry of the scan in order to avoid doing a
- * ReadBuffer() for every tuple in the index.
+ * HashScanOpaqueData is private state for a hash index scan.
  */
 typedef struct HashScanOpaqueData
 {
+   /*
+    * By definition, a hash scan should be examining only one bucket.
+    * We record the bucket number here as soon as it is known.
+    */
+   Bucket      hashso_bucket;
+   bool        hashso_bucket_valid;
+   /*
+    * If we have a share lock on the bucket, we record it here.  When
+    * hashso_bucket_blkno is zero, we have no such lock.
+    */
+   BlockNumber hashso_bucket_blkno;
+   /*
+    * We also want to remember which buffers we're currently examining in the
+    * scan. We keep these buffers pinned (but not locked) across hashgettuple
+    * calls, in order to avoid doing a ReadBuffer() for every tuple in the
+    * index.
+    */
    Buffer      hashso_curbuf;
    Buffer      hashso_mrkbuf;
 } HashScanOpaqueData;
@@ -148,10 +162,18 @@ typedef struct HashItemData
 
 typedef HashItemData *HashItem;
 
+/*
+ * Maximum size of a hash index item (it's okay to have only one per page)
+ */
+#define HashMaxItemSize(page) \
+   (PageGetPageSize(page) - \
+    sizeof(PageHeaderData) - \
+    MAXALIGN(sizeof(HashPageOpaqueData)) - \
+    sizeof(ItemIdData))
+
 /*
  * Constants
  */
-#define DEFAULT_FFACTOR            300
 #define BYTE_TO_BIT                3       /* 2^3 bits/byte */
 #define ALL_SET                    ((uint32) ~0)
 
@@ -180,10 +202,14 @@ typedef HashItemData *HashItem;
 #define ISSET(A, N)        ((A)[(N)/BITS_PER_MAP] & (1<<((N)%BITS_PER_MAP)))
 
 /*
- * page locking modes
+ * page-level and high-level locking modes (see README)
  */
-#define HASH_READ      0
-#define HASH_WRITE     1
+#define HASH_READ      BUFFER_LOCK_SHARE
+#define HASH_WRITE     BUFFER_LOCK_EXCLUSIVE
+#define HASH_NOLOCK        (-1)
+
+#define HASH_SHARE     ShareLock
+#define HASH_EXCLUSIVE ExclusiveLock
 
 /*
  * Strategy number. There's only one valid strategy for hashing: equality.
@@ -199,8 +225,6 @@ typedef HashItemData *HashItem;
 #define HASHPROC       1
 
 
-extern bool BuildingHash;
-
 /* public routines */
 
 extern Datum hashbuild(PG_FUNCTION_ARGS);
@@ -250,36 +274,37 @@ extern void _hash_squeezebucket(Relation rel,
                                Bucket bucket, BlockNumber bucket_blkno);
 
 /* hashpage.c */
-extern void _hash_metapinit(Relation rel);
+extern void _hash_getlock(Relation rel, BlockNumber whichlock, int access);
+extern bool _hash_try_getlock(Relation rel, BlockNumber whichlock, int access);
+extern void _hash_droplock(Relation rel, BlockNumber whichlock, int access);
 extern Buffer _hash_getbuf(Relation rel, BlockNumber blkno, int access);
-extern void _hash_relbuf(Relation rel, Buffer buf, int access);
+extern void _hash_relbuf(Relation rel, Buffer buf);
+extern void _hash_dropbuf(Relation rel, Buffer buf);
 extern void _hash_wrtbuf(Relation rel, Buffer buf);
-extern void _hash_wrtnorelbuf(Buffer buf);
+extern void _hash_wrtnorelbuf(Relation rel, Buffer buf);
 extern void _hash_chgbufaccess(Relation rel, Buffer buf, int from_access,
                   int to_access);
+extern void _hash_metapinit(Relation rel);
 extern void _hash_pageinit(Page page, Size size);
 extern void _hash_expandtable(Relation rel, Buffer metabuf);
 
 /* hashscan.c */
 extern void _hash_regscan(IndexScanDesc scan);
 extern void _hash_dropscan(IndexScanDesc scan);
-extern void _hash_adjscans(Relation rel, ItemPointer tid);
+extern bool _hash_has_active_scan(Relation rel, Bucket bucket);
 extern void AtEOXact_hash(void);
 
 /* hashsearch.c */
-extern void _hash_search(Relation rel, int keysz, ScanKey scankey,
-            Buffer *bufP, HashMetaPage metap);
 extern bool _hash_next(IndexScanDesc scan, ScanDirection dir);
 extern bool _hash_first(IndexScanDesc scan, ScanDirection dir);
-extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir,
-          Buffer metabuf);
+extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir);
 
 /* hashutil.c */
-extern ScanKey _hash_mkscankey(Relation rel, IndexTuple itup);
-extern void _hash_freeskey(ScanKey skey);
 extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
 extern HashItem _hash_formitem(IndexTuple itup);
-extern Bucket _hash_call(Relation rel, HashMetaPage metap, Datum key);
+extern uint32 _hash_datum2hashkey(Relation rel, Datum key);
+extern Bucket _hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
+                                  uint32 highmask, uint32 lowmask);
 extern uint32 _hash_log2(uint32 num);
 extern void _hash_checkpage(Relation rel, Page page, int flags);
 
index d7a557d2b5764bac63c4abaccfd856e1a6828586..19bda76d725c7f979dc7435850b0f00012d6137f 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lmgr.h,v 1.39 2003/08/04 02:40:14 momjian Exp $
+ * $Id: lmgr.h,v 1.40 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -54,8 +54,9 @@ extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
 extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
 
-/* Lock a page (mainly used for indices) */
+/* Lock a page (mainly used for indexes) */
 extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
+extern bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
 extern void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
 
 /* Lock an XID (used to wait for a transaction to finish) */