Fix for backward cursors with ORDER BY.
authorVadim B. Mikheev
Wed, 15 Oct 1997 06:36:36 +0000 (06:36 +0000)
committerVadim B. Mikheev
Wed, 15 Oct 1997 06:36:36 +0000 (06:36 +0000)
src/backend/utils/sort/psort.c
src/include/utils/psort.h

index 56f9f61defce36c9db61394b542d39afbc248b71..5af863b6e01de53b82948fbb8ea647cb2b93e581 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.25 1997/09/26 20:05:47 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.26 1997/10/15 06:36:08 vadim Exp $
  *
  * NOTES
  *     Sorts the first relation into the second relation.
@@ -80,7 +80,12 @@ static int _psort_cmp (HeapTuple *ltup, HeapTuple *rtup);
 
 #define TEMPDIR "./"
 
-static long shortzero = 0;     /* used to delimit runs */
+/* 
+ * tlenzero used to delimit runs; both vars below must have
+ * the same size as HeapTuple->t_len
+ */
+static unsigned int tlenzero = 0;
+static unsigned int tlendummy;
 
 static TupleDesc   PsortTupDesc;
 static ScanKey     PsortKeys;      /* used by _psort_cmp */
@@ -150,6 +155,7 @@ psort_begin(Sort * node, int nkeys, ScanKey key)
    PS(node)->tupcount = 0;
 
    PS(node)->using_tape_files = false;
+   PS(node)->all_fetched = false;
    PS(node)->psort_grab_file = NULL;
    PS(node)->memtuples = NULL;
 
@@ -219,21 +225,24 @@ inittapes(Sort * node)
  *     GETTUP          - reads the tuple
  *
  *     Note:
- *             LEN field must be a short; FP is a stream
+ *             LEN field must be as HeapTuple->t_len; FP is a stream
  */
 
 
 #define PUTTUP(NODE, TUP, FP) do {\
    ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len; \
-   fwrite((char *)TUP, (TUP)->t_len, 1, FP);} while (0)
-#define ENDRUN(FP)     fwrite((char *)&shortzero, sizeof (shortzero), 1, FP)
-#define GETLEN(LEN, FP) fread((char *)&(LEN), sizeof (shortzero), 1, FP)
+   fwrite((char *)TUP, (TUP)->t_len, 1, FP); \
+   fwrite((char *)&((TUP)->t_len), sizeof (tlendummy), 1, FP); \
+   } while (0)
+#define ENDRUN(FP)     fwrite((char *)&tlenzero, sizeof (tlenzero), 1, FP)
+#define GETLEN(LEN, FP) fread((char *)&(LEN), sizeof (tlenzero), 1, FP)
 #define ALLOCTUP(LEN)  ((HeapTuple)palloc((unsigned)LEN))
 #define GETTUP(NODE, TUP, LEN, FP) do {\
    IncrProcessed(); \
-   ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof (shortzero); \
-   fread((char *)(TUP) + sizeof (shortzero), (LEN) - sizeof (shortzero), 1, FP);} \
-                               while (0)
+   ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof (tlenzero); \
+   fread((char *)(TUP) + sizeof (tlenzero), (LEN) - sizeof (tlenzero), 1, FP); \
+   fread((char *)&tlendummy, sizeof (tlendummy), 1, FP); \
+   } while (0)
 #define SETTUPLEN(TUP, LEN)        (TUP)->t_len = LEN
 
  /*
@@ -629,11 +638,11 @@ merge(Sort * node, struct tape * dest)
    register struct tape *lasttp;       /* (TAPE[P]) */
    register struct tape *tp;
    struct leftist *tuples;
-   FILE       *destfile;
-   int         times;          /* runs left to merge */
-   int         outdummy;       /* complete dummy runs */
-   short       fromtape;
-   long        tuplen;
+   FILE           *destfile;
+   int             times;          /* runs left to merge */
+   int             outdummy;       /* complete dummy runs */
+   short           fromtape;
+   unsigned int    tuplen;
 
    Assert(node != (Sort *) NULL);
    Assert(PS(node) != (Psortstate *) NULL);
@@ -767,16 +776,20 @@ dumptuples(FILE * file, Sort * node)
 HeapTuple
 psort_grabtuple(Sort * node, bool * should_free)
 {
-   register HeapTuple tup;
-   long        tuplen;
+   register HeapTuple  tup;
 
    Assert(node != (Sort *) NULL);
    Assert(PS(node) != (Psortstate *) NULL);
 
    if (PS(node)->using_tape_files == true)
    {
-       if (!feof(PS(node)->psort_grab_file))
+       unsigned int    tuplen;
+       
+       *should_free = true;
+       if (ScanDirectionIsForward (node->plan.state->es_direction))
        {
+           if (PS(node)->all_fetched)
+               return NULL;
            if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0)
            {
                tup = (HeapTuple) palloc((unsigned) tuplen);
@@ -784,25 +797,99 @@ psort_grabtuple(Sort * node, bool * should_free)
                GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
 
                /* Update current merged sort file position */
-               PS(node)->psort_current += tuplen;
-               *should_free = true;
+               PS(node)->psort_current += tuplen + sizeof (tlendummy);
                return tup;
            }
            else
+           {
+               PS(node)->all_fetched = true;
                return NULL;
+           }
        }
-       else
+       /* Backward */
+       if (PS(node)->psort_current <= sizeof (tlendummy))
            return NULL;
+       /* 
+        * if all tuples are fetched already then we return last tuple, 
+        * else - tuple before last returned.
+        */
+       if (PS(node)->all_fetched)
+       {
+           /* psort_current is pointing to the zero tuplen at the end of file */
+           fseek(PS(node)->psort_grab_file, 
+                   PS(node)->psort_current - sizeof (tlendummy), SEEK_SET);
+           GETLEN(tuplen, PS(node)->psort_grab_file);
+           if (PS(node)->psort_current < tuplen)
+               elog (FATAL, "psort_grabtuple: too big last tuple len in backward scan");
+           PS(node)->all_fetched = false;
+       }
+       else
+       {
+           /* move to position of end tlen of prev tuple */
+           PS(node)->psort_current -= sizeof (tlendummy);
+           fseek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET);
+           GETLEN(tuplen, PS(node)->psort_grab_file);  /* get tlen of prev tuple */
+           if (tuplen == 0)
+               elog (FATAL, "psort_grabtuple: tuplen is 0 in backward scan");
+           if (PS(node)->psort_current <= tuplen + sizeof (tlendummy))
+           {   /* prev tuple should be first one */
+               if (PS(node)->psort_current != tuplen)
+                   elog (FATAL, "psort_grabtuple: first tuple expected in backward scan");
+               PS(node)->psort_current = 0;
+               fseek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET);
+               return NULL;
+           }
+           /* 
+            * Get position of prev tuple. This tuple becomes current tuple
+            * now and we have to return previous one.
+            */
+           PS(node)->psort_current -= tuplen;
+           /* move to position of end tlen of prev tuple */
+           fseek(PS(node)->psort_grab_file, 
+                   PS(node)->psort_current - sizeof (tlendummy), SEEK_SET);
+           GETLEN(tuplen, PS(node)->psort_grab_file);
+           if (PS(node)->psort_current < tuplen + sizeof (tlendummy))
+               elog (FATAL, "psort_grabtuple: too big tuple len in backward scan");
+       }
+       /* 
+        * move to prev (or last) tuple start position + sizeof(t_len) 
+        */
+       fseek(PS(node)->psort_grab_file,
+               PS(node)->psort_current - tuplen, SEEK_SET);
+       tup = (HeapTuple) palloc((unsigned) tuplen);
+       SETTUPLEN(tup, tuplen);
+       GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
+       return tup;     /* file position is equal to psort_current */
    }
    else
    {
-       if (PS(node)->psort_current < PS(node)->tupcount)
+       *should_free = false;
+       if (ScanDirectionIsForward (node->plan.state->es_direction))
        {
-           *should_free = false;
-           return (PS(node)->memtuples[PS(node)->psort_current++]);
+           if (PS(node)->psort_current < PS(node)->tupcount)
+               return (PS(node)->memtuples[PS(node)->psort_current++]);
+           else
+           {
+               PS(node)->all_fetched = true;
+               return NULL;
+           }
        }
-       else
+       /* Backward */
+       if (PS(node)->psort_current <= 0)
            return NULL;
+       /* 
+        * if all tuples are fetched already then we return last tuple, 
+        * else - tuple before last returned.
+        */
+       if (PS(node)->all_fetched)      
+           PS(node)->all_fetched = false;
+       else
+       {
+           PS(node)->psort_current--;          /* last returned tuple */
+           if (PS(node)->psort_current <= 0)
+               return NULL;
+       }
+       return (PS(node)->memtuples[PS(node)->psort_current - 1]);
    }
 }
 
index 2e4060ef56025db9259110c47ffcb37d3b65dd61..e5c57681c10897d7c37a9c88315fdce112421f34 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: psort.h,v 1.13 1997/09/18 14:42:35 vadim Exp $
+ * $Id: psort.h,v 1.14 1997/10/15 06:36:36 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -62,6 +62,7 @@ typedef struct Psortstate
    long        psort_current;  /* could be file offset, or array index */
    long        psort_saved;    /* could be file offset, or array index */
    bool        using_tape_files;
+   bool        all_fetched;    /* this is for cursors */
 
    HeapTuple  *memtuples;
 } Psortstate;