Sync 9.5 version of access/transam/parallel.c with HEAD.
authorTom Lane
Tue, 2 Aug 2016 20:09:09 +0000 (16:09 -0400)
committerTom Lane
Tue, 2 Aug 2016 20:09:09 +0000 (16:09 -0400)
This back-patches commit a5fe473ad (notably, marking ParallelMessagePending
as volatile, which is not particularly optional).  I also back-patched some
previous cosmetic changes to remove unnecessary diffs between the two
branches.  I'm unsure how much of this code is actually reachable in 9.5,
but to the extent that it is reachable, it needs to be maintained, and
minimizing cross-branch diffs will make that easier.

src/backend/access/transam/parallel.c
src/include/access/parallel.h

index 1e0eb10c860b1cc10a3b5ba05cfa4e29039ae38a..9324e52fd3c842db040370b4f4607c9690a52c25 100644 (file)
@@ -14,9 +14,9 @@
 
 #include "postgres.h"
 
+#include "access/parallel.h"
 #include "access/xact.h"
 #include "access/xlog.h"
-#include "access/parallel.h"
 #include "commands/async.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -33,6 +33,7 @@
 #include "utils/resowner.h"
 #include "utils/snapmgr.h"
 
+
 /*
  * We don't want to waste a lot of memory on an error queue which, most of
  * the time, will process only a handful of small messages.  However, it is
@@ -90,7 +91,7 @@ typedef struct FixedParallelState
 int            ParallelWorkerNumber = -1;
 
 /* Is there a parallel message pending which we need to receive? */
-bool       ParallelMessagePending = false;
+volatile bool ParallelMessagePending = false;
 
 /* Are we initializing a parallel worker? */
 bool       InitializingParallelWorker = false;
@@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState;
 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
 
 /* Private functions. */
-static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
+static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
 static void ParallelErrorContext(void *arg);
 static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
 static void ParallelWorkerMain(Datum main_arg);
 
+
 /*
  * Establish a new parallel context.  This should be done after entering
  * parallel mode, and (unless there is an error) the context should be
@@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name,
 
 /*
  * Establish the dynamic shared memory segment for a parallel context and
- * copied state and other bookkeeping information that will need by parallel
- * workers into it.
+ * copy state and other bookkeeping information that will be needed by
+ * parallel workers into it.
  */
 void
 InitializeParallelDSM(ParallelContext *pcxt)
@@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
                         PARALLEL_ERROR_QUEUE_SIZE,
                         "parallel error queue size not buffer-aligned");
        shm_toc_estimate_chunk(&pcxt->estimator,
-                              PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+                              mul_size(PARALLEL_ERROR_QUEUE_SIZE,
+                                       pcxt->nworkers));
        shm_toc_estimate_keys(&pcxt->estimator, 1);
 
        /* Estimate how much we'll need for extension entrypoint info. */
@@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
     * parallelism than to fail outright.
     */
    segsize = shm_toc_estimate(&pcxt->estimator);
-   if (pcxt->nworkers != 0)
+   if (pcxt->nworkers > 0)
        pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
    if (pcxt->seg != NULL)
        pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
@@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
         */
        error_queue_space =
            shm_toc_allocate(pcxt->toc,
-                            PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+                            mul_size(PARALLEL_ERROR_QUEUE_SIZE,
+                                     pcxt->nworkers));
        for (i = 0; i < pcxt->nworkers; ++i)
        {
            char       *start;
@@ -603,17 +607,17 @@ ParallelContextActive(void)
 
 /*
  * Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * Note: this is called within a signal handler!  All we can do is set
+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * HandleParallelMessages().
  */
 void
 HandleParallelMessageInterrupt(void)
 {
-   int         save_errno = errno;
-
    InterruptPending = true;
    ParallelMessagePending = true;
    SetLatch(MyLatch);
-
-   errno = save_errno;
 }
 
 /*
@@ -664,11 +668,8 @@ HandleParallelMessages(void)
                }
                else
                    ereport(ERROR,
-                           (errcode(ERRCODE_INTERNAL_ERROR),   /* XXX: wrong errcode? */
-                            errmsg("lost connection to parallel worker")));
-
-               /* This might make the error queue go away. */
-               CHECK_FOR_INTERRUPTS();
+                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                          errmsg("lost connection to parallel worker")));
            }
        }
    }
@@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
                errctx.previous = pcxt->error_context_stack;
                error_context_stack = &errctx;
 
-               /* Parse ErrorReponse or NoticeResponse. */
+               /* Parse ErrorResponse or NoticeResponse. */
                pq_parse_errornotice(msg, &edata);
 
                /* Death of a worker isn't enough justification for suicide. */
@@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 
        default:
            {
-               elog(ERROR, "unknown message type: %c (%d bytes)",
+               elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
                     msgtype, msg->len);
            }
    }
@@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg)
    if (toc == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-              errmsg("invalid magic number in dynamic shared memory segment")));
+          errmsg("invalid magic number in dynamic shared memory segment")));
 
    /* Look up fixed parallel state. */
    fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
index 44f0616cb86547fbe96ca5e164a9fe51ac00f1b0..4ac46fce57ac6df5a309bbd40df18307ea5e3779 100644 (file)
@@ -19,7 +19,6 @@
 #include "postmaster/bgworker.h"
 #include "storage/shm_mq.h"
 #include "storage/shm_toc.h"
-#include "utils/elog.h"
 
 typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
 
@@ -46,7 +45,7 @@ typedef struct ParallelContext
    ParallelWorkerInfo *worker;
 } ParallelContext;
 
-extern bool ParallelMessagePending;
+extern volatile bool ParallelMessagePending;
 extern int ParallelWorkerNumber;
 extern bool InitializingParallelWorker;
 
@@ -54,16 +53,16 @@ extern bool InitializingParallelWorker;
 
 extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
 extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
-extern void InitializeParallelDSM(ParallelContext *);
-extern void LaunchParallelWorkers(ParallelContext *);
-extern void WaitForParallelWorkersToFinish(ParallelContext *);
-extern void DestroyParallelContext(ParallelContext *);
+extern void InitializeParallelDSM(ParallelContext *pcxt);
+extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
+extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
 
 extern void HandleParallelMessageInterrupt(void);
 extern void HandleParallelMessages(void);
 extern void AtEOXact_Parallel(bool isCommit);
 extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
-extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
+extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end);
 
 #endif   /* PARALLEL_H */