Clean up and document btree code for ordering keys. Neat stuff,
authorTom Lane
Tue, 25 Jul 2000 04:47:59 +0000 (04:47 +0000)
committerTom Lane
Tue, 25 Jul 2000 04:47:59 +0000 (04:47 +0000)
actually, but who could understand it with no comments?  Fix bug
while at it: _bt_orderkeys would try to invoke comparisons on
NULL inputs, given the right sort of redundant quals.

src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtutils.c
src/include/access/nbtree.h

index 49aec3b23d9b05788b9b9302d41df90c94fa11ed..8b0804d28ebbfb33ab539e41bf88cfd95d4669cc 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.61 2000/07/21 06:42:32 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.62 2000/07/25 04:47:58 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -374,7 +374,7 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
    BTItem      btitem;
    IndexTuple  itup;
    BTScanOpaque so;
-   Size        keysok;
+   bool        continuescan;
 
    rel = scan->relation;
    so = (BTScanOpaque) scan->opaque;
@@ -396,16 +396,14 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
        btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
        itup = &btitem->bti_itup;
 
-       if (_bt_checkkeys(scan, itup, &keysok))
+       if (_bt_checkkeys(scan, itup, dir, &continuescan))
        {
            /* tuple passes all scan key conditions, so return it */
-           Assert(keysok == so->numberOfKeys);
            return FormRetrieveIndexResult(current, &(itup->t_tid));
        }
 
        /* This tuple doesn't pass, but there might be more that do */
-   } while (keysok >= so->numberOfFirstKeys ||
-            (keysok == ((Size) -1) && ScanDirectionIsBackward(dir)));
+   } while (continuescan);
 
    /* No more items, so close down the current-item info */
    ItemPointerSetInvalid(current);
@@ -442,11 +440,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    RegProcedure proc;
    int32       result;
    BTScanOpaque so;
-   Size        keysok;
-   bool        strategyCheck;
-   ScanKey     scankeys = 0;
+   bool        continuescan;
+   ScanKey     scankeys = NULL;
    int         keysCount = 0;
-   int        *nKeyIs = 0;
+   int        *nKeyIs = NULL;
    int         i,
                j;
    StrategyNumber strat_total;
@@ -455,71 +452,74 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    so = (BTScanOpaque) scan->opaque;
 
    /*
-    * Order the keys in the qualification and be sure that the scan
-    * exploits the tree order.
+    * Order the scan keys in our canonical fashion and eliminate any
+    * redundant keys.
     */
-   so->numberOfFirstKeys = 0;  /* may be changed by _bt_orderkeys */
-   so->qual_ok = 1;            /* may be changed by _bt_orderkeys */
-   scan->scanFromEnd = false;
-   strategyCheck = false;
-   if (so->numberOfKeys > 0)
-   {
-       _bt_orderkeys(rel, so);
+   _bt_orderkeys(rel, so);
 
-       if (so->qual_ok)
-           strategyCheck = true;
-   }
+   /*
+    * Quit now if _bt_orderkeys() discovered that the scan keys can
+    * never be satisfied (eg, x == 1 AND x > 2).
+    */
+   if (! so->qual_ok)
+       return (RetrieveIndexResult) NULL;
+
+   /*
+    * Examine the scan keys to discover where we need to start the scan.
+    */
+   scan->scanFromEnd = false;
    strat_total = BTEqualStrategyNumber;
-   if (strategyCheck)
+   if (so->numberOfKeys > 0)
    {
-       AttrNumber  attno;
-
        nKeyIs = (int *) palloc(so->numberOfKeys * sizeof(int));
        for (i = 0; i < so->numberOfKeys; i++)
        {
-           attno = so->keyData[i].sk_attno;
-           if (attno == keysCount)
+           AttrNumber  attno = so->keyData[i].sk_attno;
+
+           /* ignore keys for already-determined attrs */
+           if (attno <= keysCount)
                continue;
+           /* if we didn't find a boundary for the preceding attr, quit */
            if (attno > keysCount + 1)
                break;
            strat = _bt_getstrat(rel, attno,
                                 so->keyData[i].sk_procedure);
+           /*
+            * Can we use this key as a starting boundary for this attr?
+            *
+            * We can use multiple keys if they look like, say, = >= =
+            * but we have to stop after accepting a > or < boundary.
+            */
            if (strat == strat_total ||
                strat == BTEqualStrategyNumber)
            {
                nKeyIs[keysCount++] = i;
-               continue;
            }
-           if (ScanDirectionIsBackward(dir) &&
-               (strat == BTLessStrategyNumber ||
-                strat == BTLessEqualStrategyNumber))
+           else if (ScanDirectionIsBackward(dir) &&
+                    (strat == BTLessStrategyNumber ||
+                     strat == BTLessEqualStrategyNumber))
            {
                nKeyIs[keysCount++] = i;
                strat_total = strat;
                if (strat == BTLessStrategyNumber)
                    break;
-               continue;
            }
-           if (ScanDirectionIsForward(dir) &&
-               (strat == BTGreaterStrategyNumber ||
-                strat == BTGreaterEqualStrategyNumber))
+           else if (ScanDirectionIsForward(dir) &&
+                    (strat == BTGreaterStrategyNumber ||
+                     strat == BTGreaterEqualStrategyNumber))
            {
                nKeyIs[keysCount++] = i;
                strat_total = strat;
                if (strat == BTGreaterStrategyNumber)
                    break;
-               continue;
            }
        }
-       if (!keysCount)
+       if (keysCount == 0)
            scan->scanFromEnd = true;
    }
    else
        scan->scanFromEnd = true;
 
-   if (so->qual_ok == 0)
-       return (RetrieveIndexResult) NULL;
-
    /* if we just need to walk down one edge of the tree, do that */
    if (scan->scanFromEnd)
    {
@@ -529,16 +529,14 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    }
 
    /*
-    * Okay, we want something more complicated.  What we'll do is use the
-    * first item in the scan key passed in (which has been correctly
-    * ordered to take advantage of index ordering) to position ourselves
-    * at the right place in the scan.
+    * We want to start the scan somewhere within the index.  Set up a
+    * scankey we can use to search for the correct starting point.
     */
    scankeys = (ScanKey) palloc(keysCount * sizeof(ScanKeyData));
    for (i = 0; i < keysCount; i++)
    {
        j = nKeyIs[i];
-       /* _bt_orderkeys disallows it, but it's place to add some code latter */
+       /* _bt_orderkeys disallows it, but it's place to add some code later */
        if (so->keyData[j].sk_flags & SK_ISNULL)
        {
            pfree(nKeyIs);
@@ -578,6 +576,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    blkno = BufferGetBlockNumber(buf);
    page = BufferGetPage(buf);
 
+   /* position to the precise item on the page */
    offnum = _bt_binsrch(rel, buf, keysCount, scankeys);
 
    ItemPointerSet(current, blkno, offnum);
@@ -595,7 +594,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     * call _bt_endpoint() to set up a scan starting at that index endpoint,
     * as appropriate for the desired scan type.
     *
-    * it's yet other place to add some code latter for is(not)null ...
+    * it's yet other place to add some code later for is(not)null ...
     *----------
     */
 
@@ -737,13 +736,12 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    itup = &btitem->bti_itup;
 
    /* is the first item actually acceptable? */
-   if (_bt_checkkeys(scan, itup, &keysok))
+   if (_bt_checkkeys(scan, itup, dir, &continuescan))
    {
        /* yes, return it */
        res = FormRetrieveIndexResult(current, &(itup->t_tid));
    }
-   else if (keysok >= so->numberOfFirstKeys ||
-            (keysok == ((Size) -1) && ScanDirectionIsBackward(dir)))
+   else if (continuescan)
    {
        /* no, but there might be another one that is */
        res = _bt_next(scan, dir);
@@ -906,7 +904,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
    IndexTuple  itup;
    BTScanOpaque so;
    RetrieveIndexResult res;
-   Size        keysok;
+   bool        continuescan;
 
    rel = scan->relation;
    current = &(scan->currentItemData);
@@ -1012,13 +1010,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
    itup = &(btitem->bti_itup);
 
    /* see if we picked a winner */
-   if (_bt_checkkeys(scan, itup, &keysok))
+   if (_bt_checkkeys(scan, itup, dir, &continuescan))
    {
        /* yes, return it */
        res = FormRetrieveIndexResult(current, &(itup->t_tid));
    }
-   else if (keysok >= so->numberOfFirstKeys ||
-            (keysok == ((Size) -1) && ScanDirectionIsBackward(dir)))
+   else if (continuescan)
    {
        /* no, but there might be another one that is */
        res = _bt_next(scan, dir);
index eb77bfd8daecf472a8ef4d21cf7b68aab75fe3f0..7bf527dd44fa1e0587c4c9e5d69f9c73d02538dc 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.39 2000/07/21 19:21:00 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.40 2000/07/25 04:47:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -145,88 +145,148 @@ _bt_formitem(IndexTuple itup)
    return btitem;
 }
 
-/*
+/*----------
  * _bt_orderkeys() -- Put keys in a sensible order for conjunctive quals.
  *
- *     The order of the keys in the qual match the ordering imposed by
- *     the index.  This routine only needs to be called if there is
- *     more than one qual clause using this index.
+ * After this routine runs, the scan keys are ordered by index attribute
+ * (all quals for attr 1, then all for attr 2, etc) and within each attr
+ * the keys are ordered by constraint type: ">", ">=", "=", "<=", "<".
+ * Furthermore, redundant keys are eliminated: we keep only the tightest
+ * >/>= bound and the tightest 
+ * that's the only one returned.  (So, we return either a single = key,
+ * or one or two boundary-condition keys for each attr.)
+ *
+ * As a byproduct of this work, we can detect contradictory quals such
+ * as "x = 1 AND x > 2".  If we see that, we return so->quals_ok = FALSE,
+ * indicating the scan need not be run at all since no tuples can match.
+ *
+ * Another byproduct is to determine how many quals must be satisfied to
+ * continue the scan.  _bt_checkkeys uses this.  For example, if the quals
+ * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
+ * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
+ * But once we reach tuples like (1,4,z) we can stop scanning because no
+ * later tuples could match.  This is reflected by setting 
+ * so->numberOfRequiredKeys to the number of leading keys that must be
+ * matched to continue the scan.  numberOfRequiredKeys is equal to the
+ * number of leading "=" keys plus the key(s) for the first non "="
+ * attribute, which can be seen to be correct by considering the above
+ * example.
+ *
+ * The initial ordering of the keys is expected to be by attribute already
+ * (see group_clauses_by_indexkey() in indxpath.c).  The task here is to
+ * standardize the appearance of multiple keys for the same attribute.
+ *
+ * XXX this routine is one of many places that fail to handle SK_COMMUTE
+ * scankeys properly.  Currently, the planner is careful never to generate
+ * any indexquals that would require SK_COMMUTE to be set.  Someday we ought
+ * to try to fix this, though it's not real critical as long as indexable
+ * operators all have commutators...
+ *
+ * Note: this routine invokes comparison operators via OidFunctionCallN,
+ * ie, without caching function lookups.  No point in trying to be smarter,
+ * since these comparisons are executed only when the user expresses a
+ * hokey qualification, and happen only once per scan anyway.
+ *----------
  */
 void
 _bt_orderkeys(Relation relation, BTScanOpaque so)
 {
-   ScanKey     xform;
-   ScanKeyData *cur;
+   ScanKeyData xform[BTMaxStrategyNumber];
+   bool        init[BTMaxStrategyNumber];
+   uint16      numberOfKeys = so->numberOfKeys;
+   ScanKey     key;
+   ScanKey     cur;
    StrategyMap map;
-   int         nbytes;
    Datum       test;
    int         i,
                j;
-   int         init[BTMaxStrategyNumber + 1];
-   ScanKey     key;
-   uint16      numberOfKeys = so->numberOfKeys;
-   uint16      new_numberOfKeys = 0;
-   AttrNumber  attno = 1;
-   bool        equalStrategyEnd,
-               underEqualStrategy;
+   AttrNumber  attno;
+   uint16      new_numberOfKeys;
+   bool        allEqualSoFar;
+
+   so->qual_ok = true;
+   so->numberOfRequiredKeys = 0;
 
    if (numberOfKeys < 1)
-       return;
+       return;                 /* done if qual-less scan */
 
    key = so->keyData;
-
    cur = &key[0];
+   /* check input keys are correctly ordered */
    if (cur->sk_attno != 1)
        elog(ERROR, "_bt_orderkeys: key(s) for attribute 1 missed");
 
+   /* We can short-circuit most of the work if there's just one key */
    if (numberOfKeys == 1)
    {
 
        /*
         * We don't use indices for 'A is null' and 'A is not null'
-        * currently and 'A < = > <> NULL' is non-sense' - so qual is not
-        * Ok.      - vadim 03/21/97
+        * currently and 'A < = > <> NULL' will always fail - so qual is
+        * not Ok if comparison value is NULL.      - vadim 03/21/97
         */
        if (cur->sk_flags & SK_ISNULL)
-           so->qual_ok = 0;
-       so->numberOfFirstKeys = 1;
+           so->qual_ok = false;
+       so->numberOfRequiredKeys = 1;
        return;
    }
 
-   /* get space for the modified array of keys */
-   nbytes = BTMaxStrategyNumber * sizeof(ScanKeyData);
-   xform = (ScanKey) palloc(nbytes);
-
-   MemSet(xform, 0, nbytes);
+   /*
+    * Otherwise, do the full set of pushups.
+    */
+   new_numberOfKeys = 0;
+   allEqualSoFar = true;
+
+   /*
+    * Initialize for processing of keys for attr 1.
+    *
+    * xform[i] holds a copy of the current scan key of strategy type i+1,
+    * if any; init[i] is TRUE if we have found such a key for this attr.
+    */
+   attno = 1;
    map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
                                      BTMaxStrategyNumber,
                                      attno);
-   for (j = 0; j <= BTMaxStrategyNumber; j++)
-       init[j] = 0;
-
-   equalStrategyEnd = false;
-   underEqualStrategy = true;
-   /* check each key passed in */
-   for (i = 0;;)
+   MemSet(xform, 0, sizeof(xform)); /* not really necessary */
+   MemSet(init, 0, sizeof(init));
+
+   /*
+    * Loop iterates from 0 to numberOfKeys inclusive; we use the last
+    * pass to handle after-last-key processing.  Actual exit from the
+    * loop is at the "break" statement below.
+    */
+   for (i = 0; ; cur++, i++)
    {
        if (i < numberOfKeys)
-           cur = &key[i];
-
-       if (cur->sk_flags & SK_ISNULL)  /* see comments above */
-           so->qual_ok = 0;
+       {
+           /* See comments above: any NULL implies cannot match qual */
+           if (cur->sk_flags & SK_ISNULL)
+           {
+               so->qual_ok = false;
+               /* Quit processing so we don't try to invoke comparison
+                * routines on NULLs.
+                */
+               return;
+           }
+       }
 
+       /*
+        * If we are at the end of the keys for a particular attr,
+        * finish up processing and emit the cleaned-up keys.
+        */
        if (i == numberOfKeys || cur->sk_attno != attno)
        {
-           if (cur->sk_attno != attno + 1 && i < numberOfKeys)
+           bool        priorAllEqualSoFar = allEqualSoFar;
+
+           /* check input keys are correctly ordered */
+           if (i < numberOfKeys && cur->sk_attno != attno + 1)
                elog(ERROR, "_bt_orderkeys: key(s) for attribute %d missed",
                     attno + 1);
 
-           underEqualStrategy = (!equalStrategyEnd);
-
            /*
             * If = has been specified, no other key will be used. In case
-            * of key < 2 && key == 1 and so on we have to set qual_ok to
-            * 0
+            * of key > 2 && key == 1 and so on we have to set qual_ok to
+            * false before discarding the other keys.
             */
            if (init[BTEqualStrategyNumber - 1])
            {
@@ -236,187 +296,222 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
                eq = &xform[BTEqualStrategyNumber - 1];
                for (j = BTMaxStrategyNumber; --j >= 0;)
                {
-                   if (j == (BTEqualStrategyNumber - 1) || init[j] == 0)
+                   if (! init[j] ||
+                       j == (BTEqualStrategyNumber - 1))
                        continue;
                    chk = &xform[j];
                    test = OidFunctionCall2(chk->sk_procedure,
-                                           eq->sk_argument, chk->sk_argument);
+                                           eq->sk_argument,
+                                           chk->sk_argument);
                    if (!DatumGetBool(test))
-                       so->qual_ok = 0;
+                       so->qual_ok = false;
                }
-               init[BTLessStrategyNumber - 1] = 0;
-               init[BTLessEqualStrategyNumber - 1] = 0;
-               init[BTGreaterEqualStrategyNumber - 1] = 0;
-               init[BTGreaterStrategyNumber - 1] = 0;
+               init[BTLessStrategyNumber - 1] = false;
+               init[BTLessEqualStrategyNumber - 1] = false;
+               init[BTGreaterEqualStrategyNumber - 1] = false;
+               init[BTGreaterStrategyNumber - 1] = false;
            }
            else
-               equalStrategyEnd = true;
+           {
+               /*
+                * No "=" for this key, so we're done with required keys
+                */
+               allEqualSoFar = false;
+           }
 
-           /* only one of <, <= */
+           /* keep only one of <, <= */
            if (init[BTLessStrategyNumber - 1]
                && init[BTLessEqualStrategyNumber - 1])
            {
-               ScanKeyData *lt,
-                          *le;
+               ScanKeyData *lt = &xform[BTLessStrategyNumber - 1];
+               ScanKeyData *le = &xform[BTLessEqualStrategyNumber - 1];
 
-               lt = &xform[BTLessStrategyNumber - 1];
-               le = &xform[BTLessEqualStrategyNumber - 1];
-
-               /*
-                * DO NOT use the cached function stuff here -- this is
-                * key ordering, happens only when the user expresses a
-                * hokey qualification, and gets executed only once,
-                * anyway.  The transform maps are hard-coded, and can't
-                * be initialized in the correct way.
-                */
                test = OidFunctionCall2(le->sk_procedure,
-                                       lt->sk_argument, le->sk_argument);
+                                       lt->sk_argument,
+                                       le->sk_argument);
                if (DatumGetBool(test))
-                   init[BTLessEqualStrategyNumber - 1] = 0;
+                   init[BTLessEqualStrategyNumber - 1] = false;
                else
-                   init[BTLessStrategyNumber - 1] = 0;
+                   init[BTLessStrategyNumber - 1] = false;
            }
 
-           /* only one of >, >= */
+           /* keep only one of >, >= */
            if (init[BTGreaterStrategyNumber - 1]
                && init[BTGreaterEqualStrategyNumber - 1])
            {
-               ScanKeyData *gt,
-                          *ge;
-
-               gt = &xform[BTGreaterStrategyNumber - 1];
-               ge = &xform[BTGreaterEqualStrategyNumber - 1];
+               ScanKeyData *gt = &xform[BTGreaterStrategyNumber - 1];
+               ScanKeyData *ge = &xform[BTGreaterEqualStrategyNumber - 1];
 
-               /* see note above on function cache */
                test = OidFunctionCall2(ge->sk_procedure,
-                                       gt->sk_argument, ge->sk_argument);
+                                       gt->sk_argument,
+                                       ge->sk_argument);
                if (DatumGetBool(test))
-                   init[BTGreaterEqualStrategyNumber - 1] = 0;
+                   init[BTGreaterEqualStrategyNumber - 1] = false;
                else
-                   init[BTGreaterStrategyNumber - 1] = 0;
+                   init[BTGreaterStrategyNumber - 1] = false;
            }
 
-           /* okay, reorder and count */
+           /*
+            * Emit the cleaned-up keys back into the key[] array in the
+            * correct order.  Note we are overwriting our input here!
+            * It's OK because (a) xform[] is a physical copy of the keys
+            * we want, (b) we cannot emit more keys than we input, so
+            * we won't overwrite as-yet-unprocessed keys.
+            */
            for (j = BTMaxStrategyNumber; --j >= 0;)
+           {
                if (init[j])
-                   key[new_numberOfKeys++] = xform[j];
+                   memcpy(&key[new_numberOfKeys++], &xform[j],
+                          sizeof(ScanKeyData));
+           }
 
-           if (underEqualStrategy)
-               so->numberOfFirstKeys = new_numberOfKeys;
+           /*
+            * If all attrs before this one had "=", include these keys
+            * into the required-keys count.
+            */
+           if (priorAllEqualSoFar)
+               so->numberOfRequiredKeys = new_numberOfKeys;
 
+           /*
+            * Exit loop here if done.
+            */
            if (i == numberOfKeys)
                break;
 
-           /* initialization for new attno */
+           /* Re-initialize for new attno */
            attno = cur->sk_attno;
-           MemSet(xform, 0, nbytes);
            map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
                                              BTMaxStrategyNumber,
                                              attno);
-           /* haven't looked at any strategies yet */
-           for (j = 0; j <= BTMaxStrategyNumber; j++)
-               init[j] = 0;
+           MemSet(xform, 0, sizeof(xform)); /* not really necessary */
+           MemSet(init, 0, sizeof(init));
        }
 
+       /*
+        * OK, figure out which strategy this key corresponds to
+        */
        for (j = BTMaxStrategyNumber; --j >= 0;)
        {
            if (cur->sk_procedure == map->entry[j].sk_procedure)
                break;
        }
+       if (j < 0)
+           elog(ERROR, "_bt_orderkeys: unable to identify operator %u",
+                cur->sk_procedure);
 
        /* have we seen one of these before? */
        if (init[j])
        {
-           /* yup, use the appropriate value */
+           /* yup, keep the more restrictive value */
            test = FunctionCall2(&cur->sk_func,
-                                cur->sk_argument, xform[j].sk_argument);
+                                cur->sk_argument,
+                                xform[j].sk_argument);
            if (DatumGetBool(test))
                xform[j].sk_argument = cur->sk_argument;
            else if (j == (BTEqualStrategyNumber - 1))
-               so->qual_ok = 0;/* key == a && key == b, but a != b */
+               so->qual_ok = false; /* key == a && key == b, but a != b */
        }
        else
        {
-           /* nope, use this value */
-           memmove(&xform[j], cur, sizeof(*cur));
-           init[j] = 1;
+           /* nope, so remember this scankey */
+           memcpy(&xform[j], cur, sizeof(ScanKeyData));
+           init[j] = true;
        }
-
-       i++;
    }
 
    so->numberOfKeys = new_numberOfKeys;
-
-   pfree(xform);
 }
 
 /*
- * Test whether an indextuple satisfies all the scankey conditions
- *
- * If not ("false" return), the number of conditions satisfied is
- * returned in *keysok.  Given proper ordering of the scankey conditions,
- * we can use this to determine whether it's worth continuing the scan.
- * See _bt_orderkeys().
+ * Test whether an indextuple satisfies all the scankey conditions.
  *
- * HACK: *keysok == (Size) -1 means we stopped evaluating because we found
- * a NULL value in the index tuple.  It's not quite clear to me why this
- * case has to be treated specially --- tgl 7/00.
+ * If the tuple fails to pass the qual, we also determine whether there's
+ * any need to continue the scan beyond this tuple, and set *continuescan
+ * accordingly.  See comments for _bt_orderkeys(), above, about how this is
+ * done.
  */
 bool
-_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok)
+_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
+             ScanDirection dir, bool *continuescan)
 {
    BTScanOpaque so = (BTScanOpaque) scan->opaque;
    Size        keysz = so->numberOfKeys;
    TupleDesc   tupdesc;
    ScanKey     key;
-   Datum       datum;
-   bool        isNull;
-   Datum       test;
+   Size        keysok;
+
+   *continuescan = true;
 
-   *keysok = 0;
+   /* If no keys, always scan the whole index */
    if (keysz == 0)
        return true;
 
-   key = so->keyData;
    tupdesc = RelationGetDescr(scan->relation);
+   key = so->keyData;
+   keysok = 0;
 
    IncrIndexProcessed();
 
    while (keysz > 0)
    {
+       Datum       datum;
+       bool        isNull;
+       Datum       test;
+
        datum = index_getattr(tuple,
-                             key[0].sk_attno,
+                             key->sk_attno,
                              tupdesc,
                              &isNull);
 
        /* btree doesn't support 'A is null' clauses, yet */
-       if (key[0].sk_flags & SK_ISNULL)
-           return false;
-       if (isNull)
+       if (key->sk_flags & SK_ISNULL)
        {
-           if (*keysok < so->numberOfFirstKeys)
-               *keysok = -1;
+           /* we shouldn't get here, really; see _bt_orderkeys() */
+           *continuescan = false;
            return false;
        }
 
-       if (key[0].sk_flags & SK_COMMUTE)
+       if (isNull)
        {
-           test = FunctionCall2(&key[0].sk_func,
-                                key[0].sk_argument, datum);
+           /*
+            * Since NULLs are sorted after non-NULLs, we know we have
+            * reached the upper limit of the range of values for this
+            * index attr.  On a forward scan, we can stop if this qual
+            * is one of the "must match" subset.  On a backward scan,
+            * however, we should keep going.
+            */
+           if (keysok < so->numberOfRequiredKeys &&
+               ScanDirectionIsForward(dir))
+               *continuescan = false;
+           /*
+            * In any case, this indextuple doesn't match the qual.
+            */
+           return false;
        }
+
+       if (key->sk_flags & SK_COMMUTE)
+           test = FunctionCall2(&key->sk_func,
+                                key->sk_argument, datum);
        else
-       {
-           test = FunctionCall2(&key[0].sk_func,
-                                datum, key[0].sk_argument);
-       }
+           test = FunctionCall2(&key->sk_func,
+                                datum, key->sk_argument);
 
-       if (DatumGetBool(test) == !!(key[0].sk_flags & SK_NEGATE))
+       if (DatumGetBool(test) == !!(key->sk_flags & SK_NEGATE))
+       {
+           /*
+            * Tuple fails this qual.  If it's a required qual, then
+            * we can conclude no further tuples will pass, either.
+            */
+           if (keysok < so->numberOfRequiredKeys)
+               *continuescan = false;
            return false;
+       }
 
-       (*keysok)++;
+       keysok++;
        key++;
        keysz--;
    }
 
+   /* If we get here, the tuple passes all quals. */
    return true;
 }
index 3f8eebc3b3614d3e58bafa201110c7f11a33e4a6..d84cfa5ded94b8496856d8a1f253f2df741ed4ae 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: nbtree.h,v 1.39 2000/07/21 06:42:35 tgl Exp $
+ * $Id: nbtree.h,v 1.40 2000/07/25 04:47:57 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -47,13 +47,12 @@ typedef struct BTPageOpaqueData
 typedef BTPageOpaqueData *BTPageOpaque;
 
 /*
- * ScanOpaqueData is used to remember which buffers we're currently
- * examining in the scan.  We keep these buffers locked and pinned
- * and recorded in the opaque entry of the scan in order to avoid
- * doing a ReadBuffer() for every tuple in the index.  This avoids
- * semop() calls, which are expensive.
+ * BTScanOpaqueData is used to remember which buffers we're currently
+ * examining in the scan.  We keep these buffers pinned (but not locked,
+ * see nbtree.c) and recorded in the opaque entry of the scan to avoid
+ * doing a ReadBuffer() for every tuple in the index.
  *
- * And it's used to remember actual scankey info (we need in it
+ * And it's used to remember actual scankey info (we need it
  * if some scankeys evaled at runtime).
  *
  * curHeapIptr & mrkHeapIptr are heap iptr-s from current/marked
@@ -69,11 +68,12 @@ typedef struct BTScanOpaqueData
    Buffer      btso_mrkbuf;
    ItemPointerData curHeapIptr;
    ItemPointerData mrkHeapIptr;
-   uint16      qual_ok;        /* 0 for quals like key == 1 && key > 2 */
-   uint16      numberOfKeys;   /* number of keys */
-   uint16      numberOfFirstKeys;      /* number of keys for 1st
-                                        * attribute */
-   ScanKey     keyData;        /* key descriptor */
+   /* these fields are set by _bt_orderkeys(), which see for more info: */
+   bool        qual_ok;        /* false if qual can never be satisfied */
+   uint16      numberOfKeys;   /* number of scan keys */
+   uint16      numberOfRequiredKeys; /* number of keys that must be matched
+                                      * to continue the scan */
+   ScanKey     keyData;        /* array of scan keys */
 } BTScanOpaqueData;
 
 typedef BTScanOpaqueData *BTScanOpaque;
@@ -276,7 +276,8 @@ extern ScanKey _bt_mkscankey_nodata(Relation rel);
 extern void _bt_freeskey(ScanKey skey);
 extern void _bt_freestack(BTStack stack);
 extern void _bt_orderkeys(Relation relation, BTScanOpaque so);
-extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, Size *keysok);
+extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
+                         ScanDirection dir, bool *continuescan);
 extern BTItem _bt_formitem(IndexTuple itup);
 
 /*