Simplify gistSplit() and some refactoring related code.
authorTeodor Sigaev
Fri, 19 May 2006 16:15:17 +0000 (16:15 +0000)
committerTeodor Sigaev
Fri, 19 May 2006 16:15:17 +0000 (16:15 +0000)
src/backend/access/gist/gist.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index d207b7ecfa7f72e7c1205e5c23b8978536b412a5..cb10cbc35bd5cfceb1b55cd861d95be4990051c9 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -936,31 +936,6 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
        gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
 }
 
-static void
-gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
-{
-   int         i;
-
-   for (i = 0; i < len; i++)
-       arr[i] = reasloffset[arr[i]];
-}
-
-static IndexTupleData *
-gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
-   char *ptr, *ret = palloc(BLCKSZ);
-   int i;
-
-   ptr = ret;
-   for (i = 0; i < veclen; i++) {
-       memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
-       ptr += IndexTupleSize(vec[i]);
-   }
-
-   *memlen = ptr - ret;
-   Assert( *memlen < BLCKSZ );
-   return (IndexTupleData*)ret;
-}
-
 /*
  * gistSplit -- split a page in the tree.
  */
@@ -975,100 +950,70 @@ gistSplit(Relation r,
               *rvectup;
    GIST_SPLITVEC v;
    GistEntryVector *entryvec;
-   int         i,
-               fakeoffset;
-   OffsetNumber *realoffset;
-   IndexTuple *cleaneditup = itup;
-   int         lencleaneditup = len;
+   int         i;
+   OffsetNumber offInvTuples[ MaxOffsetNumber ];
+   int          nOffInvTuples = 0;
    SplitedPageLayout   *res = NULL;
 
    /* generate the item array */
-   realoffset = palloc((len + 1) * sizeof(OffsetNumber));
    entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
    entryvec->n = len + 1;
 
-   fakeoffset = FirstOffsetNumber;
    for (i = 1; i <= len; i++)
    {
        Datum       datum;
        bool        IsNull;
 
        if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
-       {
-           entryvec->n--;
            /* remember position of invalid tuple */
-           realoffset[entryvec->n] = i;
-           continue;
-       }
+           offInvTuples[ nOffInvTuples++ ] = i;            
+
+       if ( nOffInvTuples > 0 )
+           /* we can safely do not decompress other keys, because 
+              we will do splecial processing, but
+              it's needed to find another invalid tuples */
+           continue;   
 
        datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
-       gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
+       gistdentryinit(giststate, 0, &(entryvec->vector[i]),
                       datum, r, page, i,
                       ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
                       FALSE, IsNull);
-       realoffset[fakeoffset] = i;
-       fakeoffset++;
    }
 
    /*
-    * if it was invalid tuple then we need special processing. If it's
-    * possible, we move all invalid tuples on right page. We should remember,
-    * that union with invalid tuples is a invalid tuple.
+    * if it was invalid tuple then we need special processing.
+    * We move all invalid tuples on right page. 
+    *
+    * if there is no place on left page, gistSplit will be called one more 
+    * time for left page.
+    *
+    * Normally, we never exec this code, but after crash replay it's possible
+    * to get 'invalid' tuples (probability is low enough)
     */
-   if (entryvec->n != len + 1)
+   if (nOffInvTuples > 0)
    {
-       lencleaneditup = entryvec->n - 1;
-       cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
-       for (i = 1; i < entryvec->n; i++)
-           cleaneditup[i - 1] = itup[realoffset[i] - 1];
-
-       if (!gistfitpage(cleaneditup, lencleaneditup))
-       {
-           /* no space on left to put all good tuples, so picksplit */
-           gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
-           v.spl_leftvalid = true;
-           v.spl_rightvalid = false;
-           gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
-           gistToRealOffset(v.spl_right, v.spl_nright, realoffset);
-       }
-       else
-       {
-           /* we can try to store all valid tuples on one page */
-           v.spl_right = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-           v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
-
-           if (lencleaneditup == 0)
-           {
-               /* all tuples are invalid, so moves half of its to right */
-               v.spl_leftvalid = v.spl_rightvalid = false;
-               v.spl_nright = 0;
-               v.spl_nleft = 0;
-               for (i = 1; i <= len; i++)
-                   if (i - 1 < len / 2)
-                       v.spl_left[v.spl_nleft++] = i;
-                   else
-                       v.spl_right[v.spl_nright++] = i;
-           }
-           else
-           {
-               /*
-                * we will not call gistUserPicksplit, just put good tuples on
-                * left and invalid on right
-                */
-               v.spl_nleft = lencleaneditup;
-               v.spl_nright = 0;
-               for (i = 1; i < entryvec->n; i++)
-                   v.spl_left[i - 1] = i;
-               gistToRealOffset(v.spl_left, v.spl_nleft, realoffset);
-               v.spl_lattr[0] = v.spl_ldatum = (Datum) 0;
-               v.spl_rattr[0] = v.spl_rdatum = (Datum) 0;
-               v.spl_lisnull[0] = true;
-               v.spl_risnull[0] = true;
-               gistunionsubkey(r, giststate, itup, &v, true);
-               v.spl_leftvalid = true;
-               v.spl_rightvalid = false;
-           }
-       }
+       GistSplitVec    gsvp;
+               
+       v.spl_right = offInvTuples;
+       v.spl_nright = nOffInvTuples;
+       v.spl_rightvalid = false;
+
+       v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
+       v.spl_nleft = 0;
+       for(i = 1; i <= len; i++) 
+           if ( !GistTupleIsInvalid(itup[i - 1]) )
+               v.spl_left[ v.spl_nleft++ ] = i;
+       v.spl_leftvalid = true;
+       
+       gsvp.idgrp = NULL;
+       gsvp.attrsize = v.spl_lattrsize;
+       gsvp.attr = v.spl_lattr;
+       gsvp.len = v.spl_nleft;
+       gsvp.entries = v.spl_left;
+       gsvp.isnull = v.spl_lisnull;
+
+       gistunionsubkeyvec(giststate, itup, &gsvp, true);
    }
    else
    {
@@ -1088,12 +1033,6 @@ gistSplit(Relation r,
    for (i = 0; i < v.spl_nright; i++)
        rvectup[i] = itup[v.spl_right[i] - 1];
 
-   /* place invalid tuples on right page if itsn't done yet */
-   for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
-   {
-       rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
-   }
-
    /* finalyze splitting (may need another split) */
    if (!gistfitpage(rvectup, v.spl_nright))
    {
index ca5a9d652d22a2b8e133fb483b6591da93da9d6b..92798a27d300285ac5d7be31c7d7763e136f77a2 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *         $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
+ *         $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -139,6 +139,30 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
    return itvec;
 }
 
+/*
+ * make plain IndexTupleVector
+ */
+
+IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+   char *ptr, *ret;
+   int i;
+
+   *memlen=0;
+                   
+   for (i = 0; i < veclen; i++)
+       *memlen += IndexTupleSize(vec[i]);
+
+   ptr = ret = palloc(*memlen);
+
+   for (i = 0; i < veclen; i++) { 
+       memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+       ptr += IndexTupleSize(vec[i]);
+   }
+
+   return (IndexTupleData*)ret;
+}
+
 /*
  * Return an IndexTuple containing the result of applying the "union"
  * method to the specified IndexTuple vector.
@@ -313,100 +337,101 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
    return newtup;
 }
 
-void
-gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall)
-{
-   int         lr;
+void 
+gistunionsubkeyvec(GISTSTATE *giststate,  IndexTuple *itvec, 
+                           GistSplitVec *gsvp, bool isall) {
+   int         i;
+   GistEntryVector *evec;
 
-   for (lr = 0; lr < 2; lr++)
-   {
-       OffsetNumber *entries;
-       int         i;
-       Datum      *attr;
-       int         len,
-                  *attrsize;
-       bool       *isnull;
-       GistEntryVector *evec;
-
-       if (lr)
-       {
-           attrsize = spl->spl_lattrsize;
-           attr = spl->spl_lattr;
-           len = spl->spl_nleft;
-           entries = spl->spl_left;
-           isnull = spl->spl_lisnull;
-       }
-       else
-       {
-           attrsize = spl->spl_rattrsize;
-           attr = spl->spl_rattr;
-           len = spl->spl_nright;
-           entries = spl->spl_right;
-           isnull = spl->spl_risnull;
-       }
+   evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ);
 
-       evec = palloc(((len < 2) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
+   for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++)
+   {
+       int         j;
+       Datum       datum;
+       int         datumsize;
+       int         real_len;
 
-       for (i = (isall) ? 0 : 1; i < r->rd_att->natts; i++)
+       real_len = 0;
+       for (j = 0; j < gsvp->len; j++)
        {
-           int         j;
-           Datum       datum;
-           int         datumsize;
-           int         real_len;
+           bool        IsNull;
 
-           real_len = 0;
-           for (j = 0; j < len; j++)
-           {
-               bool        IsNull;
+           if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]])
+               continue;
 
-               if (spl->spl_idgrp[entries[j]])
-                   continue;
-               datum = index_getattr(itvec[entries[j] - 1], i + 1,
+           datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1,
                                      giststate->tupdesc, &IsNull);
-               if (IsNull)
-                   continue;
-               gistdentryinit(giststate, i,
-                              &(evec->vector[real_len]),
-                              datum,
-                              NULL, NULL, (OffsetNumber) 0,
+           if (IsNull)
+               continue;
+           gistdentryinit(giststate, i,
+                          &(evec->vector[real_len]),
+                          datum,
+                          NULL, NULL, (OffsetNumber) 0,
                           ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
-                              FALSE, IsNull);
-               real_len++;
+                          FALSE, IsNull);
+           real_len++;
 
-           }
+       }
 
-           if (real_len == 0)
+       if (real_len == 0)
+       {
+           datum = (Datum) 0;
+           datumsize = 0;
+           gsvp->isnull[i] = true;
+       }
+       else
+       {
+           /*
+            * evec->vector[0].bytes may be not defined, so form union
+            * with itself
+            */
+           if (real_len == 1)
            {
-               datum = (Datum) 0;
-               datumsize = 0;
-               isnull[i] = true;
+               evec->n = 2;
+               memcpy(&(evec->vector[1]), &(evec->vector[0]),
+                      sizeof(GISTENTRY));
            }
            else
-           {
-               /*
-                * evec->vector[0].bytes may be not defined, so form union
-                * with itself
-                */
-               if (real_len == 1)
-               {
-                   evec->n = 2;
-                   memcpy(&(evec->vector[1]), &(evec->vector[0]),
-                          sizeof(GISTENTRY));
-               }
-               else
-                   evec->n = real_len;
-               datum = FunctionCall2(&giststate->unionFn[i],
-                                     PointerGetDatum(evec),
-                                     PointerGetDatum(&datumsize));
-               isnull[i] = false;
-           }
-
-           attr[i] = datum;
-           attrsize[i] = datumsize;
+               evec->n = real_len;
+           datum = FunctionCall2(&giststate->unionFn[i],
+                                 PointerGetDatum(evec),
+                                 PointerGetDatum(&datumsize));
+           gsvp->isnull[i] = false;
        }
+
+       gsvp->attr[i] = datum;
+       gsvp->attrsize[i] = datumsize;
    }
 }
 
+/*
+ * unions subkey for after user picksplit over first column
+ */
+static void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
+{
+   GistSplitVec    gsvp;
+
+   gsvp.idgrp = spl->spl_idgrp;
+
+   gsvp.attrsize = spl->spl_lattrsize;
+   gsvp.attr = spl->spl_lattr;
+   gsvp.len = spl->spl_nleft;
+   gsvp.entries = spl->spl_left;
+   gsvp.isnull = spl->spl_lisnull;
+
+   gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+
+   gsvp.attrsize = spl->spl_rattrsize;
+   gsvp.attr = spl->spl_rattr;
+   gsvp.len = spl->spl_nright;
+   gsvp.entries = spl->spl_right;
+   gsvp.isnull = spl->spl_risnull;
+
+   gistunionsubkeyvec(giststate, itvec, &gsvp, false);
+}
+
 /*
  * find group in vector with equal value
  */
@@ -840,7 +865,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
     * if index is multikey, then we must to try get smaller bounding box for
     * subkey(s)
     */
-   if (r->rd_att->natts > 1)
+   if (giststate->tupdesc->natts > 1)
    {
        int         MaxGrpId;
 
@@ -851,7 +876,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
        MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
 
        /* form union of sub keys for each page (l,p) */
-       gistunionsubkey(r, giststate, itup, v, false);
+       gistunionsubkey(giststate, itup, v);
 
        /*
         * if possible, we insert equivalent tuples with control by penalty
index 1126727cd97bec7d3c5947e8a8e730739116beca..aef2056a34d41a12d982289f3ca41b5a01ea837c 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.19 2006/05/19 16:15:17 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -557,28 +557,16 @@ gistMakePageLayout(Buffer *buffers, int nbuffers) {
 
    while( nbuffers-- > 0 ) {
        Page page = BufferGetPage( buffers[ nbuffers ] );
-       IndexTuple  idxtup;
-       OffsetNumber    i;
-       char *ptr;
+       IndexTuple* vec;
+       int veclen;
 
        resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
 
        resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
        resptr->block.num = PageGetMaxOffsetNumber( page );
 
-       for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
-           idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-           resptr->lenlist += IndexTupleSize(idxtup);
-       }
-
-       resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
-       ptr = (char*)(resptr->list);
-
-       for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
-           idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
-           memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
-           ptr += IndexTupleSize(idxtup);
-       }
+       vec = gistextractpage( page, &veclen ); 
+       resptr->list = gistfillitupvec( vec, veclen, &(resptr->lenlist) );
 
        resptr->next = res;
        res = resptr;
index a866277fe9f462df637c9ad55b1096027513c2cb..f08d49dbf90df21c77d4dd74131407970b97d70a 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -283,6 +283,8 @@ extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
               IndexTuple *itvec, int *len,
               IndexTuple *additvec, int addlen);
+extern IndexTupleData* gistfillitupvec(IndexTuple *vec, int veclen, int *memlen);
+
 extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
          int len, GISTSTATE *giststate);
 extern IndexTuple gistgetadjusted(Relation r,
@@ -308,8 +310,19 @@ extern void gistcentryinit(GISTSTATE *giststate, int nkey,
 extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
                  IndexTuple tuple, Page p, OffsetNumber o,
                  GISTENTRY *attdata, bool *isnull);
-extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
-               IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
+
+typedef struct {
+   int     *attrsize;
+   Datum   *attr;
+   int     len;
+   OffsetNumber *entries;
+   bool    *isnull;
+   int     *idgrp;
+} GistSplitVec;
+
+extern void gistunionsubkeyvec(GISTSTATE *giststate, 
+   IndexTuple *itvec, GistSplitVec *gsvp,  bool isall);
+
 extern void GISTInitBuffer(Buffer b, uint32 f);
 extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
               Datum k, Relation r, Page pg, OffsetNumber o,