Replace not-very-bright implementation of topological sort with a better
authorTom Lane
Sat, 6 Dec 2003 22:55:11 +0000 (22:55 +0000)
committerTom Lane
Sat, 6 Dec 2003 22:55:11 +0000 (22:55 +0000)
one (use a priority heap to keep track of items ready to output, instead
of searching the input array each time).  This brings the runtime of
pg_dump back to about what it was in 7.4.

src/bin/pg_dump/pg_dump_sort.c

index 12be66dadf7dce57432e6e221ec7db1af883c390..7db36b0d2569e3599cf4841d8085d5bf85ed9245 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.1 2003/12/06 03:00:16 tgl Exp $
+ *   $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.2 2003/12/06 22:55:11 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -52,6 +52,8 @@ static bool TopoSort(DumpableObject **objs,
                     int numObjs,
                     DumpableObject **ordering,
                     int *nOrdering);
+static void addHeapElement(int val, int *heap, int heapLength);
+static int removeHeapElement(int *heap, int heapLength);
 static bool findLoop(DumpableObject *obj,
                     int depth,
                     DumpableObject **ordering,
@@ -122,14 +124,13 @@ sortDumpableObjects(DumpableObject **objs, int numObjs)
  * partial ordering.)  Minimize rearrangement of the list not needed to
  * achieve the partial ordering.
  *
- * This is a lot simpler and slower than, for example, the topological sort
- * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
- * try to minimize the damage to the existing order.
+ * The input is the list of numObjs objects in objs[].  This list is not
+ * modified.
  *
  * Returns TRUE if able to build an ordering that satisfies all the
  * constraints, FALSE if not (there are contradictory constraints).
  *
- * On success (TRUE result), ordering[] is filled with an array of
+ * On success (TRUE result), ordering[] is filled with a sorted array of
  * DumpableObject pointers, of length equal to the input list length.
  *
  * On failure (FALSE result), ordering[] is filled with an array of
@@ -146,36 +147,60 @@ TopoSort(DumpableObject **objs,
         int *nOrdering)                /* output argument */
 {
    DumpId      maxDumpId = getMaxDumpId();
-   bool        result = true;
-   DumpableObject  **topoItems;
-   DumpableObject   *obj;
+   int        *pendingHeap;
    int        *beforeConstraints;
+   int        *idMap;
+   DumpableObject   *obj;
+   int         heapLength;
    int         i,
                j,
-               k,
-               last;
+               k;
 
-   /* First, create work array with the dump items in their current order */
-   topoItems = (DumpableObject **) malloc(numObjs * sizeof(DumpableObject *));
-   if (topoItems == NULL)
-       exit_horribly(NULL, modulename, "out of memory\n");
-   memcpy(topoItems, objs, numObjs * sizeof(DumpableObject *));
+   /*
+    * This is basically the same algorithm shown for topological sorting in
+    * Knuth's Volume 1.  However, we would like to minimize unnecessary
+    * rearrangement of the input ordering; that is, when we have a choice
+    * of which item to output next, we always want to take the one highest
+    * in the original list.  Therefore, instead of maintaining an unordered
+    * linked list of items-ready-to-output as Knuth does, we maintain a heap
+    * of their item numbers, which we can use as a priority queue.  This
+    * turns the algorithm from O(N) to O(N log N) because each insertion or
+    * removal of a heap item takes O(log N) time.  However, that's still
+    * plenty fast enough for this application.
+    */
 
    *nOrdering = numObjs;   /* for success return */
 
+   /* Eliminate the null case */
+   if (numObjs <= 0)
+       return true;
+
+   /* Create workspace for the above-described heap */
+   pendingHeap = (int *) malloc(numObjs * sizeof(int));
+   if (pendingHeap == NULL)
+       exit_horribly(NULL, modulename, "out of memory\n");
+
    /*
-    * Scan the constraints, and for each item in the array, generate a
+    * Scan the constraints, and for each item in the input, generate a
     * count of the number of constraints that say it must be before
     * something else. The count for the item with dumpId j is
-    * stored in beforeConstraints[j].
+    * stored in beforeConstraints[j].  We also make a map showing the
+    * input-order index of the item with dumpId j.
     */
    beforeConstraints = (int *) malloc((maxDumpId + 1) * sizeof(int));
    if (beforeConstraints == NULL)
        exit_horribly(NULL, modulename, "out of memory\n");
    memset(beforeConstraints, 0, (maxDumpId + 1) * sizeof(int));
+   idMap = (int *) malloc((maxDumpId + 1) * sizeof(int));
+   if (idMap == NULL)
+       exit_horribly(NULL, modulename, "out of memory\n");
    for (i = 0; i < numObjs; i++)
    {
-       obj = topoItems[i];
+       obj = objs[i];
+       j = obj->dumpId;
+       if (j <= 0 || j > maxDumpId)
+           exit_horribly(NULL, modulename, "invalid dumpId %d\n", j);
+       idMap[j] = i;
        for (j = 0; j < obj->nDeps; j++)
        {
            k = obj->dependencies[j];
@@ -185,63 +210,153 @@ TopoSort(DumpableObject **objs,
        }
    }
 
+   /*
+    * Now initialize the heap of items-ready-to-output by filling it with
+    * the indexes of items that already have beforeConstraints[id] == 0.
+    *
+    * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each
+    * j in the range 1..heapLength-1 (note we are using 0-based subscripts
+    * here, while the discussion in Knuth assumes 1-based subscripts).
+    * So, if we simply enter the indexes into pendingHeap[] in decreasing
+    * order, we a-fortiori have the heap invariant satisfied at completion
+    * of this loop, and don't need to do any sift-up comparisons.
+    */
+   heapLength = 0;
+   for (i = numObjs; --i >= 0; )
+   {
+       if (beforeConstraints[objs[i]->dumpId] == 0)
+           pendingHeap[heapLength++] = i;
+   }
+
    /*--------------------
-    * Now scan the topoItems array backwards.  At each step, output the
-    * last item that has no remaining before-constraints, and decrease
-    * the beforeConstraints count of each of the items it was constrained
-    * against.
-    * i = index of ordering[] entry we want to output this time
-    * j = search index for topoItems[]
+    * Now emit objects, working backwards in the output list.  At each step,
+    * we use the priority heap to select the last item that has no remaining
+    * before-constraints.  We remove that item from the heap, output it to
+    * ordering[], and decrease the beforeConstraints count of each of the
+    * items it was constrained against.  Whenever an item's beforeConstraints
+    * count is thereby decreased to zero, we insert it into the priority heap
+    * to show that it is a candidate to output.  We are done when the heap
+    * becomes empty; if we have output every element then we succeeded,
+    * otherwise we failed.
+    * i = number of ordering[] entries left to output
+    * j = objs[] index of item we are outputting
     * k = temp for scanning constraint list for item j
-    * last = last non-null index in topoItems (avoid redundant searches)
     *--------------------
     */
-   last = numObjs - 1;
-   for (i = numObjs; --i >= 0;)
+   i = numObjs;
+   while (heapLength > 0)
    {
-       /* Find next candidate to output */
-       while (topoItems[last] == NULL)
-           last--;
-       for (j = last; j >= 0; j--)
-       {
-           obj = topoItems[j];
-           if (obj != NULL && beforeConstraints[obj->dumpId] == 0)
-               break;
-       }
-       /* If no available candidate, topological sort fails */
-       if (j < 0)
-       {
-           result = false;
-           break;
-       }
-       /* Output candidate, and mark it done by zeroing topoItems[] entry */
-       ordering[i] = obj = topoItems[j];
-       topoItems[j] = NULL;
+       /* Select object to output by removing largest heap member */
+       j = removeHeapElement(pendingHeap, heapLength--);
+       obj = objs[j];
+       /* Output candidate to ordering[] */
+       ordering[--i] = obj;
        /* Update beforeConstraints counts of its predecessors */
        for (k = 0; k < obj->nDeps; k++)
-           beforeConstraints[obj->dependencies[k]]--;
+       {
+           int     id = obj->dependencies[k];
+
+           if ((--beforeConstraints[id]) == 0)
+               addHeapElement(idMap[id], pendingHeap, heapLength++);
+       }
    }
 
    /*
-    * If we failed, report one of the circular constraint sets
+    * If we failed, report one of the circular constraint sets.  We do
+    * this by scanning beforeConstraints[] to locate the items that have
+    * not yet been output, and for each one, trying to trace a constraint
+    * loop leading back to it.  (There may be items that depend on items
+    * involved in a loop, but aren't themselves part of the loop, so not
+    * every nonzero beforeConstraints entry is necessarily a useful
+    * starting point.  We keep trying till we find a loop.)
     */
-   if (!result)
+   if (i != 0)
    {
-       for (j = last; j >= 0; j--)
+       for (j = 1; j <= maxDumpId; j++)
        {
-           ordering[0] = obj = topoItems[j];
-           if (obj && findLoop(obj, 1, ordering, nOrdering))
-               break;
+           if (beforeConstraints[j] != 0)
+           {
+               ordering[0] = obj = objs[idMap[j]];
+               if (findLoop(obj, 1, ordering, nOrdering))
+                   break;
+           }
        }
-       if (j < 0)
+       if (j > maxDumpId)
            exit_horribly(NULL, modulename,
                          "could not find dependency loop\n");
    }
 
    /* Done */
-   free(topoItems);
+   free(pendingHeap);
    free(beforeConstraints);
+   free(idMap);
+
+   return (i == 0);
+}
+
+/*
+ * Add an item to a heap (priority queue)
+ *
+ * heapLength is the current heap size; caller is responsible for increasing
+ * its value after the call.  There must be sufficient storage at *heap.
+ */
+static void
+addHeapElement(int val, int *heap, int heapLength)
+{
+   int         j;
 
+   /*
+    * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
+    * is using 1-based array indexes, not 0-based.
+    */
+   j = heapLength;
+   while (j > 0)
+   {
+       int         i = (j - 1) >> 1;
+
+       if (val <= heap[i])
+           break;
+       heap[j] = heap[i];
+       j = i;
+   }
+   heap[j] = val;
+}
+
+/*
+ * Remove the largest item present in a heap (priority queue)
+ *
+ * heapLength is the current heap size; caller is responsible for decreasing
+ * its value after the call.
+ *
+ * We remove and return heap[0], which is always the largest element of
+ * the heap, and then "sift up" to maintain the heap invariant.
+ */
+static int
+removeHeapElement(int *heap, int heapLength)
+{
+   int         result = heap[0];
+   int         val;
+   int         i;
+
+   if (--heapLength <= 0)
+       return result;
+   val = heap[heapLength];     /* value that must be reinserted */
+   i = 0;                      /* i is where the "hole" is */
+   for (;;)
+   {
+       int         j = 2 * i + 1;
+
+       if (j >= heapLength)
+           break;
+       if (j + 1 < heapLength &&
+           heap[j] < heap[j + 1])
+           j++;
+       if (val >= heap[j])
+           break;
+       heap[i] = heap[j];
+       i = j;
+   }
+   heap[i] = val;
    return result;
 }