Rework completion of incomplete inserts. Now it writes
authorTeodor Sigaev
Fri, 19 May 2006 11:10:25 +0000 (11:10 +0000)
committerTeodor Sigaev
Fri, 19 May 2006 11:10:25 +0000 (11:10 +0000)
WAL log during inserts.

src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c

index 9b32304d1ae7c231ebb789ff4e4991c15422077f..a47d81db78e2f91be16318cbeb5929f118b59a96 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.22 2006/05/19 11:10:25 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -104,19 +104,25 @@ gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
 
    if (!gv->index->rd_istemp)
    {
-       XLogRecData rdata;
+       XLogRecData rdata[2];
        XLogRecPtr  recptr;
        gistxlogPageDelete  xlrec;
 
        xlrec.node = gv->index->rd_node;
        xlrec.blkno = blkno;
 
-       rdata.buffer = InvalidBuffer;
-       rdata.data = (char *) &xlrec;
-       rdata.len = sizeof(gistxlogPageDelete);
-       rdata.next = NULL;
+       rdata[0].buffer = buffer;
+       rdata[0].buffer_std = true;
+       rdata[0].data = NULL;
+       rdata[0].len = 0;
+       rdata[0].next = &(rdata[1]);
 
-       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
+       rdata[1].buffer = InvalidBuffer;
+       rdata[1].data = (char *) &xlrec;
+       rdata[1].len = sizeof(gistxlogPageDelete);
+       rdata[1].next = NULL;
+
+       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, rdata);
        PageSetLSN(page, recptr);
        PageSetTLI(page, ThisTimeLineID);
    }
index 01dab119b2ef01f1e7c5dcb1fa07043640792375..1126727cd97bec7d3c5947e8a8e730739116beca 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -73,8 +73,18 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
                     BlockNumber *blkno, int lenblk,
                     PageSplitRecord *xlinfo /* to extract blkno info */ )
 {
-   MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
-   gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
+   MemoryContext oldCxt;
+   gistIncompleteInsert *ninsert;
+
+   if ( !ItemPointerIsValid(&key) )
+       /* 
+        * if key is null then we should not store insertion as incomplete,
+        * because it's a vacuum operation..
+        */
+       return;
+
+   oldCxt = MemoryContextSwitchTo(insertCtx);
+   ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
 
    ninsert->node = node;
    ninsert->key = key;
@@ -115,6 +125,12 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
 {
    ListCell   *l;
 
+   if ( !ItemPointerIsValid(&key) )
+       return;
+
+   if (incomplete_inserts==NIL)
+       return;
+
    foreach(l, incomplete_inserts)
    {
        gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
@@ -180,16 +196,13 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
    Page        page;
 
    /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
-   if (ItemPointerIsValid(&(xldata->key)))
-   {
-       if (incomplete_inserts != NIL)
-           forgetIncompleteInsert(xldata->node, xldata->key);
+   forgetIncompleteInsert(xldata->node, xldata->key);
 
-       if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
-           pushIncompleteInsert(xldata->node, lsn, xldata->key,
-                                &(xldata->blkno), 1,
-                                NULL);
-   }
+   if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
+       /* operation with root always finalizes insertion */
+       pushIncompleteInsert(xldata->node, lsn, xldata->key,
+                            &(xldata->blkno), 1,
+                            NULL);
 
    /* nothing else to do if page was backed up (and no info to do it with) */
    if (record->xl_info & XLR_BKP_BLOCK_1)
@@ -252,12 +265,15 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
    Buffer      buffer;
    Page        page;
 
+   /* nothing else to do if page was backed up (and no info to do it with) */
+   if (record->xl_info & XLR_BKP_BLOCK_1)
+       return;
+
    reln = XLogOpenRelation(xldata->node);
    buffer = XLogReadBuffer(reln, xldata->blkno, false);
    if (!BufferIsValid(buffer))
        return;
 
-   GISTInitBuffer( buffer, 0 );
    page = (Page) BufferGetPage(buffer);
    GistPageSetDeleted(page);
 
@@ -333,15 +349,11 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
        UnlockReleaseBuffer(buffer);
    }
 
-   if (ItemPointerIsValid(&(xlrec.data->key)))
-   {
-       if (incomplete_inserts != NIL)
-           forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+   forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
 
-       pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
-                            NULL, 0,
-                            &xlrec);
-   }
+   pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
+                        NULL, 0,
+                        &xlrec);
 }
 
 static void
@@ -536,7 +548,43 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
            insert->path[i++] = ptr->blkno;
    }
    else
-       elog(LOG, "lost parent for block %u", insert->origblkno);
+       elog(ERROR, "lost parent for block %u", insert->origblkno);
+}
+
+static SplitedPageLayout*
+gistMakePageLayout(Buffer *buffers, int nbuffers) {
+   SplitedPageLayout   *res=NULL, *resptr;
+
+   while( nbuffers-- > 0 ) {
+       Page page = BufferGetPage( buffers[ nbuffers ] );
+       IndexTuple  idxtup;
+       OffsetNumber    i;
+       char *ptr;
+
+       resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
+
+       resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
+       resptr->block.num = PageGetMaxOffsetNumber( page );
+
+       for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
+           idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+           resptr->lenlist += IndexTupleSize(idxtup);
+       }
+
+       resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
+       ptr = (char*)(resptr->list);
+
+       for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
+           idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+           memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
+           ptr += IndexTupleSize(idxtup);
+       }
+
+       resptr->next = res;
+       res = resptr;
+   }
+
+   return res;
 }
 
 /*
@@ -548,11 +596,11 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
  * Note that we assume the index is now in a valid state, except for the
  * unfinished insertion.  In particular it's safe to invoke gistFindPath();
  * there shouldn't be any garbage pages for it to run into.
- *
- * Although stored LSN in gistIncompleteInsert is a LSN of child page,
- * we can compare it with LSN of parent, because parent is always locked
- * while we change child page (look at gistmakedeal). So if parent's LSN is
- * less than stored lsn then changes in parent aren't done yet.
+ * 
+ * To complete insert we can't use basic insertion algorithm because
+ * during insertion we can't call user-defined support functions of opclass.
+ * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
+ * 'invalid' tuple should be updated by vacuum full.
  */
 static void
 gistContinueInsert(gistIncompleteInsert *insert)
@@ -574,39 +622,27 @@ gistContinueInsert(gistIncompleteInsert *insert)
    for (i = 0; i < insert->lenblk; i++)
        itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
 
+   /*
+    * any insertion of itup[] should make LOG message about 
+    */
+
    if (insert->origblkno == GIST_ROOT_BLKNO)
    {
        /*
         * it was split root, so we should only make new root. it can't be
-        * simple insert into root, look at call pushIncompleteInsert in
-        * gistRedoPageSplitRecord
+        * simple insert into root, we should replace all content of root.
         */
        Buffer      buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
-       Page        page;
-
-       Assert(BufferIsValid(buffer));
-       page = BufferGetPage(buffer);
 
-       GISTInitBuffer(buffer, 0);
-       gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
-
-       PageSetLSN(page, insert->lsn);
-       PageSetTLI(page, ThisTimeLineID);
-
-       MarkBufferDirty(buffer);
+       gistnewroot(index, buffer, itup, lenitup, NULL);
        UnlockReleaseBuffer(buffer);
-
-       /*
-        * XXX fall out to avoid making LOG message at bottom of routine.
-        * I think the logic for when to emit that message is all wrong...
-        */
-       return;
    }
    else
    {
        Buffer     *buffers;
        Page       *pages;
        int         numbuffer;
+       OffsetNumber    *todelete;
 
        /* construct path */
        gistxlogFindPath(index, insert);
@@ -615,49 +651,60 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
        buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
        pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
+       todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
 
        for (i = 0; i < insert->pathlen; i++)
        {
            int         j,
                        k,
-                       pituplen = 0,
-                       childfound = 0;
+                       pituplen = 0;
+           XLogRecData     *rdata;
+           XLogRecPtr      recptr;
+           Buffer  tempbuffer = InvalidBuffer;
+           int     ntodelete = 0;
 
            numbuffer = 1;
-           buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
-           LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
-           pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
+           buffers[0] = ReadBuffer(index, insert->path[i]);
+           LockBuffer(buffers[0], GIST_EXCLUSIVE);
+           /*
+            * we check buffer, because we restored page earlier
+            */
+           gistcheckpage(index, buffers[0]);
 
-           if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
-           {
-               UnlockReleaseBuffer(buffers[numbuffer - 1]);
-               return;
-           }
+           pages[0] = BufferGetPage(buffers[0]);
+           Assert( !GistPageIsLeaf(pages[0]) );
 
-           pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]);
+           pituplen = PageGetMaxOffsetNumber(pages[0]);
 
-           /* remove old IndexTuples */
-           for (j = 0; j < pituplen && childfound < lenitup; j++)
+           /* find remove old IndexTuples to remove */
+           for (j = 0; j < pituplen && ntodelete < lenitup; j++)
            {
                BlockNumber blkno;
-               ItemId      iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber);
-               IndexTuple  idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid);
+               ItemId      iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
+               IndexTuple  idxtup = (IndexTuple) PageGetItem(pages[0], iid);
 
                blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
 
                for (k = 0; k < lenitup; k++)
                    if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
                    {
-                       PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber);
-                       j--;
-                       pituplen--;
-                       childfound++;
+                       todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
+                       ntodelete++;
                        break;
                    }
            }
 
-           if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
+           if ( ntodelete == 0 ) 
+               elog(PANIC,"gistContinueInsert: can't find pointer to page(s)");
+
+           /*
+            * we check space with subtraction only first tuple to delete, hope,
+            * that wiil be enough space....
+            */
+
+           if (gistnospace(pages[0], itup, lenitup, *todelete))
            {
+
                /* no space left on page, so we must split */
                buffers[numbuffer] = ReadBuffer(index, P_NEW);
                LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
@@ -668,62 +715,86 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
                if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
                {
-                   IndexTuple *parentitup;
+                   Buffer tmp;
 
                    /*
-                    * we split root, just copy tuples from old root to new
-                    * page
+                    * we split root, just copy content from root to new page
                     */
-                   parentitup = gistextractpage(pages[numbuffer - 1],
-                                                  &pituplen);
 
                    /* sanity check */
                    if (i + 1 != insert->pathlen)
                        elog(PANIC, "unexpected pathlen in index \"%s\"",
                             RelationGetRelationName(index));
 
-                   /* fill new page */
-                   buffers[numbuffer] = ReadBuffer(index, P_NEW);
-                   LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
-                   GISTInitBuffer(buffers[numbuffer], 0);
-                   pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
-                   gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
-                   numbuffer++;
-
-                   /* fill root page */
-                   GISTInitBuffer(buffers[0], 0);
-                   for (j = 1; j < numbuffer; j++)
-                   {
-                       IndexTuple  tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
-
-                       if (PageAddItem(pages[0],
-                                       (Item) tuple,
-                                       IndexTupleSize(tuple),
-                                       (OffsetNumber) j,
-                                       LP_USED) == InvalidOffsetNumber)
-                           elog(PANIC, "failed to add item to index page in \"%s\"",
-                                RelationGetRelationName(index));
-                   }
+                   /* fill new page, root will be changed later */
+                   tempbuffer = ReadBuffer(index, P_NEW);
+                   LockBuffer(tempbuffer, GIST_EXCLUSIVE);
+                   memcpy( BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer) );
+
+                   /* swap buffers[0] (was root) and temp buffer */
+                   tmp = buffers[0];
+                   buffers[0] = tempbuffer;
+                   tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, it is still unchanged */
+
+                   pages[0] = BufferGetPage(buffers[0]);
                }
+
+               START_CRIT_SECTION();
+
+               for(j=0;j
+                   PageIndexTupleDelete(pages[0], todelete[j]);
+
+               rdata = formSplitRdata(index->rd_node, insert->path[i],
+                                       false, &(insert->key), 
+                                       gistMakePageLayout( buffers, numbuffer ) );
+
+           } else {
+               START_CRIT_SECTION();
+
+               for(j=0;j
+                   PageIndexTupleDelete(pages[0], todelete[j]);
+               gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);
+
+               rdata = formUpdateRdata(index->rd_node, buffers[0], 
+                           todelete, ntodelete,
+                           itup, lenitup, &(insert->key)); 
            }
-           else
-               gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber);
 
-           lenitup = numbuffer;
+           /* 
+            * use insert->key as mark for completion of insert (form*Rdata() above)
+            * for following possible replays
+            */
+
+           /* write pages with XLOG LSN */
+           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
            for (j = 0; j < numbuffer; j++)
            {
-               itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
-               PageSetLSN(pages[j], insert->lsn);
+               PageSetLSN(pages[j], recptr);
                PageSetTLI(pages[j], ThisTimeLineID);
                GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
                MarkBufferDirty(buffers[j]);
+           }
+
+           END_CRIT_SECTION();
+
+           lenitup = numbuffer;
+           for (j = 0; j < numbuffer; j++) {
+               itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
                UnlockReleaseBuffer(buffers[j]);
            }
+
+           if ( tempbuffer != InvalidBuffer ) {
+               /*
+                * it was a root split, so fill it by new values
+                */
+               gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
+               UnlockReleaseBuffer(tempbuffer);
+           }
        }
    }
 
    ereport(LOG,
-   (errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery",
+   (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
            insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
     errdetail("Incomplete insertion detected during crash replay.")));
 }
@@ -747,6 +818,7 @@ gist_xlog_cleanup(void)
    MemoryContext oldCxt;
 
    oldCxt = MemoryContextSwitchTo(opCtx);
+
    foreach(l, incomplete_inserts)
    {
        gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);