Modify parallel pg_restore to track pending and ready items by means of
authorTom Lane
Fri, 7 Aug 2009 22:48:34 +0000 (22:48 +0000)
committerTom Lane
Fri, 7 Aug 2009 22:48:34 +0000 (22:48 +0000)
two new lists, rather than repeatedly rescanning the main TOC list.
This avoids a potential O(N^2) slowdown, although you'd need a *lot*
of tables to make that really significant; and it might simplify future
improvements in the scheduling algorithm by making the set of ready
items more easily inspectable.  The original thought that it would
in itself result in a more efficient job dispatch order doesn't seem
to have been borne out in testing, but it seems worth doing anyway.

src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h

index 2ff282faa8c19a5432ba6fe07f5f3c9fde61d8c9..20bd3eb7eacd1b0b24228024b69a64dc212e5329 100644 (file)
@@ -15,7 +15,7 @@
  *
  *
  * IDENTIFICATION
- *     $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.174 2009/08/04 21:56:08 tgl Exp $
+ *     $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #define thandle HANDLE
 #endif
 
+/* Arguments needed for a worker child */
 typedef struct _restore_args
 {
    ArchiveHandle *AH;
    TocEntry   *te;
 } RestoreArgs;
 
+/* State for each parallel activity slot */
 typedef struct _parallel_slot
 {
    thandle     child_id;
@@ -117,11 +119,15 @@ static thandle spawn_restore(RestoreArgs *args);
 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
 static bool work_in_progress(ParallelSlot *slots, int n_slots);
 static int get_next_slot(ParallelSlot *slots, int n_slots);
+static void par_list_header_init(TocEntry *l);
+static void par_list_append(TocEntry *l, TocEntry *te);
+static void par_list_remove(TocEntry *te);
 static TocEntry *get_next_work_item(ArchiveHandle *AH,
-                  TocEntry **first_unprocessed,
+                  TocEntry *ready_list,
                   ParallelSlot *slots, int n_slots);
 static parallel_restore_result parallel_restore(RestoreArgs *args);
-static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+              thandle worker, int status,
               ParallelSlot *slots, int n_slots);
 static void fix_dependencies(ArchiveHandle *AH);
 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
@@ -129,7 +135,8 @@ static void repoint_table_dependencies(ArchiveHandle *AH,
                           DumpId tableId, DumpId tableDataId);
 static void identify_locking_dependencies(TocEntry *te,
                              TocEntry **tocsByDumpId);
-static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
+static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+                               TocEntry *ready_list);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
@@ -3069,7 +3076,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
    ParallelSlot *slots;
    int         work_status;
    int         next_slot;
-   TocEntry   *first_unprocessed = AH->toc->next;
+   TocEntry    pending_list;
+   TocEntry    ready_list;
    TocEntry   *next_work_item;
    thandle     ret_child;
    TocEntry   *te;
@@ -3091,8 +3099,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
     * faster in a single connection because we avoid all the connection and
     * setup overhead.
     */
-   while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
-                                               NULL, 0)) != NULL)
+   for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
    {
        if (next_work_item->section == SECTION_DATA ||
            next_work_item->section == SECTION_POST_DATA)
@@ -3104,8 +3111,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
        (void) restore_toc_entry(AH, next_work_item, ropt, false);
 
-       next_work_item->restored = true;
-       reduce_dependencies(AH, next_work_item);
+       /* there should be no touch of ready_list here, so pass NULL */
+       reduce_dependencies(AH, next_work_item, NULL);
    }
 
    /*
@@ -3128,6 +3135,25 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
    AH->currTablespace = NULL;
    AH->currWithOids = -1;
 
+   /*
+    * Initialize the lists of pending and ready items.  After this setup,
+    * the pending list is everything that needs to be done but is blocked
+    * by one or more dependencies, while the ready list contains items that
+    * have no remaining dependencies.  Note: we don't yet filter out entries
+    * that aren't going to be restored.  They might participate in
+    * dependency chains connecting entries that should be restored, so we
+    * treat them as live until we actually process them.
+    */
+   par_list_header_init(&pending_list);
+   par_list_header_init(&ready_list);
+   for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
+   {
+       if (next_work_item->depCount > 0)
+           par_list_append(&pending_list, next_work_item);
+       else
+           par_list_append(&ready_list, next_work_item);
+   }
+
    /*
     * main parent loop
     *
@@ -3137,7 +3163,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
    ahlog(AH, 1, "entering main parallel loop\n");
 
-   while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
+   while ((next_work_item = get_next_work_item(AH, &ready_list,
                                                slots, n_slots)) != NULL ||
           work_in_progress(slots, n_slots))
    {
@@ -3153,8 +3179,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
                      next_work_item->dumpId,
                      next_work_item->desc, next_work_item->tag);
 
-               next_work_item->restored = true;
-               reduce_dependencies(AH, next_work_item);
+               par_list_remove(next_work_item);
+               reduce_dependencies(AH, next_work_item, &ready_list);
 
                continue;
            }
@@ -3169,7 +3195,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
                      next_work_item->dumpId,
                      next_work_item->desc, next_work_item->tag);
 
-               next_work_item->restored = true;
+               par_list_remove(next_work_item);
 
                /* this memory is dealloced in mark_work_done() */
                args = malloc(sizeof(RestoreArgs));
@@ -3196,7 +3222,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
        if (WIFEXITED(work_status))
        {
-           mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
+           mark_work_done(AH, &ready_list,
+                          ret_child, WEXITSTATUS(work_status),
                           slots, n_slots);
        }
        else
@@ -3222,14 +3249,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
     * dependencies, or some other pathological condition. If so, do it in the
     * single parent connection.
     */
-   for (te = AH->toc->next; te != AH->toc; te = te->next)
+   for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
    {
-       if (!te->restored)
-       {
-           ahlog(AH, 1, "processing missed item %d %s %s\n",
-                 te->dumpId, te->desc, te->tag);
-           (void) restore_toc_entry(AH, te, ropt, false);
-       }
+       ahlog(AH, 1, "processing missed item %d %s %s\n",
+             te->dumpId, te->desc, te->tag);
+       (void) restore_toc_entry(AH, te, ropt, false);
    }
 
    /* The ACLs will be handled back in RestoreArchive. */
@@ -3372,25 +3396,57 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2)
 }
 
 
+/*
+ * Initialize the header of a parallel-processing list.
+ *
+ * These are circular lists with a dummy TocEntry as header, just like the
+ * main TOC list; but we use separate list links so that an entry can be in
+ * the main TOC list as well as in a parallel-processing list.
+ */
+static void
+par_list_header_init(TocEntry *l)
+{
+   l->par_prev = l->par_next = l;
+}
+
+/* Append te to the end of the parallel-processing list headed by l */
+static void
+par_list_append(TocEntry *l, TocEntry *te)
+{
+   te->par_prev = l->par_prev;
+   l->par_prev->par_next = te;
+   l->par_prev = te;
+   te->par_next = l;
+}
+
+/* Remove te from whatever parallel-processing list it's in */
+static void
+par_list_remove(TocEntry *te)
+{
+   te->par_prev->par_next = te->par_next;
+   te->par_next->par_prev = te->par_prev;
+   te->par_prev = NULL;
+   te->par_next = NULL;
+}
+
 
 /*
  * Find the next work item (if any) that is capable of being run now.
  *
  * To qualify, the item must have no remaining dependencies
- * and no requirement for locks that is incompatible with
- * items currently running.
+ * and no requirements for locks that are incompatible with
+ * items currently running.  Items in the ready_list are known to have
+ * no remaining dependencies, but we have to check for lock conflicts.
  *
- * first_unprocessed is state data that tracks the location of the first
- * TocEntry that's not marked 'restored'.  This avoids O(N^2) search time
- * with long TOC lists.  (Even though the constant is pretty small, it'd
- * get us eventually.)
+ * Note that the returned item has *not* been removed from ready_list.
+ * The caller must do that after successfully dispatching the item.
  *
  * pref_non_data is for an alternative selection algorithm that gives
  * preference to non-data items if there is already a data load running.
  * It is currently disabled.
  */
 static TocEntry *
-get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
+get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
                   ParallelSlot *slots, int n_slots)
 {
    bool        pref_non_data = false;  /* or get from AH->ropt */
@@ -3415,26 +3471,12 @@ get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
    }
 
    /*
-    * Advance first_unprocessed if possible.
-    */
-   for (te = *first_unprocessed; te != AH->toc; te = te->next)
-   {
-       if (!te->restored)
-           break;
-   }
-   *first_unprocessed = te;
-
-   /*
-    * Search from first_unprocessed until we find an available item.
+    * Search the ready_list until we find a suitable item.
     */
-   for (; te != AH->toc; te = te->next)
+   for (te = ready_list->par_next; te != ready_list; te = te->par_next)
    {
        bool        conflicts = false;
 
-       /* Ignore if already done or still waiting on dependencies */
-       if (te->restored || te->depCount > 0)
-           continue;
-
        /*
         * Check to see if the item would need exclusive lock on something
         * that a currently running item also needs lock on, or vice versa. If
@@ -3546,7 +3588,8 @@ parallel_restore(RestoreArgs *args)
  * update status, and reduce the dependency count of any dependent items.
  */
 static void
-mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+              thandle worker, int status,
               ParallelSlot *slots, int n_slots)
 {
    TocEntry   *te = NULL;
@@ -3585,7 +3628,7 @@ mark_work_done(ArchiveHandle *AH, thandle worker, int status,
        die_horribly(AH, modulename, "worker process failed: exit code %d\n",
                     status);
 
-   reduce_dependencies(AH, te);
+   reduce_dependencies(AH, te, ready_list);
 }
 
 
@@ -3610,13 +3653,16 @@ fix_dependencies(ArchiveHandle *AH)
     * indexes the TOC entries by dump ID, rather than searching the TOC list
     * repeatedly.  Entries for dump IDs not present in the TOC will be NULL.
     *
-    * Also, initialize the depCount fields.
+    * Also, initialize the depCount fields, and make sure all the TOC items
+    * are marked as not being in any parallel-processing list.
     */
    tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
    for (te = AH->toc->next; te != AH->toc; te = te->next)
    {
        tocsByDumpId[te->dumpId - 1] = te;
        te->depCount = te->nDeps;
+       te->par_prev = NULL;
+       te->par_next = NULL;
    }
 
    /*
@@ -3785,10 +3831,11 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
 
 /*
  * Remove the specified TOC entry from the depCounts of items that depend on
- * it, thereby possibly making them ready-to-run.
+ * it, thereby possibly making them ready-to-run.  Any pending item that
+ * becomes ready should be moved to the ready list.
  */
 static void
-reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
 {
    DumpId      target = te->dumpId;
    int         i;
@@ -3805,7 +3852,16 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
        for (i = 0; i < te->nDeps; i++)
        {
            if (te->dependencies[i] == target)
+           {
                te->depCount--;
+               if (te->depCount == 0 && te->par_prev != NULL)
+               {
+                   /* It must be in the pending list, so remove it ... */
+                   par_list_remove(te);
+                   /* ... and add to ready_list */
+                   par_list_append(ready_list, te);
+               }
+           }
        }
    }
 }
index 710dec019ada98fa1085bbee8b27efafd7fc97f8..fa6db407f2b003195dad53ddd893eedf72831eb4 100644 (file)
@@ -17,7 +17,7 @@
  *
  *
  * IDENTIFICATION
- *     $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.81 2009/08/04 21:56:09 tgl Exp $
+ *     $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.82 2009/08/07 22:48:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -314,7 +314,8 @@ typedef struct _tocEntry
    void       *formatData;     /* TOC Entry data specific to file format */
 
    /* working state (needed only for parallel restore) */
-   bool        restored;       /* item is in progress or done */
+   struct _tocEntry *par_prev; /* list links for pending/ready items; */
+   struct _tocEntry *par_next; /* these are NULL if not in either list */
    bool        created;        /* set for DATA member if TABLE was created */
    int         depCount;       /* number of dependencies not yet restored */
    DumpId     *lockDeps;       /* dumpIds of objects this one needs lock on */