pgbench: Synchronize client threads.
authorThomas Munro
Wed, 10 Mar 2021 03:17:34 +0000 (16:17 +1300)
committerThomas Munro
Wed, 10 Mar 2021 04:44:04 +0000 (17:44 +1300)
Wait until all pgbench threads are connected before benchmarking begins.
This fixes a problem where some connections could take a very long time
to be established because of lock contention from earlier connections,
making results unstable and bogus with high connection counts.

Author: Andres Freund 
Author: Fabien COELHO 
Reviewed-by: Marina Polyakova
Reviewed-by: Kyotaro Horiguchi
Reviewed-by: Hayato Kuroda
Reviewed-by: David Rowley
Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de

src/bin/pgbench/pgbench.c

index b556d3f6b49c91c4359e56f891842f5bb4a87271..c0d2a124a92744eb8b2d52185658a6a771595852 100644 (file)
@@ -126,9 +126,16 @@ typedef struct socket_set
 #define THREAD_JOIN(handle) \
    (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
    GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
+#define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
+#define THREAD_BARRIER_INIT(barrier, n) \
+   (InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
+#define THREAD_BARRIER_WAIT(barrier) \
+   EnterSynchronizationBarrier((barrier), \
+                               SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
+#define THREAD_BARRIER_DESTROY(barrier)
 #elif defined(ENABLE_THREAD_SAFETY)
 /* Use POSIX threads */
-#include 
+#include "port/pg_pthread.h"
 #define THREAD_T pthread_t
 #define THREAD_FUNC_RETURN_TYPE void *
 #define THREAD_FUNC_RETURN return NULL
@@ -136,11 +143,20 @@ typedef struct socket_set
    pthread_create((handle), NULL, (function), (arg))
 #define THREAD_JOIN(handle) \
    pthread_join((handle), NULL)
+#define THREAD_BARRIER_T pthread_barrier_t
+#define THREAD_BARRIER_INIT(barrier, n) \
+   pthread_barrier_init((barrier), NULL, (n))
+#define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
+#define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
 #else
 /* No threads implementation, use none (-j 1) */
 #define THREAD_T void *
 #define THREAD_FUNC_RETURN_TYPE void *
 #define THREAD_FUNC_RETURN return NULL
+#define THREAD_BARRIER_T int
+#define THREAD_BARRIER_INIT(barrier, n) (*(barrier) = 0)
+#define THREAD_BARRIER_WAIT(barrier)
+#define THREAD_BARRIER_DESTROY(barrier)
 #endif
 
 
@@ -326,6 +342,9 @@ typedef struct RandomState
 /* Various random sequences are initialized from this one. */
 static RandomState base_random_sequence;
 
+/* Synchronization barrier for start and connection */
+static THREAD_BARRIER_T barrier;
+
 /*
  * Connection state machine states.
  */
@@ -6121,6 +6140,10 @@ main(int argc, char **argv)
    if (duration > 0)
        setalarm(duration);
 
+   errno = THREAD_BARRIER_INIT(&barrier, nthreads);
+   if (errno != 0)
+       pg_log_fatal("could not initialize barrier: %m");
+
 #ifdef ENABLE_THREAD_SAFETY
    /* start all threads but thread 0 which is executed directly later */
    for (i = 1; i < nthreads; i++)
@@ -6191,6 +6214,8 @@ main(int argc, char **argv)
    printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
                 bench_start - start_time, latency_late);
 
+   THREAD_BARRIER_DESTROY(&barrier);
+
    if (exit_code != 0)
        pg_log_fatal("Run was aborted; the above results are incomplete.");
 
@@ -6237,6 +6262,8 @@ threadRun(void *arg)
        state[i].state = CSTATE_CHOOSE_SCRIPT;
 
    /* READY */
+   THREAD_BARRIER_WAIT(&barrier);
+
    thread_start = pg_time_now();
    thread->started_time = thread_start;
    last_report = thread_start;
@@ -6249,7 +6276,18 @@ threadRun(void *arg)
        for (int i = 0; i < nstate; i++)
        {
            if ((state[i].con = doConnect()) == NULL)
+           {
+               /*
+                * On connection failure, we meet the barrier here in place of
+                * GO before proceeding to the "done" path which will cleanup,
+                * so as to avoid locking the process.
+                *
+                * It is unclear whether it is worth doing anything rather than
+                * coldly exiting with an error message.
+                */
+               THREAD_BARRIER_WAIT(&barrier);
                goto done;
+           }
        }
 
        /* compute connection delay */
@@ -6261,6 +6299,8 @@ threadRun(void *arg)
        thread->conn_duration = 0;
    }
 
+   /* GO */
+   THREAD_BARRIER_WAIT(&barrier);
 
    start = pg_time_now();
    thread->bench_start = start;