WAL
authorVadim B. Mikheev
Fri, 13 Oct 2000 02:03:02 +0000 (02:03 +0000)
committerVadim B. Mikheev
Fri, 13 Oct 2000 02:03:02 +0000 (02:03 +0000)
src/backend/access/heap/heapam.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtree.c
src/include/access/nbtree.h

index dbcefbf273376e12b61f22b3a070d426c2ad3dde..3e1de33bfe40e4649904386f2fa51a050c86f1a9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.86 2000/10/04 00:04:41 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -2016,6 +2016,22 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record)
        elog(STOP, "heap_redo: unknown op code %u", info);
 }
 
+void heap_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+   uint8   info = record->xl_info & ~XLR_INFO_MASK;
+
+   if (info == XLOG_HEAP_INSERT)
+       heap_xlog_insert(false, lsn, record);
+   else if (info == XLOG_HEAP_DELETE)
+       heap_xlog_delete(false, lsn, record);
+   else if (info == XLOG_HEAP_UPDATE)
+       heap_xlog_update(false, lsn, record);
+   else if (info == XLOG_HEAP_MOVE)
+       heap_xlog_move(false, lsn, record);
+   else
+       elog(STOP, "heap_undo: unknown op code %u", info);
+}
+
 void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
 {
    xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record);
@@ -2199,7 +2215,7 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
    else    /* we can't delete tuple right now */
    {
        lp->lp_flags |= LP_DELETE;  /* mark for deletion */
-       MarkBufferForCleanup(buffer, PageCleanup);
+       MarkBufferForCleanup(buffer, HeapPageCleanup);
    }
 
 }
index e454a989ee4d33c6e2d4bb69e45111a6881cd297..c72b8ca3df613289213f466596e65f81fe8773ee 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.64 2000/10/05 20:10:20 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -61,6 +61,10 @@ static void _bt_pgaddtup(Relation rel, Page page,
 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
                        int keysz, ScanKey scankey);
 
+#ifdef XLOG
+static Relation        _xlheapRel; /* temporary hack */
+#endif
+
 /*
  * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
  *
@@ -119,6 +123,10 @@ top:
        }
    }
 
+#ifdef XLOG
+   _xlheapRel = heapRel;   /* temporary hack */
+#endif
+
    /* do the insertion */
    res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
 
@@ -517,21 +525,38 @@ _bt_insertonpg(Relation rel,
 #ifdef XLOG
        /* XLOG stuff */
        {
-           char                xlbuf[sizeof(xl_btree_insert) + 2 * sizeof(CommandId)];
+           char                xlbuf[sizeof(xl_btree_insert) + 
+                   sizeof(CommandId) + sizeof(RelFileNode)];
            xl_btree_insert    *xlrec = xlbuf;
            int                 hsize = SizeOfBtreeInsert;
+           BTItemData          truncitem;
+           BTItem              xlitem = btitem;
+           Size                xlsize = IndexTupleDSize(btitem->bti_itup) + 
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
 
            xlrec->target.node = rel->rd_node;
            ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff);
            if (P_ISLEAF(lpageop))
-           {
+           {
                CommandId   cid = GetCurrentCommandId();
-               memcpy(xlbuf + SizeOfBtreeInsert, &(char*)cid, sizeof(CommandId));
+               memcpy(xlbuf + hsize, &cid, sizeof(CommandId));
                hsize += sizeof(CommandId);
+               memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
+               hsize += sizeof(RelFileNode);
+           }
+           /*
+            * Read comments in _bt_pgaddtup
+            */
+           else if (newitemoff == P_FIRSTDATAKEY(lpageop))
+           {
+               truncitem = *btitem;
+               truncitem.bti_itup.t_info = sizeof(BTItemData);
+               xlitem = &truncitem;
+               xlsize = sizeof(BTItemData);
            }
 
            XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT,
-               xlbuf, hsize, (char*) btitem, itemsz);
+               xlbuf, hsize, (char*) xlitem, xlsize);
 
            PageSetLSN(page, recptr);
            PageSetSUI(page, ThisStartUpID);
@@ -752,7 +777,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
     */
    {
        char                xlbuf[sizeof(xl_btree_split) + 
-           2 * sizeof(CommandId) + BLCKSZ];
+           sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ];
        xl_btree_split     *xlrec = xlbuf;
        int                 hsize = SizeOfBtreeSplit;
        int                 flag = (newitemonleft) ? 
@@ -765,11 +790,30 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
            CommandId   cid = GetCurrentCommandId();
            memcpy(xlbuf + hsize, &(char*)cid, sizeof(CommandId));
            hsize += sizeof(CommandId);
+           memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
+           hsize += sizeof(RelFileNode);
        }
        if (newitemonleft)
        {
-           memcpy(xlbuf + hsize, (char*) newitem, newitemsz);
-           hsize += newitemsz;
+           /*
+            * Read comments in _bt_pgaddtup.
+            * Actually, seems that in non-leaf splits newitem shouldn't
+            * go to first data key position.
+            */
+           if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque))
+           {
+               BTItemData  truncitem = *newitem;
+               truncitem.bti_itup.t_info = sizeof(BTItemData);
+               memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData));
+               hsize += sizeof(BTItemData);
+           }
+           else
+           {
+               Size    itemsz = IndexTupleDSize(newitem->bti_itup) + 
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
+               memcpy(xlbuf + hsize, (char*) newitem, itemsz);
+               hsize += itemsz;
+           }
            xlrec->otherblk = BufferGetBlockNumber(rbuf);
        }
        else
@@ -1012,7 +1056,7 @@ static Buffer
 _bt_getstackbuf(Relation rel, BTStack stack)
 {
    BlockNumber blkno;
-   Buffer      buf;
+   Buffer      buf, newbuf;
    OffsetNumber start,
                offnum,
                maxoff;
@@ -1101,11 +1145,18 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
    Size        itemsz;
    BTItem      new_item;
 
+#ifdef XLOG
+   Buffer      metabuf;
+#endif
+
    /* get a new root page */
    rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
    rootpage = BufferGetPage(rootbuf);
    rootblknum = BufferGetBlockNumber(rootbuf);
 
+#ifdef XLOG
+   metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE);
+#endif
 
    /* NO ELOG(ERROR) from here till newroot op is logged */
 
@@ -1168,9 +1219,12 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 #ifdef XLOG
    /* XLOG stuff */
    {
-       xl_btree_newroot       xlrec;
+       xl_btree_newroot    xlrec;
+       Page                metapg = BufferGetPage(metabuf);
+       BTMetaPageData     *metad = BTPageGetMeta(metapg);
+
        xlrec.node = rel->rd_node;
-       xlrec.rootblk = rootblknum;
+       BlockIdSet(&(xlrec.rootblk), rootblknum);
 
        /* 
         * Dirrect access to page is not good but faster - we should 
@@ -1181,16 +1235,25 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
            (char*)rootpage + (PageHeader) rootpage)->pd_upper,
            ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->upper);
 
+       metad->btm_root = rootblknum;
+       (metad->btm_level)++;
+
        PageSetLSN(rootpage, recptr);
        PageSetSUI(rootpage, ThisStartUpID);
+       PageSetLSN(metapg, recptr);
+       PageSetSUI(metapg, ThisStartUpID);
+
+       _bt_wrtbuf(rel, metabuf);
    }
 #endif
 
    /* write and let go of the new root buffer */
    _bt_wrtbuf(rel, rootbuf);
 
+#ifndef XLOG
    /* update metadata page with new root block number */
    _bt_metaproot(rel, rootblknum, 0);
+#endif
 
    /* update and release new sibling, and finally the old root */
    _bt_wrtbuf(rel, rbuf);
index 2da74219010543275f7bed0ca84c6f3b7d7889b1..41acd11659c13496bc83bfaa6563a3708ec9bf2e 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.38 2000/10/04 00:04:42 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.39 2000/10/13 02:03:00 vadim Exp $
  *
  * NOTES
  *    Postgres btree pages look like ordinary relation pages.  The opaque
 #include "access/nbtree.h"
 #include "miscadmin.h"
 
-#define BTREE_METAPAGE 0
-#define BTREE_MAGIC        0x053162
-
-#define BTREE_VERSION  1
-
-typedef struct BTMetaPageData
-{
-   uint32      btm_magic;
-   uint32      btm_version;
-   BlockNumber btm_root;
-   int32       btm_level;
-} BTMetaPageData;
-
-#define BTPageGetMeta(p) \
-   ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
-
-
 /*
  * We use high-concurrency locking on btrees.  There are two cases in
  * which we don't do locking.  One is when we're building the btree.
@@ -188,14 +171,18 @@ _bt_getroot(Relation rel, int access)
 #ifdef XLOG
            /* XLOG stuff */
            {
-               xl_btree_insert    xlrec;
+               xl_btree_newroot       xlrec;
+
                xlrec.node = rel->rd_node;
+               BlockIdSet(&(xlrec.rootblk), rootblkno);
 
                XLogRecPtr recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT,
                    &xlrec, SizeOfBtreeNewroot, NULL, 0);
 
                PageSetLSN(rootpage, recptr);
                PageSetSUI(rootpage, ThisStartUpID);
+               PageSetLSN(metapg, recptr);
+               PageSetSUI(metapg, ThisStartUpID);
            }
 #endif
 
index 7fec982fa2d9e59388ca0f9121255c75ff9fcc8e..1064c2bb1075c7ff6c9f2b71227687569d9a6c17 100644 (file)
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -730,3 +730,583 @@ _bt_restscan(IndexScanDesc scan)
        so->btso_curbuf = buf;
    }
 }
+
+#ifdef XLOG
+void btree_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+   uint8   info = record->xl_info & ~XLR_INFO_MASK;
+
+   if (info == XLOG_BTREE_DELETE)
+       btree_xlog_delete(true, lsn, record);
+   else if (info == XLOG_BTREE_INSERT)
+       btree_xlog_insert(true, lsn, record);
+   else if (info == XLOG_BTREE_SPLIT)
+       btree_xlog_split(true, false, lsn, record); /* new item on the right */
+   else if (info == XLOG_BTREE_SPLEFT)
+       btree_xlog_split(true, true, lsn, record);  /* new item on the left */
+   else if (info == XLOG_BTREE_NEWROOT)
+       btree_xlog_newroot(true, lsn, record);
+   else
+       elog(STOP, "btree_redo: unknown op code %u", info);
+}
+
+void btree_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+   uint8   info = record->xl_info & ~XLR_INFO_MASK;
+
+   if (info == XLOG_BTREE_DELETE)
+       btree_xlog_delete(false, lsn, record);
+   else if (info == XLOG_BTREE_INSERT)
+       btree_xlog_insert(false, lsn, record);
+   else if (info == XLOG_BTREE_SPLIT)
+       btree_xlog_split(false, false, lsn, record);/* new item on the right */
+   else if (info == XLOG_BTREE_SPLEFT)
+       btree_xlog_split(false, true, lsn, record); /* new item on the left */
+   else if (info == XLOG_BTREE_NEWROOT)
+       btree_xlog_newroot(false, lsn, record);
+   else
+       elog(STOP, "btree_undo: unknown op code %u", info);
+}
+
+static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+   xl_btree_delete    *xlrec;
+   Relation           *reln;
+   Buffer              buffer;
+   Page                page;
+
+   if (!redo)
+       return;
+
+   xlrec = (xl_btree_delete*) XLogRecGetData(record);
+   reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+   if (!RelationIsValid(reln))
+       return;
+   buffer = XLogReadBuffer(false, reln, 
+               ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+   if (!BufferIsValid(buffer))
+       elog(STOP, "btree_delete_redo: block unfound");
+   page = (Page) BufferGetPage(buffer);
+   if (PageIsNew((PageHeader) page))
+       elog(STOP, "btree_delete_redo: uninitialized page");
+
+   PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
+
+   return;
+}
+
+static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+   xl_btree_insert    *xlrec;
+   Relation           *reln;
+   Buffer              buffer;
+   Page                page;
+   BTPageOpaque        pageop;
+
+   xlrec = (xl_btree_insert*) XLogRecGetData(record);
+   reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+   if (!RelationIsValid(reln))
+       return;
+   buffer = XLogReadBuffer((redo) ? true : false, reln, 
+               ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+   if (!BufferIsValid(buffer))
+       return;
+   page = (Page) BufferGetPage(buffer);
+   if (PageIsNew((PageHeader) page))
+       elog(STOP, "btree_insert_%s: uninitialized page",
+           (redo) ? "redo" : "undo");
+   pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+   if (redo)
+   {
+       if (XLByteLE(lsn, PageGetLSN(page)))
+           UnlockAndReleaseBuffer(buffer);
+       else
+       {
+           Size        hsize = SizeOfBtreeInsert;
+           RelFileNode hnode;
+
+           if (P_ISLEAF(pageop))
+           {
+               hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+               memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + 
+                           sizeof(CommandId), sizeof(RelFileNode));
+           }
+
+           if (! _bt_add_item(page, 
+                   ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+                   (char*)xlrec + hsize,
+                   record->xl_len - hsize,
+                   &hnode))
+               elog(STOP, "btree_insert_redo: failed to add item");
+
+           PageSetLSN(page, lsn);
+           PageSetSUI(page, ThisStartUpID);
+           UnlockAndWriteBuffer(buffer);
+       }
+   }
+   else
+   {
+       BTItemData      btdata;
+
+       if (XLByteLT(PageGetLSN(page), lsn))
+           elog(STOP, "btree_insert_undo: bad page LSN");
+
+       if (! P_ISLEAF(pageop))
+       {
+           UnlockAndReleaseBuffer(buffer);
+           return;
+       }
+
+       memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + 
+           sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
+
+       _bt_del_item(reln, buffer, &btdata, true, lsn, record);
+
+   }
+
+   return;
+}
+
+static void
+btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
+{
+   xl_btree_split     *xlrec;
+   Relation           *reln;
+   BlockNumber         blkno;
+   BlockNumber         parent;
+   Buffer              buffer;
+   Page                page;
+   BTPageOpaque        pageop;
+   char               *op = (redo) ? "redo" : "undo";
+   bool                isleaf;
+
+   xlrec = (xl_btree_split*) XLogRecGetData(record);
+   reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
+   if (!RelationIsValid(reln))
+       return;
+
+   /* Left (original) sibling */
+   blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
+                   BlockIdGetBlockNumber(xlrec->otherblk);
+   buffer = XLogReadBuffer(false, reln, blkno);
+   if (!BufferIsValid(buffer))
+       elog(STOP, "btree_split_%s: lost left sibling", op);
+
+   page = (Page) BufferGetPage(buffer);
+   if (PageIsNew((PageHeader) page))
+       elog(STOP, "btree_split_%s: uninitialized left sibling", op);
+
+   pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+   isleaf = P_ISLEAF(pageop);
+   parent = pageop->btpo_parent;
+
+   if (redo)
+   {
+       if (XLByteLE(lsn, PageGetLSN(page)))
+           UnlockAndReleaseBuffer(buffer);
+       else
+       {
+           /* Delete items related to new right sibling */
+           _bt_thin_left_page(page, record);
+
+           if (onleft)
+           {
+               BTItemData  btdata;
+               Size        hsize = SizeOfBtreeSplit;
+               Size        itemsz;
+               RelFileNode hnode;
+
+               pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk);
+               if (isleaf)
+               {
+                   hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+                   memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + 
+                               sizeof(CommandId), sizeof(RelFileNode));
+               }
+
+               memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
+
+               if (! _bt_add_item(page, 
+                       ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
+                       (char*)xlrec + hsize,
+                       itemsz,
+                       &hnode))
+                   elog(STOP, "btree_split_redo: failed to add item");
+           }
+           else
+               pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+           PageSetLSN(page, lsn);
+           PageSetSUI(page, ThisStartUpID);
+           UnlockAndWriteBuffer(buffer);
+       }
+   }
+   else    /* undo */
+   {
+       if (XLByteLT(PageGetLSN(page), lsn))
+           elog(STOP, "btree_split_undo: bad left sibling LSN");
+
+       if (! isleaf || ! onleft)
+           UnlockAndReleaseBuffer(buffer);
+       else
+       {
+           BTItemData      btdata;
+
+           memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + 
+               sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
+
+           _bt_del_item(reln, buffer, &btdata, false, lsn, record);
+       }
+   }
+
+   /* Right (new) sibling */
+   blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : 
+                   ItemPointerGetBlockNumber(&(xlrec->target.tid));
+   buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
+   if (!BufferIsValid(buffer))
+       elog(STOP, "btree_split_%s: lost right sibling", op);
+
+   page = (Page) BufferGetPage(buffer);
+   if (PageIsNew((PageHeader) page))
+   {
+       if (!redo)
+           elog(STOP, "btree_split_undo: uninitialized right sibling");
+       PageInit(page, BufferGetPageSize(buffer), 0);
+   }
+
+   if (redo)
+   {
+       if (XLByteLE(lsn, PageGetLSN(page)))
+           UnlockAndReleaseBuffer(buffer);
+       else
+       {
+           Size        hsize = SizeOfBtreeSplit;
+           BTItemData  btdata;
+           Size        itemsz;
+
+           _bt_pageinit(page, BufferGetPageSize(buffer));
+           pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+           if (isleaf)
+           {
+               pageop->btpo_flags |= BTP_LEAF;
+               hsize += (sizeof(CommandId) + sizeof(RelFileNode));
+           }
+           if (onleft)     /* skip target item */
+           {
+               memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
+               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
+               hsize += itemsz;
+           }
+
+           for (char* item = (char*)xlrec + hsize;
+                   item < (char*)record + record->xl_len; )
+           {
+               memcpy(&btdata, item, sizeof(BTItemData));
+               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
+               itemsz = MAXALIGN(itemsz);
+               if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,   
+                       LP_USED) == InvalidOffsetNumber)
+                   elog(STOP, "btree_split_redo: can't add item to right sibling");
+               item += itemsz;
+           }
+
+           pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
+                   BlockIdGetBlockNumber(xlrec->otherblk);
+           pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk);
+           pageop->btpo_parent = parent;
+
+           PageSetLSN(page, lsn);
+           PageSetSUI(page, ThisStartUpID);
+           UnlockAndWriteBuffer(buffer);
+       }
+   }
+   else    /* undo */
+   {
+       if (XLByteLT(PageGetLSN(page), lsn))
+           elog(STOP, "btree_split_undo: bad right sibling LSN");
+
+       if (! isleaf || onleft)
+           UnlockAndReleaseBuffer(buffer);
+       else
+       {
+           char        tbuf[BLCKSZ];
+           int         cnt;
+           char       *item;
+           Size        itemsz;
+
+           item = (char*)xlrec + SizeOfBtreeSplit +
+                   sizeof(CommandId) + sizeof(RelFileNode);
+           for (cnt = 0; item < (char*)record + record->xl_len; )
+           {
+               BTItem  btitem = (BTItem)
+                   (tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
+               memcpy(btitem, item, sizeof(BTItemData));
+               itemsz = IndexTupleDSize(btitem->bti_itup) +
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
+               itemsz = MAXALIGN(itemsz);
+               item += itemsz;
+               cnt++;
+           }
+           cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+           if (cnt < 0)
+               elog(STOP, "btree_split_undo: target item unfound in right sibling");
+
+           item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData)));
+
+           _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record);
+       }
+   }
+
+   /* Right (next) page */
+   blkno = BlockIdGetBlockNumber(xlrec->rightblk);
+   buffer = XLogReadBuffer(false, reln, blkno);
+   if (!BufferIsValid(buffer))
+       elog(STOP, "btree_split_%s: lost next right page", op);
+
+   page = (Page) BufferGetPage(buffer);
+   if (PageIsNew((PageHeader) page))
+       elog(STOP, "btree_split_%s: uninitialized next right page", op);
+
+   if (redo)
+   {
+       if (XLByteLE(lsn, PageGetLSN(page)))
+           UnlockAndReleaseBuffer(buffer);
+       else
+       {
+           pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+           pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
+                   ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+           PageSetLSN(page, lsn);
+           PageSetSUI(page, ThisStartUpID);
+           UnlockAndWriteBuffer(buffer);
+       }
+   }
+   else    /* undo */
+   {
+       if (XLByteLT(PageGetLSN(page), lsn))
+           elog(STOP, "btree_split_undo: bad next right page LSN");
+
+       UnlockAndReleaseBuffer(buffer);
+   }
+
+}
+
+static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+   xl_btree_newroot   *xlrec;
+   Relation           *reln;
+   Buffer              buffer;
+   Page                page;
+   Buffer              metabuf;
+   Page                metapg;
+
+   if (!redo)
+       return;
+
+   xlrec = (xl_btree_newroot*) XLogRecGetData(record);
+   reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
+   if (!RelationIsValid(reln))
+       return;
+   buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
+   if (!BufferIsValid(buffer))
+       elog(STOP, "btree_newroot_redo: no root page");
+   metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
+   if (!BufferIsValid(buffer))
+       elog(STOP, "btree_newroot_redo: no metapage");
+   page = (Page) BufferGetPage(buffer);
+
+   if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
+   {
+       _bt_pageinit(page, BufferGetPageSize(buffer));
+       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+       pageop->btpo_flags |= BTP_ROOT;
+       pageop->btpo_prev = pageop->btpo_next = P_NONE;
+       pageop->btpo_parent = BTREE_METAPAGE;
+
+       if (record->xl_len == SizeOfBtreeNewroot)   /* no childs */
+           pageop->btpo_flags |= BTP_LEAF;
+       else
+       {
+           BTItemData  btdata;
+           Size        itemsz;
+
+           for (char* item = (char*)xlrec + SizeOfBtreeNewroot;
+                   item < (char*)record + record->xl_len; )
+           {
+               memcpy(&btdata, item, sizeof(BTItemData));
+               itemsz = IndexTupleDSize(btdata.bti_itup) +
+                           (sizeof(BTItemData) - sizeof(IndexTupleData));
+               itemsz = MAXALIGN(itemsz);
+               if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,   
+                       LP_USED) == InvalidOffsetNumber)
+                   elog(STOP, "btree_newroot_redo: can't add item");
+               item += itemsz;
+           }
+       }
+
+       PageSetLSN(page, lsn);
+       PageSetSUI(page, ThisStartUpID);
+       UnlockAndWriteBuffer(buffer);
+   }
+   else
+       UnlockAndReleaseBuffer(buffer);
+
+   metapg = BufferGetPage(metabuf);
+   if (PageIsNew((PageHeader) metapg))
+   {
+       BTMetaPageData  md;
+
+       _bt_pageinit(metapg, BufferGetPageSize(metabuf));
+       md.btm_magic = BTREE_MAGIC;
+       md.btm_version = BTREE_VERSION;
+       md.btm_root = P_NONE;
+       md.btm_level = 0;
+       memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md));
+   }
+
+   if (XLByteLT(PageGetLSN(metapg), lsn))
+   {
+       BTMetaPageData     *metad = BTPageGetMeta(metapg);
+
+       metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
+       (metad->btm_level)++;
+       PageSetLSN(metapg, lsn);
+       PageSetSUI(metapg, ThisStartUpID);
+       UnlockAndWriteBuffer(metabuf);
+   }
+   else
+       UnlockAndReleaseBuffer(metabuf);
+
+   return;
+}
+
+/*
+ * UNDO insertion on *leaf* page: 
+ * - find inserted tuple;
+ * - delete it if heap tuple was inserted by the same xaction
+ */
+static void
+_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, 
+               XLogRecPtr lsn, XLogRecord *record)
+{
+   char           *xlrec = (char*) XLogRecGetData(record);
+   Page            page = (Page) BufferGetPage(buffer);
+   BTPageOpaque    pageop;
+   BlockNumber     blkno;
+   OffsetNumber    offno;
+   ItemId          lp;
+
+   for ( ; ; )
+   {
+       offno = _bt_find_btitem(page, btitem);
+       if (offno != InvalidOffsetNumber)
+           break;
+       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+       if (P_RIGHTMOST(pageop))
+           break;
+       blkno = pageop->btpo_next;
+       UnlockAndReleaseBuffer(buffer);
+       buffer = XLogReadBuffer(false, reln, blkno);
+       if (!BufferIsValid(buffer))
+           elog(STOP, "btree_%s_undo: lost right sibling",
+               (insert) ? "insert" : "split");
+       page = (Page) BufferGetPage(buffer);
+       if (PageIsNew((PageHeader) page))
+           elog(STOP, "btree_%s_undo: uninitialized right sibling",
+               (insert) ? "insert" : "split");
+       if (XLByteLT(PageGetLSN(page), lsn))
+           break;
+   }
+
+   if (offno == InvalidOffsetNumber)   /* not found */
+   {
+       if (!InRecovery)
+           elog(STOP, "btree_%s_undo: lost target tuple in rollback",
+               (insert) ? "insert" : "split");
+       UnlockAndReleaseBuffer(buffer);
+       return;
+   }
+
+   lp = PageGetItemId(page, offno);
+   if (ItemIdDeleted(lp))  /* marked for deletion */
+   {
+       if (!InRecovery)
+           elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
+               (insert) ? "insert" : "split");
+   }
+   else if (InRecovery)    /* check heap tuple */
+   {
+       int         result;
+       CommandId   cid;
+       RelFileNode hnode;
+       Size        hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
+
+       memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
+       memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
+       result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid),
+                   record->xl_xid, cid);
+       if (result <= 0)    /* no tuple or not owner */
+       {
+           UnlockAndReleaseBuffer(buffer);
+           return;
+       }
+   }
+   else if (! BufferIsUpdatable(buffer))   /* normal rollback */
+   {
+       lp->lp_flags |= LP_DELETE;
+       MarkBufferForCleanup(buffer, IndexPageCleanup);
+       return;
+   }
+
+   PageIndexTupleDelete(page, offno);
+   if (InRecovery)
+   {
+       pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+       pageop->btpo_flags |= BTP_REORDER;
+   }
+   UnlockAndWriteBuffer(buffer);
+
+   return;
+}
+
+static bool
+_bt_add_item(Page page, OffsetNumber offno, 
+   char* item, Size size, RelFileNode* hnode)
+{
+   BTPageOpaque    pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+   if (offno > PageGetMaxOffsetNumber(page) + 1)
+   {
+       if (! (pageop->btpo_flags & BTP_REORDER))
+       {
+           elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
+           pageop->btpo_flags |= BTP_REORDER;
+       }
+       offno = PageGetMaxOffsetNumber(page) + 1;
+   }
+
+   if (PageAddItem(page, (Item) item, size, offno, 
+           LP_USED) == InvalidOffsetNumber)
+   {
+       /* ops, not enough space - try to deleted dead tuples */
+       bool        result;
+
+       if (! P_ISLEAF(pageop))
+           return(false);
+       result = _bt_cleanup_page(page, hnode);
+       if (!result || PageAddItem(page, (Item) item, size, offno,  
+               LP_USED) == InvalidOffsetNumber)
+           return(false);
+   }
+
+   return(true);
+}
+
+#endif
index 437b6637b2438bbe47f798b5c2c3d4e027a11fb1..4ca61e0c630c5f6c6d725f7652ffdcaf3ab4fa92 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: nbtree.h,v 1.43 2000/10/04 00:04:43 vadim Exp $
+ * $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -42,11 +42,28 @@ typedef struct BTPageOpaqueData
 #define BTP_FREE       (1 << 2)    /* not currently used... */
 #define BTP_META       (1 << 3)    /* Set in the meta-page only */
 
+#ifdef XLOG
+#define    BTP_REORDER     (1 << 4)    /* items must be re-ordered */
+#endif
 } BTPageOpaqueData;
 
 typedef BTPageOpaqueData *BTPageOpaque;
 
 #define BTREE_METAPAGE 0   /* first page is meta */
+#define BTREE_MAGIC        0x053162
+
+#define BTREE_VERSION  1
+
+typedef struct BTMetaPageData
+{
+   uint32      btm_magic;
+   uint32      btm_version;
+   BlockNumber btm_root;
+   int32       btm_level;
+} BTMetaPageData;
+
+#define BTPageGetMeta(p) \
+   ((BTMetaPageData *) &((PageHeader) p)->pd_linp[0])
 
 /*
  * BTScanOpaqueData is used to remember which buffers we're currently
@@ -228,13 +245,13 @@ typedef struct xl_btree_delete
 
 /* 
  * This is what we need to know about pure (without split) insert - 
- * 14 + [4] + btitem with key data. Note that we need in CommandID
- * (4 bytes) only for leaf page insert.
+ * 14 + [4+8] + btitem with key data. Note that we need in CommandID
+ * and HeapNode (4 + 8 bytes) only for leaf page insert.
  */
 typedef struct xl_btree_insert
 {
    xl_btreetid         target;     /* inserted tuple id */
-   /* [CommandID and ] BTITEM FOLLOWS AT END OF STRUCT */
+   /* [CommandID, HeapNode and ] BTITEM FOLLOWS AT END OF STRUCT */
 } xl_btree_insert;
 
 #define SizeOfBtreeInsert  (offsetof(xl_btreetid, tid) + SizeOfIptrData)
@@ -242,8 +259,8 @@ typedef struct xl_btree_insert
 
 /* 
  * This is what we need to know about insert with split - 
- * 22 + [4] + [btitem] + right sibling btitems. Note that we need in
- * CommandID (4 bytes) only for leaf page insert.
+ * 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in
+ * CommandID and HeapNode (4 + 8 bytes) only for leaf page insert.
  */
 typedef struct xl_btree_split
 {
@@ -255,7 +272,7 @@ typedef struct xl_btree_split
     * We log all btitems from the right sibling. If new btitem goes on
     * the left sibling then we log it too and it will be the first
     * BTItemData at the end of this struct, but after (for the leaf
-    * pages) CommandId.
+    * pages) CommandId and HeapNode.
     */
 } xl_btree_split;