Provide a DSA area for all parallel queries.
authorRobert Haas
Mon, 19 Dec 2016 21:47:15 +0000 (16:47 -0500)
committerRobert Haas
Mon, 19 Dec 2016 22:11:46 +0000 (17:11 -0500)
This will allow future parallel query code to dynamically allocate
storage shared by all participants.

Thomas Munro, with assorted changes by me.

doc/src/sgml/monitoring.sgml
src/backend/executor/execParallel.c
src/include/executor/execParallel.h
src/include/nodes/execnodes.h
src/include/storage/lwlock.h

index 02bc8feca7740da121b7399e0eaab9286a7931c7..1545f03656c94f76ec2cb5c4b703300f8da88666 100644 (file)
@@ -818,7 +818,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
       
        
-        7">LWLock
+        8">LWLock
         ShmemIndexLock
         Waiting to find or allocate space in shared memory.
        
@@ -1069,6 +1069,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          predicate_lock_manager
          Waiting to add or examine predicate lock information.
         
+        
+         parallel_query_dsa
+         Waiting for parallel query dynamic shared memory allocation lock.
+        
         
          Lock
          relation
index f9c85989d82b285e50d3282fdd676f41161e2ec4..8a6f844e352b52c3b2023b418719c4d0b469f9ff 100644 (file)
@@ -34,6 +34,7 @@
 #include "optimizer/planner.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
+#include "utils/dsa.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 
@@ -47,6 +48,7 @@
 #define PARALLEL_KEY_BUFFER_USAGE      UINT64CONST(0xE000000000000003)
 #define PARALLEL_KEY_TUPLE_QUEUE       UINT64CONST(0xE000000000000004)
 #define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_DSA               UINT64CONST(0xE000000000000006)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE      65536
 
@@ -345,6 +347,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
    int         param_len;
    int         instrumentation_len = 0;
    int         instrument_offset = 0;
+   Size        dsa_minsize = dsa_minimum_size();
 
    /* Allocate object for return value. */
    pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -413,6 +416,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        shm_toc_estimate_keys(&pcxt->estimator, 1);
    }
 
+   /* Estimate space for DSA area. */
+   shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+
    /* Everyone's had a chance to ask for space, so now create the DSM. */
    InitializeParallelDSM(pcxt);
 
@@ -466,6 +473,29 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        pei->instrumentation = instrumentation;
    }
 
+   /*
+    * Create a DSA area that can be used by the leader and all workers.
+    * (However, if we failed to create a DSM and are using private memory
+    * instead, then skip this.)
+    */
+   if (pcxt->seg != NULL)
+   {
+       char       *area_space;
+
+       area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
+       shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
+       pei->area = dsa_create_in_place(area_space, dsa_minsize,
+                                       LWTRANCHE_PARALLEL_QUERY_DSA,
+                                       "parallel_query_dsa",
+                                       pcxt->seg);
+   }
+
+   /*
+    * Make the area available to executor nodes running in the leader.  See
+    * also ParallelQueryMain which makes it available to workers.
+    */
+   estate->es_query_dsa = pei->area;
+
    /*
     * Give parallel-aware nodes a chance to initialize their shared data.
     * This also initializes the elements of instrumentation->ps_instrument,
@@ -571,6 +601,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 void
 ExecParallelCleanup(ParallelExecutorInfo *pei)
 {
+   if (pei->area != NULL)
+   {
+       dsa_detach(pei->area);
+       pei->area = NULL;
+   }
    if (pei->pcxt != NULL)
    {
        DestroyParallelContext(pei->pcxt);
@@ -728,6 +763,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    QueryDesc  *queryDesc;
    SharedExecutorInstrumentation *instrumentation;
    int         instrument_options = 0;
+   void       *area_space;
+   dsa_area   *area;
 
    /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
    receiver = ExecParallelGetReceiver(seg, toc);
@@ -739,10 +776,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    /* Prepare to track buffer usage during query execution. */
    InstrStartParallelQuery();
 
-   /* Start up the executor, have it run the plan, and then shut it down. */
+   /* Attach to the dynamic shared memory area. */
+   area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA);
+   area = dsa_attach_in_place(area_space, seg);
+
+   /* Start up the executor */
    ExecutorStart(queryDesc, 0);
+
+   /* Special executor initialization steps for parallel workers */
+   queryDesc->planstate->state->es_query_dsa = area;
    ExecParallelInitializeWorker(queryDesc->planstate, toc);
+
+   /* Run the plan */
    ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+
+   /* Shut down the executor */
    ExecutorFinish(queryDesc);
 
    /* Report buffer usage during parallel execution. */
@@ -758,6 +806,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
    ExecutorEnd(queryDesc);
 
    /* Cleanup. */
+   dsa_detach(area);
    FreeQueryDesc(queryDesc);
    (*receiver->rDestroy) (receiver);
 }
index f4c6d37a119289ee832d49fd93579fb2e5913bb4..4bbee691a7f33f64c3c8d69fea8d186b1ca2521f 100644 (file)
@@ -17,6 +17,7 @@
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "nodes/plannodes.h"
+#include "utils/dsa.h"
 
 typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
 
@@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo
    BufferUsage *buffer_usage;
    SharedExecutorInstrumentation *instrumentation;
    shm_mq_handle **tqueue;
+   dsa_area   *area;
    bool        finished;
 } ParallelExecutorInfo;
 
index 703604ab9d746f4f1b2010c3187bcc6d052fe5e8..5c3b8683f5b6dd123fec24e042b6e41f3d17fb6c 100644 (file)
@@ -427,6 +427,9 @@ typedef struct EState
    HeapTuple  *es_epqTuple;    /* array of EPQ substitute tuples */
    bool       *es_epqTupleSet; /* true if EPQ tuple is provided */
    bool       *es_epqScanDone; /* true if EPQ tuple has been fetched */
+
+   /* The per-query shared memory area to use for parallel execution. */
+   struct dsa_area   *es_query_dsa;
 } EState;
 
 
index db1c687e21e8398bb201666b833c5eb6991d82e5..3ca4db0a7231e0333a5762a22e1d7916599aebd7 100644 (file)
@@ -210,6 +210,7 @@ typedef enum BuiltinTrancheIds
    LWTRANCHE_BUFFER_MAPPING,
    LWTRANCHE_LOCK_MANAGER,
    LWTRANCHE_PREDICATE_LOCK_MANAGER,
+   LWTRANCHE_PARALLEL_QUERY_DSA,
    LWTRANCHE_FIRST_USER_DEFINED
 }  BuiltinTrancheIds;