Duplicates handling...
authorVadim B. Mikheev
Tue, 10 Jun 1997 07:28:50 +0000 (07:28 +0000)
committerVadim B. Mikheev
Tue, 10 Jun 1997 07:28:50 +0000 (07:28 +0000)
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtsearch.c

index 890a921f6532bb0d6a07f6161c1184c8e18c262a..b1aa7342a9566c3a050a8e0090fa977f9b3aa2b4 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.15 1997/06/06 03:11:42 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.16 1997/06/10 07:28:47 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
 #endif
 
 static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
-static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem);
+static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright);
 static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
 static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
 static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
 static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
 static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
+#if 0
 static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey);
+#endif
 
 /*
  *  _bt_doinsert() -- Handle insertion of a single btitem in the tree.
@@ -52,14 +54,13 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
     BTStack stack;
     Buffer buf;
     BlockNumber blkno;
-    int natts;
+    int natts = rel->rd_rel->relnatts;
     InsertIndexResult res;
     
     itup = &(btitem->bti_itup);
     
     /* we need a scan key to do our search, so build one */
     itup_scankey = _bt_mkscankey(rel, itup);
-    natts = rel->rd_rel->relnatts;
     
     /* find the page containing this key */
     stack = _bt_search(rel, natts, itup_scankey, &buf);
@@ -223,17 +224,13 @@ _bt_insertonpg(Relation rel,
 {
     InsertIndexResult res;
     Page page;
-    Buffer rbuf;
-    Buffer pbuf;
-    Page rpage;
-    BTItem ritem;
     BTPageOpaque lpageop;
-    BTPageOpaque rpageop;
-    BlockNumber rbknum, itup_blkno;
+    BlockNumber itup_blkno;
     OffsetNumber itup_off;
+    OffsetNumber firstright = InvalidOffsetNumber;
     int itemsz;
-    Page ppage;
-    BTPageOpaque ppageop;
+    bool do_split = false;
+    bool keys_equal = false;
     
     page = BufferGetPage(buf);
     lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -248,8 +245,9 @@ _bt_insertonpg(Relation rel,
      * page in the chain of duplicates then:
      * 1. if scankey == hikey (i.e. - new duplicate item) then
      *    insert it here;
-     * 2. if scankey < hikey then we grab new page, copy current page
-     *    content there and insert new item on the current page.
+     * 2. if scankey < hikey then:
+     *    2.a if there is duplicate key(s) here - we force splitting;
+     *    2.b else - we may "eat" this page from duplicates chain.
      */
     if ( lpageop->btpo_flags & BTP_CHAIN )
     {
@@ -274,26 +272,105 @@ _bt_insertonpg(Relation rel,
            if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, 
                            BTLessStrategyNumber) )
            elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
-       return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem));
+       if ( maxoff > P_HIKEY ) /* have duplicate(s) */
+       {
+           firstright = P_FIRSTKEY;
+           do_split = true;
+       }
+       else            /* "eat" page */
+       {
+           Buffer pbuf;
+           Page ppage;
+           
+           itup_blkno = BufferGetBlockNumber(buf);
+           itup_off = PageAddItem(page, (Item) btitem, itemsz, 
+                           P_FIRSTKEY, LP_USED);
+           if ( itup_off == InvalidOffsetNumber )
+               elog (FATAL, "btree: failed to add item");
+           lpageop->btpo_flags &= ~BTP_CHAIN;
+           pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+           ppage = BufferGetPage(pbuf);
+               PageIndexTupleDelete(ppage, stack->bts_offset);
+       pfree(stack->bts_btitem);
+       stack->bts_btitem = _bt_formitem(&(btitem->bti_itup));
+       ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
+                  itup_blkno, P_HIKEY);
+       _bt_wrtbuf(rel, buf);
+       res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
+                   keysz, scankey, stack->bts_btitem,
+                   NULL);
+           ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+           return (res);
+       }
    }
+   else
+   {
+       keys_equal = true;
+           if ( PageGetFreeSpace(page) < itemsz )
+               do_split = true;
+       }
+    }
+    else if ( PageGetFreeSpace(page) < itemsz )
+       do_split = true;
+    else if ( PageGetFreeSpace(page) < 3*itemsz + 2*sizeof(ItemIdData) )
+    {
+       OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY;
+       OffsetNumber maxoff = PageGetMaxOffsetNumber (page);
+       ItemId itid;
+       BTItem previtem, chkitem;
+       Size maxsize;
+       Size currsize;
+       
+       itid = PageGetItemId(page, offnum);
+       previtem = (BTItem) PageGetItem(page, itid);
+       maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData));
+       for (offnum = OffsetNumberNext(offnum);
+           offnum <= maxoff; offnum = OffsetNumberNext(offnum) )
+   {
+           itid = PageGetItemId(page, offnum);
+       chkitem = (BTItem) PageGetItem(page, itid);
+           if ( !_bt_itemcmp (rel, keysz, previtem, chkitem,
+                                   BTEqualStrategyNumber) )
+           {
+               if ( currsize > maxsize )
+                   maxsize = currsize;
+               currsize = 0;
+               previtem = chkitem;
+           }
+           currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData));
+   }
+       if ( currsize > maxsize )
+           maxsize = currsize;
+       maxsize += sizeof (PageHeaderData) + 
+               DOUBLEALIGN (sizeof (BTPageOpaqueData));
+       if ( maxsize >= PageGetPageSize (page) / 2 )
+           do_split = true;
     }
-
     
-    if (PageGetFreeSpace(page) < itemsz)
+    if ( do_split )
     {
+       Buffer rbuf;
+       Page rpage;
+       BTItem ritem;
+       BlockNumber rbknum;
+       BTPageOpaque rpageop;
+       Buffer pbuf;
+       Page ppage;
+       BTPageOpaque ppageop;
        BlockNumber bknum = BufferGetBlockNumber(buf);
        BTItem lowLeftItem;
-       BTItem hiRightItem = NULL;
+       OffsetNumber maxoff;
+       bool shifted = false;
+       bool left_chained = ( lpageop->btpo_flags & BTP_CHAIN ) ? true : false;
    
    /* 
-    * If we have to split leaf page in the chain of duplicates
-    * then we try to look at our right sibling first.
+    * If we have to split leaf page in the chain of duplicates by
+    * new duplicate then we try to look at our right sibling first.
     */
    if ( ( lpageop->btpo_flags & BTP_CHAIN ) &&
-           ( lpageop->btpo_flags & BTP_LEAF ) )
+       ( lpageop->btpo_flags & BTP_LEAF ) && keys_equal )
    {
            bool use_left = true;
-           bool keys_equal = false;
            
        rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
        rpage = BufferGetPage(rbuf);
@@ -309,29 +386,20 @@ _bt_insertonpg(Relation rel,
                {
                    if ( !( rpageop->btpo_flags & BTP_CHAIN ) )
                        elog (FATAL, "btree: lost page in the chain of duplicates");
-                   keys_equal = true;
                }
                else if ( _bt_skeycmp (rel, keysz, scankey, rpage, 
                        PageGetItemId(rpage, P_HIKEY), 
                        BTGreaterStrategyNumber) )
                    elog (FATAL, "btree: hikey is out of order");
-           /*
-            * If hikey > scankey and BTP_CHAIN is ON 
-        * then it's first page of the chain of higher keys:
-            * our left sibling hikey was lying! We can't add new 
-            * item here, but we can turn BTP_CHAIN off on our
-            * left page and overwrite its hikey.
+               else if ( rpageop->btpo_flags & BTP_CHAIN )
+           /* 
+            * If hikey > scankey then it's last page in chain and
+            * BTP_CHAIN must be OFF 
             */
-           if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) )
-           {
-               BTItem tmp;
-               
-               lpageop->btpo_flags &= ~BTP_CHAIN;
-               tmp = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY));
-               hiRightItem = _bt_formitem(&(tmp->bti_itup));
-               }
+                   elog (FATAL, "btree: lost last page in the chain of duplicates");
+               
                /* if there is room here then we use this page. */
-               else if ( PageGetFreeSpace (rpage) > itemsz )
+               if ( PageGetFreeSpace (rpage) > itemsz )
                use_left = false;
        }
        else    /* rightmost page */
@@ -349,12 +417,70 @@ _bt_insertonpg(Relation rel,
        }
        _bt_relbuf(rel, rbuf, BT_WRITE);
    }
+   /*
+    * If after splitting un-chained page we'll got chain of pages 
+    * with duplicates then we want to know 
+    * 1. on which of two pages new btitem will go (current
+    *    _bt_findsplitloc is quite bad);
+    * 2. what parent (if there's one) thinking about it
+    *    (remember about deletions)
+    */
+   else if ( !( lpageop->btpo_flags & BTP_CHAIN ) )
+   {
+           OffsetNumber start = ( P_RIGHTMOST(lpageop) ) ? P_HIKEY : P_FIRSTKEY;
+           Size llimit;
+           
+           maxoff = PageGetMaxOffsetNumber (page);
+           llimit = PageGetPageSize(page) - sizeof (PageHeaderData) - 
+                   DOUBLEALIGN (sizeof (BTPageOpaqueData)) 
+                   + sizeof(ItemIdData);
+           llimit /= 2;
+           firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit);
+           
+           if ( _bt_itemcmp (rel, keysz, 
+           (BTItem) PageGetItem(page, PageGetItemId(page, start)),
+           (BTItem) PageGetItem(page, PageGetItemId(page, firstright)), 
+                   BTEqualStrategyNumber) )
+       {
+               if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                           PageGetItemId(page, firstright), 
+                           BTLessStrategyNumber) )
+               /* 
+                * force moving current items to the new page:
+                * new item will go on the current page.
+                */
+                   firstright = start;
+               else
+               /* 
+                * new btitem >= firstright, start item == firstright -
+                * new chain of duplicates: if this non-leftmost leaf
+                * page and parent item < start item then force moving
+                * all items to the new page - current page will be
+                * "empty" after it.
+                */
+               {
+                   if ( !P_LEFTMOST (lpageop) && 
+                       ( lpageop->btpo_flags & BTP_LEAF ) )
+                   {
+               ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
+                  bknum, P_HIKEY);
+               pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+               if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, 
+                       (BTItem) PageGetItem(page, 
+                           PageGetItemId(page, start)),
+                               BTLessStrategyNumber) )
+                   {
+                           firstright = start;
+                           shifted = true;
+                       }
+           _bt_relbuf(rel, pbuf, BT_WRITE);
+                   }
+               }
+           }   /* else - no new chain if start item < firstright one */
+   }
    
    /* split the buffer into left and right halves */
-   rbuf = _bt_split(rel, buf, hiRightItem);
-   
-   if ( hiRightItem != (BTItem) NULL )
-       pfree (hiRightItem);
+   rbuf = _bt_split(rel, buf, firstright);
    
    /* which new page (left half or right half) gets the tuple? */
    if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
@@ -369,13 +495,24 @@ _bt_insertonpg(Relation rel,
        itup_blkno = BufferGetBlockNumber(rbuf);
    }
    
-   lowLeftItem = (BTItem) PageGetItem(page,
+       maxoff = PageGetMaxOffsetNumber (page);
+       if ( shifted )
+       {
+           if ( maxoff > P_FIRSTKEY )
+               elog (FATAL, "btree: shifted page is not empty");
+           lowLeftItem = (BTItem) NULL;
+       }
+       else
+       {
+           if ( maxoff < P_FIRSTKEY )
+               elog (FATAL, "btree: un-shifted page is empty");
+       lowLeftItem = (BTItem) PageGetItem(page,
                     PageGetItemId(page, P_FIRSTKEY));
-   
-       if ( _bt_itemcmp (rel, keysz, lowLeftItem, 
+       if ( _bt_itemcmp (rel, keysz, lowLeftItem, 
            (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), 
                    BTEqualStrategyNumber) ) 
-       lpageop->btpo_flags |= BTP_CHAIN;
+           lpageop->btpo_flags |= BTP_CHAIN;
+   }
 
    /*
     *  By here,
@@ -405,6 +542,8 @@ _bt_insertonpg(Relation rel,
            BTItem new_item;
            OffsetNumber upditem_offset = P_HIKEY;
            bool do_update = false;
+           bool update_in_place = true;
+           bool parent_chained;
 
        /* form a index tuple that points at the new right page */
        rbknum = BufferGetBlockNumber(rbuf);
@@ -449,6 +588,10 @@ _bt_insertonpg(Relation rel,
        pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
        ppage = BufferGetPage(pbuf);
        ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+       parent_chained = (( ppageop->btpo_flags & BTP_CHAIN )) ? true : false;
+       
+       if ( parent_chained && !left_chained )
+           elog (FATAL, "nbtree: unexpected chained parent of unchained page");
        
        /*
         *  If the key of new_item is < than the key of the item
@@ -472,7 +615,8 @@ _bt_insertonpg(Relation rel,
         */
        if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item,
                                    BTGreaterStrategyNumber) ||
-               ( _bt_itemcmp(rel, keysz, stack->bts_btitem, 
+               ( !shifted && 
+                 _bt_itemcmp(rel, keysz, stack->bts_btitem, 
                        new_item, BTEqualStrategyNumber) &&
                  _bt_itemcmp(rel, keysz, lowLeftItem, 
                        new_item, BTLessStrategyNumber) ) )
@@ -491,34 +635,17 @@ _bt_insertonpg(Relation rel,
            elog (FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)",
                P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset);
        }
-       /*
-        * There was bug caused by deletion all minimum keys (K1) from 
-        * an index page and insertion there (up to page splitting) 
-        * higher duplicate keys (K2): after it parent item for left
-        * page contained K1 and the next item (for new right page) - K2, 
-        * - and scan for the key = K2 lost items on the left page.
-        * So, we have to update parent item if its key < minimum
-        * key on the left and minimum keys on the left and on the right
-        * are equal. It would be nice to update hikey on the previous
-        * page of the left one too, but we may get deadlock here
-        * (read comments in _bt_split), so we leave previous page
-        * hikey _inconsistent_, but there should to be BTP_CHAIN flag
-        * on it, which privents _bt_moveright from dangerous movings 
-        * from there.  - vadim 05/27/97
-        */
-       else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, 
-                       lowLeftItem, BTLessStrategyNumber) && 
-               _bt_itemcmp (rel, keysz, new_item, 
-                       lowLeftItem, BTEqualStrategyNumber) ) 
-       {
-           do_update = true;
-           upditem_offset = stack->bts_offset;
-       }
        
        if ( do_update )
        {
-       /* Try to update in place. */
-           if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
+           if ( shifted )
+                   elog (FATAL, "btree: attempt to update parent for shifted page");
+       /* 
+        * Try to update in place. If out parent page is chained
+        * then we must forse insertion.
+        */
+           if ( !parent_chained && 
+               DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
                    DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) ) 
                {
            _bt_updateitem(rel, keysz, pbuf, 
@@ -528,6 +655,7 @@ _bt_insertonpg(Relation rel,
        }
        else
        {
+           update_in_place = false;
                    PageIndexTupleDelete(ppage, upditem_offset);
        
           /* 
@@ -543,14 +671,18 @@ _bt_insertonpg(Relation rel,
            ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), 
                   bknum, P_HIKEY);
        
-           /* unlock the children before doing this */
+           /* 
+            * Unlock the children before doing this
+            *
+            * Mmm ... I foresee problems here. - vadim 06/10/97
+            */
            _bt_relbuf(rel, buf, BT_WRITE);
            _bt_relbuf(rel, rbuf, BT_WRITE);
        
            /* 
             * A regular _bt_binsrch should find the right place to
-            * put the new entry, since it should be either lower 
-            * than any other key on the page or unique. 
+            * put the new entry, since it should be lower than any 
+            * other key on the page. 
             * Therefore set afteritem to NULL.
             */
            newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
@@ -575,9 +707,49 @@ _bt_insertonpg(Relation rel,
        }
        
        newskey = _bt_mkscankey(rel, &(new_item->bti_itup));
+       
+       afteritem = stack->bts_btitem;
+       if ( parent_chained && !update_in_place )
+       {
+           ppage = BufferGetPage(pbuf);
+           ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+           if ( ppageop->btpo_flags & BTP_CHAIN )
+               elog (FATAL, "btree: unexpected BTP_CHAIN flag in parent after update");
+           if ( P_RIGHTMOST (ppageop) )
+               elog (FATAL, "btree: chained parent is RIGHTMOST after update");
+           maxoff = PageGetMaxOffsetNumber (ppage);
+           if ( maxoff != P_FIRSTKEY )
+               elog (FATAL, "btree: FIRSTKEY was unexpected in parent after update");
+               if ( _bt_skeycmp (rel, keysz, newskey, ppage, 
+                           PageGetItemId(ppage, P_FIRSTKEY), 
+                           BTLessEqualStrategyNumber) )
+               elog (FATAL, "btree: parent FIRSTKEY is >= duplicate key after update");
+               if ( !_bt_skeycmp (rel, keysz, newskey, ppage, 
+                           PageGetItemId(ppage, P_HIKEY), 
+                           BTEqualStrategyNumber) )
+               elog (FATAL, "btree: parent HIGHKEY is not equal duplicate key after update");
+           afteritem = (BTItem) NULL;
+       }
+       else if ( left_chained && !update_in_place )
+       {
+           ppage = BufferGetPage(pbuf);
+           ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
+           if ( !P_RIGHTMOST (ppageop) &&
+                   _bt_skeycmp (rel, keysz, newskey, ppage, 
+                           PageGetItemId(ppage, P_HIKEY), 
+                           BTGreaterStrategyNumber) )
+               afteritem = (BTItem) NULL;
+       }
+       if ( afteritem == (BTItem) NULL)
+       {
+       rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE);
+       _bt_relbuf(rel, pbuf, BT_WRITE);
+       pbuf = rbuf;
+       }
+       
        newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
                    keysz, newskey, new_item,
-                   stack->bts_btitem);
+                   afteritem);
        
        /* be tidy */
        pfree(newres);
@@ -607,7 +779,7 @@ _bt_insertonpg(Relation rel,
  * pin and lock on buf are maintained.
  */
 static Buffer
-_bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
+_bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
 {
     Buffer rbuf;
     Page origpage;
@@ -622,9 +794,7 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
     OffsetNumber leftoff, rightoff;
     OffsetNumber start;
     OffsetNumber maxoff;
-    OffsetNumber firstright;
     OffsetNumber i;
-    Size llimit;
     
     rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
     origpage = BufferGetPage(buf);
@@ -666,23 +836,9 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
    /* splitting a non-rightmost page, start at the first data item */
    start = P_FIRSTKEY;
 
-   /* 
-    * Copy the original high key to the new page if high key
-    * was not passed by caller.
-    */
-   if ( hiRightItem == NULL )
-   {
-       itemid = PageGetItemId(origpage, P_HIKEY);
-       itemsz = ItemIdGetLength(itemid);
-       item = (BTItem) PageGetItem(origpage, itemid);
-   }
-   else
-   {
-       item = hiRightItem;
-           itemsz = IndexTupleDSize(hiRightItem->bti_itup)
-           + (sizeof(BTItemData) - sizeof(IndexTupleData));
-           itemsz = DOUBLEALIGN(itemsz);
-   }
+   itemid = PageGetItemId(origpage, P_HIKEY);
+   itemsz = ItemIdGetLength(itemid);
+   item = (BTItem) PageGetItem(origpage, itemid);
    if ( PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber )
        elog (FATAL, "btree: failed to add hikey to the right sibling");
    rightoff = P_FIRSTKEY;
@@ -694,8 +850,11 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
    rightoff = P_HIKEY;
     }
     maxoff = PageGetMaxOffsetNumber(origpage);
-    llimit = PageGetFreeSpace(leftpage) / 2;
-    firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
+    if ( firstright == InvalidOffsetNumber )
+    {
+       Size llimit = PageGetFreeSpace(leftpage) / 2;
+       firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
+    }
     
     for (i = start; i <= maxoff; i = OffsetNumberNext(i)) {
    itemid = PageGetItemId(origpage, i);
@@ -814,6 +973,9 @@ _bt_findsplitloc(Relation rel,
     Size nbytes;
     int natts;
     
+    if ( start >= maxoff )
+       elog (FATAL, "btree: cannot split if start (%d) >= maxoff (%d)",
+                   start, maxoff);
     natts = rel->rd_rel->relnatts;
     saferight = start;
     safeitemid = PageGetItemId(page, saferight);
@@ -822,8 +984,8 @@ _bt_findsplitloc(Relation rel,
     
     i = OffsetNumberNext(start);
     
-    while (nbytes < llimit) {
-   
+    while (nbytes < llimit)
+    {
    /* check the next item on the page */
    nxtitemid = PageGetItemId(page, i);
    nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData));
@@ -840,7 +1002,10 @@ _bt_findsplitloc(Relation rel,
        safeitem = nxtitem;
        saferight = i;
    }
-   i = OffsetNumberNext(i);
+   if ( i < maxoff )
+       i = OffsetNumberNext(i);
+   else
+       break;
     }
     
     /*
@@ -851,6 +1016,9 @@ _bt_findsplitloc(Relation rel,
     if (saferight == start)
    saferight = i;
     
+    if ( saferight == maxoff  && ( maxoff - start ) > 1 )
+       saferight = start + ( maxoff - start ) / 2;
+    
     return (saferight);
 }
 
@@ -1051,10 +1219,22 @@ _bt_goesonpg(Relation rel,
      *  If we have no adjacency information, and the item is equal to the
      *  high key on the page (by here it is), then the item does not belong
      *  on this page.
+     *
+     *  Now it's not true in all cases.        - vadim 06/10/97
      */
     
     if (afteritem == (BTItem) NULL)
+    {
+       if ( opaque->btpo_flags & BTP_LEAF )
+       return (false);
+       if ( opaque->btpo_flags & BTP_CHAIN )
+       return (true);
+       if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                   PageGetItemId(page, P_FIRSTKEY), 
+                   BTEqualStrategyNumber) )
+       return (true);
    return (false);
+    }
     
     /* damn, have to work for it.  i hate that. */
     maxoff = PageGetMaxOffsetNumber(page);
@@ -1269,6 +1449,7 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
     return (true);
 }
 
+#if 0
 /*
  * _bt_shift - insert btitem on the passed page after shifting page 
  *        to the right in the tree.
@@ -1404,3 +1585,4 @@ _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz,
     
     return (res);
 }
+#endif
index 45f4f7604d012d3cff6be2a263d99ac94bdc8c69..a78acd0f04b28f470962c2560d91a29221465f32 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.20 1997/05/30 18:35:37 vadim Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.21 1997/06/10 07:28:50 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -159,6 +159,7 @@ _bt_moveright(Relation rel,
     BTPageOpaque opaque;
     ItemId hikey;
     BlockNumber rblkno;
+    int natts = rel->rd_rel->relnatts;
         
     page = BufferGetPage(buf);
     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -195,26 +196,9 @@ _bt_moveright(Relation rel,
         *  on this page to do not lose "good" tuples if number
         *  of attrs > keysize. Example: (2,0) - last items on 
         *  this page, (2,1) - first item on next page (hikey), 
-        *  our scankey is x = 2. Scankey >= (2,1) because of 
+        *  our scankey is x = 2. Scankey == (2,1) because of 
         *  we compare first attrs only, but we shouldn't to move
         *  right of here.      - vadim 04/15/97
-        *
-        *  XXX
-        *  This code changed again! Actually, we break our
-        *  duplicates handling in single case: if we insert 
-        *  new minimum key into leftmost page with duplicates
-        *  and splitting doesn't occure then _bt_insertonpg doesn't 
-        *  worry about duplicates-rule. Fix _bt_insertonpg ?
-        *  But I don't see why don't compare scankey with _last_
-        *  item on the page instead of first one, in any cases.
-        *  So - we do it in that way now.  - vadim 05/26/97
-        *
-        *  Also, if we are on an "pseudo-empty" leaf page (i.e. there is
-        *  only hikey here) and scankey == hikey then we don't move
-        *  right! It's fix for bug described in _bt_insertonpg(). It's 
-        *  right - at least till index cleanups are perfomed by vacuum 
-        *  in exclusive mode: so, though this page may be just splitted, 
-        *  it may not be "emptied" before we got here. - vadim 05/27/97
         */
        
        if ( _bt_skeycmp (rel, keysz, scankey, page, hikey, 
@@ -227,14 +211,27 @@ _bt_moveright(Relation rel,
            }
            if ( offmax > P_HIKEY )
            {
-           if ( _bt_skeycmp (rel, keysz, scankey, page, 
+               if ( natts == keysz )   /* sanity checks */
+               {
+               if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                       PageGetItemId (page, P_FIRSTKEY),
+                   BTEqualStrategyNumber) )
+               elog (FATAL, "btree: BTP_CHAIN flag was expected");
+               if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                       PageGetItemId (page, offmax),
+                   BTEqualStrategyNumber) )
+               elog (FATAL, "btree: unexpected equal last item");
+               if ( _bt_skeycmp (rel, keysz, scankey, page, 
+                       PageGetItemId (page, offmax),
+                   BTLessStrategyNumber) )
+               elog (FATAL, "btree: unexpected greater last item");
+           /* move right */
+           }
+           else if ( _bt_skeycmp (rel, keysz, scankey, page, 
                    PageGetItemId (page, offmax),
                BTLessEqualStrategyNumber) )
                break;
            }
-           else if ( offmax == P_HIKEY && 
-               ( opaque->btpo_flags & BTP_LEAF ) )
-               break;
        }
        
        /* step right one page */