Make sequential scans parallel-aware.
authorRobert Haas
Wed, 11 Nov 2015 13:57:52 +0000 (08:57 -0500)
committerRobert Haas
Wed, 11 Nov 2015 13:57:52 +0000 (08:57 -0500)
In addition, this path fills in a number of missing bits and pieces in
the parallel infrastructure.  Paths and plans now have a parallel_aware
flag indicating whether whatever parallel-aware logic they have should
be engaged.  It is believed that we will need this flag for a number of
path/plan types, not just sequential scans, which is why the flag is
generic rather than part of the SeqScan structures specifically.
Also, execParallel.c now gives parallel nodes a chance to initialize
their PlanState nodes from the DSM during parallel worker startup.

Amit Kapila, with a fair amount of adjustment by me.  Review of previous
patch versions by Haribabu Kommi and others.

18 files changed:
src/backend/commands/explain.c
src/backend/executor/execAmi.c
src/backend/executor/execParallel.c
src/backend/executor/nodeSeqscan.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/util/pathnode.c
src/include/executor/nodeSeqscan.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h
src/include/nodes/relation.h
src/include/optimizer/cost.h
src/include/optimizer/pathnode.h

index 7fb8a1458dfa427d87b296a6f5d8768d8aac5170..183d3d9bcb77af298986812f84bc4502214cb563 100644 (file)
@@ -984,6 +984,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
            appendStringInfoString(es->str, "->  ");
            es->indent += 2;
        }
+       if (plan->parallel_aware)
+           appendStringInfoString(es->str, "Parallel ");
        appendStringInfoString(es->str, pname);
        es->indent++;
    }
@@ -1000,6 +1002,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
            ExplainPropertyText("Subplan Name", plan_name, es);
        if (custom_name)
            ExplainPropertyText("Custom Plan Provider", custom_name, es);
+       if (plan->parallel_aware)
+           ExplainPropertyText("Parallel Aware", "true", es);
    }
 
    switch (nodeTag(plan))
index 163650cecd1cb748af692a7dfa74b20f0fad013c..b969fc080374860108aec808f3afef50e888a10f 100644 (file)
@@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node)
    if (node == NULL)
        return false;
 
+   /*
+    * Parallel-aware nodes return a subset of the tuples in each worker,
+    * and in general we can't expect to have enough bookkeeping state to
+    * know which ones we returned in this worker as opposed to some other
+    * worker.
+    */
+   if (node->parallel_aware)
+       return false;
+
    switch (nodeTag(node))
    {
        case T_Result:
index 99a9de3cdc397fde4dab1623cdb968508ff87445..eae13c5647752c79a42eff96406ed50b014edae5 100644 (file)
@@ -25,6 +25,7 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeSeqscan.h"
 #include "executor/tqueue.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/planmain.h"
@@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
    /* Count this node. */
    e->nnodes++;
 
-   /*
-    * XXX. Call estimators for parallel-aware nodes here, when we have
-    * some.
-    */
+   /* Call estimators for parallel-aware nodes. */
+   switch (nodeTag(planstate))
+   {
+       case T_SeqScanState:
+           ExecSeqScanEstimate((SeqScanState *) planstate,
+                               e->pcxt);
+           break;
+       default:
+           break;
+   }
 
    return planstate_tree_walker(planstate, ExecParallelEstimate, e);
 }
@@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate,
    /* Count this node. */
    d->nnodes++;
 
-   /*
-    * XXX. Call initializers for parallel-aware plan nodes, when we have
-    * some.
-    */
+   /* Call initializers for parallel-aware plan nodes. */
+   switch (nodeTag(planstate))
+   {
+       case T_SeqScanState:
+           ExecSeqScanInitializeDSM((SeqScanState *) planstate,
+                                    d->pcxt);
+           break;
+       default:
+           break;
+   }
 
    return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
 }
@@ -574,6 +587,30 @@ ExecParallelReportInstrumentation(PlanState *planstate,
                                 instrumentation);
 }
 
+/*
+ * Initialize the PlanState and its descendents with the information
+ * retrieved from shared memory.  This has to be done once the PlanState
+ * is allocated and initialized by executor; that is, after ExecutorStart().
+ */
+static bool
+ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
+{
+   if (planstate == NULL)
+       return false;
+
+   /* Call initializers for parallel-aware plan nodes. */
+   switch (nodeTag(planstate))
+   {
+       case T_SeqScanState:
+           ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
+           break;
+       default:
+           break;
+   }
+
+   return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
+}
+
 /*
  * Main entrypoint for parallel query worker processes.
  *
@@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 
    /* Start up the executor, have it run the plan, and then shut it down. */
    ExecutorStart(queryDesc, 0);
+   ExecParallelInitializeWorker(queryDesc->planstate, toc);
    ExecutorRun(queryDesc, ForwardScanDirection, 0L);
    ExecutorFinish(queryDesc);
 
index 3cb81fccc30c6378f99c6dcf0c92d01efbfdf101..b858f2f3af89fbad0702f7ce9430ab0ebdd6a547 100644 (file)
  *     ExecInitSeqScan         creates and initializes a seqscan node.
  *     ExecEndSeqScan          releases any storage allocated.
  *     ExecReScanSeqScan       rescans the relation
+ *
+ *     ExecSeqScanEstimate     estimates DSM space needed for parallel scan
+ *     ExecSeqScanInitializeDSM initialize DSM for parallel scan
+ *     ExecSeqScanInitializeWorker attach to DSM info in parallel worker
  */
 #include "postgres.h"
 
@@ -53,10 +57,22 @@ SeqNext(SeqScanState *node)
    /*
     * get information from the estate and scan state
     */
-   scandesc = node->ss_currentScanDesc;
-   estate = node->ps.state;
+   scandesc = node->ss.ss_currentScanDesc;
+   estate = node->ss.ps.state;
    direction = estate->es_direction;
-   slot = node->ss_ScanTupleSlot;
+   slot = node->ss.ss_ScanTupleSlot;
+
+   if (scandesc == NULL)
+   {
+       /*
+        * We reach here if the scan is not parallel, or if we're executing
+        * a scan that was intended to be parallel serially.
+        */
+       scandesc = heap_beginscan(node->ss.ss_currentRelation,
+                                 estate->es_snapshot,
+                                 0, NULL);
+       node->ss.ss_currentScanDesc = scandesc;
+   }
 
    /*
     * get the next tuple from the table
@@ -123,27 +139,19 @@ static void
 InitScanRelation(SeqScanState *node, EState *estate, int eflags)
 {
    Relation    currentRelation;
-   HeapScanDesc currentScanDesc;
 
    /*
     * get the relation object id from the relid'th entry in the range table,
     * open that relation and acquire appropriate lock on it.
     */
    currentRelation = ExecOpenScanRelation(estate,
-                                     ((SeqScan *) node->ps.plan)->scanrelid,
+                                     ((SeqScan *) node->ss.ps.plan)->scanrelid,
                                           eflags);
 
-   /* initialize a heapscan */
-   currentScanDesc = heap_beginscan(currentRelation,
-                                    estate->es_snapshot,
-                                    0,
-                                    NULL);
-
-   node->ss_currentRelation = currentRelation;
-   node->ss_currentScanDesc = currentScanDesc;
+   node->ss.ss_currentRelation = currentRelation;
 
    /* and report the scan tuple slot's rowtype */
-   ExecAssignScanType(node, RelationGetDescr(currentRelation));
+   ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
 }
 
 
@@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
     * create state structure
     */
    scanstate = makeNode(SeqScanState);
-   scanstate->ps.plan = (Plan *) node;
-   scanstate->ps.state = estate;
+   scanstate->ss.ps.plan = (Plan *) node;
+   scanstate->ss.ps.state = estate;
 
    /*
     * Miscellaneous initialization
     *
     * create expression context for node
     */
-   ExecAssignExprContext(estate, &scanstate->ps);
+   ExecAssignExprContext(estate, &scanstate->ss.ps);
 
    /*
     * initialize child expressions
     */
-   scanstate->ps.targetlist = (List *)
+   scanstate->ss.ps.targetlist = (List *)
        ExecInitExpr((Expr *) node->plan.targetlist,
                     (PlanState *) scanstate);
-   scanstate->ps.qual = (List *)
+   scanstate->ss.ps.qual = (List *)
        ExecInitExpr((Expr *) node->plan.qual,
                     (PlanState *) scanstate);
 
    /*
     * tuple table initialization
     */
-   ExecInitResultTupleSlot(estate, &scanstate->ps);
-   ExecInitScanTupleSlot(estate, scanstate);
+   ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
+   ExecInitScanTupleSlot(estate, &scanstate->ss);
 
    /*
     * initialize scan relation
     */
    InitScanRelation(scanstate, estate, eflags);
 
-   scanstate->ps.ps_TupFromTlist = false;
+   scanstate->ss.ps.ps_TupFromTlist = false;
 
    /*
     * Initialize result tuple type and projection info.
     */
-   ExecAssignResultTypeFromTL(&scanstate->ps);
-   ExecAssignScanProjectionInfo(scanstate);
+   ExecAssignResultTypeFromTL(&scanstate->ss.ps);
+   ExecAssignScanProjectionInfo(&scanstate->ss);
 
    return scanstate;
 }
@@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node)
    /*
     * get information from node
     */
-   relation = node->ss_currentRelation;
-   scanDesc = node->ss_currentScanDesc;
+   relation = node->ss.ss_currentRelation;
+   scanDesc = node->ss.ss_currentScanDesc;
 
    /*
     * Free the exprcontext
     */
-   ExecFreeExprContext(&node->ps);
+   ExecFreeExprContext(&node->ss.ps);
 
    /*
     * clean out the tuple table
     */
-   ExecClearTuple(node->ps.ps_ResultTupleSlot);
-   ExecClearTuple(node->ss_ScanTupleSlot);
+   ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+   ExecClearTuple(node->ss.ss_ScanTupleSlot);
 
    /*
     * close heap scan
     */
-   heap_endscan(scanDesc);
+   if (scanDesc != NULL)
+       heap_endscan(scanDesc);
 
    /*
     * close the heap relation.
@@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node)
 {
    HeapScanDesc scan;
 
-   scan = node->ss_currentScanDesc;
+   scan = node->ss.ss_currentScanDesc;
 
-   heap_rescan(scan,           /* scan desc */
-               NULL);          /* new scan keys */
+   if (scan != NULL)
+       heap_rescan(scan,           /* scan desc */
+                   NULL);          /* new scan keys */
 
    ExecScanReScan((ScanState *) node);
 }
+
+/* ----------------------------------------------------------------
+ *                     Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *     ExecSeqScanEstimate
+ *
+ *     estimates the space required to serialize seqscan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanEstimate(SeqScanState *node,
+                   ParallelContext *pcxt)
+{
+   EState     *estate = node->ss.ps.state;
+
+   node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot);
+   shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecSeqScanInitializeDSM
+ *
+ *     Set up a parallel heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanInitializeDSM(SeqScanState *node,
+                        ParallelContext *pcxt)
+{
+   EState     *estate = node->ss.ps.state;
+   ParallelHeapScanDesc    pscan;
+
+   pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+   heap_parallelscan_initialize(pscan,
+                                node->ss.ss_currentRelation,
+                                estate->es_snapshot);
+   shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
+   node->ss.ss_currentScanDesc =
+       heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecSeqScanInitializeWorker
+ *
+ *     Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
+{
+   ParallelHeapScanDesc    pscan;
+
+   pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+   node->ss.ss_currentScanDesc =
+       heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+}
index c176ff978ea306f9542caab8e2618213436fd42b..26264cbfab49e0ba52ab7c1370585d9a64ef81b8 100644 (file)
@@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
    COPY_SCALAR_FIELD(total_cost);
    COPY_SCALAR_FIELD(plan_rows);
    COPY_SCALAR_FIELD(plan_width);
+   COPY_SCALAR_FIELD(parallel_aware);
    COPY_SCALAR_FIELD(plan_node_id);
    COPY_NODE_FIELD(targetlist);
    COPY_NODE_FIELD(qual);
index 3d3a7744b528e5aa5cfd1e68e4682df8fced0e69..ab2fdc434e0203f82f1887d9a005ea4467e41c82 100644 (file)
@@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
    WRITE_FLOAT_FIELD(total_cost, "%.2f");
    WRITE_FLOAT_FIELD(plan_rows, "%.0f");
    WRITE_INT_FIELD(plan_width);
+   WRITE_BOOL_FIELD(parallel_aware);
    WRITE_INT_FIELD(plan_node_id);
    WRITE_NODE_FIELD(targetlist);
    WRITE_NODE_FIELD(qual);
@@ -1585,6 +1586,7 @@ _outPathInfo(StringInfo str, const Path *node)
        _outBitmapset(str, node->param_info->ppi_req_outer);
    else
        _outBitmapset(str, NULL);
+   WRITE_BOOL_FIELD(parallel_aware);
    WRITE_FLOAT_FIELD(rows, "%.0f");
    WRITE_FLOAT_FIELD(startup_cost, "%.2f");
    WRITE_FLOAT_FIELD(total_cost, "%.2f");
index 94ba6dc0b9b1efa7c3286fb052c7dfb05c31fa3e..5e258c939f2c484d7f1579478da175a62cd65c9e 100644 (file)
@@ -1412,6 +1412,7 @@ ReadCommonPlan(Plan *local_node)
    READ_FLOAT_FIELD(total_cost);
    READ_FLOAT_FIELD(plan_rows);
    READ_INT_FIELD(plan_width);
+   READ_BOOL_FIELD(parallel_aware);
    READ_INT_FIELD(plan_node_id);
    READ_NODE_FIELD(targetlist);
    READ_NODE_FIELD(qual);
index 8fc1cfd15f5330a44c537eec49e5eecc93dac27f..47de4eeba8c1fef77cf154b263eb6616cc442622 100644 (file)
@@ -475,7 +475,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
    required_outer = rel->lateral_relids;
 
    /* Consider sequential scan */
-   add_path(rel, create_seqscan_path(root, rel, required_outer));
+   add_path(rel, create_seqscan_path(root, rel, required_outer, 0));
 
    /* Consider index scans */
    create_index_paths(root, rel);
index 1b61fd9d4eaa7206e3b6a1b46dfa022b2a2c16d6..e604992f73429b8ebb2e8371f7a2af9076346973 100644 (file)
@@ -181,10 +181,13 @@ clamp_row_est(double nrows)
  *
  * 'baserel' is the relation to be scanned
  * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ * 'nworkers' are the number of workers among which the work will be
+ *         distributed if the scan is parallel scan
  */
 void
 cost_seqscan(Path *path, PlannerInfo *root,
-            RelOptInfo *baserel, ParamPathInfo *param_info)
+            RelOptInfo *baserel, ParamPathInfo *param_info,
+            int nworkers)
 {
    Cost        startup_cost = 0;
    Cost        run_cost = 0;
@@ -222,6 +225,16 @@ cost_seqscan(Path *path, PlannerInfo *root,
    cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
    run_cost += cpu_per_tuple * baserel->tuples;
 
+   /*
+    * Primitive parallel cost model.  Assume the leader will do half as much
+    * work as a regular worker, because it will also need to read the tuples
+    * returned by the workers when they percolate up to the gather ndoe.
+    * This is almost certainly not exactly the right way to model this, so
+    * this will probably need to be changed at some point...
+    */
+   if (nworkers > 0)
+       run_cost = run_cost / (nworkers + 0.5);
+
    path->startup_cost = startup_cost;
    path->total_cost = startup_cost + run_cost;
 }
index e70a337328ebfef7b17299aeac2475b6c44f7b27..411b36c418ed2cf599b2c556a0534855ea30bf99 100644 (file)
@@ -101,7 +101,7 @@ static List *fix_indexorderby_references(PlannerInfo *root, IndexPath *index_pat
 static Node *fix_indexqual_operand(Node *node, IndexOptInfo *index, int indexcol);
 static List *get_switched_clauses(List *clauses, Relids outerrelids);
 static List *order_qual_clauses(PlannerInfo *root, List *clauses);
-static void copy_path_costsize(Plan *dest, Path *src);
+static void copy_generic_path_info(Plan *dest, Path *src);
 static void copy_plan_costsize(Plan *dest, Plan *src);
 static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
 static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
@@ -779,7 +779,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
     * prepare_sort_from_pathkeys on it before we do so on the individual
     * child plans, to make cross-checking the sort info easier.
     */
-   copy_path_costsize(plan, (Path *) best_path);
+   copy_generic_path_info(plan, (Path *) best_path);
    plan->targetlist = tlist;
    plan->qual = NIL;
    plan->lefttree = NULL;
@@ -901,7 +901,7 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path)
 
    plan = make_material(subplan);
 
-   copy_path_costsize(&plan->plan, (Path *) best_path);
+   copy_generic_path_info(&plan->plan, (Path *) best_path);
 
    return plan;
 }
@@ -1129,7 +1129,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
                              best_path->single_copy,
                              subplan);
 
-   copy_path_costsize(&gather_plan->plan, &best_path->path);
+   copy_generic_path_info(&gather_plan->plan, &best_path->path);
 
    /* use parallel mode for parallel plans. */
    root->glob->parallelModeNeeded = true;
@@ -1178,7 +1178,7 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path,
                             scan_clauses,
                             scan_relid);
 
-   copy_path_costsize(&scan_plan->plan, best_path);
+   copy_generic_path_info(&scan_plan->plan, best_path);
 
    return scan_plan;
 }
@@ -1224,7 +1224,7 @@ create_samplescan_plan(PlannerInfo *root, Path *best_path,
                                scan_relid,
                                tsc);
 
-   copy_path_costsize(&scan_plan->scan.plan, best_path);
+   copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
    return scan_plan;
 }
@@ -1422,7 +1422,7 @@ create_indexscan_plan(PlannerInfo *root,
                                            indexorderbyops,
                                            best_path->indexscandir);
 
-   copy_path_costsize(&scan_plan->plan, &best_path->path);
+   copy_generic_path_info(&scan_plan->plan, &best_path->path);
 
    return scan_plan;
 }
@@ -1538,7 +1538,7 @@ create_bitmap_scan_plan(PlannerInfo *root,
                                     bitmapqualorig,
                                     baserelid);
 
-   copy_path_costsize(&scan_plan->scan.plan, &best_path->path);
+   copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
 
    return scan_plan;
 }
@@ -1795,7 +1795,7 @@ create_tidscan_plan(PlannerInfo *root, TidPath *best_path,
                             scan_relid,
                             tidquals);
 
-   copy_path_costsize(&scan_plan->scan.plan, &best_path->path);
+   copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
 
    return scan_plan;
 }
@@ -1836,7 +1836,7 @@ create_subqueryscan_plan(PlannerInfo *root, Path *best_path,
                                  scan_relid,
                                  best_path->parent->subplan);
 
-   copy_path_costsize(&scan_plan->scan.plan, best_path);
+   copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
    return scan_plan;
 }
@@ -1879,7 +1879,7 @@ create_functionscan_plan(PlannerInfo *root, Path *best_path,
    scan_plan = make_functionscan(tlist, scan_clauses, scan_relid,
                                  functions, rte->funcordinality);
 
-   copy_path_costsize(&scan_plan->scan.plan, best_path);
+   copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
    return scan_plan;
 }
@@ -1923,7 +1923,7 @@ create_valuesscan_plan(PlannerInfo *root, Path *best_path,
    scan_plan = make_valuesscan(tlist, scan_clauses, scan_relid,
                                values_lists);
 
-   copy_path_costsize(&scan_plan->scan.plan, best_path);
+   copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
    return scan_plan;
 }
@@ -2016,7 +2016,7 @@ create_ctescan_plan(PlannerInfo *root, Path *best_path,
    scan_plan = make_ctescan(tlist, scan_clauses, scan_relid,
                             plan_id, cte_param_id);
 
-   copy_path_costsize(&scan_plan->scan.plan, best_path);
+   copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
    return scan_plan;
 }
@@ -2076,7 +2076,7 @@ create_worktablescan_plan(PlannerInfo *root, Path *best_path,
    scan_plan = make_worktablescan(tlist, scan_clauses, scan_relid,
                                   cteroot->wt_param_id);
 
-   copy_path_costsize(&scan_plan->scan.plan, best_path);
+   copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
    return scan_plan;
 }
@@ -2132,7 +2132,7 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path,
                                                tlist, scan_clauses);
 
    /* Copy cost data from Path to Plan; no need to make FDW do this */
-   copy_path_costsize(&scan_plan->scan.plan, &best_path->path);
+   copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
 
    /* Copy foreign server OID; likewise, no need to make FDW do this */
    scan_plan->fs_server = rel->serverid;
@@ -2238,7 +2238,7 @@ create_customscan_plan(PlannerInfo *root, CustomPath *best_path,
     * Copy cost data from Path to Plan; no need to make custom-plan providers
     * do this
     */
-   copy_path_costsize(&cplan->scan.plan, &best_path->path);
+   copy_generic_path_info(&cplan->scan.plan, &best_path->path);
 
    /* Likewise, copy the relids that are represented by this custom scan */
    cplan->custom_relids = best_path->path.parent->relids;
@@ -2355,7 +2355,7 @@ create_nestloop_plan(PlannerInfo *root,
                              inner_plan,
                              best_path->jointype);
 
-   copy_path_costsize(&join_plan->join.plan, &best_path->path);
+   copy_generic_path_info(&join_plan->join.plan, &best_path->path);
 
    return join_plan;
 }
@@ -2650,7 +2650,7 @@ create_mergejoin_plan(PlannerInfo *root,
                               best_path->jpath.jointype);
 
    /* Costs of sort and material steps are included in path cost already */
-   copy_path_costsize(&join_plan->join.plan, &best_path->jpath.path);
+   copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
 
    return join_plan;
 }
@@ -2775,7 +2775,7 @@ create_hashjoin_plan(PlannerInfo *root,
                              (Plan *) hash_plan,
                              best_path->jpath.jointype);
 
-   copy_path_costsize(&join_plan->join.plan, &best_path->jpath.path);
+   copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
 
    return join_plan;
 }
@@ -3411,9 +3411,11 @@ order_qual_clauses(PlannerInfo *root, List *clauses)
 /*
  * Copy cost and size info from a Path node to the Plan node created from it.
  * The executor usually won't use this info, but it's needed by EXPLAIN.
+ *
+ * Also copy the parallel-aware flag, which the executor will use.
  */
 static void
-copy_path_costsize(Plan *dest, Path *src)
+copy_generic_path_info(Plan *dest, Path *src)
 {
    if (src)
    {
@@ -3421,6 +3423,7 @@ copy_path_costsize(Plan *dest, Path *src)
        dest->total_cost = src->total_cost;
        dest->plan_rows = src->rows;
        dest->plan_width = src->parent->width;
+       dest->parallel_aware = src->parallel_aware;
    }
    else
    {
@@ -3428,6 +3431,7 @@ copy_path_costsize(Plan *dest, Path *src)
        dest->total_cost = 0;
        dest->plan_rows = 0;
        dest->plan_width = 0;
+       dest->parallel_aware = false;
    }
 }
 
index 536b55e4930557fc08ae93f7fc130aa44873ad8f..fa1ab3a46c3614daa46f49e6b6920113c9409847 100644 (file)
@@ -4690,7 +4690,7 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
    comparisonCost = 2.0 * (indexExprCost.startup + indexExprCost.per_tuple);
 
    /* Estimate the cost of seq scan + sort */
-   seqScanPath = create_seqscan_path(root, rel, NULL);
+   seqScanPath = create_seqscan_path(root, rel, NULL, 0);
    cost_sort(&seqScanAndSortPath, root, NIL,
              seqScanPath->total_cost, rel->tuples, rel->width,
              comparisonCost, maintenance_work_mem, -1.0);
index 1895a6894a37081b6c8278314cb42af113b8fcf0..09c32445462d497b5542a3cba0d752ab2aa7989d 100644 (file)
@@ -696,7 +696,8 @@ add_path_precheck(RelOptInfo *parent_rel,
  *   pathnode.
  */
 Path *
-create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
+create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
+                   Relids required_outer, int nworkers)
 {
    Path       *pathnode = makeNode(Path);
 
@@ -704,9 +705,10 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = nworkers > 0 ? true : false;
    pathnode->pathkeys = NIL;   /* seqscan has unordered result */
 
-   cost_seqscan(pathnode, root, rel, pathnode->param_info);
+   cost_seqscan(pathnode, root, rel, pathnode->param_info, nworkers);
 
    return pathnode;
 }
@@ -724,6 +726,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = false;
    pathnode->pathkeys = NIL;   /* samplescan has unordered result */
 
    cost_samplescan(pathnode, root, rel, pathnode->param_info);
@@ -777,6 +780,7 @@ create_index_path(PlannerInfo *root,
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = pathkeys;
 
    /* Convert clauses to indexquals the executor can handle */
@@ -822,6 +826,7 @@ create_bitmap_heap_path(PlannerInfo *root,
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;      /* always unordered */
 
    pathnode->bitmapqual = bitmapqual;
@@ -847,6 +852,7 @@ create_bitmap_and_path(PlannerInfo *root,
    pathnode->path.pathtype = T_BitmapAnd;
    pathnode->path.parent = rel;
    pathnode->path.param_info = NULL;   /* not used in bitmap trees */
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;      /* always unordered */
 
    pathnode->bitmapquals = bitmapquals;
@@ -871,6 +877,7 @@ create_bitmap_or_path(PlannerInfo *root,
    pathnode->path.pathtype = T_BitmapOr;
    pathnode->path.parent = rel;
    pathnode->path.param_info = NULL;   /* not used in bitmap trees */
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;      /* always unordered */
 
    pathnode->bitmapquals = bitmapquals;
@@ -895,6 +902,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;      /* always unordered */
 
    pathnode->tidquals = tidquals;
@@ -922,6 +930,7 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_appendrel_parampathinfo(rel,
                                                            required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;      /* result is always considered
                                         * unsorted */
    pathnode->subpaths = subpaths;
@@ -975,6 +984,7 @@ create_merge_append_path(PlannerInfo *root,
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_appendrel_parampathinfo(rel,
                                                            required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = pathkeys;
    pathnode->subpaths = subpaths;
 
@@ -1049,6 +1059,7 @@ create_result_path(List *quals)
    pathnode->path.pathtype = T_Result;
    pathnode->path.parent = NULL;
    pathnode->path.param_info = NULL;   /* there are no other rels... */
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;
    pathnode->quals = quals;
 
@@ -1082,6 +1093,7 @@ create_material_path(RelOptInfo *rel, Path *subpath)
    pathnode->path.pathtype = T_Material;
    pathnode->path.parent = rel;
    pathnode->path.param_info = subpath->param_info;
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = subpath->pathkeys;
 
    pathnode->subpath = subpath;
@@ -1142,6 +1154,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
    pathnode->path.pathtype = T_Unique;
    pathnode->path.parent = rel;
    pathnode->path.param_info = subpath->param_info;
+   pathnode->path.parallel_aware = false;
 
    /*
     * Assume the output is unsorted, since we don't necessarily have pathkeys
@@ -1323,6 +1336,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = NIL;      /* Gather has unordered result */
 
    pathnode->subpath = subpath;
@@ -1378,6 +1392,7 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = false;
    pathnode->pathkeys = pathkeys;
 
    cost_subqueryscan(pathnode, root, rel, pathnode->param_info);
@@ -1400,6 +1415,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = false;
    pathnode->pathkeys = pathkeys;
 
    cost_functionscan(pathnode, root, rel, pathnode->param_info);
@@ -1422,6 +1438,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel,
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = false;
    pathnode->pathkeys = NIL;   /* result is always unordered */
 
    cost_valuesscan(pathnode, root, rel, pathnode->param_info);
@@ -1443,6 +1460,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = false;
    pathnode->pathkeys = NIL;   /* XXX for now, result is always unordered */
 
    cost_ctescan(pathnode, root, rel, pathnode->param_info);
@@ -1465,6 +1483,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
    pathnode->parent = rel;
    pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                     required_outer);
+   pathnode->parallel_aware = false;
    pathnode->pathkeys = NIL;   /* result is always unordered */
 
    /* Cost is the same as for a regular CTE scan */
@@ -1496,6 +1515,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
    pathnode->path.parent = rel;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
+   pathnode->path.parallel_aware = false;
    pathnode->path.rows = rows;
    pathnode->path.startup_cost = startup_cost;
    pathnode->path.total_cost = total_cost;
@@ -1630,6 +1650,7 @@ create_nestloop_path(PlannerInfo *root,
                                  sjinfo,
                                  required_outer,
                                  &restrict_clauses);
+   pathnode->path.parallel_aware = false;
    pathnode->path.pathkeys = pathkeys;
    pathnode->jointype = jointype;
    pathnode->outerjoinpath = outer_path;
@@ -1687,6 +1708,7 @@ create_mergejoin_path(PlannerInfo *root,
                                  sjinfo,
                                  required_outer,
                                  &restrict_clauses);
+   pathnode->jpath.path.parallel_aware = false;
    pathnode->jpath.path.pathkeys = pathkeys;
    pathnode->jpath.jointype = jointype;
    pathnode->jpath.outerjoinpath = outer_path;
@@ -1743,6 +1765,7 @@ create_hashjoin_path(PlannerInfo *root,
                                  sjinfo,
                                  required_outer,
                                  &restrict_clauses);
+   pathnode->jpath.path.parallel_aware = false;
 
    /*
     * A hashjoin never has pathkeys, since its output ordering is
@@ -1798,7 +1821,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
    switch (path->pathtype)
    {
        case T_SeqScan:
-           return create_seqscan_path(root, rel, required_outer);
+           return create_seqscan_path(root, rel, required_outer, 0);
        case T_SampleScan:
            return (Path *) create_samplescan_path(root, rel, required_outer);
        case T_IndexScan:
index 39d12a62fcd17560fc538f27ca0bd16dc39f7aba..f8f9299b6524442654709562f3332ea995b7b502 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef NODESEQSCAN_H
 #define NODESEQSCAN_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);
@@ -21,4 +22,9 @@ extern TupleTableSlot *ExecSeqScan(SeqScanState *node);
 extern void ExecEndSeqScan(SeqScanState *node);
 extern void ExecReScanSeqScan(SeqScanState *node);
 
+/* parallel scan support */
+extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
+
 #endif   /* NODESEQSCAN_H */
index 58ec889b2f02fefd122633cc6aa587900bd4958d..eb3591a663f5d316242f9fa669c582bf76c1f339 100644 (file)
@@ -1248,11 +1248,15 @@ typedef struct ScanState
    TupleTableSlot *ss_ScanTupleSlot;
 } ScanState;
 
-/*
- * SeqScan uses a bare ScanState as its state node, since it needs
- * no additional fields.
+/* ----------------
+ *  SeqScanState information
+ * ----------------
  */
-typedef ScanState SeqScanState;
+typedef struct SeqScanState
+{
+   ScanState   ss;             /* its first field is NodeTag */
+   Size        pscan_len;      /* size of parallel heap scan descriptor */
+} SeqScanState;
 
 /* ----------------
  *  SampleScanState information
index 6b28c8e28f4ddd846fc59f2af5966c4e0b41277e..292219db51f541de86b352077ccdb75185778ed1 100644 (file)
@@ -108,6 +108,11 @@ typedef struct Plan
    double      plan_rows;      /* number of rows plan is expected to emit */
    int         plan_width;     /* average row width in bytes */
 
+   /*
+    * information needed for parallel query
+    */
+   bool        parallel_aware; /* engage parallel-aware logic? */
+
    /*
     * Common structural data for all Plan types.
     */
index 6cf2e24ce7d30e06cb759d32f64962723ef28dd5..d7406cc614988b46d674de321e9d0f3faad091b9 100644 (file)
@@ -753,6 +753,7 @@ typedef struct Path
 
    RelOptInfo *parent;         /* the relation this path can build */
    ParamPathInfo *param_info;  /* parameterization info, or NULL if none */
+   bool        parallel_aware; /* engage parallel-aware logic? */
 
    /* estimated size/costs for path (see costsize.c for more info) */
    double      rows;           /* estimated number of result tuples */
index 25a730362a845bbe74b6cb1a2f3ae271daf4f2e0..ac21a3a181016163745e4d85292c54d6f8ab7c66 100644 (file)
@@ -72,7 +72,7 @@ extern double clamp_row_est(double nrows);
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
                    double index_pages, PlannerInfo *root);
 extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
-            ParamPathInfo *param_info);
+            ParamPathInfo *param_info, int nworkers);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
                ParamPathInfo *param_info);
 extern void cost_index(IndexPath *path, PlannerInfo *root,
index 7a4940c7d20bf77a7a21f79bdda096c67d1e8340..f28b4e2b06330ac4e07360900e8849dd328f97c6 100644 (file)
@@ -31,7 +31,7 @@ extern bool add_path_precheck(RelOptInfo *parent_rel,
                  List *pathkeys, Relids required_outer);
 
 extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
-                   Relids required_outer);
+                   Relids required_outer, int nworkers);
 extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel,
                       Relids required_outer);
 extern IndexPath *create_index_path(PlannerInfo *root,