Rewrite GiST support code for rangetypes.
authorTom Lane
Mon, 5 Mar 2012 03:50:06 +0000 (22:50 -0500)
committerTom Lane
Mon, 5 Mar 2012 03:50:06 +0000 (22:50 -0500)
This patch installs significantly smarter penalty and picksplit functions
for ranges, making GiST indexes for them smaller and faster to search.

There is no on-disk format change, so no catversion bump, but you'd need
to REINDEX to get the benefits for any existing index.

Alexander Korotkov, reviewed by Jeff Davis

src/backend/utils/adt/rangetypes_gist.c

index 4267dc8cb613b4ba29e3027bd9c5170664fc47dc..87f71e6812c3587eb854f6f5e1d7cd052b27f279 100644 (file)
 #define RANGESTRAT_CONTAINS_ELEM       16
 #define RANGESTRAT_EQ                  18
 
-/* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
-#define rangeCopy(r) \
-   ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
-                                            false, -1)))
+/*
+ * Range class properties used to segregate different classes of ranges in
+ * GiST.  Each unique combination of properties is a class.  CLS_EMPTY cannot
+ * be combined with anything else.
+ */
+#define CLS_NORMAL         0   /* Ordinary finite range (no bits set) */
+#define CLS_LOWER_INF      1   /* Lower bound is infinity */
+#define CLS_UPPER_INF      2   /* Upper bound is infinity */
+#define CLS_CONTAIN_EMPTY  4   /* Contains underlying empty ranges */
+#define CLS_EMPTY          8   /* Special class for empty ranges */
+
+#define CLS_COUNT          9   /* # of classes; includes all combinations of
+                                * properties. CLS_EMPTY doesn't combine with
+                                * anything else, so it's only 2^3 + 1. */
+
+/*
+ * Minimum accepted ratio of split for items of the same class.  If the items
+ * are of different classes, we will separate along those lines regardless of
+ * the ratio.
+ */
+#define LIMIT_RATIO  0.3
+
+/* Constants for fixed penalty values */
+#define INFINITE_BOUND_PENALTY  2.0
+#define CONTAIN_EMPTY_PENALTY  1.0
+#define DEFAULT_SUBTYPE_DIFF_PENALTY  1.0
 
 /*
- * Auxiliary structure for picksplit method.
+ * Per-item data for range_gist_single_sorting_split.
  */
 typedef struct
 {
-   int         index;          /* original index in entryvec->vector[] */
-   RangeType  *data;           /* range value to sort */
-   TypeCacheEntry *typcache;   /* range type's info */
-}  PickSplitSortItem;
+   int                 index;
+   RangeBound          bound;
+} SingleBoundSortItem;
+
+/* place on left or right side of split? */
+typedef enum
+{
+   SPLIT_LEFT = 0,             /* makes initialization to SPLIT_LEFT easier */
+   SPLIT_RIGHT
+} SplitLR;
+
+/*
+ * Context for range_gist_consider_split.
+ */
+typedef struct
+{
+   TypeCacheEntry *typcache;   /* typcache for range type */
+   bool        has_subtype_diff;   /* does it have subtype_diff? */
+   int         entries_count;  /* total number of entries being split */
+
+   /* Information about currently selected split follows */
+
+   bool        first;          /* true if no split was selected yet */
+
+   RangeBound  *left_upper;    /* upper bound of left interval */
+   RangeBound  *right_lower;   /* lower bound of right interval */
+
+   float4      ratio;          /* split ratio */
+   float4      overlap;        /* overlap between left and right predicate */
+   int         common_left;    /* # common entries destined for each side */
+   int         common_right;
+} ConsiderSplitContext;
+
+/*
+ * Bounds extracted from a non-empty range, for use in
+ * range_gist_double_sorting_split.
+ */
+typedef struct
+{
+   RangeBound  lower;
+   RangeBound  upper;
+} NonEmptyRange;
+
+/*
+ * Represents information about an entry that can be placed in either group
+ * without affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+   /* Index of entry in the initial array */
+   int         index;
+   /* Delta between closeness of range to each of the two groups */
+   double      delta;
+} CommonEntry;
+
+/* Helper macros to place an entry in the left or right group during split */
+/* Note direct access to variables v, typcache, left_range, right_range */
+#define PLACE_LEFT(range, off)                 \
+   do {                                        \
+       if (v->spl_nleft > 0)                   \
+           left_range = range_super_union(typcache, left_range, range); \
+       else                                    \
+           left_range = (range);               \
+       v->spl_left[v->spl_nleft++] = (off);    \
+   } while(0)
+
+#define PLACE_RIGHT(range, off)                    \
+   do {                                        \
+       if (v->spl_nright > 0)                  \
+           right_range = range_super_union(typcache, right_range, range); \
+       else                                    \
+           right_range = (range);              \
+       v->spl_right[v->spl_nright++] = (off);  \
+   } while(0)
+
+/* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
+#define rangeCopy(r) \
+   ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
+                                            false, -1)))
 
 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType * r1,
                  RangeType * r2);
@@ -57,7 +154,30 @@ static bool range_gist_consistent_int(FmgrInfo *flinfo,
 static bool range_gist_consistent_leaf(FmgrInfo *flinfo,
                           StrategyNumber strategy, RangeType *key,
                           Datum query);
-static int sort_item_cmp(const void *a, const void *b);
+static void range_gist_fallback_split(TypeCacheEntry *typcache,
+                                     GistEntryVector *entryvec,
+                                     GIST_SPLITVEC *v);
+static void range_gist_class_split(TypeCacheEntry *typcache,
+                                  GistEntryVector *entryvec,
+                                  GIST_SPLITVEC *v,
+                                  SplitLR *classes_groups);
+static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
+                                           GistEntryVector *entryvec,
+                                           GIST_SPLITVEC *v,
+                                           bool use_upper_bound);
+static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
+                                           GistEntryVector *entryvec,
+                                           GIST_SPLITVEC *v);
+static void range_gist_consider_split(ConsiderSplitContext *context,
+                         RangeBound *right_lower, int min_left_count,
+                         RangeBound *left_upper, int max_left_count);
+static int get_gist_range_class(RangeType *range);
+static int single_bound_cmp(const void *a, const void *b, void *arg);
+static int interval_cmp_lower(const void *a, const void *b, void *arg);
+static int interval_cmp_upper(const void *a, const void *b, void *arg);
+static int common_entry_cmp(const void *i1, const void *i2);
+static float8 call_subtype_diff(TypeCacheEntry *typcache,
+                               Datum val1, Datum val2);
 
 
 /* GiST query consistency check */
@@ -122,7 +242,16 @@ range_gist_decompress(PG_FUNCTION_ARGS)
    PG_RETURN_POINTER(entry);
 }
 
-/* page split penalty function */
+/*
+ * GiST page split penalty function.
+ *
+ * The penalty function has the following goals (in order from most to least
+ * important):
+ * - Keep normal ranges separate
+ * - Avoid broadening the class of the original predicate
+ * - Avoid broadening (as determined by subtype_diff) the original predicate
+ * - Favor adding ranges to narrower original predicates
+ */
 Datum
 range_gist_penalty(PG_FUNCTION_ARGS)
 {
@@ -132,118 +261,253 @@ range_gist_penalty(PG_FUNCTION_ARGS)
    RangeType  *orig = DatumGetRangeType(origentry->key);
    RangeType  *new = DatumGetRangeType(newentry->key);
    TypeCacheEntry *typcache;
-   RangeType  *s_union;
-   FmgrInfo   *subtype_diff;
-   RangeBound  lower1,
-               lower2;
-   RangeBound  upper1,
-               upper2;
-   bool        empty1,
-               empty2;
-   float8      lower_diff,
-               upper_diff;
+   bool        has_subtype_diff;
+   RangeBound  orig_lower,
+               new_lower,
+               orig_upper,
+               new_upper;
+   bool        orig_empty,
+               new_empty;
 
    if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
        elog(ERROR, "range types do not match");
 
    typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
 
-   subtype_diff = &typcache->rng_subdiff_finfo;
+   has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
 
-   /*
-    * If new is or contains empty, and orig doesn't, apply infinite penalty.
-    * We really don't want to pollute an empty-free subtree with empties.
-    */
-   if (RangeIsOrContainsEmpty(new) && !RangeIsOrContainsEmpty(orig))
-   {
-       *penalty = get_float4_infinity();
-       PG_RETURN_POINTER(penalty);
-   }
+   range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
+   range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
 
    /*
-    * We want to compare the size of "orig" to size of "orig union new".
-    * The penalty will be the sum of the reduction in the lower bound plus
-    * the increase in the upper bound.
+    * Distinct branches for handling distinct classes of ranges.  Note
+    * that penalty values only need to be commensurate within the same
+    * class of new range.
     */
-   s_union = range_super_union(typcache, orig, new);
-
-   range_deserialize(typcache, orig, &lower1, &upper1, &empty1);
-   range_deserialize(typcache, s_union, &lower2, &upper2, &empty2);
-
-   /* handle cases where orig is empty */
-   if (empty1 && empty2)
+   if (new_empty)
    {
-       *penalty = 0;
-       PG_RETURN_POINTER(penalty);
+       /* Handle insertion of empty range */
+       if (orig_empty)
+       {
+           /*
+            * The best case is to insert it to empty original
+            * range.  Insertion here means no broadening of original range.
+            * Also original range is the most narrow.
+            */
+           *penalty = 0.0;
+       }
+       else if (RangeIsOrContainsEmpty(orig))
+       {
+           /*
+            * The second case is to insert empty range into range which
+            * contains at least one underlying empty range.  There is still
+            * no broadening of original range, but original range is not as
+            * narrow as possible.
+            */
+           *penalty = CONTAIN_EMPTY_PENALTY;
+       }
+       else if (orig_lower.infinite && orig_upper.infinite)
+       {
+           /*
+            * Original range requires broadening.  (-inf; +inf) is most far
+            * from normal range in this case.
+            */
+           *penalty = 2 * CONTAIN_EMPTY_PENALTY;
+       }
+       else if (orig_lower.infinite || orig_upper.infinite)
+       {
+           /*
+            * (-inf, x) or (x, +inf) original ranges are closer to normal
+            * ranges, so it's worse to mix it with empty ranges.
+            */
+           *penalty = 3 * CONTAIN_EMPTY_PENALTY;
+       }
+       else
+       {
+           /*
+            * The least preferred case is broadening of normal range.
+            */
+           *penalty = 4 * CONTAIN_EMPTY_PENALTY;
+       }
    }
-   else if (empty1)
+   else if (new_lower.infinite && new_upper.infinite)
    {
-       /* infinite penalty for pushing non-empty into all-empty subtree */
-       *penalty = get_float4_infinity();
-       PG_RETURN_POINTER(penalty);
-   }
-
-   /* if orig isn't empty, s_union can't be either */
-   Assert(!empty2);
-
-   /* similarly, if orig's lower bound is infinite, s_union's must be too */
-   Assert(lower2.infinite || !lower1.infinite);
+       /* Handle insertion of (-inf, +inf) range */
+       if (orig_lower.infinite && orig_upper.infinite)
+       {
+           /*
+            * Best case is inserting to (-inf, +inf) original range.
+            */
+           *penalty = 0.0;
+       }
+       else if (orig_lower.infinite || orig_upper.infinite)
+       {
+           /*
+            * When original range is (-inf, x) or (x, +inf) it requires
+            * broadening of original range (extension of one bound to
+            * infinity).
+            */
+           *penalty = INFINITE_BOUND_PENALTY;
+       }
+       else
+       {
+           /*
+            * Insertion to normal original range is least preferred.
+            */
+           *penalty = 2 * INFINITE_BOUND_PENALTY;
+       }
 
-   if (lower2.infinite && lower1.infinite)
-       lower_diff = 0;
-   else if (lower2.infinite)
-       lower_diff = get_float8_infinity();
-   else if (OidIsValid(subtype_diff->fn_oid))
-   {
-       lower_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
-                                                     typcache->rng_collation,
-                                                     lower1.val,
-                                                     lower2.val));
-       /* orig's lower bound must be >= s_union's */
-       if (lower_diff < 0)
-           lower_diff = 0;     /* subtype_diff is broken */
+       if (RangeIsOrContainsEmpty(orig))
+       {
+           /*
+            * Original range is narrower when it doesn't contain empty ranges.
+            * Add additional penalty otherwise.
+            */
+           *penalty += CONTAIN_EMPTY_PENALTY;
+       }
    }
-   else
+   else if (new_lower.infinite)
    {
-       /* only know whether there is a difference or not */
-       lower_diff = range_cmp_bounds(typcache, &lower1, &lower2) > 0 ? 1 : 0;
+       /* Handle insertion of (-inf, x) range */
+       if (!orig_empty && orig_lower.infinite)
+       {
+           if (orig_upper.infinite)
+           {
+               /*
+                * (-inf, +inf) range won't be extended by insertion of
+                * (-inf, x) range. It's a less desirable case than insertion
+                * to (-inf, y) original range without extension, because in
+                * that case original range is narrower. But we can't express
+                * that in single float value.
+                */
+               *penalty = 0.0;
+           }
+           else
+           {
+               if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
+               {
+                   /*
+                    * Get extension of original range using subtype_diff.
+                    * Use constant if subtype_diff unavailable.
+                    */
+                   if (has_subtype_diff)
+                       *penalty = call_subtype_diff(typcache,
+                                                    new_upper.val,
+                                                    orig_upper.val);
+                   else
+                       *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
+               }
+               else
+               {
+                   /* No extension of original range */
+                   *penalty = 0.0;
+               }
+           }
+       }
+       else
+       {
+           /*
+            * If lower bound of original range is not -inf, then extension
+            * of it is infinity.
+            */
+           *penalty = get_float4_infinity();
+       }
    }
-
-   /* similarly, if orig's upper bound is infinite, s_union's must be too */
-   Assert(upper2.infinite || !upper1.infinite);
-
-   if (upper2.infinite && upper1.infinite)
-       upper_diff = 0;
-   else if (upper2.infinite)
-       upper_diff = get_float8_infinity();
-   else if (OidIsValid(subtype_diff->fn_oid))
+   else if (new_upper.infinite)
    {
-       upper_diff = DatumGetFloat8(FunctionCall2Coll(subtype_diff,
-                                                     typcache->rng_collation,
-                                                     upper2.val,
-                                                     upper1.val));
-       /* orig's upper bound must be <= s_union's */
-       if (upper_diff < 0)
-           upper_diff = 0;     /* subtype_diff is broken */
+       /* Handle insertion of (x, +inf) range */
+       if (!orig_empty && orig_upper.infinite)
+       {
+           if (orig_lower.infinite)
+           {
+               /*
+                * (-inf, +inf) range won't be extended by insertion of
+                * (x, +inf) range. It's a less desirable case than insertion
+                * to (y, +inf) original range without extension, because in
+                * that case original range is narrower. But we can't express
+                * that in single float value.
+                */
+               *penalty = 0.0;
+           }
+           else
+           {
+               if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
+               {
+                   /*
+                    * Get extension of original range using subtype_diff.
+                    * Use constant if subtype_diff unavailable.
+                    */
+                   if (has_subtype_diff)
+                       *penalty = call_subtype_diff(typcache,
+                                                    orig_lower.val,
+                                                    new_lower.val);
+                   else
+                       *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
+               }
+               else
+               {
+                   /* No extension of original range */
+                   *penalty = 0.0;
+               }
+           }
+       }
+       else
+       {
+           /*
+            * If upper bound of original range is not +inf, then extension
+            * of it is infinity.
+            */
+           *penalty = get_float4_infinity();
+       }
    }
    else
    {
-       /* only know whether there is a difference or not */
-       upper_diff = range_cmp_bounds(typcache, &upper2, &upper1) > 0 ? 1 : 0;
+       /* Handle insertion of normal (non-empty, non-infinite) range */
+       if (orig_empty || orig_lower.infinite || orig_upper.infinite)
+       {
+           /*
+            * Avoid mixing normal ranges with infinite and empty ranges.
+            */
+           *penalty = get_float4_infinity();
+       }
+       else
+       {
+           /*
+            * Calculate extension of original range by calling subtype_diff.
+            * Use constant if subtype_diff unavailable.
+            */
+           float8      diff = 0.0;
+
+           if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
+           {
+               if (has_subtype_diff)
+                   diff += call_subtype_diff(typcache,
+                                             orig_lower.val,
+                                             new_lower.val);
+               else
+                   diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
+           }
+           if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
+           {
+               if (has_subtype_diff)
+                   diff += call_subtype_diff(typcache,
+                                             new_upper.val,
+                                             orig_upper.val);
+               else
+                   diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
+           }
+           *penalty = diff;
+       }
    }
 
-   Assert(lower_diff >= 0 && upper_diff >= 0);
-
-   *penalty = (float) (lower_diff + upper_diff);
    PG_RETURN_POINTER(penalty);
 }
 
 /*
  * The GiST PickSplit method for ranges
  *
- * Algorithm based on sorting.  Incoming array of ranges is sorted using
- * sort_item_cmp function.  After that first half of ranges goes to the left
- * output, and the second half of ranges goes to the right output.
+ * Primarily, we try to segregate ranges of different classes.  If splitting
+ * ranges of the same class, use the appropriate split method for that class.
  */
 Datum
 range_gist_picksplit(PG_FUNCTION_ARGS)
@@ -253,73 +517,149 @@ range_gist_picksplit(PG_FUNCTION_ARGS)
    TypeCacheEntry *typcache;
    OffsetNumber i;
    RangeType  *pred_left;
-   RangeType  *pred_right;
-   PickSplitSortItem *sortItems;
    int         nbytes;
-   OffsetNumber split_idx;
-   OffsetNumber *left;
-   OffsetNumber *right;
    OffsetNumber maxoff;
+   int         count_in_classes[CLS_COUNT];
+   int         j;
+   int         non_empty_classes_count = 0;
+   int         biggest_class = -1;
+   int         biggest_class_count = 0;
+   int         total_count;
 
    /* use first item to look up range type's info */
    pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
    typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
 
-   /* allocate result and work arrays */
    maxoff = entryvec->n - 1;
    nbytes = (maxoff + 1) * sizeof(OffsetNumber);
    v->spl_left = (OffsetNumber *) palloc(nbytes);
    v->spl_right = (OffsetNumber *) palloc(nbytes);
-   sortItems = (PickSplitSortItem *) palloc(maxoff * sizeof(PickSplitSortItem));
 
    /*
-    * Prepare auxiliary array and sort the values.
+    * Get count distribution of range classes.
     */
+   memset(count_in_classes, 0, sizeof(count_in_classes));
    for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
    {
-       sortItems[i - 1].index = i;
-       sortItems[i - 1].data = DatumGetRangeType(entryvec->vector[i].key);
-       sortItems[i - 1].typcache = typcache;
-   }
-   qsort(sortItems, maxoff, sizeof(PickSplitSortItem), sort_item_cmp);
-
-   split_idx = maxoff / 2;
+       RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
 
-   left = v->spl_left;
-   v->spl_nleft = 0;
-   right = v->spl_right;
-   v->spl_nright = 0;
+       count_in_classes[get_gist_range_class(range)]++;
+   }
 
    /*
-    * First half of items goes to the left output.
+    * Count non-empty classes and find biggest class.
     */
-   pred_left = sortItems[0].data;
-   *left++ = sortItems[0].index;
-   v->spl_nleft++;
-   for (i = 1; i < split_idx; i++)
+   total_count = maxoff;
+   for (j = 0; j < CLS_COUNT; j++)
    {
-       pred_left = range_super_union(typcache, pred_left, sortItems[i].data);
-       *left++ = sortItems[i].index;
-       v->spl_nleft++;
+       if (count_in_classes[j] > 0)
+       {
+           if (count_in_classes[j] > biggest_class_count)
+           {
+               biggest_class_count = count_in_classes[j];
+               biggest_class = j;
+           }
+           non_empty_classes_count++;
+       }
    }
 
-   /*
-    * Second half of items goes to the right output.
-    */
-   pred_right = sortItems[split_idx].data;
-   *right++ = sortItems[split_idx].index;
-   v->spl_nright++;
-   for (i = split_idx + 1; i < maxoff; i++)
+   Assert(non_empty_classes_count > 0);
+
+   if (non_empty_classes_count == 1)
    {
-       pred_right = range_super_union(typcache, pred_right, sortItems[i].data);
-       *right++ = sortItems[i].index;
-       v->spl_nright++;
+       /* One non-empty class, so split inside class */
+       if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
+       {
+           /* double sorting split for normal ranges */
+           range_gist_double_sorting_split(typcache, entryvec, v);
+       }
+       else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
+       {
+           /* upper bound sorting split for (-inf, x) ranges */
+           range_gist_single_sorting_split(typcache, entryvec, v, true);
+       }
+       else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
+       {
+           /* lower bound sorting split for (x, +inf) ranges */
+           range_gist_single_sorting_split(typcache, entryvec, v, false);
+       }
+       else
+       {
+           /* trivial split for all (-inf, +inf) or all empty ranges */
+           range_gist_fallback_split(typcache, entryvec, v);
+       }
    }
+   else
+   {
+       /*
+        * Class based split.
+        *
+        * To which side of the split should each class go?  Initialize them
+        * all to go to the left side.
+        */
+       SplitLR classes_groups[CLS_COUNT];
 
-   *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
+       memset(classes_groups, 0, sizeof(classes_groups));
 
-   v->spl_ldatum = RangeTypeGetDatum(pred_left);
-   v->spl_rdatum = RangeTypeGetDatum(pred_right);
+       if (count_in_classes[CLS_NORMAL] > 0)
+       {
+           /* separate normal ranges if any */
+           classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
+       }
+       else
+       {
+           /*----------
+            * Try to split classes in one of two ways:
+            *  1) containing infinities - not containing infinities
+            *  2) containing empty - not containing empty
+            *
+            * Select the way which balances the ranges between left and right
+            * the best. If split in these ways is not possible, there are at
+            * most 3 classes, so just separate biggest class.
+            *----------
+            */
+           int infCount, nonInfCount;
+           int emptyCount, nonEmptyCount;
+
+           nonInfCount =
+               count_in_classes[CLS_NORMAL] +
+               count_in_classes[CLS_CONTAIN_EMPTY] +
+               count_in_classes[CLS_EMPTY];
+           infCount = total_count - nonInfCount;
+
+           nonEmptyCount =
+               count_in_classes[CLS_NORMAL]    +
+               count_in_classes[CLS_LOWER_INF] +
+               count_in_classes[CLS_UPPER_INF] +
+               count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
+           emptyCount = total_count - nonEmptyCount;
+
+           if (infCount > 0 && nonInfCount > 0 &&
+               (Abs(infCount - nonInfCount) <=
+                Abs(emptyCount - nonEmptyCount)))
+           {
+               classes_groups[CLS_NORMAL]        = SPLIT_RIGHT;
+               classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
+               classes_groups[CLS_EMPTY]         = SPLIT_RIGHT;
+           }
+           else if (emptyCount > 0 && nonEmptyCount > 0)
+           {
+               classes_groups[CLS_NORMAL]                    = SPLIT_RIGHT;
+               classes_groups[CLS_LOWER_INF]                 = SPLIT_RIGHT;
+               classes_groups[CLS_UPPER_INF]                 = SPLIT_RIGHT;
+               classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
+           }
+           else
+           {
+               /*
+                * Either total_count == emptyCount or total_count == infCount.
+                */
+               classes_groups[biggest_class] = SPLIT_RIGHT;
+           }
+       }
+
+       range_gist_class_split(typcache, entryvec, v, classes_groups);
+   }
 
    PG_RETURN_POINTER(v);
 }
@@ -611,78 +951,649 @@ range_gist_consistent_leaf(FmgrInfo *flinfo, StrategyNumber strategy,
 }
 
 /*
- * Compare function for PickSplitSortItem. This is actually the
- * interesting part of the picksplit algorithm.
+ * Trivial split: half of entries will be placed on one page
+ * and the other half on the other page.
+ */
+static void
+range_gist_fallback_split(TypeCacheEntry *typcache,
+                         GistEntryVector *entryvec,
+                         GIST_SPLITVEC *v)
+{
+   RangeType    *left_range = NULL;
+   RangeType    *right_range = NULL;
+   OffsetNumber i, maxoff, split_idx;
+
+   maxoff = entryvec->n - 1;
+   /* Split entries before this to left page, after to right: */
+   split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
+
+   v->spl_nleft = 0;
+   v->spl_nright = 0;
+   for (i = FirstOffsetNumber; i <= maxoff; i++)
+   {
+       RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+
+       if (i < split_idx)
+           PLACE_LEFT(range, i);
+       else
+           PLACE_RIGHT(range, i);
+   }
+
+   v->spl_ldatum = RangeTypeGetDatum(left_range);
+   v->spl_rdatum = RangeTypeGetDatum(right_range);
+}
+
+/*
+ * Split based on classes of ranges.
  *
- * We want to separate out empty ranges, bounded ranges, and unbounded
- * ranges. We assume that "contains" and "overlaps" are the most
- * important queries, so empty ranges will rarely match and unbounded
- * ranges frequently will. Bounded ranges should be in the middle.
+ * See get_gist_range_class for class definitions.
+ * classes_groups is an array of length CLS_COUNT indicating the side of the
+ * split to which each class should go.
+ */
+static void
+range_gist_class_split(TypeCacheEntry *typcache,
+                      GistEntryVector *entryvec,
+                      GIST_SPLITVEC *v,
+                      SplitLR *classes_groups)
+{
+   RangeType           *left_range = NULL;
+   RangeType           *right_range = NULL;
+   OffsetNumber        i, maxoff;
+
+   maxoff = entryvec->n - 1;
+
+   v->spl_nleft = 0;
+   v->spl_nright = 0;
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+   {
+       RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+       int     class;
+
+       /* Get class of range */
+       class = get_gist_range_class(range);
+
+       /* Place range to appropriate page */
+       if (classes_groups[class] == SPLIT_LEFT)
+           PLACE_LEFT(range, i);
+       else
+       {
+           Assert(classes_groups[class] == SPLIT_RIGHT);
+           PLACE_RIGHT(range, i);
+       }
+   }
+
+   v->spl_ldatum = RangeTypeGetDatum(left_range);
+   v->spl_rdatum = RangeTypeGetDatum(right_range);
+}
+
+/*
+ * Sorting based split. First half of entries according to the sort will be
+ * placed to one page, and second half of entries will be placed to other
+ * page. use_upper_bound parameter indicates whether to use upper or lower
+ * bound for sorting.
+ */
+static void
+range_gist_single_sorting_split(TypeCacheEntry *typcache,
+                               GistEntryVector *entryvec,
+                               GIST_SPLITVEC *v,
+                               bool use_upper_bound)
+{
+   SingleBoundSortItem *sortItems;
+   RangeType           *left_range = NULL;
+   RangeType           *right_range = NULL;
+   OffsetNumber        i, maxoff, split_idx;
+
+   maxoff = entryvec->n - 1;
+
+   sortItems = (SingleBoundSortItem *)
+       palloc(maxoff * sizeof(SingleBoundSortItem));
+
+   /*
+    * Prepare auxiliary array and sort the values.
+    */
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+   {
+       RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+       RangeBound bound2;
+       bool empty;
+
+       sortItems[i - 1].index = i;
+       /* Put appropriate bound into array */
+       if (use_upper_bound)
+           range_deserialize(typcache, range, &bound2,
+                             &sortItems[i - 1].bound, &empty);
+       else
+           range_deserialize(typcache, range, &sortItems[i - 1].bound,
+                             &bound2, &empty);
+       Assert(!empty);
+   }
+
+   qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
+             single_bound_cmp, typcache);
+
+   split_idx = maxoff / 2;
+
+   v->spl_nleft = 0;
+   v->spl_nright = 0;
+
+   for (i = 0; i < maxoff; i++)
+   {
+       int     idx = sortItems[i].index;
+       RangeType *range = DatumGetRangeType(entryvec->vector[idx].key);
+
+       if (i < split_idx)
+           PLACE_LEFT(range, idx);
+       else
+           PLACE_RIGHT(range, idx);
+   }
+
+   v->spl_ldatum = RangeTypeGetDatum(left_range);
+   v->spl_rdatum = RangeTypeGetDatum(right_range);
+}
+
+/*
+ * Double sorting split algorithm.
+ *
+ * The algorithm considers dividing ranges into two groups. The first (left)
+ * group contains general left bound. The second (right) group contains
+ * general right bound. The challenge is to find upper bound of left group
+ * and lower bound of right group so that overlap of groups is minimal and
+ * ratio of distribution is acceptable. Algorithm finds for each lower bound of
+ * right group minimal upper bound of left group, and for each upper bound of
+ * left group maximal lower bound of right group. For each found pair
+ * range_gist_consider_split considers replacement of currently selected
+ * split with the new one.
+ *
+ * After that, all the entries are divided into three groups:
+ * 1) Entries which should be placed to the left group
+ * 2) Entries which should be placed to the right group
+ * 3) "Common entries" which can be placed to either group without affecting
+ *   amount of overlap.
  *
- * Empty ranges we push all the way to the left, then bounded ranges
- * (sorted on lower bound, then upper), then ranges with no lower
- * bound, then ranges with no upper bound; and finally, ranges with no
- * upper or lower bound all the way to the right.
+ * The common ranges are distributed by difference of distance from lower
+ * bound of common range to lower bound of right group and distance from upper
+ * bound of common range to upper bound of left group.
+ *
+ * For details see:
+ * "A new double sorting-based node splitting algorithm for R-tree",
+ * A. Korotkov
+ * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
  */
-static int
-sort_item_cmp(const void *a, const void *b)
+static void
+range_gist_double_sorting_split(TypeCacheEntry *typcache,
+                               GistEntryVector *entryvec,
+                               GIST_SPLITVEC *v)
 {
-   PickSplitSortItem *i1 = (PickSplitSortItem *) a;
-   PickSplitSortItem *i2 = (PickSplitSortItem *) b;
-   RangeType  *r1 = i1->data;
-   RangeType  *r2 = i2->data;
-   TypeCacheEntry *typcache = i1->typcache;
-   RangeBound  lower1,
-               lower2;
-   RangeBound  upper1,
-               upper2;
-   bool        empty1,
-               empty2;
-   int         cmp;
+   ConsiderSplitContext context;
+   OffsetNumber i, maxoff;
+   RangeType   *range,
+               *left_range = NULL,
+               *right_range = NULL;
+   int          common_entries_count;
+   NonEmptyRange *by_lower,
+                 *by_upper;
+   CommonEntry *common_entries;
+   int          nentries, i1, i2;
+   RangeBound  *right_lower, *left_upper;
+
+   memset(&context, 0, sizeof(ConsiderSplitContext));
+   context.typcache = typcache;
+   context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
 
-   range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
-   range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
+   maxoff = entryvec->n - 1;
+   nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
+   context.first = true;
+
+   /* Allocate arrays for sorted range bounds */
+   by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
+   by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
+
+   /* Fill arrays of bounds */
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+   {
+       RangeType *range = DatumGetRangeType(entryvec->vector[i].key);
+       bool empty;
+
+       range_deserialize(typcache, range,
+                         &by_lower[i - FirstOffsetNumber].lower,
+                         &by_lower[i - FirstOffsetNumber].upper,
+                         &empty);
+       Assert(!empty);
+   }
+
+   /*
+    * Make two arrays of range bounds: one sorted by lower bound and another
+    * sorted by upper bound.
+    */
+   memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
+   qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
+             interval_cmp_lower, typcache);
+   qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
+             interval_cmp_upper, typcache);
+
+   /*----------
+    * The goal is to form a left and right range, so that every entry
+    * range is contained by either left or right interval (or both).
+    *
+    * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
+    *
+    * 0 1 2 3 4
+    * +-+
+    *   +---+
+    *     +-+
+    *     +---+
+    *
+    * The left and right ranges are of the form (0,a) and (b,4).
+    * We first consider splits where b is the lower bound of an entry.
+    * We iterate through all entries, and for each b, calculate the
+    * smallest possible a. Then we consider splits where a is the
+    * upper bound of an entry, and for each a, calculate the greatest
+    * possible b.
+    *
+    * In the above example, the first loop would consider splits:
+    * b=0: (0,1)-(0,4)
+    * b=1: (0,1)-(1,4)
+    * b=2: (0,3)-(2,4)
+    *
+    * And the second loop:
+    * a=1: (0,1)-(1,4)
+    * a=3: (0,3)-(2,4)
+    * a=4: (0,4)-(2,4)
+    *----------
+    */
+
+   /*
+    * Iterate over lower bound of right group, finding smallest possible
+    * upper bound of left group.
+    */
+   i1 = 0;
+   i2 = 0;
+   right_lower = &by_lower[i1].lower;
+   left_upper  = &by_upper[i2].lower;
+   while (true)
+   {
+       /*
+        * Find next lower bound of right group.
+        */
+       while (i1 < nentries &&
+              range_cmp_bounds(typcache, right_lower,
+                               &by_lower[i1].lower) == 0)
+       {
+           if (range_cmp_bounds(typcache, &by_lower[i1].upper,
+                                left_upper) > 0)
+               left_upper = &by_lower[i1].upper;
+           i1++;
+       }
+       if (i1 >= nentries)
+           break;
+       right_lower = &by_lower[i1].lower;
 
-   if (empty1 || empty2)
+       /*
+        * Find count of ranges which anyway should be placed to the
+        * left group.
+        */
+       while (i2 < nentries &&
+              range_cmp_bounds(typcache, &by_upper[i2].upper,
+                               left_upper) <= 0)
+           i2++;
+
+       /*
+        * Consider found split to see if it's better than what we had.
+        */
+       range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
+   }
+
+   /*
+    * Iterate over upper bound of left group finding greatest possible
+    * lower bound of right group.
+    */
+   i1 = nentries - 1;
+   i2 = nentries - 1;
+   right_lower = &by_lower[i1].upper;
+   left_upper  = &by_upper[i2].upper;
+   while (true)
+   {
+       /*
+        * Find next upper bound of left group.
+        */
+       while (i2 >= 0 &&
+              range_cmp_bounds(typcache, left_upper,
+                               &by_upper[i2].upper) == 0)
+       {
+           if (range_cmp_bounds(typcache, &by_upper[i2].lower,
+                                right_lower) < 0)
+               right_lower = &by_upper[i2].lower;
+           i2--;
+       }
+       if (i2 < 0)
+           break;
+       left_upper = &by_upper[i2].upper;
+
+       /*
+        * Find count of intervals which anyway should be placed to the
+        * right group.
+        */
+       while (i1 >= 0 &&
+              range_cmp_bounds(typcache, &by_lower[i1].lower,
+                               right_lower) >= 0)
+           i1--;
+
+       /*
+        * Consider found split to see if it's better than what we had.
+        */
+       range_gist_consider_split(&context, right_lower, i1 + 1,
+                                 left_upper, i2 + 1);
+   }
+
+   /*
+    * If we failed to find any acceptable splits, use trivial split.
+    */
+   if (context.first)
+   {
+       range_gist_fallback_split(typcache, entryvec, v);
+       return;
+   }
+
+   /*
+    * Ok, we have now selected bounds of the groups. Now we have to distribute
+    * entries themselves. At first we distribute entries which can be placed
+    * unambiguously and collect "common entries" to array.
+    */
+
+   /* Allocate vectors for results */
+   v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+   v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+   v->spl_nleft = 0;
+   v->spl_nright = 0;
+
+   /*
+    * Allocate an array for "common entries" - entries which can be placed to
+    * either group without affecting overlap along selected axis.
+    */
+   common_entries_count = 0;
+   common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+   /*
+    * Distribute entries which can be distributed unambiguously, and collect
+    * common entries.
+    */
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
    {
-       if (empty1 && empty2)
-           return 0;
-       else if (empty1)
-           return -1;
-       else if (empty2)
-           return 1;
+       RangeBound  lower,
+                   upper;
+       bool        empty;
+
+       /*
+        * Get upper and lower bounds along selected axis.
+        */
+       range = DatumGetRangeType(entryvec->vector[i].key);
+
+       range_deserialize(typcache, range, &lower, &upper, &empty);
+
+       if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
+       {
+           /* Fits in the left group */
+           if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
+           {
+               /* Fits also in the right group, so "common entry" */
+               common_entries[common_entries_count].index = i;
+               if (context.has_subtype_diff)
+               {
+                   /*
+                    * delta = (lower - context.right_lower) -
+                    * (context.left_upper - upper)
+                    */
+                   common_entries[common_entries_count].delta =
+                       call_subtype_diff(typcache,
+                                         lower.val,
+                                         context.right_lower->val) -
+                       call_subtype_diff(typcache,
+                                         context.left_upper->val,
+                                         upper.val);
+               }
+               else
+               {
+                   /* Without subtype_diff, take all deltas as zero */
+                   common_entries[common_entries_count].delta = 0;
+               }
+               common_entries_count++;
+           }
+           else
+           {
+               /* Doesn't fit to the right group, so join to the left group */
+               PLACE_LEFT(range, i);
+           }
+       }
        else
-           Assert(false);
+       {
+           /*
+            * Each entry should fit on either left or right group. Since this
+            * entry didn't fit in the left group, it better fit in the right
+            * group.
+            */
+           Assert(range_cmp_bounds(typcache, &lower,
+                                   context.right_lower) >= 0);
+           PLACE_RIGHT(range, i);
+       }
+   }
+
+   /*
+    * Distribute "common entries", if any.
+    */
+   if (common_entries_count > 0)
+   {
+       /*
+        * Sort "common entries" by calculated deltas in order to distribute
+        * the most ambiguous entries first.
+        */
+       qsort(common_entries, common_entries_count, sizeof(CommonEntry),
+             common_entry_cmp);
+
+       /*
+        * Distribute "common entries" between groups according to sorting.
+        */
+       for (i = 0; i < common_entries_count; i++)
+       {
+           int     idx = common_entries[i].index;
+
+           range = DatumGetRangeType(entryvec->vector[idx].key);
+
+           /*
+            * Check if we have to place this entry in either group to achieve
+            * LIMIT_RATIO.
+            */
+           if (i < context.common_left)
+               PLACE_LEFT(range, idx);
+           else
+               PLACE_RIGHT(range, idx);
+       }
    }
 
+   v->spl_ldatum = PointerGetDatum(left_range);
+   v->spl_rdatum = PointerGetDatum(right_range);
+}
+
+/*
+ * Consider replacement of currently selected split with a better one
+ * during range_gist_double_sorting_split.
+ */
+static void
+range_gist_consider_split(ConsiderSplitContext *context,
+                         RangeBound *right_lower, int min_left_count,
+                         RangeBound *left_upper, int max_left_count)
+{
+   int         left_count,
+               right_count;
+   float4      ratio,
+               overlap;
+
+   /*
+    * Calculate entries distribution ratio assuming most uniform distribution
+    * of common entries.
+    */
+   if (min_left_count >= (context->entries_count + 1) / 2)
+       left_count = min_left_count;
+   else if (max_left_count <= context->entries_count / 2)
+       left_count = max_left_count;
+   else
+       left_count = context->entries_count / 2;
+   right_count = context->entries_count - left_count;
+
    /*
-    * If both lower or both upper bounds are infinite, we sort by ascending
-    * range size. That means that if both upper bounds are infinite, we sort
-    * by the lower bound _descending_. That creates a slightly odd total
-    * order, but keeps the pages with very unselective predicates grouped
-    * more closely together on the right.
+    * Ratio of split: quotient between size of smaller group and total
+    * entries count.  This is necessarily 0.5 or less; if it's less than
+    * LIMIT_RATIO then we will never accept the new split.
     */
-   if (lower1.infinite || upper1.infinite ||
-       lower2.infinite || upper2.infinite)
+   ratio = ((float4) Min(left_count, right_count)) /
+       ((float4) context->entries_count);
+
+   if (ratio > LIMIT_RATIO)
    {
-       if (lower1.infinite && lower2.infinite)
-           return range_cmp_bounds(typcache, &upper1, &upper2);
-       else if (lower1.infinite)
-           return -1;
-       else if (lower2.infinite)
-           return 1;
-       else if (upper1.infinite && upper2.infinite)
-           return -(range_cmp_bounds(typcache, &lower1, &lower2));
-       else if (upper1.infinite)
-           return 1;
-       else if (upper2.infinite)
-           return -1;
+       bool        selectthis = false;
+
+       /*
+        * The ratio is acceptable, so compare current split with previously
+        * selected one. We search for minimal overlap (allowing negative
+        * values) and minimal ratio secondarily.  If subtype_diff is
+        * available, it's used for overlap measure.  Without subtype_diff we
+        * use number of "common entries" as an overlap measure.
+        */
+       if (context->has_subtype_diff)
+           overlap = call_subtype_diff(context->typcache,
+                                       left_upper->val,
+                                       right_lower->val);
+       else
+           overlap = max_left_count - min_left_count;
+
+       /* If there is no previous selection, select this split */
+       if (context->first)
+           selectthis = true;
        else
-           Assert(false);
+       {
+           /*
+            * Choose the new split if it has a smaller overlap, or same
+            * overlap but better ratio.
+            */
+           if (overlap < context->overlap ||
+               (overlap == context->overlap && ratio > context->ratio))
+               selectthis = true;
+       }
+
+       if (selectthis)
+       {
+           /* save information about selected split */
+           context->first = false;
+           context->ratio = ratio;
+           context->overlap = overlap;
+           context->right_lower = right_lower;
+           context->left_upper = left_upper;
+           context->common_left = max_left_count - left_count;
+           context->common_right = left_count - min_left_count;
+       }
+   }
+}
+
+/*
+ * Find class number for range.
+ *
+ * The class number is a valid combination of the properties of the
+ * range.  Note: the highest possible number is 8, because CLS_EMPTY
+ * can't be combined with anything else.
+ */
+static int
+get_gist_range_class(RangeType *range)
+{
+   int         classNumber;
+   char        flags;
+
+   flags = range_get_flags(range);
+   if (flags & RANGE_EMPTY)
+   {
+       classNumber = CLS_EMPTY;
    }
+   else
+   {
+       classNumber = 0;
+       if (flags & RANGE_LB_INF)
+           classNumber |= CLS_LOWER_INF;
+       if (flags & RANGE_UB_INF)
+           classNumber |= CLS_UPPER_INF;
+       if (flags & RANGE_CONTAIN_EMPTY)
+           classNumber |= CLS_CONTAIN_EMPTY;
+   }
+   return classNumber;
+}
+
+/*
+ * Comparison function for range_gist_single_sorting_split.
+ */
+static int
+single_bound_cmp(const void *a, const void *b, void *arg)
+{
+   SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
+   SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
+   TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+   return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
+}
+
+/*
+ * Compare NonEmptyRanges by lower bound.
+ */
+static int
+interval_cmp_lower(const void *a, const void *b, void *arg)
+{
+   NonEmptyRange *i1 = (NonEmptyRange *) a;
+   NonEmptyRange *i2 = (NonEmptyRange *) b;
+   TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+   return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
+}
+
+/*
+ * Compare NonEmptyRanges by upper bound.
+ */
+static int
+interval_cmp_upper(const void *a, const void *b, void *arg)
+{
+   NonEmptyRange *i1 = (NonEmptyRange *) a;
+   NonEmptyRange *i2 = (NonEmptyRange *) b;
+   TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
+
+   return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
+}
 
-   if ((cmp = range_cmp_bounds(typcache, &lower1, &lower2)) != 0)
-       return cmp;
+/*
+ * Compare CommonEntrys by their deltas.
+ */
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+   double      delta1 = ((CommonEntry *) i1)->delta;
+   double      delta2 = ((CommonEntry *) i2)->delta;
+
+   if (delta1 < delta2)
+       return -1;
+   else if (delta1 > delta2)
+       return 1;
+   else
+       return 0;
+}
 
-   return range_cmp_bounds(typcache, &upper1, &upper2);
+/*
+ * Convenience function to invoke type-specific subtype_diff function.
+ * Caller must have already checked that there is one for the range type.
+ */
+static float8
+call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
+{
+   float8      value;
+
+   value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
+                                            typcache->rng_collation,
+                                            val1, val2));
+   /* Cope with buggy subtype_diff function by returning zero */
+   if (value >= 0.0)
+       return value;
+   return 0.0;
 }