Fix two distinct errors in creation of GIN_INSERT_LISTPAGE xlog records.
authorTom Lane
Tue, 15 Sep 2009 20:31:30 +0000 (20:31 +0000)
committerTom Lane
Tue, 15 Sep 2009 20:31:30 +0000 (20:31 +0000)
In practice these mistakes were always masked when full_page_writes was on,
because XLogInsert would always choose to log the full page, and then
ginRedoInsertListPage wouldn't try to do anything.  But with full_page_writes
off a WAL replay failure was certain.

The GIN_INSERT_LISTPAGE record type could probably be eliminated entirely
in favor of using XLOG_HEAP_NEWPAGE, but I refrained from doing that now
since it would have required a significantly more invasive patch.

In passing do a little bit of code cleanup, including making the accounting
for free space on GIN list pages more precise.  (This wasn't a bug as the
errors were always in the conservative direction.)

Per report from Simon.  Back-patch to 8.4 which contains the identical code.

src/backend/access/gin/ginfast.c

index 20887ba56cf5f7f3c2aaefd924dc7e4c0cab0f31..9d2351af52914d66a3eec0dda5509ca23732fec7 100644 (file)
@@ -11,7 +11,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *         $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.3 2009/06/11 14:48:53 momjian Exp $
+ *         $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.4 2009/09/15 20:31:30 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,13 +41,15 @@ typedef struct DatumArray
 
 /*
  * Build a pending-list page from the given array of tuples, and write it out.
+ *
+ * Returns amount of free space left on the page.
  */
 static int32
 writeListPage(Relation index, Buffer buffer,
              IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
 {
    Page        page = BufferGetPage(buffer);
-   int         i,
+   int32       i,
                freesize,
                size = 0;
    OffsetNumber l,
@@ -100,8 +102,6 @@ writeListPage(Relation index, Buffer buffer,
        GinPageGetOpaque(page)->maxoff = 0;
    }
 
-   freesize = PageGetFreeSpace(page);
-
    MarkBufferDirty(buffer);
 
    if (!index->rd_istemp)
@@ -110,26 +110,30 @@ writeListPage(Relation index, Buffer buffer,
        ginxlogInsertListPage data;
        XLogRecPtr  recptr;
 
-       rdata[0].buffer = buffer;
-       rdata[0].buffer_std = true;
+       data.node = index->rd_node;
+       data.blkno = BufferGetBlockNumber(buffer);
+       data.rightlink = rightlink;
+       data.ntuples = ntuples;
+
+       rdata[0].buffer = InvalidBuffer;
        rdata[0].data = (char *) &data;
        rdata[0].len = sizeof(ginxlogInsertListPage);
        rdata[0].next = rdata + 1;
 
-       rdata[1].buffer = InvalidBuffer;
+       rdata[1].buffer = buffer;
+       rdata[1].buffer_std = true;
        rdata[1].data = workspace;
        rdata[1].len = size;
        rdata[1].next = NULL;
 
-       data.blkno = BufferGetBlockNumber(buffer);
-       data.rightlink = rightlink;
-       data.ntuples = ntuples;
-
        recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata);
        PageSetLSN(page, recptr);
        PageSetTLI(page, ThisTimeLineID);
    }
 
+   /* get free space before releasing buffer */
+   freesize = PageGetExactFreeSpace(page);
+
    UnlockReleaseBuffer(buffer);
 
    END_CRIT_SECTION();
@@ -165,7 +169,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
            {
                res->nPendingPages++;
                writeListPage(index, prevBuffer,
-                             tuples + startTuple, i - startTuple,
+                             tuples + startTuple,
+                             i - startTuple,
                              BufferGetBlockNumber(curBuffer));
            }
            else
@@ -180,7 +185,7 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
 
        tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
 
-       if (size + tupsize >= GinListPageSize)
+       if (size + tupsize > GinListPageSize)
        {
            /* won't fit, force a new page and reprocess */
            i--;
@@ -197,7 +202,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
     */
    res->tail = BufferGetBlockNumber(curBuffer);
    res->tailFreeSize = writeListPage(index, curBuffer,
-                                  tuples + startTuple, ntuples - startTuple,
+                                     tuples + startTuple,
+                                     ntuples - startTuple,
                                      InvalidBlockNumber);
    res->nPendingPages++;
    /* that was only one heap tuple */
@@ -237,7 +243,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
    metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
    metapage = BufferGetPage(metabuffer);
 
-   if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE)
+   if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
    {
        /*
         * Total size is greater than one page => make sublist
@@ -265,13 +271,12 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
 
    if (separateList)
    {
-       GinMetaPageData sublist;
-
        /*
         * We should make sublist separately and append it to the tail
         */
-       memset(&sublist, 0, sizeof(GinMetaPageData));
+       GinMetaPageData sublist;
 
+       memset(&sublist, 0, sizeof(GinMetaPageData));
        makeSublist(index, collector->tuples, collector->ntuples, &sublist);
 
        /*
@@ -283,45 +288,44 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
        if (metadata->head == InvalidBlockNumber)
        {
            /*
-            * Sublist becomes main list
+            * Main list is empty, so just copy sublist into main list
             */
            START_CRIT_SECTION();
+
            memcpy(metadata, &sublist, sizeof(GinMetaPageData));
-           memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData));
        }
        else
        {
            /*
-            * merge lists
+            * Merge lists
             */
-
            data.prevTail = metadata->tail;
+           data.newRightlink = sublist.head;
+
            buffer = ReadBuffer(index, metadata->tail);
            LockBuffer(buffer, GIN_EXCLUSIVE);
            page = BufferGetPage(buffer);
+
            Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
 
            START_CRIT_SECTION();
 
            GinPageGetOpaque(page)->rightlink = sublist.head;
+
+           MarkBufferDirty(buffer);
+
            metadata->tail = sublist.tail;
            metadata->tailFreeSize = sublist.tailFreeSize;
 
            metadata->nPendingPages += sublist.nPendingPages;
            metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
-
-           memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
-           data.newRightlink = sublist.head;
-
-           MarkBufferDirty(buffer);
        }
    }
    else
    {
        /*
-        * Insert into tail page, metapage is already locked
+        * Insert into tail page.  Metapage is already locked
         */
-
        OffsetNumber l,
                    off;
        int         i,
@@ -331,6 +335,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
        buffer = ReadBuffer(index, metadata->tail);
        LockBuffer(buffer, GIN_EXCLUSIVE);
        page = BufferGetPage(buffer);
+
        off = (PageIsEmpty(page)) ? FirstOffsetNumber :
            OffsetNumberNext(PageGetMaxOffsetNumber(page));
 
@@ -368,20 +373,24 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
            off++;
        }
 
-       metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData);
-       memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+       Assert((ptr - rdata[1].data) <= collector->sumsize);
+
+       metadata->tailFreeSize = PageGetExactFreeSpace(page);
+
        MarkBufferDirty(buffer);
    }
 
    /*
-    * Make real write
+    * Write metabuffer, make xlog entry
     */
-
    MarkBufferDirty(metabuffer);
+
    if (!index->rd_istemp)
    {
        XLogRecPtr  recptr;
 
+       memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+
        recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
        PageSetLSN(metapage, recptr);
        PageSetTLI(metapage, ThisTimeLineID);
@@ -552,7 +561,6 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
            metadata->nPendingPages = 0;
            metadata->nPendingHeapTuples = 0;
        }
-       memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
 
        MarkBufferDirty(metabuffer);
 
@@ -567,6 +575,8 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
        {
            XLogRecPtr  recptr;
 
+           memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+
            recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
            PageSetLSN(metapage, recptr);
            PageSetTLI(metapage, ThisTimeLineID);