WAL for GiST. It work for online backup and so on, but on
authorTeodor Sigaev
Tue, 14 Jun 2005 11:45:14 +0000 (11:45 +0000)
committerTeodor Sigaev
Tue, 14 Jun 2005 11:45:14 +0000 (11:45 +0000)
recovery after crash (power loss etc) it may say that it can't restore
index and index should be reindexed.

Some refactoring code.

src/backend/access/gist/Makefile
src/backend/access/gist/gist.c
src/backend/access/gist/gistget.c
src/backend/access/gist/gistutil.c [new file with mode: 0644]
src/backend/access/gist/gistxlog.c [new file with mode: 0644]
src/backend/access/transam/rmgr.c
src/include/access/gist_private.h

index f2ec10a90af3a538eb7026d91b962f391f3d14f5..b22f846a23ddc841bf10cb2fbdb1fa2356631fd9 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for access/gist
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.12 2003/11/29 19:51:39 pgsql Exp $
+#    $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/access/gist
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = gist.o gistget.o gistscan.o
+OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o
 
 all: SUBSYS.o
 
index 2a55c91f0df347138cec17f62d19b2b399fee7ca..4e3faccdf92566c4cdd2df8319bac9dde55f23a3 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "miscadmin.h"
 #include "utils/memutils.h"
 
-
-#undef GIST_PAGEADDITEM
-
-#define ATTSIZE(datum, tupdesc, i, isnull) \
-   ( \
-       (isnull) ? 0 : \
-           att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
-   )
-
-/* result's status */
-#define INSERTED   0x01
-#define SPLITED        0x02
-
-/* group flags ( in gistSplit ) */
-#define LEFT_ADDED 0x01
-#define RIGHT_ADDED 0x02
-#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
-
-/*
- * This defines only for shorter code, used in gistgetadjusted
- * and gistadjsubkey only
- */
-#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb)  do { \
-   if (isnullkey) {                                              \
-       gistentryinit((evp), rkey, r, NULL,                       \
-                     (OffsetNumber) 0, rkeyb, FALSE);            \
-   } else {                                                      \
-       gistentryinit((evp), okey, r, NULL,                       \
-                     (OffsetNumber) 0, okeyb, FALSE);            \
-   }                                                             \
-} while(0)
-
-#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
-   FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b);     \
-   FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b);     \
-} while(0);
-
 /* Working state for gistbuild and its callback */
 typedef struct
 {
@@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index,
 static void gistdoinsert(Relation r,
             IndexTuple itup,
             GISTSTATE *GISTstate);
-static int gistlayerinsert(Relation r, BlockNumber blkno,
-               IndexTuple **itup,
-               int *len,
-               GISTSTATE *giststate);
-static OffsetNumber gistwritebuffer(Relation r,
-               Page page,
-               IndexTuple *itup,
-               int len,
-               OffsetNumber off);
-static bool gistnospace(Page page, IndexTuple *itvec, int len);
-static IndexTuple *gistreadbuffer(Buffer buffer, int *len);
-static IndexTuple *gistjoinvector(
-              IndexTuple *itvec, int *len,
-              IndexTuple *additvec, int addlen);
-static IndexTuple gistunion(Relation r, IndexTuple *itvec,
-         int len, GISTSTATE *giststate);
-
-static IndexTuple gistgetadjusted(Relation r,
-               IndexTuple oldtup,
-               IndexTuple addtup,
+static void gistfindleaf(GISTInsertState *state,
                GISTSTATE *giststate);
-static int gistfindgroup(GISTSTATE *giststate,
-             GISTENTRY *valvec, GIST_SPLITVEC *spl);
-static void gistadjsubkey(Relation r,
-             IndexTuple *itup, int *len,
-             GIST_SPLITVEC *v,
-             GISTSTATE *giststate);
-static IndexTuple gistFormTuple(GISTSTATE *giststate,
-           Relation r, Datum *attdata, int *datumsize, bool *isnull);
+
+
+typedef struct PageLayout {
+   gistxlogPage    block;
+   OffsetNumber    *list;
+   Buffer      buffer; /* to write after all proceed */
+
+   struct PageLayout *next;
+} PageLayout;
+
+
+#define ROTATEDIST(d) do { \
+   PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \
+   memset(tmp,0,sizeof(PageLayout)); \
+   tmp->next = (d); \
+   (d)=tmp; \
+} while(0)
+   
+
 static IndexTuple *gistSplit(Relation r,
          Buffer buffer,
          IndexTuple *itup,
          int *len,
+         PageLayout    **dist,
          GISTSTATE *giststate);
-static void gistnewroot(Relation r, IndexTuple *itup, int len);
-static void GISTInitBuffer(Buffer b, uint32 f);
-static OffsetNumber gistchoose(Relation r, Page p,
-          IndexTuple it,
-          GISTSTATE *giststate);
-static void gistdelete(Relation r, ItemPointer tid);
-
-#ifdef GIST_PAGEADDITEM
-static IndexTuple gist_tuple_replacekey(Relation r,
-                     GISTENTRY entry, IndexTuple t);
-#endif
-static void gistcentryinit(GISTSTATE *giststate, int nkey,
-              GISTENTRY *e, Datum k,
-              Relation r, Page pg,
-              OffsetNumber o, int b, bool l, bool isNull);
-static void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
-                             IndexTuple tuple, Page p, OffsetNumber o,
-                             GISTENTRY *attdata, bool *isnull);
-static void gistpenalty(GISTSTATE *giststate, int attno,
-           GISTENTRY *key1, bool isNull1,
-           GISTENTRY *key2, bool isNull2,
-           float *penalty);
+
 
 #undef GISTDEBUG
 
@@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS)
    /* initialize the root page */
    buffer = ReadBuffer(index, P_NEW);
    GISTInitBuffer(buffer, F_LEAF);
+   if ( !index->rd_istemp ) {
+       XLogRecPtr      recptr;
+       XLogRecData     rdata;
+       Page            page;
+
+       rdata.buffer     = InvalidBuffer;
+       rdata.data       = (char*)&(index->rd_node);
+       rdata.len        = sizeof(RelFileNode);
+       rdata.next       = NULL;
+
+       page = BufferGetPage(buffer);
+
+       START_CRIT_SECTION();
+
+       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
+       PageSetLSN(page, recptr);
+       PageSetTLI(page, ThisTimeLineID);
+
+       END_CRIT_SECTION();
+   }
    WriteBuffer(buffer);
 
    /* build the index */
@@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS)
    PG_RETURN_BOOL(true);
 }
 
-#ifdef GIST_PAGEADDITEM
-/*
- * Take a compressed entry, and install it on a page.  Since we now know
- * where the entry will live, we decompress it and recompress it using
- * that knowledge (some compression routines may want to fish around
- * on the page, for example, or do something special for leaf nodes.)
- */
-static OffsetNumber
-gistPageAddItem(GISTSTATE *giststate,
-               Relation r,
-               Page page,
-               Item item,
-               Size size,
-               OffsetNumber offsetNumber,
-               ItemIdFlags flags,
-               GISTENTRY *dentry,
-               IndexTuple *newtup)
-{
-   GISTENTRY   tmpcentry;
-   IndexTuple  itup = (IndexTuple) item;
-   OffsetNumber retval;
-   Datum       datum;
-   bool        IsNull;
-
-   /*
-    * recompress the item given that we now know the exact page and
-    * offset for insertion
-    */
-   datum = index_getattr(itup, 1, r->rd_att, &IsNull);
-   gistdentryinit(giststate, 0, dentry, datum,
-                  (Relation) 0, (Page) 0,
-                  (OffsetNumber) InvalidOffsetNumber,
-                  ATTSIZE(datum, r, 1, IsNull),
-                  FALSE, IsNull);
-   gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page,
-                  offsetNumber, dentry->bytes, FALSE);
-   *newtup = gist_tuple_replacekey(r, tmpcentry, itup);
-   retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
-                        offsetNumber, flags);
-   if (retval == InvalidOffsetNumber)
-       elog(ERROR, "failed to add index item to \"%s\"",
-            RelationGetRelationName(r));
-   return retval;
-}
-#endif
 
 /*
  * Workhouse routine for doing insertion into a GiST index. Note that
@@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate,
 static void
 gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
 {
-   IndexTuple *instup;
-   int         ret,
-               len = 1;
+   GISTInsertState state;
 
-   instup = (IndexTuple *) palloc(sizeof(IndexTuple));
-   instup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
-   memcpy(instup[0], itup, IndexTupleSize(itup));
+   memset(&state, 0, sizeof(GISTInsertState));
 
-   ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate);
-   if (ret & SPLITED)
-       gistnewroot(r, instup, len);
-}
+   state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
+   state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
+   memcpy(state.itup[0], itup, IndexTupleSize(itup));
+   state.ituplen=1;
+   state.r = r;
+   state.key = itup->t_tid;
+   state.needInsertComplete = true; 
+   state.xlog_mode = false;
 
-static int
-gistlayerinsert(Relation r, BlockNumber blkno,
-               IndexTuple **itup,      /* in - out, has compressed entry */
-               int *len,       /* in - out */
-               GISTSTATE *giststate)
-{
-   Buffer      buffer;
-   Page        page;
-   int         ret;
-   GISTPageOpaque opaque;
+   state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+   memset( state.stack, 0, sizeof(GISTInsertStack));
+   state.stack->blkno=GIST_ROOT_BLKNO;
 
-   buffer = ReadBuffer(r, blkno);
-   page = (Page) BufferGetPage(buffer);
-   opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+   gistfindleaf(&state, giststate);
+   gistmakedeal(&state, giststate);
+}
 
-   if (!(opaque->flags & F_LEAF))
+static bool
+gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
+   bool is_splitted = false;
+
+   if (gistnospace(state->stack->page, state->itup, state->ituplen))
    {
-        /*
-         * This is an internal page, so continue to walk down the
-         * tree. We find the child node that has the minimum insertion
-         * penalty and recursively invoke ourselves to modify that
-         * node. Once the recursive call returns, we may need to
-         * adjust the parent node for two reasons: the child node
-         * split, or the key in this node needs to be adjusted for the
-         * newly inserted key below us.
-         */
-       ItemId      iid;
-       BlockNumber nblkno;
-       ItemPointerData oldtid;
-       IndexTuple  oldtup;
-       OffsetNumber child;
-
-       child = gistchoose(r, page, *(*itup), giststate);
-       iid = PageGetItemId(page, child);
-       oldtup = (IndexTuple) PageGetItem(page, iid);
-       nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+       /* no space for insertion */
+       IndexTuple *itvec,
+                  *newitup;
+       int         tlen,olen;
+       PageLayout  *dist=NULL, *ptr;
+
+       memset(&dist, 0, sizeof(PageLayout));
+       is_splitted = true;
+       itvec = gistextractbuffer(state->stack->buffer, &tlen);
+       olen=tlen;
+       itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
+       newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+
+       if ( !state->r->rd_istemp && !state->xlog_mode) {
+           gistxlogPageSplit   xlrec;
+           XLogRecPtr      recptr;
+           XLogRecData     *rdata;
+           int i, npage = 0, cur=1;
+
+           ptr=dist;
+           while( ptr ) {
+               npage++;
+               ptr=ptr->next;
+           }
 
-       /*
-        * After this call: 1. if child page was splited, then itup
-        * contains keys for each page 2. if  child page wasn't splited,
-        * then itup contains additional for adjustment of current key
-        */
-       ret = gistlayerinsert(r, nblkno, itup, len, giststate);
+           rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2));
+
+           xlrec.node = state->r->rd_node;
+           xlrec.origblkno = state->stack->blkno;
+           xlrec.npage = npage;
+           xlrec.nitup = state->ituplen;
+           xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
+           xlrec.key = state->key;
+           xlrec.pathlen = (uint16)state->pathlen;
+
+           rdata[0].buffer = InvalidBuffer;
+           rdata[0].data   = (char *) &xlrec;
+           rdata[0].len    = sizeof( gistxlogPageSplit );
+           rdata[0].next   = NULL;
+
+           if ( state->pathlen>=0 ) {
+               rdata[0].next   = &(rdata[1]);
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].data   = (char *) (state->path);
+               rdata[1].len    = sizeof( BlockNumber ) * state->pathlen;
+               rdata[1].next   = NULL;
+               cur++;
+           }
+           
+           /* new tuples */    
+           for(i=0;iituplen;i++) {
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data   = (char*)(state->itup[i]);
+               rdata[cur].len  = IndexTupleSize(state->itup[i]);
+               rdata[cur-1].next = &(rdata[cur]);
+               cur++;
+           }
 
-       /* nothing inserted in child */
-       if (!(ret & INSERTED))
-       {
-           ReleaseBuffer(buffer);
-           return 0x00;
-       }
+           /* new page layout */
+           ptr=dist;
+           while(ptr) {
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data   = (char*)&(ptr->block);
+               rdata[cur].len  = sizeof(gistxlogPage);
+               rdata[cur-1].next = &(rdata[cur]);
+               cur++;
+
+               rdata[cur].buffer = InvalidBuffer;
+               rdata[cur].data   = (char*)(ptr->list);
+               rdata[cur].len    = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
+               if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
+                   rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+               rdata[cur-1].next = &(rdata[cur]);
+               rdata[cur].next=NULL;
+               cur++;
+               
+               ptr=ptr->next;
+           }
 
-       /* child did not split */
-       if (!(ret & SPLITED))
-       {
-           IndexTuple  newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
+           START_CRIT_SECTION();
 
-           if (!newtup)
-           {
-               /* not need to update key */
-               ReleaseBuffer(buffer);
-               return 0x00;
+           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+           ptr = dist;
+           while(ptr) {    
+               PageSetLSN(BufferGetPage(ptr->buffer), recptr);
+               PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+               ptr=ptr->next;
            }
 
-           (*itup)[0] = newtup;
+           END_CRIT_SECTION();
        }
 
-        /*
-         * This node's key has been modified, either because a child
-         * split occurred or because we needed to adjust our key for
-         * an insert in a child node. Therefore, remove the old
-         * version of this node's key.
-         */
-       ItemPointerSet(&oldtid, blkno, child);
-       gistdelete(r, &oldtid);
-
-       /*
-        * if child was splitted, new key for child will be inserted in
-        * the end list of child, so we must say to any scans that page is
-        * changed beginning from 'child' offset
-        */
-       if (ret & SPLITED)
-           gistadjscans(r, GISTOP_SPLIT, blkno, child);
-   }
+       ptr = dist;
+       while(ptr) {
+           WriteBuffer(ptr->buffer);
+           ptr=ptr->next;
+       }
 
-   ret = INSERTED;
+       state->itup = newitup;
+       state->ituplen = tlen;          /* now tlen >= 2 */
 
-   if (gistnospace(page, (*itup), *len))
-   {
-       /* no space for insertion */
-       IndexTuple *itvec,
-                  *newitup;
-       int         tlen,
-                   oldlen;
-
-       ret |= SPLITED;
-       itvec = gistreadbuffer(buffer, &tlen);
-       itvec = gistjoinvector(itvec, &tlen, (*itup), *len);
-       oldlen = *len;
-       newitup = gistSplit(r, buffer, itvec, &tlen, giststate);
-       ReleaseBuffer(buffer);
-       *itup = newitup;
-       *len = tlen;            /* now tlen >= 2 */
+       if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
+           gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode);
+           state->needInsertComplete=false;
+       }
+       if ( state->xlog_mode ) 
+           LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(state->stack->buffer);
    }
    else
    {
        /* enough space */
-       OffsetNumber off,
-                   l;
+       OffsetNumber off, l;
 
-       off = (PageIsEmpty(page)) ?
+       off = (PageIsEmpty(state->stack->page)) ?
            FirstOffsetNumber
            :
-           OffsetNumberNext(PageGetMaxOffsetNumber(page));
-       l = gistwritebuffer(r, page, *itup, *len, off);
-       WriteBuffer(buffer);
+           OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
+       l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
+       if ( !state->r->rd_istemp && !state->xlog_mode) {
+           gistxlogEntryUpdate xlrec;
+           XLogRecPtr      recptr;
+           XLogRecData     *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) );
+           int i, cur=0;
+           
+           xlrec.node = state->r->rd_node;
+           xlrec.blkno = state->stack->blkno;
+           xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
+           xlrec.key = state->key;
+           xlrec.pathlen = (uint16)state->pathlen;
+
+           rdata[0].buffer = InvalidBuffer;
+           rdata[0].data   = (char *) &xlrec;
+           rdata[0].len    = sizeof( gistxlogEntryUpdate );
+           rdata[0].next   = NULL;
+
+           if ( state->pathlen>=0 ) {
+               rdata[0].next   = &(rdata[1]);
+               rdata[1].buffer = InvalidBuffer;
+               rdata[1].data   = (char *) (state->path);
+               rdata[1].len    = sizeof( BlockNumber ) * state->pathlen;
+               rdata[1].next   = NULL;
+               cur++;
+           }
+
+           for(i=1; i<=state->ituplen; i++) { /* adding tuples */
+               rdata[i+cur].buffer = InvalidBuffer;
+               rdata[i+cur].data   = (char*)(state->itup[i-1]);
+               rdata[i+cur].len    = IndexTupleSize(state->itup[i-1]);
+               rdata[i+cur].next   = NULL;
+               rdata[i-1+cur].next = &(rdata[i+cur]);
+           }   
+           
+           START_CRIT_SECTION();
+
+           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+           PageSetLSN(state->stack->page, recptr);
+           PageSetTLI(state->stack->page, ThisTimeLineID);
+
+           END_CRIT_SECTION();
+       }
+
+       if ( state->stack->blkno == GIST_ROOT_BLKNO ) 
+                        state->needInsertComplete=false;
 
-       if (*len > 1)
-       {                       /* previous insert ret & SPLITED != 0 */
+       if ( state->xlog_mode ) 
+           LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(state->stack->buffer);
+
+       if (state->ituplen > 1)
+       {                       /* previous is_splitted==true */
            /*
             * child was splited, so we must form union for insertion in
             * parent
             */
-           IndexTuple  newtup = gistunion(r, (*itup), *len, giststate);
-           ItemPointerSet(&(newtup->t_tid), blkno, 1);
-           (*itup)[0] = newtup;
-           *len = 1;
+           IndexTuple  newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
+           ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber);
+           state->itup[0] = newtup;
+           state->ituplen = 1;
        }
    }
-
-   return ret;
-}
-
-/*
- * Write itup vector to page, has no control of free space
- */
-static OffsetNumber
-gistwritebuffer(Relation r, Page page, IndexTuple *itup,
-               int len, OffsetNumber off)
-{
-   OffsetNumber l = InvalidOffsetNumber;
-   int         i;
-
-   for (i = 0; i < len; i++)
-   {
-#ifdef GIST_PAGEADDITEM
-       GISTENTRY   tmpdentry;
-       IndexTuple  newtup;
-       bool        IsNull;
-
-       l = gistPageAddItem(giststate, r, page,
-                           (Item) itup[i], IndexTupleSize(itup[i]),
-                           off, LP_USED, &tmpdentry, &newtup);
-       off = OffsetNumberNext(off);
-#else
-       l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
-                       off, LP_USED);
-       if (l == InvalidOffsetNumber)
-           elog(ERROR, "failed to add index item to \"%s\"",
-                RelationGetRelationName(r));
-#endif
-   }
-   return l;
+   return is_splitted;
 }
 
-/*
- * Check space for itup vector on page
- */
-static bool
-gistnospace(Page page, IndexTuple *itvec, int len)
-{
-   unsigned int size = 0;
-   int         i;
-
-   for (i = 0; i < len; i++)
-       size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
-
-   return (PageGetFreeSpace(page) < size);
-}
-
-/*
- * Read buffer into itup vector
- */
-static IndexTuple *
-gistreadbuffer(Buffer buffer, int *len /* out */ )
-{
-   OffsetNumber i,
-               maxoff;
-   IndexTuple *itvec;
-   Page        p = (Page) BufferGetPage(buffer);
-
-   maxoff = PageGetMaxOffsetNumber(p);
-   *len = maxoff;
-   itvec = palloc(sizeof(IndexTuple) * maxoff);
-   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-       itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
-
-   return itvec;
-}
-
-/*
- * join two vectors into one
- */
-static IndexTuple *
-gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
-{
-   itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
-   memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
-   *len += addlen;
-   return itvec;
-}
-
-/*
- * Return an IndexTuple containing the result of applying the "union"
- * method to the specified IndexTuple vector.
- */
-static IndexTuple
-gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+static void
+gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
 {
-   Datum       attr[INDEX_MAX_KEYS];
-   bool        isnull[INDEX_MAX_KEYS];
-   GistEntryVector *evec;
-   int         i;
-   GISTENTRY   centry[INDEX_MAX_KEYS];
-
-   evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
-
-   for (i = 0; i < r->rd_att->natts; i++)
-   {
-       Datum       datum;
-       int         j;
-       int         real_len;
+   ItemId      iid;
+   IndexTuple  oldtup;
+   GISTInsertStack *ptr;
 
-       real_len = 0;
-       for (j = 0; j < len; j++)
-       {
-           bool        IsNull;
-           datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
-           if (IsNull)
-               continue;
-
-           gistdentryinit(giststate, i,
-                          &(evec->vector[real_len]),
-                          datum,
-                          NULL, NULL, (OffsetNumber) 0,
-                          ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-                          FALSE, IsNull);
-           real_len++;
-       }
+   /* walk down */
+   while( true ) { 
+       GISTPageOpaque opaque;
 
-       /* If this tuple vector was all NULLs, the union is NULL */
-       if (real_len == 0)
-       {
-           attr[i] = (Datum) 0;
-           isnull[i] = TRUE;
-       }
-       else
+       state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
+       state->stack->page = (Page) BufferGetPage(state->stack->buffer);
+       opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page);
+   
+       if (!(opaque->flags & F_LEAF))
        {
-           int datumsize;
-
-           if (real_len == 1)
-           {
-               evec->n = 2;
-               gistentryinit(evec->vector[1],
-                             evec->vector[0].key, r, NULL,
-                             (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
-           }
-           else
-               evec->n = real_len;
-
-           /* Compress the result of the union and store in attr array */
-           datum = FunctionCall2(&giststate->unionFn[i],
-                                 PointerGetDatum(evec),
-                                 PointerGetDatum(&datumsize));
-
-           gistcentryinit(giststate, i, ¢ry[i], datum,
-                          NULL, NULL, (OffsetNumber) 0,
-                          datumsize, FALSE, FALSE);
-           isnull[i] = FALSE;
-           attr[i] = centry[i].key;
-       }
+               /*
+               * This is an internal page, so continue to walk down the
+               * tree. We find the child node that has the minimum insertion
+               * penalty and recursively invoke ourselves to modify that
+               * node. Once the recursive call returns, we may need to
+               * adjust the parent node for two reasons: the child node
+               * split, or the key in this node needs to be adjusted for the
+               * newly inserted key below us.
+               */
+           GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack));
+   
+           state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
+
+           iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
+           oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
+           item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
+           item->parent = state->stack;
+           item->todelete = false;
+           state->stack = item;
+       } else 
+           break;
    }
 
-   return index_form_tuple(giststate->tupdesc, attr, isnull);
-}
-
-
-/*
- * Forms union of oldtup and addtup, if union == oldtup then return NULL
- */
-static IndexTuple
-gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
-{
-   GistEntryVector *evec;
-   bool        neednew = false;
-   bool        isnull[INDEX_MAX_KEYS];
-   Datum       attr[INDEX_MAX_KEYS];
-   GISTENTRY   centry[INDEX_MAX_KEYS],
-               oldatt[INDEX_MAX_KEYS],
-               addatt[INDEX_MAX_KEYS],
-              *ev0p,
-              *ev1p;
-   bool        oldisnull[INDEX_MAX_KEYS],
-               addisnull[INDEX_MAX_KEYS];
-   IndexTuple  newtup = NULL;
-   int         i;
-
-   evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
-   evec->n = 2;
-   ev0p = &(evec->vector[0]);
-   ev1p = &(evec->vector[1]);
-
-   gistDeCompressAtt(giststate, r, oldtup, NULL,
-                     (OffsetNumber) 0, oldatt, oldisnull);
-
-   gistDeCompressAtt(giststate, r, addtup, NULL,
-                     (OffsetNumber) 0, addatt, addisnull);
-
-   for (i = 0; i < r->rd_att->natts; i++)
-   {
-       if (oldisnull[i] && addisnull[i])
-       {
-           attr[i] = (Datum) 0;
-           isnull[i] = TRUE;
-       }
-       else
-       {
-           Datum       datum;
-           int         datumsize;
-
-           FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
-                  addisnull[i], addatt[i].key, addatt[i].bytes);
-
-           datum = FunctionCall2(&giststate->unionFn[i],
-                                 PointerGetDatum(evec),
-                                 PointerGetDatum(&datumsize));
-
-           if (oldisnull[i] || addisnull[i])
-           {
-               if (oldisnull[i])
-                   neednew = true;
-           }
-           else
-           {
-               bool    result;
-
-               FunctionCall3(&giststate->equalFn[i],
-                             ev0p->key,
-                             datum,
-                             PointerGetDatum(&result));
+   /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */
 
-               if (!result)
-                   neednew = true;
-           }
-
-           gistcentryinit(giststate, i, ¢ry[i], datum,
-                          NULL, NULL, (OffsetNumber) 0,
-                          datumsize, FALSE, FALSE);
-
-           attr[i] = centry[i].key;
-           isnull[i] = FALSE;
-       }
+   /* form state->path to work xlog */
+   ptr = state->stack;
+   state->pathlen=1;
+   while( ptr ) {
+       state->pathlen++;
+       ptr=ptr->parent;
    }
-
-   if (neednew)
-   {
-       /* need to update key */
-       newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
-       newtup->t_tid = oldtup->t_tid;
+   state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen);
+   ptr = state->stack;
+   state->pathlen=0;
+   while( ptr ) {
+       state->path[ state->pathlen ] = ptr->blkno;
+       state->pathlen++;
+       ptr=ptr->parent;
    }
-
-   return newtup;
+   state->pathlen--;
+   state->path++;
 }
 
-static void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
-{
-   int lr;
-
-   for (lr = 0; lr < 2; lr++)
-   {
-       OffsetNumber *entries;
-       int         i;
-       Datum      *attr;
-       int         len,
-                   *attrsize;
-       bool       *isnull;
-       GistEntryVector *evec;
-
-       if (lr)
-       {
-           attrsize = spl->spl_lattrsize;
-           attr = spl->spl_lattr;
-           len = spl->spl_nleft;
-           entries = spl->spl_left;
-           isnull = spl->spl_lisnull;
-       }
-       else
-       {
-           attrsize = spl->spl_rattrsize;
-           attr = spl->spl_rattr;
-           len = spl->spl_nright;
-           entries = spl->spl_right;
-           isnull = spl->spl_risnull;
-       }
-
-       evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
-
-       for (i = 1; i < r->rd_att->natts; i++)
-       {
-           int         j;
-           Datum       datum;
-           int         datumsize;
-           int         real_len;
-
-           real_len = 0;
-           for (j = 0; j < len; j++)
-           {
-               bool        IsNull;
-
-               if (spl->spl_idgrp[entries[j]])
-                   continue;
-               datum = index_getattr(itvec[entries[j] - 1], i + 1,
-                                     giststate->tupdesc, &IsNull);
-               if (IsNull)
-                   continue;
-               gistdentryinit(giststate, i,
-                              &(evec->vector[real_len]),
-                              datum,
-                              NULL, NULL, (OffsetNumber) 0,
-                              ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-                              FALSE, IsNull);
-               real_len++;
-
-           }
 
-           if (real_len == 0)
-           {
-               datum = (Datum) 0;
-               datumsize = 0;
-               isnull[i] = true;
-           }
-           else
-           {
-               /*
-                * evec->vector[0].bytes may be not defined, so form union
-                * with itself
-                */
-               if (real_len == 1)
-               {
-                   evec->n = 2;
-                   memcpy(&(evec->vector[1]), &(evec->vector[0]),
-                          sizeof(GISTENTRY));
-               }
-               else
-                   evec->n = real_len;
-               datum = FunctionCall2(&giststate->unionFn[i],
-                                     PointerGetDatum(evec),
-                                     PointerGetDatum(&datumsize));
-               isnull[i] = false;
-           }
+void
+gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
+   int         is_splitted;
+   ItemId      iid;
+   IndexTuple  oldtup, newtup;
 
-           attr[i] = datum;
-           attrsize[i] = datumsize;
-       }
-   }
-}
+   /* walk up */
+   while( true ) {
+                /*
+                 * After this call: 1. if child page was splited, then itup
+                 * contains keys for each page 2. if  child page wasn't splited,
+                 * then itup contains additional for adjustment of current key
+                 */
 
-/*
- * find group in vector with equal value
- */
-static int
-gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
-{
-   int         i;
-   int         curid = 1;
-
-   /*
-    * first key is always not null (see gistinsert), so we may not check
-    * for nulls
-    */
-   for (i = 0; i < spl->spl_nleft; i++)
-   {
-       int j;
-       int len;
-       bool result;
-
-       if (spl->spl_idgrp[spl->spl_left[i]])
-           continue;
-       len = 0;
-       /* find all equal value in right part */
-       for (j = 0; j < spl->spl_nright; j++)
-       {
-           if (spl->spl_idgrp[spl->spl_right[j]])
-               continue;
-           FunctionCall3(&giststate->equalFn[0],
-                         valvec[spl->spl_left[i]].key,
-                         valvec[spl->spl_right[j]].key,
-                         PointerGetDatum(&result));
-           if (result)
-           {
-               spl->spl_idgrp[spl->spl_right[j]] = curid;
-               len++;
-           }
-       }
-       /* find all other equal value in left part */
-       if (len)
-       {
-           /* add current val to list of equal values */
-           spl->spl_idgrp[spl->spl_left[i]] = curid;
-           /* searching .. */
-           for (j = i + 1; j < spl->spl_nleft; j++)
-           {
-               if (spl->spl_idgrp[spl->spl_left[j]])
-                   continue;
-               FunctionCall3(&giststate->equalFn[0],
-                             valvec[spl->spl_left[i]].key,
-                             valvec[spl->spl_left[j]].key,
-                             PointerGetDatum(&result));
-               if (result)
-               {
-                   spl->spl_idgrp[spl->spl_left[j]] = curid;
-                   len++;
-               }
-           }
-           spl->spl_ngrp[curid] = len + 1;
-           curid++;
-       }
-   }
+       is_splitted = gistplacetopage(state, giststate );
 
-   return curid;
-}
+       /* pop page from stack */
+       state->stack = state->stack->parent;
+       state->pathlen--;
+       state->path++;
+   
+       /* stack is void */
+       if ( ! state->stack )
+           break;
 
-/*
- * Insert equivalent tuples to left or right page with minimum
- * penalty
- */
-static void
-gistadjsubkey(Relation r,
-             IndexTuple *itup, /* contains compressed entry */
-             int *len,
-             GIST_SPLITVEC *v,
-             GISTSTATE *giststate)
-{
-   int         curlen;
-   OffsetNumber *curwpos;
-   GISTENTRY   entry,
-               identry[INDEX_MAX_KEYS],
-              *ev0p,
-              *ev1p;
-   float       lpenalty,
-               rpenalty;
-   GistEntryVector *evec;
-   int         datumsize;
-   bool        isnull[INDEX_MAX_KEYS];
-   int         i,
-               j;
 
-   /* clear vectors */
-   curlen = v->spl_nleft;
-   curwpos = v->spl_left;
-   for (i = 0; i < v->spl_nleft; i++)
-   {
-       if (v->spl_idgrp[v->spl_left[i]] == 0)
+       /* child did not split */
+       if (!is_splitted)
        {
-           *curwpos = v->spl_left[i];
-           curwpos++;
-       }
-       else
-           curlen--;
-   }
-   v->spl_nleft = curlen;
+           /* parent's tuple */
+           iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
+           oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
+           newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
+   
+           if (!newtup) /* not need to update key */
+               break;
 
-   curlen = v->spl_nright;
-   curwpos = v->spl_right;
-   for (i = 0; i < v->spl_nright; i++)
-   {
-       if (v->spl_idgrp[v->spl_right[i]] == 0)
-       {
-           *curwpos = v->spl_right[i];
-           curwpos++;
+           state->itup[0] = newtup;    
        }
-       else
-           curlen--;
+   
+           /*
+            * This node's key has been modified, either because a child
+            * split occurred or because we needed to adjust our key for
+            * an insert in a child node. Therefore, remove the old
+            * version of this node's key.
+            */
+       gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
+       PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
+       if ( !state->r->rd_istemp ) 
+           state->stack->todelete = true;
+               
+       /*
+        * if child was splitted, new key for child will be inserted in
+        * the end list of child, so we must say to any scans that page is
+        * changed beginning from 'child' offset
+        */
+       if (is_splitted)
+           gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum);
+   } /* while */
+
+   /* release all buffers */
+   while( state->stack ) {
+       if ( state->xlog_mode ) 
+           LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(state->stack->buffer);
+       state->stack = state->stack->parent;
    }
-   v->spl_nright = curlen;
 
-   evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
-   evec->n = 2;
-   ev0p = &(evec->vector[0]);
-   ev1p = &(evec->vector[1]);
+   /* say to xlog that insert is completed */
+   if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) {
+       gistxlogInsertComplete  xlrec;
+       XLogRecData     rdata;
+           
+       xlrec.node = state->r->rd_node;
+       xlrec.key = state->key;
+           
+       rdata.buffer = InvalidBuffer;
+       rdata.data   = (char *) &xlrec;
+       rdata.len    = sizeof( gistxlogInsertComplete );
+       rdata.next   = NULL;
 
-   /* add equivalent tuple */
-   for (i = 0; i < *len; i++)
-   {
-       Datum       datum;
+       START_CRIT_SECTION();
 
-       if (v->spl_idgrp[i + 1] == 0)   /* already inserted */
-           continue;
-       gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
-                         identry, isnull);
+       XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata);
 
-       v->spl_ngrp[v->spl_idgrp[i + 1]]--;
-       if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
-           (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
-       {
-           /* force last in group */
-           rpenalty = 1.0;
-           lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
-       }
-       else
-       {
-           /* where? */
-           for (j = 1; j < r->rd_att->natts; j++)
-           {
-               gistentryinit(entry, v->spl_lattr[j], r, NULL,
-                          (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
-               gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
-                           &identry[j], isnull[j], &lpenalty);
-
-               gistentryinit(entry, v->spl_rattr[j], r, NULL,
-                          (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
-               gistpenalty(giststate, j, &entry, v->spl_risnull[j],
-                           &identry[j], isnull[j], &rpenalty);
-
-               if (lpenalty != rpenalty)
-                   break;
-           }
-       }
-
-       /*
-        * add
-        * XXX: refactor this to avoid duplicating code
-        */
-       if (lpenalty < rpenalty)
-       {
-           v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
-           v->spl_left[v->spl_nleft] = i + 1;
-           v->spl_nleft++;
-           for (j = 1; j < r->rd_att->natts; j++)
-           {
-               if (isnull[j] && v->spl_lisnull[j])
-               {
-                   v->spl_lattr[j] = (Datum) 0;
-                   v->spl_lattrsize[j] = 0;
-               }
-               else
-               {
-                   FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
-                          isnull[j], identry[j].key, identry[j].bytes);
-
-                   datum = FunctionCall2(&giststate->unionFn[j],
-                                         PointerGetDatum(evec),
-                                         PointerGetDatum(&datumsize));
-
-                   v->spl_lattr[j] = datum;
-                   v->spl_lattrsize[j] = datumsize;
-                   v->spl_lisnull[j] = false;
-               }
-           }
-       }
-       else
-       {
-           v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
-           v->spl_right[v->spl_nright] = i + 1;
-           v->spl_nright++;
-           for (j = 1; j < r->rd_att->natts; j++)
-           {
-               if (isnull[j] && v->spl_risnull[j])
-               {
-                   v->spl_rattr[j] = (Datum) 0;
-                   v->spl_rattrsize[j] = 0;
-               }
-               else
-               {
-                   FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
-                          isnull[j], identry[j].key, identry[j].bytes);
-
-                   datum = FunctionCall2(&giststate->unionFn[j],
-                                         PointerGetDatum(evec),
-                                         PointerGetDatum(&datumsize));
-
-                   v->spl_rattr[j] = datum;
-                   v->spl_rattrsize[j] = datumsize;
-                   v->spl_risnull[j] = false;
-               }
-           }
-       }
+       END_CRIT_SECTION();
    }
 }
 
@@ -1105,6 +674,7 @@ gistSplit(Relation r,
          Buffer buffer,
          IndexTuple *itup,     /* contains compressed entry */
          int *len,
+         PageLayout    **dist,
          GISTSTATE *giststate)
 {
    Page        p;
@@ -1225,29 +795,57 @@ gistSplit(Relation r,
    /* write on disk (may need another split) */
    if (gistnospace(right, rvectup, v.spl_nright))
    {
+       int i;
+       PageLayout *d, *origd=*dist;
+   
        nlen = v.spl_nright;
-       newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate);
+       newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
+       /* XLOG stuff */
+       d=*dist;
+       /* translate offsetnumbers to our */
+       while( d && d!=origd ) {
+           for(i=0;iblock.num;i++)
+               d->list[i] = v.spl_right[ d->list[i]-1 ]; 
+           d=d->next;
+       }
        ReleaseBuffer(rightbuf);
    }
    else
    {
        OffsetNumber l;
 
-       l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
-       WriteBuffer(rightbuf);
-
+       l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
+       /* XLOG stuff */
+       ROTATEDIST(*dist);
+       (*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
+       (*dist)->block.num = v.spl_nright;
+       (*dist)->list = v.spl_right;
+       (*dist)->buffer = rightbuf;
        nlen = 1;
        newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
        newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull);
-       ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1);
+       ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber);
    }
 
    if (gistnospace(left, lvectup, v.spl_nleft))
    {
        int         llen = v.spl_nleft;
        IndexTuple *lntup;
-
-       lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate);
+       int i;
+       PageLayout *d, *origd=*dist;
+
+       lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
+
+       /* XLOG stuff */
+       d=*dist;
+       /* translate offsetnumbers to our */
+       while( d && d!=origd ) {
+           for(i=0;iblock.num;i++)
+               d->list[i] = v.spl_left[ d->list[i]-1 ]; 
+           d=d->next;
+       }
+       
        ReleaseBuffer(leftbuf);
 
        newtup = gistjoinvector(newtup, &nlen, lntup, llen);
@@ -1256,151 +854,76 @@ gistSplit(Relation r,
    {
        OffsetNumber l;
 
-       l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
+       l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
        if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
            PageRestoreTempPage(left, p);
 
-       WriteBuffer(leftbuf);
-
+       /* XLOG stuff */
+       ROTATEDIST(*dist);
+       (*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
+       (*dist)->block.num = v.spl_nleft;
+       (*dist)->list = v.spl_left;
+       (*dist)->buffer = leftbuf;
        nlen += 1;
        newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
        newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
-       ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1);
+       ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber);
    }
 
    *len = nlen;
    return newtup;
 }
 
-static void
-gistnewroot(Relation r, IndexTuple *itup, int len)
-{
-   Buffer      b;
-   Page        p;
-
-   b = ReadBuffer(r, GIST_ROOT_BLKNO);
-   GISTInitBuffer(b, 0);
-   p = BufferGetPage(b);
-
-   gistwritebuffer(r, p, itup, len, FirstOffsetNumber);
-   WriteBuffer(b);
-}
-
-static void
-GISTInitBuffer(Buffer b, uint32 f)
+void
+gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode)
 {
-   GISTPageOpaque opaque;
+   Buffer      buffer;
    Page        page;
-   Size        pageSize;
-
-   pageSize = BufferGetPageSize(b);
-   page = BufferGetPage(b);
-   PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
-
-   opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
-   opaque->flags = f;
-}
-
-/*
- * find entry with lowest penalty
- */
-static OffsetNumber
-gistchoose(Relation r, Page p, IndexTuple it,  /* it has compressed entry */
-          GISTSTATE *giststate)
-{
-   OffsetNumber maxoff;
-   OffsetNumber i;
-   OffsetNumber which;
-   float       sum_grow,
-               which_grow[INDEX_MAX_KEYS];
-   GISTENTRY   entry,
-               identry[INDEX_MAX_KEYS];
-   bool        isnull[INDEX_MAX_KEYS];
-
-   maxoff = PageGetMaxOffsetNumber(p);
-   *which_grow = -1.0;
-   which = -1;
-   sum_grow = 1;
-   gistDeCompressAtt(giststate, r,
-                     it, NULL, (OffsetNumber) 0,
-                     identry, isnull);
-
-   for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
-   {
-       int         j;
-       IndexTuple  itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 
-       sum_grow = 0;
-       for (j = 0; j < r->rd_att->natts; j++)
-       {
-           Datum       datum;
-           float       usize;
-           bool        IsNull;
-
-           datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
-           gistdentryinit(giststate, j, &entry, datum, r, p, i,
-                          ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
-                          FALSE, IsNull);
-           gistpenalty(giststate, j, &entry, IsNull,
-                       &identry[j], isnull[j], &usize);
-
-           if (which_grow[j] < 0 || usize < which_grow[j])
-           {
-               which = i;
-               which_grow[j] = usize;
-               if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
-                   which_grow[j + 1] = -1;
-               sum_grow += which_grow[j];
-           }
-           else if (which_grow[j] == usize)
-               sum_grow += usize;
-           else
-           {
-               sum_grow = 1;
-               break;
-           }
-       }
+   buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO);
+   GISTInitBuffer(buffer, 0);
+   page = BufferGetPage(buffer);
+
+   gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+   if ( !xlog_mode && !r->rd_istemp ) {
+       gistxlogEntryUpdate xlrec;
+       XLogRecPtr      recptr;
+       XLogRecData     *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) );
+       int i;
+           
+       xlrec.node = r->rd_node;
+       xlrec.blkno = GIST_ROOT_BLKNO;
+       xlrec.todeleteoffnum = InvalidOffsetNumber;
+       xlrec.key = *key;
+       xlrec.pathlen=0;
+           
+       rdata[0].buffer = InvalidBuffer;
+       rdata[0].data   = (char *) &xlrec;
+       rdata[0].len    = sizeof( gistxlogEntryUpdate );
+       rdata[0].next   = NULL;
+
+       for(i=1; i<=len; i++) {
+           rdata[i].buffer = InvalidBuffer;
+           rdata[i].data   = (char*)(itup[i-1]);
+           rdata[i].len    = IndexTupleSize(itup[i-1]);
+           rdata[i].next   = NULL;
+           rdata[i-1].next = &(rdata[i]);
+       }   
+           
+       START_CRIT_SECTION();
+
+       recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
+       PageSetLSN(page, recptr);
+       PageSetTLI(page, ThisTimeLineID);
+
+       END_CRIT_SECTION();
    }
-
-   return which;
+   if ( xlog_mode ) 
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+   WriteBuffer(buffer);
 }
 
-/*
- * Retail deletion of a single tuple.
- *
- * NB: this is no longer called externally, but is still needed by
- * gistlayerinsert().  That dependency will have to be fixed if GIST
- * is ever going to allow concurrent insertions.
- */
-static void
-gistdelete(Relation r, ItemPointer tid)
-{
-   BlockNumber blkno;
-   OffsetNumber offnum;
-   Buffer      buf;
-   Page        page;
-
-   /*
-    * Since GIST is not marked "amconcurrent" in pg_am, caller should
-    * have acquired exclusive lock on index relation.  We need no locking
-    * here.
-    */
-
-   blkno = ItemPointerGetBlockNumber(tid);
-   offnum = ItemPointerGetOffsetNumber(tid);
-
-   /* adjust any scans that will be affected by this deletion */
-   /* NB: this works only for scans in *this* backend! */
-   gistadjscans(r, GISTOP_DEL, blkno, offnum);
-
-   /* delete the index tuple */
-   buf = ReadBuffer(r, blkno);
-   page = BufferGetPage(buf);
-
-   PageIndexTupleDelete(page, offnum);
-
-   WriteBuffer(buf);
-}
 
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
@@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS)
            page = BufferGetPage(buf);
 
            PageIndexTupleDelete(page, offnum);
+           if ( !rel->rd_istemp ) {
+               gistxlogEntryUpdate xlrec;
+               XLogRecPtr      recptr;
+               XLogRecData     rdata;
+           
+               xlrec.node = rel->rd_node;
+               xlrec.blkno = blkno;
+               xlrec.todeleteoffnum = offnum;
+               xlrec.pathlen=0;
+               ItemPointerSetInvalid( &(xlrec.key) );
+           
+               rdata.buffer = InvalidBuffer;
+               rdata.data   = (char *) &xlrec;
+               rdata.len    = sizeof( gistxlogEntryUpdate );
+               rdata.next   = NULL;
+
+               START_CRIT_SECTION();
+
+               recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata);
+               PageSetLSN(page, recptr);
+               PageSetTLI(page, ThisTimeLineID);
+
+               END_CRIT_SECTION();
+           }
 
            WriteBuffer(buf);
 
@@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate)
    /* no work */
 }
 
-#ifdef GIST_PAGEADDITEM
-/*
- * Given an IndexTuple to be inserted on a page, this routine replaces
- * the key with another key, which may involve generating a new IndexTuple
- * if the sizes don't match or if the null status changes.
- *
- * XXX this only works for a single-column index tuple!
- */
-static IndexTuple
-gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
-{
-   bool        IsNull;
-   Datum       datum = index_getattr(t, 1, r->rd_att, &IsNull);
-
-   /*
-    * If new entry fits in index tuple, copy it in.  To avoid worrying
-    * about null-value bitmask, pass it off to the general
-    * index_form_tuple routine if either the previous or new value is
-    * NULL.
-    */
-   if (!IsNull && DatumGetPointer(entry.key) != NULL &&
-       (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull))
-   {
-       memcpy(DatumGetPointer(datum),
-              DatumGetPointer(entry.key),
-              entry.bytes);
-       /* clear out old size */
-       t->t_info &= ~INDEX_SIZE_MASK;
-       /* or in new size */
-       t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
-
-       return t;
-   }
-   else
-   {
-       /* generate a new index tuple for the compressed entry */
-       TupleDesc   tupDesc = r->rd_att;
-       IndexTuple  newtup;
-       bool        isnull;
-
-       isnull = (DatumGetPointer(entry.key) == NULL);
-       newtup = index_form_tuple(tupDesc, &(entry.key), &isnull);
-       newtup->t_tid = t->t_tid;
-       return newtup;
-   }
-}
-#endif
-
-/*
- * initialize a GiST entry with a decompressed version of key
- */
-void
-gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
-              Datum k, Relation r, Page pg, OffsetNumber o,
-              int b, bool l, bool isNull)
-{
-   if (b && !isNull)
-   {
-       GISTENTRY  *dep;
-
-       gistentryinit(*e, k, r, pg, o, b, l);
-       dep = (GISTENTRY *)
-           DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
-                                         PointerGetDatum(e)));
-       /* decompressFn may just return the given pointer */
-       if (dep != e)
-           gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
-                         dep->bytes, dep->leafkey);
-   }
-   else
-       gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
-}
-
-
-/*
- * initialize a GiST entry with a compressed version of key
- */
-static void
-gistcentryinit(GISTSTATE *giststate, int nkey,
-              GISTENTRY *e, Datum k, Relation r,
-              Page pg, OffsetNumber o, int b, bool l, bool isNull)
-{
-   if (!isNull)
-   {
-       GISTENTRY  *cep;
-
-       gistentryinit(*e, k, r, pg, o, b, l);
-       cep = (GISTENTRY *)
-           DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
-                                         PointerGetDatum(e)));
-       /* compressFn may just return the given pointer */
-       if (cep != e)
-           gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
-                         cep->bytes, cep->leafkey);
-   }
-   else
-       gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
-}
-
-static IndexTuple
-gistFormTuple(GISTSTATE *giststate, Relation r,
-             Datum attdata[], int datumsize[], bool isnull[])
-{
-   GISTENTRY   centry[INDEX_MAX_KEYS];
-   Datum       compatt[INDEX_MAX_KEYS];
-   int         i;
-
-   for (i = 0; i < r->rd_att->natts; i++)
-   {
-       if (isnull[i])
-           compatt[i] = (Datum) 0;
-       else
-       {
-           gistcentryinit(giststate, i, ¢ry[i], attdata[i],
-                          NULL, NULL, (OffsetNumber) 0,
-                          datumsize[i], FALSE, FALSE);
-           compatt[i] = centry[i].key;
-       }
-   }
-
-   return index_form_tuple(giststate->tupdesc, compatt, isnull);
-}
-
-static void
-gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
-                 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
-{
-   int         i;
-
-   for (i = 0; i < r->rd_att->natts; i++)
-   {
-       Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
-       gistdentryinit(giststate, i, &attdata[i],
-                      datum, r, p, o,
-                      ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
-                      FALSE, isnull[i]);
-   }
-}
-
-static void
-gistpenalty(GISTSTATE *giststate, int attno,
-           GISTENTRY *key1, bool isNull1,
-           GISTENTRY *key2, bool isNull2, float *penalty)
-{
-   if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
-       *penalty = 0.0;
-   else
-       FunctionCall3(&giststate->penaltyFn[attno],
-                     PointerGetDatum(key1),
-                     PointerGetDatum(key2),
-                     PointerGetDatum(penalty));
-}
-
 #ifdef GISTDEBUG
 static void
 gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
@@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
 }
 #endif   /* defined GISTDEBUG */
 
-void
-gist_redo(XLogRecPtr lsn, XLogRecord *record)
-{
-   elog(PANIC, "gist_redo: unimplemented");
-}
-
-void
-gist_desc(char *buf, uint8 xl_info, char *rec)
-{
-}
index e5f77b67926261e4f6c6cfc03123e1aa5995f159..5b9a94471b10816aeebc07c54ab601d924d2c3b3 100644 (file)
@@ -8,14 +8,14 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.47 2005/05/17 03:34:18 neilc Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include "access/gist_private.h"
 #include "access/itup.h"
+#include "access/gist_private.h"
 #include "executor/execdebug.h"
 #include "utils/memutils.h"
 
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
new file mode 100644 (file)
index 0000000..44391f9
--- /dev/null
@@ -0,0 +1,785 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistutil.c
+ *   utilities routines for the postgres GiST index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "miscadmin.h"
+
+/* group flags ( in gistadjsubkey ) */
+#define LEFT_ADDED      0x01
+#define RIGHT_ADDED 0x02
+#define BOTH_ADDED      ( LEFT_ADDED | RIGHT_ADDED )
+
+
+/*
+ * This defines is only for shorter code, used in gistgetadjusted
+ * and gistadjsubkey only
+ */
+#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb)  do { \
+   if (isnullkey) {                                              \
+       gistentryinit((evp), rkey, r, NULL,                       \
+                     (OffsetNumber) 0, rkeyb, FALSE);            \
+   } else {                                                      \
+       gistentryinit((evp), okey, r, NULL,                       \
+                     (OffsetNumber) 0, okeyb, FALSE);            \
+   }                                                             \
+} while(0)
+
+#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
+   FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b);     \
+   FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b);     \
+} while(0);
+
+
+static void
+gistpenalty(GISTSTATE *giststate, int attno,
+           GISTENTRY *key1, bool isNull1,
+           GISTENTRY *key2, bool isNull2, float *penalty);
+
+/*
+ * Write itup vector to page, has no control of free space
+ */
+OffsetNumber
+gistfillbuffer(Relation r, Page page, IndexTuple *itup,
+               int len, OffsetNumber off)
+{
+   OffsetNumber l = InvalidOffsetNumber;
+   int         i;
+
+   for (i = 0; i < len; i++)
+   {
+       l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
+                       off, LP_USED);
+       if (l == InvalidOffsetNumber)
+           elog(ERROR, "gistfillbuffer: failed to add index item to \"%s\"",
+                RelationGetRelationName(r));
+       off++;
+   }
+   return l;
+}
+
+/*
+ * Check space for itup vector on page
+ */
+bool
+gistnospace(Page page, IndexTuple *itvec, int len)
+{
+   unsigned int size = 0;
+   int         i;
+
+   for (i = 0; i < len; i++)
+       size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
+
+   return (PageGetFreeSpace(page) < size);
+}
+
+/*
+ * Read buffer into itup vector
+ */
+IndexTuple *
+gistextractbuffer(Buffer buffer, int *len /* out */ )
+{
+   OffsetNumber i,
+               maxoff;
+   IndexTuple *itvec;
+   Page        p = (Page) BufferGetPage(buffer);
+
+   maxoff = PageGetMaxOffsetNumber(p);
+   *len = maxoff;
+   itvec = palloc(sizeof(IndexTuple) * maxoff);
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+   return itvec;
+}
+
+/*
+ * join two vectors into one
+ */
+IndexTuple *
+gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
+{
+   itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
+   memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
+   *len += addlen;
+   return itvec;
+}
+
+/*
+ * Return an IndexTuple containing the result of applying the "union"
+ * method to the specified IndexTuple vector.
+ */
+IndexTuple
+gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
+{
+   Datum       attr[INDEX_MAX_KEYS];
+   bool        isnull[INDEX_MAX_KEYS];
+   GistEntryVector *evec;
+   int         i;
+   GISTENTRY   centry[INDEX_MAX_KEYS];
+
+   evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+
+   for (i = 0; i < r->rd_att->natts; i++)
+   {
+       Datum       datum;
+       int         j;
+       int         real_len;
+
+       real_len = 0;
+       for (j = 0; j < len; j++)
+       {
+           bool        IsNull;
+           datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
+           if (IsNull)
+               continue;
+
+           gistdentryinit(giststate, i,
+                          &(evec->vector[real_len]),
+                          datum,
+                          NULL, NULL, (OffsetNumber) 0,
+                          ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
+                          FALSE, IsNull);
+           real_len++;
+       }
+
+       /* If this tuple vector was all NULLs, the union is NULL */
+       if (real_len == 0)
+       {
+           attr[i] = (Datum) 0;
+           isnull[i] = TRUE;
+       }
+       else
+       {
+           int datumsize;
+
+           if (real_len == 1)
+           {
+               evec->n = 2;
+               gistentryinit(evec->vector[1],
+                             evec->vector[0].key, r, NULL,
+                             (OffsetNumber) 0, evec->vector[0].bytes, FALSE);
+           }
+           else
+               evec->n = real_len;
+
+           /* Compress the result of the union and store in attr array */
+           datum = FunctionCall2(&giststate->unionFn[i],
+                                 PointerGetDatum(evec),
+                                 PointerGetDatum(&datumsize));
+
+           gistcentryinit(giststate, i, ¢ry[i], datum,
+                          NULL, NULL, (OffsetNumber) 0,
+                          datumsize, FALSE, FALSE);
+           isnull[i] = FALSE;
+           attr[i] = centry[i].key;
+       }
+   }
+
+   return index_form_tuple(giststate->tupdesc, attr, isnull);
+}
+
+
+/*
+ * Forms union of oldtup and addtup, if union == oldtup then return NULL
+ */
+IndexTuple
+gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
+{
+   GistEntryVector *evec;
+   bool        neednew = false;
+   bool        isnull[INDEX_MAX_KEYS];
+   Datum       attr[INDEX_MAX_KEYS];
+   GISTENTRY   centry[INDEX_MAX_KEYS],
+               oldatt[INDEX_MAX_KEYS],
+               addatt[INDEX_MAX_KEYS],
+              *ev0p,
+              *ev1p;
+   bool        oldisnull[INDEX_MAX_KEYS],
+               addisnull[INDEX_MAX_KEYS];
+   IndexTuple  newtup = NULL;
+   int         i;
+
+   evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
+   evec->n = 2;
+   ev0p = &(evec->vector[0]);
+   ev1p = &(evec->vector[1]);
+
+   gistDeCompressAtt(giststate, r, oldtup, NULL,
+                     (OffsetNumber) 0, oldatt, oldisnull);
+
+   gistDeCompressAtt(giststate, r, addtup, NULL,
+                     (OffsetNumber) 0, addatt, addisnull);
+
+   for (i = 0; i < r->rd_att->natts; i++)
+   {
+       if (oldisnull[i] && addisnull[i])
+       {
+           attr[i] = (Datum) 0;
+           isnull[i] = TRUE;
+       }
+       else
+       {
+           Datum       datum;
+           int         datumsize;
+
+           FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
+                  addisnull[i], addatt[i].key, addatt[i].bytes);
+
+           datum = FunctionCall2(&giststate->unionFn[i],
+                                 PointerGetDatum(evec),
+                                 PointerGetDatum(&datumsize));
+
+           if (oldisnull[i] || addisnull[i])
+           {
+               if (oldisnull[i])
+                   neednew = true;
+           }
+           else
+           {
+               bool    result;
+
+               FunctionCall3(&giststate->equalFn[i],
+                             ev0p->key,
+                             datum,
+                             PointerGetDatum(&result));
+
+               if (!result)
+                   neednew = true;
+           }
+
+           gistcentryinit(giststate, i, ¢ry[i], datum,
+                          NULL, NULL, (OffsetNumber) 0,
+                          datumsize, FALSE, FALSE);
+
+           attr[i] = centry[i].key;
+           isnull[i] = FALSE;
+       }
+   }
+
+   if (neednew)
+   {
+       /* need to update key */
+       newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
+       newtup->t_tid = oldtup->t_tid;
+   }
+
+   return newtup;
+}
+
+void
+gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+   int lr;
+
+   for (lr = 0; lr < 2; lr++)
+   {
+       OffsetNumber *entries;
+       int         i;
+       Datum      *attr;
+       int         len,
+                   *attrsize;
+       bool       *isnull;
+       GistEntryVector *evec;
+
+       if (lr)
+       {
+           attrsize = spl->spl_lattrsize;
+           attr = spl->spl_lattr;
+           len = spl->spl_nleft;
+           entries = spl->spl_left;
+           isnull = spl->spl_lisnull;
+       }
+       else
+       {
+           attrsize = spl->spl_rattrsize;
+           attr = spl->spl_rattr;
+           len = spl->spl_nright;
+           entries = spl->spl_right;
+           isnull = spl->spl_risnull;
+       }
+
+       evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+
+       for (i = 1; i < r->rd_att->natts; i++)
+       {
+           int         j;
+           Datum       datum;
+           int         datumsize;
+           int         real_len;
+
+           real_len = 0;
+           for (j = 0; j < len; j++)
+           {
+               bool        IsNull;
+
+               if (spl->spl_idgrp[entries[j]])
+                   continue;
+               datum = index_getattr(itvec[entries[j] - 1], i + 1,
+                                     giststate->tupdesc, &IsNull);
+               if (IsNull)
+                   continue;
+               gistdentryinit(giststate, i,
+                              &(evec->vector[real_len]),
+                              datum,
+                              NULL, NULL, (OffsetNumber) 0,
+                              ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
+                              FALSE, IsNull);
+               real_len++;
+
+           }
+
+           if (real_len == 0)
+           {
+               datum = (Datum) 0;
+               datumsize = 0;
+               isnull[i] = true;
+           }
+           else
+           {
+               /*
+                * evec->vector[0].bytes may be not defined, so form union
+                * with itself
+                */
+               if (real_len == 1)
+               {
+                   evec->n = 2;
+                   memcpy(&(evec->vector[1]), &(evec->vector[0]),
+                          sizeof(GISTENTRY));
+               }
+               else
+                   evec->n = real_len;
+               datum = FunctionCall2(&giststate->unionFn[i],
+                                     PointerGetDatum(evec),
+                                     PointerGetDatum(&datumsize));
+               isnull[i] = false;
+           }
+
+           attr[i] = datum;
+           attrsize[i] = datumsize;
+       }
+   }
+}
+
+/*
+ * find group in vector with equal value
+ */
+int
+gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
+{
+   int         i;
+   int         curid = 1;
+
+   /*
+    * first key is always not null (see gistinsert), so we may not check
+    * for nulls
+    */
+   for (i = 0; i < spl->spl_nleft; i++)
+   {
+       int j;
+       int len;
+       bool result;
+
+       if (spl->spl_idgrp[spl->spl_left[i]])
+           continue;
+       len = 0;
+       /* find all equal value in right part */
+       for (j = 0; j < spl->spl_nright; j++)
+       {
+           if (spl->spl_idgrp[spl->spl_right[j]])
+               continue;
+           FunctionCall3(&giststate->equalFn[0],
+                         valvec[spl->spl_left[i]].key,
+                         valvec[spl->spl_right[j]].key,
+                         PointerGetDatum(&result));
+           if (result)
+           {
+               spl->spl_idgrp[spl->spl_right[j]] = curid;
+               len++;
+           }
+       }
+       /* find all other equal value in left part */
+       if (len)
+       {
+           /* add current val to list of equal values */
+           spl->spl_idgrp[spl->spl_left[i]] = curid;
+           /* searching .. */
+           for (j = i + 1; j < spl->spl_nleft; j++)
+           {
+               if (spl->spl_idgrp[spl->spl_left[j]])
+                   continue;
+               FunctionCall3(&giststate->equalFn[0],
+                             valvec[spl->spl_left[i]].key,
+                             valvec[spl->spl_left[j]].key,
+                             PointerGetDatum(&result));
+               if (result)
+               {
+                   spl->spl_idgrp[spl->spl_left[j]] = curid;
+                   len++;
+               }
+           }
+           spl->spl_ngrp[curid] = len + 1;
+           curid++;
+       }
+   }
+
+   return curid;
+}
+
+/*
+ * Insert equivalent tuples to left or right page with minimum
+ * penalty
+ */
+void
+gistadjsubkey(Relation r,
+             IndexTuple *itup, /* contains compressed entry */
+             int *len,
+             GIST_SPLITVEC *v,
+             GISTSTATE *giststate)
+{
+   int         curlen;
+   OffsetNumber *curwpos;
+   GISTENTRY   entry,
+               identry[INDEX_MAX_KEYS],
+              *ev0p,
+              *ev1p;
+   float       lpenalty,
+               rpenalty;
+   GistEntryVector *evec;
+   int         datumsize;
+   bool        isnull[INDEX_MAX_KEYS];
+   int         i,
+               j;
+
+   /* clear vectors */
+   curlen = v->spl_nleft;
+   curwpos = v->spl_left;
+   for (i = 0; i < v->spl_nleft; i++)
+   {
+       if (v->spl_idgrp[v->spl_left[i]] == 0)
+       {
+           *curwpos = v->spl_left[i];
+           curwpos++;
+       }
+       else
+           curlen--;
+   }
+   v->spl_nleft = curlen;
+
+   curlen = v->spl_nright;
+   curwpos = v->spl_right;
+   for (i = 0; i < v->spl_nright; i++)
+   {
+       if (v->spl_idgrp[v->spl_right[i]] == 0)
+       {
+           *curwpos = v->spl_right[i];
+           curwpos++;
+       }
+       else
+           curlen--;
+   }
+   v->spl_nright = curlen;
+
+   evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
+   evec->n = 2;
+   ev0p = &(evec->vector[0]);
+   ev1p = &(evec->vector[1]);
+
+   /* add equivalent tuple */
+   for (i = 0; i < *len; i++)
+   {
+       Datum       datum;
+
+       if (v->spl_idgrp[i + 1] == 0)   /* already inserted */
+           continue;
+       gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
+                         identry, isnull);
+
+       v->spl_ngrp[v->spl_idgrp[i + 1]]--;
+       if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
+           (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
+       {
+           /* force last in group */
+           rpenalty = 1.0;
+           lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
+       }
+       else
+       {
+           /* where? */
+           for (j = 1; j < r->rd_att->natts; j++)
+           {
+               gistentryinit(entry, v->spl_lattr[j], r, NULL,
+                          (OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
+               gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
+                           &identry[j], isnull[j], &lpenalty);
+
+               gistentryinit(entry, v->spl_rattr[j], r, NULL,
+                          (OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
+               gistpenalty(giststate, j, &entry, v->spl_risnull[j],
+                           &identry[j], isnull[j], &rpenalty);
+
+               if (lpenalty != rpenalty)
+                   break;
+           }
+       }
+
+       /*
+        * add
+        * XXX: refactor this to avoid duplicating code
+        */
+       if (lpenalty < rpenalty)
+       {
+           v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
+           v->spl_left[v->spl_nleft] = i + 1;
+           v->spl_nleft++;
+           for (j = 1; j < r->rd_att->natts; j++)
+           {
+               if (isnull[j] && v->spl_lisnull[j])
+               {
+                   v->spl_lattr[j] = (Datum) 0;
+                   v->spl_lattrsize[j] = 0;
+               }
+               else
+               {
+                   FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
+                          isnull[j], identry[j].key, identry[j].bytes);
+
+                   datum = FunctionCall2(&giststate->unionFn[j],
+                                         PointerGetDatum(evec),
+                                         PointerGetDatum(&datumsize));
+
+                   v->spl_lattr[j] = datum;
+                   v->spl_lattrsize[j] = datumsize;
+                   v->spl_lisnull[j] = false;
+               }
+           }
+       }
+       else
+       {
+           v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
+           v->spl_right[v->spl_nright] = i + 1;
+           v->spl_nright++;
+           for (j = 1; j < r->rd_att->natts; j++)
+           {
+               if (isnull[j] && v->spl_risnull[j])
+               {
+                   v->spl_rattr[j] = (Datum) 0;
+                   v->spl_rattrsize[j] = 0;
+               }
+               else
+               {
+                   FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
+                          isnull[j], identry[j].key, identry[j].bytes);
+
+                   datum = FunctionCall2(&giststate->unionFn[j],
+                                         PointerGetDatum(evec),
+                                         PointerGetDatum(&datumsize));
+
+                   v->spl_rattr[j] = datum;
+                   v->spl_rattrsize[j] = datumsize;
+                   v->spl_risnull[j] = false;
+               }
+           }
+       }
+   }
+}
+
+/*
+ * find entry with lowest penalty
+ */
+OffsetNumber
+gistchoose(Relation r, Page p, IndexTuple it,  /* it has compressed entry */
+          GISTSTATE *giststate)
+{
+   OffsetNumber maxoff;
+   OffsetNumber i;
+   OffsetNumber which;
+   float       sum_grow,
+               which_grow[INDEX_MAX_KEYS];
+   GISTENTRY   entry,
+               identry[INDEX_MAX_KEYS];
+   bool        isnull[INDEX_MAX_KEYS];
+
+   maxoff = PageGetMaxOffsetNumber(p);
+   *which_grow = -1.0;
+   which = -1;
+   sum_grow = 1;
+   gistDeCompressAtt(giststate, r,
+                     it, NULL, (OffsetNumber) 0,
+                     identry, isnull);
+
+   for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
+   {
+       int         j;
+       IndexTuple  itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+
+       sum_grow = 0;
+       for (j = 0; j < r->rd_att->natts; j++)
+       {
+           Datum       datum;
+           float       usize;
+           bool        IsNull;
+
+           datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
+           gistdentryinit(giststate, j, &entry, datum, r, p, i,
+                          ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
+                          FALSE, IsNull);
+           gistpenalty(giststate, j, &entry, IsNull,
+                       &identry[j], isnull[j], &usize);
+
+           if (which_grow[j] < 0 || usize < which_grow[j])
+           {
+               which = i;
+               which_grow[j] = usize;
+               if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
+                   which_grow[j + 1] = -1;
+               sum_grow += which_grow[j];
+           }
+           else if (which_grow[j] == usize)
+               sum_grow += usize;
+           else
+           {
+               sum_grow = 1;
+               break;
+           }
+       }
+   }
+
+   return which;
+}
+
+/*
+ * initialize a GiST entry with a decompressed version of key
+ */
+void
+gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+              Datum k, Relation r, Page pg, OffsetNumber o,
+              int b, bool l, bool isNull)
+{
+   if (b && !isNull)
+   {
+       GISTENTRY  *dep;
+
+       gistentryinit(*e, k, r, pg, o, b, l);
+       dep = (GISTENTRY *)
+           DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
+                                         PointerGetDatum(e)));
+       /* decompressFn may just return the given pointer */
+       if (dep != e)
+           gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
+                         dep->bytes, dep->leafkey);
+   }
+   else
+       gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
+}
+
+
+/*
+ * initialize a GiST entry with a compressed version of key
+ */
+void
+gistcentryinit(GISTSTATE *giststate, int nkey,
+              GISTENTRY *e, Datum k, Relation r,
+              Page pg, OffsetNumber o, int b, bool l, bool isNull)
+{
+   if (!isNull)
+   {
+       GISTENTRY  *cep;
+
+       gistentryinit(*e, k, r, pg, o, b, l);
+       cep = (GISTENTRY *)
+           DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
+                                         PointerGetDatum(e)));
+       /* compressFn may just return the given pointer */
+       if (cep != e)
+           gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
+                         cep->bytes, cep->leafkey);
+   }
+   else
+       gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
+}
+
+IndexTuple
+gistFormTuple(GISTSTATE *giststate, Relation r,
+             Datum attdata[], int datumsize[], bool isnull[])
+{
+   GISTENTRY   centry[INDEX_MAX_KEYS];
+   Datum       compatt[INDEX_MAX_KEYS];
+   int         i;
+
+   for (i = 0; i < r->rd_att->natts; i++)
+   {
+       if (isnull[i])
+           compatt[i] = (Datum) 0;
+       else
+       {
+           gistcentryinit(giststate, i, ¢ry[i], attdata[i],
+                          NULL, NULL, (OffsetNumber) 0,
+                          datumsize[i], FALSE, FALSE);
+           compatt[i] = centry[i].key;
+       }
+   }
+
+   return index_form_tuple(giststate->tupdesc, compatt, isnull);
+}
+
+void
+gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
+                 OffsetNumber o, GISTENTRY *attdata, bool *isnull)
+{
+   int         i;
+
+   for (i = 0; i < r->rd_att->natts; i++)
+   {
+       Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
+       gistdentryinit(giststate, i, &attdata[i],
+                      datum, r, p, o,
+                      ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
+                      FALSE, isnull[i]);
+   }
+}
+
+static void
+gistpenalty(GISTSTATE *giststate, int attno,
+           GISTENTRY *key1, bool isNull1,
+           GISTENTRY *key2, bool isNull2, float *penalty)
+{
+   if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
+       *penalty = 0.0;
+   else
+       FunctionCall3(&giststate->penaltyFn[attno],
+                     PointerGetDatum(key1),
+                     PointerGetDatum(key2),
+                     PointerGetDatum(penalty));
+}
+
+void
+GISTInitBuffer(Buffer b, uint32 f)
+{
+   GISTPageOpaque opaque;
+   Page            page;
+   Size            pageSize;
+
+   pageSize = BufferGetPageSize(b);
+   page = BufferGetPage(b);
+   PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
+
+   opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+   opaque->flags = f;
+}
+
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
new file mode 100644 (file)
index 0000000..b99ab24
--- /dev/null
@@ -0,0 +1,628 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistxlog.c
+ *   WAL replay logic for GiST.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *           $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+typedef struct {
+   gistxlogEntryUpdate *data;
+   int         len;
+   IndexTuple      *itup;
+   BlockNumber     *path;
+} EntryUpdateRecord;
+
+typedef struct {
+   gistxlogPage    *header;
+   OffsetNumber    *offnum;
+
+   /* to work with */
+   Page    page;
+   Buffer  buffer;
+   bool    is_ok;
+} NewPage;
+
+typedef struct {
+   gistxlogPageSplit   *data;
+   NewPage         *page;
+   IndexTuple      *itup;
+   BlockNumber     *path;
+} PageSplitRecord;
+
+/* track for incomplete inserts, idea was taken from nbtxlog.c */
+
+typedef struct gistIncompleteInsert {
+   RelFileNode node;
+   ItemPointerData key;
+   int     lenblk;
+   BlockNumber *blkno;
+   int     pathlen;
+   BlockNumber *path;
+} gistIncompleteInsert;
+
+
+MemoryContext opCtx;
+MemoryContext insertCtx;
+static List *incomplete_inserts;
+
+
+#define ItemPointerEQ( a, b )  \
+   ( \
+   ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
+   ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
+        )
+
+static void
+pushIncompleteInsert(RelFileNode node, ItemPointerData key,
+       BlockNumber *blkno, int lenblk,
+       BlockNumber *path,  int pathlen,
+       PageSplitRecord *xlinfo /* to extract blkno info */ ) {
+   MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
+   gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
+
+   ninsert->node = node;
+   ninsert->key  = key;
+
+   if ( lenblk && blkno ) {    
+       ninsert->lenblk = lenblk;
+       ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
+       memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
+   } else {
+       int i;
+
+       Assert( xlinfo );
+       ninsert->lenblk = xlinfo->data->npage;
+       ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
+       for(i=0;ilenblk;i++)
+           ninsert->blkno[i] = xlinfo->page[i].header->blkno;
+   }
+   Assert( ninsert->lenblk>0 );
+   
+   if ( path && ninsert->pathlen ) {
+       ninsert->pathlen = pathlen;
+       ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
+       memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
+   } else { 
+       ninsert->pathlen = 0;
+       ninsert->path = NULL;
+   }
+
+   incomplete_inserts = lappend(incomplete_inserts, ninsert);
+   MemoryContextSwitchTo(oldCxt);
+}
+
+static void
+forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
+   ListCell   *l;
+
+   foreach(l, incomplete_inserts) {
+       gistIncompleteInsert    *insert = (gistIncompleteInsert*) lfirst(l);
+
+       if (  RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
+           
+           /* found */
+           if ( insert->path ) pfree( insert->path );
+           pfree( insert->blkno );
+           incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
+           pfree( insert );
+           break;
+       } 
+   }
+}
+
+static void
+decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
+   char *begin = XLogRecGetData(record), *ptr;
+   int i=0, addpath=0;
+
+   decoded->data = (gistxlogEntryUpdate*)begin;
+
+   if ( decoded->data->pathlen ) {
+       addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+       decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+   } else 
+       decoded->path = NULL;
+
+   decoded->len=0;
+   ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
+   while( ptr - begin < record->xl_len ) {
+       decoded->len++;
+       ptr += IndexTupleSize( (IndexTuple)ptr );
+   }  
+
+   decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len );
+   
+   ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
+   while( ptr - begin < record->xl_len ) {
+       decoded->itup[i] = (IndexTuple)ptr;
+       ptr += IndexTupleSize( decoded->itup[i] );
+       i++;
+   }
+}
+
+
+static void
+gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
+   EntryUpdateRecord   xlrec;
+   Relation        reln;
+   Buffer          buffer;
+   Page            page;
+
+   decodeEntryUpdateRecord( &xlrec, record );
+
+   reln = XLogOpenRelation(xlrec.data->node);
+   if (!RelationIsValid(reln))
+       return;
+   buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
+   if (!BufferIsValid(buffer))
+       elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+   page = (Page) BufferGetPage(buffer);
+
+   if ( isnewroot ) {
+       if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
+           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+           ReleaseBuffer(buffer);
+           return;
+       }
+   } else { 
+       if ( PageIsNew((PageHeader) page) )
+           elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
+       if (XLByteLE(lsn, PageGetLSN(page))) {
+           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+           ReleaseBuffer(buffer);
+           return;
+       }
+   }
+
+   if ( isnewroot )
+       GISTInitBuffer(buffer, 0);
+   else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) 
+       PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+
+   /* add tuples */
+   if ( xlrec.len > 0 ) {
+                OffsetNumber off = (PageIsEmpty(page)) ?  
+                        FirstOffsetNumber
+                        :
+                        OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+       gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+   }
+
+   PageSetLSN(page, lsn);
+   PageSetTLI(page, ThisTimeLineID);
+   LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+   WriteBuffer(buffer);
+
+   if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
+       if ( incomplete_inserts != NIL )
+           forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+
+       if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
+           pushIncompleteInsert(xlrec.data->node, xlrec.data->key, 
+               &(xlrec.data->blkno), 1,
+               xlrec.path, xlrec.data->pathlen,
+               NULL);
+   }
+}
+
+static void
+decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
+   char *begin = XLogRecGetData(record), *ptr;
+   int i=0, addpath = 0;
+
+   decoded->data = (gistxlogPageSplit*)begin;
+   decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
+   decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
+
+   if ( decoded->data->pathlen ) {
+       addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+       decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+   } else 
+       decoded->path = NULL;
+
+   ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
+   for(i=0;idata->nitup;i++) {
+       Assert( ptr - begin < record->xl_len );
+       decoded->itup[i] = (IndexTuple)ptr;
+       ptr += IndexTupleSize( decoded->itup[i] );
+   }
+
+   for(i=0;idata->npage;i++) {
+       Assert( ptr - begin < record->xl_len );
+       decoded->page[i].header = (gistxlogPage*)ptr;
+       ptr += sizeof(gistxlogPage);
+
+       Assert( ptr - begin < record->xl_len );
+       decoded->page[i].offnum = (OffsetNumber*)ptr;
+       ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num );
+   }
+}
+   
+static void
+gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
+   PageSplitRecord xlrec;
+   Relation        reln;
+   Buffer          buffer;
+   Page            page;
+   int         i, len=0;
+   IndexTuple  *itup, *institup;
+   GISTPageOpaque opaque;
+   bool release=true;
+
+   decodePageSplitRecord( &xlrec, record );
+
+   reln = XLogOpenRelation(xlrec.data->node);
+   if (!RelationIsValid(reln))
+       return;
+   buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
+   if (!BufferIsValid(buffer))
+       elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
+   page = (Page) BufferGetPage(buffer);
+   if (PageIsNew((PageHeader) page))
+       elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
+
+   if (XLByteLE(lsn, PageGetLSN(page))) {
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(buffer);
+       return;
+   }
+   
+   if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
+       PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+
+   itup = gistextractbuffer(buffer, &len);
+   itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
+   institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
+        opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+
+   for(i=0;inpage;i++) {
+       int j;
+       NewPage *newpage = xlrec.page + i; 
+
+       /* prepare itup vector */
+       for(j=0;jheader->num;j++)
+           institup[j] = itup[ newpage->offnum[j] - 1 ];
+
+       if ( newpage->header->blkno == xlrec.data->origblkno ) {
+           /* IncrBufferRefCount(buffer); */
+           newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData));
+           newpage->buffer = buffer;
+           newpage->is_ok=false; 
+       } else {
+           newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno);
+           if (!BufferIsValid(newpage->buffer))
+               elog(PANIC, "gistRedoPageSplitRecord: lost page");
+           newpage->page = (Page) BufferGetPage(newpage->buffer);
+           if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+               newpage->is_ok=true;
+               continue; /* good page */
+           } else {
+               newpage->is_ok=false;
+               GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF);
+           }
+       }
+       gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber);
+   }
+
+   for(i=0;inpage;i++) {
+       NewPage *newpage = xlrec.page + i;
+
+       if ( newpage->is_ok )
+           continue;
+
+       if ( newpage->header->blkno == xlrec.data->origblkno ) { 
+           PageRestoreTempPage(newpage->page, page);
+           release = false;
+       }
+
+       PageSetLSN(newpage->page, lsn);
+       PageSetTLI(newpage->page, ThisTimeLineID);
+       LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(newpage->buffer);   
+   }
+
+   if ( release ) {
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(buffer);
+   }
+
+   if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
+       if ( incomplete_inserts != NIL )
+           forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
+
+       pushIncompleteInsert(xlrec.data->node, xlrec.data->key, 
+               NULL, 0,
+               xlrec.path, xlrec.data->pathlen,
+               &xlrec);
+   }
+}
+
+static void
+gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
+   RelFileNode *node = (RelFileNode*)XLogRecGetData(record);
+   Relation        reln;
+   Buffer          buffer;
+   Page            page;
+
+   reln = XLogOpenRelation(*node);
+   if (!RelationIsValid(reln))
+       return;
+   buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO);
+   if (!BufferIsValid(buffer))
+       elog(PANIC, "gistRedoCreateIndex: block unfound");
+   page = (Page) BufferGetPage(buffer);
+
+   if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(buffer);
+       return;
+   }
+
+   GISTInitBuffer(buffer, F_LEAF);
+
+   PageSetLSN(page, lsn);
+   PageSetTLI(page, ThisTimeLineID);
+   LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+   WriteBuffer(buffer);    
+}
+
+void
+gist_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+   uint8           info = record->xl_info & ~XLR_INFO_MASK;
+
+   MemoryContext oldCxt;
+   oldCxt = MemoryContextSwitchTo(opCtx);
+   switch (info) {
+       case    XLOG_GIST_ENTRY_UPDATE:
+       case    XLOG_GIST_ENTRY_DELETE:
+           gistRedoEntryUpdateRecord(lsn, record,false);
+           break;
+       case    XLOG_GIST_NEW_ROOT:
+           gistRedoEntryUpdateRecord(lsn, record,true);
+           break;
+       case    XLOG_GIST_PAGE_SPLIT:
+           gistRedoPageSplitRecord(lsn, record);   
+           break;
+       case    XLOG_GIST_CREATE_INDEX:
+           gistRedoCreateIndex(lsn, record);
+           break;
+       case    XLOG_GIST_INSERT_COMPLETE:
+           forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, 
+               ((gistxlogInsertComplete*)XLogRecGetData(record))->key );
+           break;
+       default:
+           elog(PANIC, "gist_redo: unknown op code %u", info);
+   }
+
+   MemoryContextSwitchTo(oldCxt);
+   MemoryContextReset(opCtx);
+}
+
+static void
+out_target(char *buf, RelFileNode node, ItemPointerData key)
+{
+        sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
+                 node.spcNode, node.dbNode, node.relNode,
+                        ItemPointerGetBlockNumber(&key),
+                        ItemPointerGetOffsetNumber(&key));
+}
+
+static void
+out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
+   out_target(buf, xlrec->node, xlrec->key);
+   sprintf(buf + strlen(buf), "; block number %u; update offset %u;", 
+       xlrec->blkno, xlrec->todeleteoffnum);
+}
+
+static void
+out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
+   strcat(buf, "page_split: ");
+   out_target(buf, xlrec->node, xlrec->key);
+   sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", 
+       xlrec->origblkno, xlrec->todeleteoffnum,
+       xlrec->nitup, xlrec->npage);
+}
+
+void
+gist_desc(char *buf, uint8 xl_info, char *rec)
+{
+   uint8           info = xl_info & ~XLR_INFO_MASK;
+
+   switch (info) {
+       case    XLOG_GIST_ENTRY_UPDATE:
+           strcat(buf, "entry_update: ");
+           out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
+           break;
+       case    XLOG_GIST_ENTRY_DELETE:
+           strcat(buf, "entry_delete: ");
+           out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
+           break;
+       case    XLOG_GIST_NEW_ROOT:
+           strcat(buf, "new_root: ");
+           out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key);
+           break;
+       case    XLOG_GIST_PAGE_SPLIT:
+           out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec);
+           break;
+       case    XLOG_GIST_CREATE_INDEX:
+           sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u", 
+               ((RelFileNode*)rec)->spcNode, 
+               ((RelFileNode*)rec)->dbNode, 
+               ((RelFileNode*)rec)->relNode);
+           break;
+       case    XLOG_GIST_INSERT_COMPLETE:
+           strcat(buf, "insert_complete: ");
+           out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); 
+       default:
+           elog(PANIC, "gist_desc: unknown op code %u", info);
+   }
+}
+
+
+#ifdef GIST_INCOMPLETE_INSERT 
+static void
+gistContinueInsert(gistIncompleteInsert *insert) {
+   GISTSTATE   giststate;
+   GISTInsertState state;
+   int i;
+   MemoryContext oldCxt;
+   oldCxt = MemoryContextSwitchTo(opCtx);
+   
+   state.r = XLogOpenRelation(insert->node);
+   if (!RelationIsValid(state.r))
+       return;
+
+   initGISTstate(&giststate, state.r);
+
+   state.needInsertComplete=false;
+   ItemPointerSetInvalid( &(state.key) );
+   state.path=NULL;
+   state.pathlen=0;
+   state.xlog_mode = true;
+
+   /* form union tuples */
+   state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
+   state.ituplen = insert->lenblk; 
+   for(i=0;ilenblk;i++) {
+       int len=0;
+       IndexTuple *itup;
+       Buffer  buffer;
+       Page    page;
+
+       buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
+       if (!BufferIsValid(buffer))
+           elog(PANIC, "gistContinueInsert: block unfound");
+       page = (Page) BufferGetPage(buffer);
+       if ( PageIsNew((PageHeader)page) )
+           elog(PANIC, "gistContinueInsert: uninitialized page");
+
+       itup = gistextractbuffer(buffer, &len);
+       state.itup[i] = gistunion(state.r, itup, len, &giststate);
+
+       ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
+       
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(buffer);
+   }
+
+   if ( insert->pathlen==0 ) { 
+       /*it  was split root, so we should only make new root*/
+       gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
+       MemoryContextSwitchTo(oldCxt);
+       MemoryContextReset(opCtx);
+       return;
+   }
+
+   /* form stack */
+   state.stack=NULL;
+   for(i=0;ipathlen;i++) {
+       int j,len=0;
+       IndexTuple *itup;
+       GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
+
+       top->blkno = insert->path[i];
+       top->buffer = XLogReadBuffer(false, state.r, top->blkno);
+       if (!BufferIsValid(top->buffer))
+           elog(PANIC, "gistContinueInsert: block unfound");
+       top->page = (Page) BufferGetPage(top->buffer);
+       if ( PageIsNew((PageHeader)(top->page)) )
+           elog(PANIC, "gistContinueInsert: uninitialized page");
+
+       top->todelete = false;  
+
+       /* find childoffnum */
+       itup = gistextractbuffer(top->buffer, &len);
+       top->childoffnum=InvalidOffsetNumber;
+       for(j=0;jchildoffnum==InvalidOffsetNumber;j++) {
+           BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); 
+           
+           if ( i==0 ) {
+               int k; 
+               for(k=0;klenblk;k++)
+                   if ( insert->blkno[k] == blkno ) {
+                       top->childoffnum = j+1;
+                       break;
+                   }
+           } else if ( insert->path[i-1]==blkno )
+                   top->childoffnum = j+1;
+       }
+
+       if ( top->childoffnum==InvalidOffsetNumber ) {
+           elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
+           return;
+       }
+
+       if ( i==0 ) 
+           PageIndexTupleDelete(top->page, top->childoffnum);
+           
+       /* install item on right place in stack */
+       top->parent=NULL;
+       if ( state.stack ) {
+           GISTInsertStack *ptr = state.stack;
+           while( ptr->parent )
+               ptr = ptr->parent;
+           ptr->parent=top;
+       } else
+           state.stack = top;
+   }
+
+   /* Good. Now we can continue insert */
+
+   gistmakedeal(&state, &giststate);
+
+   MemoryContextSwitchTo(oldCxt);
+   MemoryContextReset(opCtx);
+}
+#endif
+
+void
+gist_xlog_startup(void) {
+   incomplete_inserts=NIL;
+   insertCtx = AllocSetContextCreate(CurrentMemoryContext,
+       "GiST insert in xlog  temporary context",   
+                                 ALLOCSET_DEFAULT_MINSIZE,
+                                 ALLOCSET_DEFAULT_INITSIZE,
+                                 ALLOCSET_DEFAULT_MAXSIZE);
+   opCtx = createTempGistContext();
+}
+
+void
+gist_xlog_cleanup(void) {
+   ListCell   *l;
+
+   foreach(l, incomplete_inserts) {
+       gistIncompleteInsert    *insert = (gistIncompleteInsert*) lfirst(l);
+       char buf[1024];
+
+       *buf='\0';
+       out_target(buf, insert->node, insert->key);
+       elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
+#ifdef GIST_INCOMPLETE_INSERT 
+       gistContinueInsert(insert);
+#endif
+   }
+   MemoryContextDelete(opCtx);
+   MemoryContextDelete(insertCtx); 
+}
+
index 5fe442fd80d05cb9dce964d7b32c13b0446cb939..3b8bcb131d0be70ea21c4cf290710ff7ff300c02 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Resource managers definition
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.19 2005/06/08 15:50:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.20 2005/06/14 11:45:14 teodor Exp $
  */
 #include "postgres.h"
 
@@ -37,6 +37,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
    {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup},
    {"Hash", hash_redo, hash_desc, NULL, NULL},
    {"Rtree", rtree_redo, rtree_desc, NULL, NULL},
-   {"Gist", gist_redo, gist_desc, NULL, NULL},
+   {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup},
    {"Sequence", seq_redo, seq_desc, NULL, NULL}
 };
index db255a7b6f548f60670996352e3869cf07d8c6eb..479f221176b00f9fa7fd8e6367bc5dbee017d013 100644 (file)
@@ -7,15 +7,17 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.2 2005/06/06 17:01:24 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef GIST_PRIVATE_H
 #define GIST_PRIVATE_H
 
+#include "access/itup.h"
 #include "access/gist.h"
 #include "access/xlog.h"
+#include "access/xlogdefs.h"
 #include "fmgr.h"
 
 /*
@@ -66,6 +68,42 @@ typedef struct GISTScanOpaqueData
 
 typedef GISTScanOpaqueData *GISTScanOpaque;
 
+/*
+ * GISTInsertStack used for locking buffers and transfer arguments during
+ * insertion
+ */
+
+typedef struct GISTInsertStack {
+   /* current page */
+   BlockNumber blkno;   
+   Buffer      buffer;
+   Page        page;
+   
+   /* child's offset */
+   OffsetNumber    childoffnum;
+
+   /* pointer to parent */
+   struct GISTInsertStack  *parent;
+
+   bool todelete;
+} GISTInsertStack;
+
+typedef struct {
+   Relation    r;
+   IndexTuple      *itup; /* in/out, points to compressed entry */
+   int             ituplen; /* length of itup */
+   GISTInsertStack *stack;
+   bool needInsertComplete;
+   bool xlog_mode;
+
+   /* pointer to heap tuple */
+   ItemPointerData key;
+
+   /* path to stroe in XLog */
+   BlockNumber *path;
+   int         pathlen; 
+} GISTInsertState;
+
 /*
  * When we're doing a scan and updating a tree at the same time, the
  * updates may affect the scan.  We use the flags entry of the scan's
@@ -89,6 +127,72 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
 #define GISTOP_DEL     0
 #define GISTOP_SPLIT   1
 
+#define ATTSIZE(datum, tupdesc, i, isnull) \
+        ( \
+                (isnull) ? 0 : \
+                   att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
+        ) 
+
+/* XLog stuff */
+#define    XLOG_GIST_ENTRY_UPDATE  0x00
+#define    XLOG_GIST_ENTRY_DELETE  0x10
+#define XLOG_GIST_NEW_ROOT 0x20
+
+typedef struct gistxlogEntryUpdate {
+   RelFileNode node;
+   BlockNumber blkno;
+
+   /* if todeleteoffnum!=InvalidOffsetNumber then delete it. */ 
+   OffsetNumber    todeleteoffnum;
+   uint16      pathlen;
+
+   /* 
+    * It used to identify compliteness of insert.
+         * Sets to leaf itup 
+         */ 
+   ItemPointerData key;
+
+   /* follow:
+    * 1. path to root (BlockNumber) 
+    * 2. tuples to insert
+         */ 
+} gistxlogEntryUpdate;
+
+#define XLOG_GIST_PAGE_SPLIT   0x30
+
+typedef struct gistxlogPageSplit {
+   RelFileNode node;
+   BlockNumber origblkno; /*splitted page*/
+   OffsetNumber    todeleteoffnum;
+   uint16      pathlen;
+   int     npage;
+   int     nitup;
+
+   /* see comments on gistxlogEntryUpdate */
+   ItemPointerData key;
+   /* follow:
+    * 1. path to root (BlockNumber) 
+    * 2. tuples to insert
+    * 3. gistxlogPage and array of OffsetNumber per page
+         */ 
+} gistxlogPageSplit;
+
+typedef struct gistxlogPage {
+   BlockNumber blkno;
+   int     num;
+} gistxlogPage;    
+
+
+#define XLOG_GIST_INSERT_COMPLETE  0x40
+
+typedef struct gistxlogInsertComplete {
+   RelFileNode node;
+   ItemPointerData key;
+} gistxlogInsertComplete;
+
+#define XLOG_GIST_CREATE_INDEX 0x50
+
 /* gist.c */
 extern Datum gistbuild(PG_FUNCTION_ARGS);
 extern Datum gistinsert(PG_FUNCTION_ARGS);
@@ -96,15 +200,57 @@ extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
 extern MemoryContext createTempGistContext(void);
 extern void initGISTstate(GISTSTATE *giststate, Relation index);
 extern void freeGISTstate(GISTSTATE *giststate);
-extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
-              Datum k, Relation r, Page pg, OffsetNumber o,
-              int b, bool l, bool isNull);
+extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode);
+extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
 
+/* gistxlog.c */
 extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
 extern void gist_desc(char *buf, uint8 xl_info, char *rec);
+extern void gist_xlog_startup(void);
+extern void gist_xlog_cleanup(void);
 
 /* gistget.c */
 extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern Datum gistgetmulti(PG_FUNCTION_ARGS);
 
+/* gistutil.c */
+extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
+                                int len, OffsetNumber off);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len);
+extern IndexTuple * gistextractbuffer(Buffer buffer, int *len /* out */ );
+extern IndexTuple * gistjoinvector(
+                           IndexTuple *itvec, int *len,
+                           IndexTuple *additvec, int addlen);
+extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
+                  int len, GISTSTATE *giststate);
+extern IndexTuple gistgetadjusted(Relation r,
+                                IndexTuple oldtup,
+                                IndexTuple addtup,
+                                GISTSTATE *giststate);
+extern int gistfindgroup(GISTSTATE *giststate,
+                          GISTENTRY *valvec, GIST_SPLITVEC *spl);
+extern void gistadjsubkey(Relation r,
+                          IndexTuple *itup, int *len,
+                          GIST_SPLITVEC *v,
+                          GISTSTATE *giststate);
+extern IndexTuple gistFormTuple(GISTSTATE *giststate,
+                        Relation r, Datum *attdata, int *datumsize, bool *isnull);
+
+extern OffsetNumber gistchoose(Relation r, Page p,
+                   IndexTuple it,
+                   GISTSTATE *giststate);
+extern void gistcentryinit(GISTSTATE *giststate, int nkey,
+                           GISTENTRY *e, Datum k, 
+                           Relation r, Page pg,
+                           OffsetNumber o, int b, bool l, bool isNull);
+extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
+                              IndexTuple tuple, Page p, OffsetNumber o,
+                              GISTENTRY *attdata, bool *isnull);
+extern void gistunionsubkey(Relation r, GISTSTATE *giststate, 
+                            IndexTuple *itvec, GIST_SPLITVEC *spl);
+extern void GISTInitBuffer(Buffer b, uint32 f);
+extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+              Datum k, Relation r, Page pg, OffsetNumber o,
+              int b, bool l, bool isNull);
+
 #endif /* GIST_PRIVATE_H */