Forget to add new file :((
authorTeodor Sigaev
Wed, 28 Jun 2006 12:08:35 +0000 (12:08 +0000)
committerTeodor Sigaev
Wed, 28 Jun 2006 12:08:35 +0000 (12:08 +0000)
src/backend/access/gist/gistsplit.c [new file with mode: 0644]

diff --git a/src/backend/access/gist/gistsplit.c b/src/backend/access/gist/gistsplit.c
new file mode 100644 (file)
index 0000000..ce5a69c
--- /dev/null
@@ -0,0 +1,538 @@
+/*-------------------------------------------------------------------------
+ *
+ * gistsplit.c
+ *   Split page algorithm
+ *
+ *
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.1 2006/06/28 12:08:35 teodor Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "access/heapam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+
+typedef struct {
+    Datum   *attr;
+   int     len;
+   OffsetNumber *entries;
+   bool    *isnull;
+   bool        *equiv;
+} GistSplitUnion;
+
+
+/*
+ * Forms unions of subkeys after page split, but
+ * uses only tuples aren't in groups of equalent tuples
+ */
+static void 
+gistunionsubkeyvec(GISTSTATE *giststate,  IndexTuple *itvec, 
+                           GistSplitUnion *gsvp, int startkey) {
+   IndexTuple  *cleanedItVec;
+   int         i, cleanedLen=0;
+
+   cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len);
+
+   for(i=0;ilen;i++) {
+       if ( gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
+           continue;
+
+       cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
+   }
+
+    gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey, 
+       gsvp->attr, gsvp->isnull);
+
+   pfree( cleanedItVec );
+}
+
+/*
+ * unions subkeys for after user picksplit over attno-1 column
+ */
+static void
+gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl, int attno)
+{
+   GistSplitUnion  gsvp;
+
+   gsvp.equiv = spl->spl_equiv;
+
+   gsvp.attr = spl->spl_lattr;
+   gsvp.len = spl->splitVector.spl_nleft;
+   gsvp.entries = spl->splitVector.spl_left;
+   gsvp.isnull = spl->spl_lisnull;
+
+   gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
+
+   gsvp.attr = spl->spl_rattr;
+   gsvp.len = spl->splitVector.spl_nright;
+   gsvp.entries = spl->splitVector.spl_right;
+   gsvp.isnull = spl->spl_risnull;
+
+   gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
+}
+
+/*
+ * find group in vector with equivalent value 
+ */
+static int
+gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
+{
+   int         i;
+   GISTENTRY   entry;
+   int len=0;
+
+   /*
+    * attno key is always not null (see gistSplitByKey), so we may not check for
+    * nulls
+    */
+   gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
+   for (i = 0; i < spl->splitVector.spl_nleft; i++) {
+       float penalty = gistpenalty(giststate, attno, &entry, false,
+                                   &valvec[spl->splitVector.spl_left[i]], false);
+       if ( penalty == 0.0 ) {
+           spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
+           len++;
+       }
+   }
+
+   gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
+   for (i = 0; i < spl->splitVector.spl_nright; i++) {
+       float penalty = gistpenalty(giststate, attno, &entry, false,
+                                   &valvec[spl->splitVector.spl_right[i]], false);
+       if ( penalty == 0.0 ) {
+           spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
+           len++;
+       }
+   }
+
+   return len;
+}
+
+static void
+cleanupOffsets( OffsetNumber *a, int *len, bool *equiv, int *LenEquiv ) {
+   int curlen,i;
+   OffsetNumber    *curwpos;
+
+   curlen = *len;
+   curwpos = a;
+   for (i = 0; i < *len; i++) {
+       if ( equiv[ a[i] ] == FALSE ) {
+           *curwpos = a[i];
+           curwpos++;
+       } else {
+           /* corner case: we shouldn't make void array */
+           if ( curlen==1 ) {
+               equiv[ a[i] ] = FALSE; /* mark item as non-equivalent */
+               i--; /* redo the same */
+               *LenEquiv -= 1;
+               continue;
+           } else
+               curlen--;
+       }
+   }
+
+   *len = curlen;
+}
+
+static void
+placeOne( Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno ) {
+   GISTENTRY   identry[INDEX_MAX_KEYS];
+   bool        isnull[INDEX_MAX_KEYS];
+   bool    toLeft = true;
+
+   gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);
+
+   for(;attnotupdesc->natts;attno++) {
+       float lpenalty, rpenalty;
+       GISTENTRY   entry;
+
+       gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0,  FALSE);
+       lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry+attno, isnull[ attno ]); 
+       gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0,  FALSE);
+       rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry+attno, isnull[ attno ]);
+
+       if ( lpenalty != rpenalty ) {
+           if ( lpenalty > rpenalty )
+               toLeft = false;
+           break;
+       }
+   }
+
+   if ( toLeft ) 
+       v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = off;
+   else
+       v->splitVector.spl_right[ v->splitVector.spl_nright++ ] = off;
+}
+
+#define SWAPVAR( s, d, t ) \
+do {   \
+   (t) = (s); \
+   (s) = (d); \
+   (d) = (t); \
+} while(0)
+
+/*
+ * adjust left and right unions according to splits by previous 
+ * split by firsts columns. This function is called only in case 
+ * when pickSplit doesn't support subspplit.
+ */
+
+static void
+supportSecondarySplit( Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR ) {
+   bool    leaveOnLeft = true, tmpBool;
+   GISTENTRY entryL, entryR, entrySL, entrySR; 
+   
+   gistentryinit(entryL, oldL, r, NULL, 0,  FALSE);
+   gistentryinit(entryR, oldR, r, NULL, 0,  FALSE);
+   gistentryinit(entrySL, sv->spl_ldatum , r, NULL, 0,  FALSE);
+   gistentryinit(entrySR, sv->spl_rdatum , r, NULL, 0,  FALSE);
+
+   if ( sv->spl_ldatum_exists && sv->spl_rdatum_exists ) {
+       float penalty1, penalty2;
+
+       penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
+                  gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
+       penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
+                  gistpenalty(giststate, attno, &entryR, false, &entrySL, false);
+
+       if ( penalty1 > penalty2 )
+           leaveOnLeft = false;
+
+   } else {
+       GISTENTRY   *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
+       float penalty1, penalty2;
+
+       /*
+        * there is only one previously defined union,
+        * so we just choose swap or not by lowest penalty
+        */
+
+       penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
+       penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);
+
+       if ( penalty1 < penalty2 ) 
+           leaveOnLeft = ( sv->spl_ldatum_exists ) ? true : false;
+       else
+           leaveOnLeft = ( sv->spl_rdatum_exists ) ? true : false;
+   }
+
+   if ( leaveOnLeft == false ) {
+       /*
+        * swap left and right 
+        */
+       OffsetNumber    *off, noff;
+       Datum           datum;
+
+       SWAPVAR( sv->spl_left, sv->spl_right, off );
+       SWAPVAR( sv->spl_nleft, sv->spl_nright, noff );
+       SWAPVAR( sv->spl_ldatum, sv->spl_rdatum, datum );
+       gistentryinit(entrySL, sv->spl_ldatum , r, NULL, 0,  FALSE);
+       gistentryinit(entrySR, sv->spl_rdatum , r, NULL, 0,  FALSE);
+   }
+
+   if ( sv->spl_ldatum_exists ) 
+       gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
+                       &sv->spl_ldatum, &tmpBool);
+
+   if ( sv->spl_rdatum_exists ) 
+       gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
+                       &sv->spl_rdatum, &tmpBool);
+
+   sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
+}
+
+/*
+ * Calls user picksplit method for attno columns to split vector to
+ * two vectors. May use attno+n columns data to
+ * get better split.
+ * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
+ * so caller may find better split
+ * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved 
+ */
+static bool
+gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
+                 IndexTuple *itup, int len, GISTSTATE *giststate)
+{
+   GIST_SPLITVEC *sv = &v->splitVector;
+   /*
+    * now let the user-defined picksplit function set up the split vector; in
+    * entryvec have no null value!!
+    */
+
+   sv->spl_ldatum_exists = ( v->spl_lisnull[ attno ] ) ? false : true;
+   sv->spl_rdatum_exists = ( v->spl_risnull[ attno ] ) ? false : true;
+   sv->spl_ldatum = v->spl_lattr[attno];
+   sv->spl_rdatum = v->spl_rattr[attno];
+
+   FunctionCall2(&giststate->picksplitFn[attno],
+                 PointerGetDatum(entryvec),
+                 PointerGetDatum(sv));
+
+   /* compatibility with old code */
+   if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
+       sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
+   if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
+       sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
+
+   if( sv->spl_ldatum_exists || sv->spl_rdatum_exists ) { 
+       elog(LOG,"PickSplit method of %d columns of index '%s' doesn't support secondary split",
+           attno + 1, RelationGetRelationName(r) );
+
+       supportSecondarySplit( r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno] ); 
+   }
+
+   v->spl_lattr[attno] = sv->spl_ldatum;
+   v->spl_rattr[attno] = sv->spl_rdatum;
+   v->spl_lisnull[attno] = false;
+   v->spl_risnull[attno] = false;
+
+   /*
+    * if index is multikey, then we must to try get smaller bounding box for
+    * subkey(s)
+    */
+   v->spl_equiv = NULL;
+
+   if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts)
+   {
+       if ( gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum) ) {
+           /*
+            * Left and right key's unions are equial, so
+            * we can get better split by following columns. Note,
+            * unions for attno columns are already done.
+            */
+
+           return true;
+       } else {
+           int         LenEquiv;
+
+           v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n+1));
+
+           LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);
+
+           /*
+            * if possible, we should distribute equivalent tuples 
+            */
+           if (LenEquiv == 0 ) { 
+               gistunionsubkey(giststate, itup, v, attno + 1);
+           } else {
+               cleanupOffsets( sv->spl_left,  &sv->spl_nleft, v->spl_equiv, &LenEquiv );
+               cleanupOffsets( sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv );
+
+               gistunionsubkey(giststate, itup, v, attno + 1);
+               if (LenEquiv == 1 ) {
+                   /*
+                    * In case with one tuple we just choose left-right 
+                    * by penalty. It's simplify user-defined pickSplit
+                    */
+                   OffsetNumber toMove = InvalidOffsetNumber;
+
+                   for(toMove=FirstOffsetNumber;toMoven;toMove++) 
+                       if ( v->spl_equiv[ toMove ] )
+                           break;
+                   Assert( toMove < entryvec->n );
+           
+                   placeOne( r, giststate, v, itup[ toMove-1 ], toMove, attno+1 );
+                   /* redo gistunionsubkey(): it will not degradate performance,
+                    * because it's very rarely */
+                   v->spl_equiv = NULL;
+                   gistunionsubkey(giststate, itup, v, attno + 1);
+
+                   return false;
+               } else if ( LenEquiv > 1 )
+                   return true;
+           }
+       }
+   }
+
+   return false;
+}
+
+/*
+ * simple split page 
+ */
+static void
+gistSplitHalf(GIST_SPLITVEC *v, int len) {
+   int i;
+
+   v->spl_nright   = v->spl_nleft      = 0;
+   v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+   v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+   for(i = 1; i <= len; i++)
+       if ( i
+           v->spl_right[ v->spl_nright++ ] = i;
+       else
+           v->spl_left[ v->spl_nleft++ ] = i;
+}
+
+/*
+ * if it was invalid tuple then we need special processing.
+ * We move all invalid tuples on right page. 
+ *
+ * if there is no place on left page, gistSplit will be called one more 
+ * time for left page.
+ *
+ * Normally, we never exec this code, but after crash replay it's possible
+ * to get 'invalid' tuples (probability is low enough)
+ */
+static void
+gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len) {
+   int i;
+   static OffsetNumber offInvTuples[ MaxOffsetNumber ];
+   int          nOffInvTuples = 0;
+
+   for (i = 1; i <= len; i++)
+       if ( GistTupleIsInvalid(itup[i - 1]) )
+           offInvTuples[ nOffInvTuples++ ] = i;
+
+   if ( nOffInvTuples == len ) {
+       /* corner case, all tuples are invalid */
+       v->spl_rightvalid= v->spl_leftvalid     = false;
+       gistSplitHalf( &v->splitVector, len );
+   } else {
+       GistSplitUnion    gsvp;
+               
+       v->splitVector.spl_right = offInvTuples;
+       v->splitVector.spl_nright = nOffInvTuples;
+       v->spl_rightvalid = false;
+
+       v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+       v->splitVector.spl_nleft = 0;
+       for(i = 1; i <= len; i++) 
+           if ( !GistTupleIsInvalid(itup[i - 1]) )
+               v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = i;
+       v->spl_leftvalid = true;
+       
+       gsvp.equiv = NULL;
+       gsvp.attr = v->spl_lattr;
+       gsvp.len = v->splitVector.spl_nleft;
+       gsvp.entries = v->splitVector.spl_left;
+       gsvp.isnull = v->spl_lisnull;
+
+       gistunionsubkeyvec(giststate, itup, &gsvp, 0);
+   }
+}
+
+/*
+ * trys to split page by attno key, in a case of null
+ * values move its to separate page. 
+ */
+void
+gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate, 
+       GistSplitVector *v, GistEntryVector *entryvec, int attno) {
+   int i;
+   static OffsetNumber offNullTuples[ MaxOffsetNumber ];
+   int          nOffNullTuples = 0;
+
+   for (i = 1; i <= len; i++) {
+       Datum       datum;
+       bool        IsNull;
+
+       if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) {
+           gistSplitByInvalid(giststate, v, itup, len);
+           return;
+       }
+
+       datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull);
+       gistdentryinit(giststate, attno, &(entryvec->vector[i]),
+                      datum, r, page, i,
+                      FALSE, IsNull);
+       if ( IsNull )
+           offNullTuples[ nOffNullTuples++ ] = i;
+   }
+
+   v->spl_leftvalid = v->spl_rightvalid = true;
+
+   if ( nOffNullTuples == len ) {
+       /* 
+        * Corner case: All keys in attno column are null, we should try to
+        * split by keys in next column. It all keys in all columns
+        * are NULL just split page half by half
+        */
+       v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
+
+       if ( attno+1 == r->rd_att->natts ) 
+           gistSplitHalf( &v->splitVector, len );
+       else 
+           gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
+   } else if ( nOffNullTuples > 0 ) {
+       int j=0;
+       
+       /* 
+        * We don't want to mix NULLs and not-NULLs keys
+        * on one page, so move nulls to right page
+        */
+       v->splitVector.spl_right = offNullTuples;
+       v->splitVector.spl_nright = nOffNullTuples;
+       v->spl_risnull[attno] = TRUE;
+
+       v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
+       v->splitVector.spl_nleft = 0;
+       for(i = 1; i <= len; i++) 
+           if ( jsplitVector.spl_nright && offNullTuples[j] == i ) 
+               j++;
+           else
+               v->splitVector.spl_left[ v->splitVector.spl_nleft++ ] = i;
+
+       v->spl_equiv = NULL;
+       gistunionsubkey(giststate, itup, v, attno);
+   } else {
+       /*
+        * all keys are not-null
+        */
+       entryvec->n = len+1;
+
+       if ( gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno+1 != r->rd_att->natts ) {
+           /*
+            * Splitting on attno column is not optimized: there is a tuples which can be freely
+            * left or right page, we will try to split page by 
+            * following columns
+            */
+           if ( v->spl_equiv == NULL ) {
+               /* simple case: left and right keys for attno column are equial */
+               gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
+           } else {
+               /* we should clean up vector from already distributed tuples */
+               IndexTuple  *newitup =  (IndexTuple*)palloc((len + 1) * sizeof(IndexTuple));
+               OffsetNumber    *map = (OffsetNumber*)palloc((len + 1) * sizeof(IndexTuple));
+               int newlen = 0;
+               GIST_SPLITVEC backupSplit = v->splitVector;
+
+               for(i=0; i
+                   if ( v->spl_equiv[i+1] ) {
+                       map[ newlen ] = i+1;
+                       newitup[ newlen++ ] = itup[i];
+                   }
+
+               Assert( newlen>0 );
+
+               backupSplit.spl_left = (OffsetNumber*)palloc(sizeof(OffsetNumber)*len);
+               memcpy( backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber)*v->splitVector.spl_nleft);
+               backupSplit.spl_right = (OffsetNumber*)palloc(sizeof(OffsetNumber)*len);
+               memcpy( backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber)*v->splitVector.spl_nright);
+
+               gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno+1);
+
+               /* merge result of subsplit */
+               for(i=0;isplitVector.spl_nleft;i++)
+                   backupSplit.spl_left[ backupSplit.spl_nleft++ ] = map[ v->splitVector.spl_left[i]-1 ];
+               for(i=0;isplitVector.spl_nright;i++)
+                   backupSplit.spl_right[ backupSplit.spl_nright++ ] = map[ v->splitVector.spl_right[i]-1 ];
+
+               v->splitVector = backupSplit;
+               /* reunion left and right datums */
+               gistunionsubkey(giststate, itup, v, attno);
+           }
+       } 
+   }
+}