More GIN refactoring.
authorHeikki Linnakangas
Wed, 20 Nov 2013 15:00:53 +0000 (17:00 +0200)
committerHeikki Linnakangas
Wed, 20 Nov 2013 15:01:33 +0000 (17:01 +0200)
Split off the portion of ginInsertValue that inserts the tuple to current
level into a separate function, ginPlaceToPage. ginInsertValue's charter
is now to recurse up the tree to insert the downlink, when a page split is
required.

This is in preparation for a patch to change the way incomplete splits are
handled, which will need to do these operations separately. And IMHO makes
the code more readable anyway.

src/backend/access/gin/ginbtree.c

index c4801d5cd8937fbb3977733ab756427e5b5fe847..7248b06326fdb279d7cf7ce28369e472c73a17d7 100644 (file)
@@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
 }
 
 /*
- * Insert value (stored in GinBtree) to tree described by stack
- *
- * During an index build, buildStats is non-null and the counters
- * it contains are incremented as needed.
+ * Returns true if the insertion is done, false if the page was split and
+ * downlink insertion is pending.
  *
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * stack->buffer is locked on entry, and is kept locked.
  */
-void
-ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static bool
+ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
+              GinStatsData *buildStats)
 {
-   GinBtreeStack *parent;
-   BlockNumber rootBlkno;
-   Page        page,
-               rpage,
-               lpage;
+   Page        page = BufferGetPage(stack->buffer);
+   XLogRecData *rdata;
+   bool        fit;
 
-   /* extract root BlockNumber from stack */
-   Assert(stack != NULL);
-   parent = stack;
-   while (parent->parent)
-       parent = parent->parent;
-   rootBlkno = parent->blkno;
-   Assert(BlockNumberIsValid(rootBlkno));
+   START_CRIT_SECTION();
+   fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
+   if (fit)
+   {
+       MarkBufferDirty(stack->buffer);
 
-   /* this loop crawls up the stack until the insertion is complete */
-   for (;;)
+       if (RelationNeedsWAL(btree->index))
+       {
+           XLogRecPtr  recptr;
+
+           recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+           PageSetLSN(page, recptr);
+       }
+
+       END_CRIT_SECTION();
+
+       return true;
+   }
+   else
    {
-       XLogRecData *rdata;
+       /* Didn't fit, have to split */
+       Buffer      rbuffer;
+       Page        newlpage;
        BlockNumber savedRightLink;
-       bool        fit;
+       GinBtreeStack *parent;
+       Page        lpage,
+                   rpage;
+
+       END_CRIT_SECTION();
+
+       rbuffer = GinNewBuffer(btree->index);
 
-       page = BufferGetPage(stack->buffer);
        savedRightLink = GinPageGetOpaque(page)->rightlink;
 
-       START_CRIT_SECTION();
-       fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
-       if (fit)
+       /*
+        * newlpage is a pointer to memory page, it is not associated with
+        * a buffer. stack->buffer is not touched yet.
+        */
+       newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
+
+       ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
+
+       /* During index build, count the newly-split page */
+       if (buildStats)
        {
+           if (btree->isData)
+               buildStats->nDataPages++;
+           else
+               buildStats->nEntryPages++;
+       }
+
+       parent = stack->parent;
+
+       if (parent == NULL)
+       {
+           /*
+            * split root, so we need to allocate new left page and place
+            * pointer on root to left and right page
+            */
+           Buffer      lbuffer = GinNewBuffer(btree->index);
+
+           ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
+           ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
+
+           lpage = BufferGetPage(lbuffer);
+           rpage = BufferGetPage(rbuffer);
+
+           GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
+           GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+           ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
+
+           START_CRIT_SECTION();
+
+           GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
+           PageRestoreTempPage(newlpage, lpage);
+           btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
+
+           MarkBufferDirty(rbuffer);
+           MarkBufferDirty(lbuffer);
            MarkBufferDirty(stack->buffer);
 
            if (RelationNeedsWAL(btree->index))
            {
                XLogRecPtr  recptr;
 
-               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
+               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
                PageSetLSN(page, recptr);
+               PageSetLSN(lpage, recptr);
+               PageSetLSN(rpage, recptr);
            }
 
-           LockBuffer(stack->buffer, GIN_UNLOCK);
-           END_CRIT_SECTION();
-
-           freeGinBtreeStack(stack);
-
-           return;
-       }
-       else
-       {
-           /* Didn't fit, have to split */
-           Buffer      rbuffer;
-           Page        newlpage;
-
+           UnlockReleaseBuffer(rbuffer);
+           UnlockReleaseBuffer(lbuffer);
            END_CRIT_SECTION();
 
-           rbuffer = GinNewBuffer(btree->index);
-
-           /*
-            * newlpage is a pointer to memory page, it is not associated with
-            * a buffer. stack->buffer is not touched yet.
-            */
-           newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
-
-           ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
-
-           /* During index build, count the newly-split page */
+           /* During index build, count the newly-added root page */
            if (buildStats)
            {
                if (btree->isData)
@@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
                    buildStats->nEntryPages++;
            }
 
-           parent = stack->parent;
-
-           if (parent == NULL)
-           {
-               /*
-                * split root, so we need to allocate new left page and place
-                * pointer on root to left and right page
-                */
-               Buffer      lbuffer = GinNewBuffer(btree->index);
-
-               ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
-               ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
-               page = BufferGetPage(stack->buffer);
-               lpage = BufferGetPage(lbuffer);
-               rpage = BufferGetPage(rbuffer);
-
-               GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
-               GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
-               ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
-               START_CRIT_SECTION();
-
-               GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
-               PageRestoreTempPage(newlpage, lpage);
-               btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
-               MarkBufferDirty(rbuffer);
-               MarkBufferDirty(lbuffer);
-               MarkBufferDirty(stack->buffer);
+           return true;
+       }
+       else
+       {
+           /* split non-root page */
+           ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
+           ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
 
-               if (RelationNeedsWAL(btree->index))
-               {
-                   XLogRecPtr  recptr;
+           lpage = BufferGetPage(stack->buffer);
+           rpage = BufferGetPage(rbuffer);
 
-                   recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-                   PageSetLSN(page, recptr);
-                   PageSetLSN(lpage, recptr);
-                   PageSetLSN(rpage, recptr);
-               }
+           GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+           GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
 
-               UnlockReleaseBuffer(rbuffer);
-               UnlockReleaseBuffer(lbuffer);
-               LockBuffer(stack->buffer, GIN_UNLOCK);
-               END_CRIT_SECTION();
+           START_CRIT_SECTION();
+           PageRestoreTempPage(newlpage, lpage);
 
-               freeGinBtreeStack(stack);
+           MarkBufferDirty(rbuffer);
+           MarkBufferDirty(stack->buffer);
 
-               /* During index build, count the newly-added root page */
-               if (buildStats)
-               {
-                   if (btree->isData)
-                       buildStats->nDataPages++;
-                   else
-                       buildStats->nEntryPages++;
-               }
+           if (RelationNeedsWAL(btree->index))
+           {
+               XLogRecPtr  recptr;
 
-               return;
+               recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+               PageSetLSN(lpage, recptr);
+               PageSetLSN(rpage, recptr);
            }
-           else
-           {
-               /* split non-root page */
-               ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
-               ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
+           UnlockReleaseBuffer(rbuffer);
+           END_CRIT_SECTION();
 
-               lpage = BufferGetPage(stack->buffer);
-               rpage = BufferGetPage(rbuffer);
+           return false;
+       }
+   }
+}
 
-               GinPageGetOpaque(rpage)->rightlink = savedRightLink;
-               GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+/*
+ * Insert value (stored in GinBtree) to tree described by stack
+ *
+ * During an index build, buildStats is non-null and the counters
+ * it contains are incremented as needed.
+ *
+ * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+{
+   GinBtreeStack *parent;
+   BlockNumber rootBlkno;
+   Page        page;
+
+   /* extract root BlockNumber from stack */
+   Assert(stack != NULL);
+   parent = stack;
+   while (parent->parent)
+       parent = parent->parent;
+   rootBlkno = parent->blkno;
+   Assert(BlockNumberIsValid(rootBlkno));
 
-               START_CRIT_SECTION();
-               PageRestoreTempPage(newlpage, lpage);
+   /* this loop crawls up the stack until the insertion is complete */
+   for (;;)
+   {
+       bool done;
 
-               MarkBufferDirty(rbuffer);
-               MarkBufferDirty(stack->buffer);
+       done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
 
-               if (RelationNeedsWAL(btree->index))
-               {
-                   XLogRecPtr  recptr;
+       /* just to be extra sure we don't delete anything by accident... */
+       btree->isDelete = FALSE;
 
-                   recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
-                   PageSetLSN(lpage, recptr);
-                   PageSetLSN(rpage, recptr);
-               }
-               UnlockReleaseBuffer(rbuffer);
-               END_CRIT_SECTION();
-           }
+       if (done)
+       {
+           LockBuffer(stack->buffer, GIN_UNLOCK);
+           freeGinBtreeStack(stack);
+           break;
        }
 
        btree->prepareDownlink(btree, stack->buffer);
-       btree->isDelete = FALSE;
 
        /* search parent to lock */
        LockBuffer(parent->buffer, GIN_EXCLUSIVE);