Remove thread-emulation support from pgbench.
authorHeikki Linnakangas
Fri, 3 Jul 2015 08:48:54 +0000 (11:48 +0300)
committerHeikki Linnakangas
Fri, 3 Jul 2015 08:51:36 +0000 (11:51 +0300)
You can no longer use pgbench with multiple threads when compiled without
--enable-thread-safety. That's an acceptable limitation these days; it
still works fine with -j1, and all modern platforms support threads anyway.
This makes future maintenance and development of the code easier.

Fabien Coelho

src/bin/pgbench/pgbench.c

index 59e70b6f375bb561966fbf68879f77485d070e6b..95be62cbbbbc85637bcd942dfa8e90df65d301b4 100644 (file)
@@ -70,20 +70,8 @@ static int   pthread_join(pthread_t th, void **thread_return);
 /* Use platform-dependent pthread capability */
 #include 
 #else
-/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-#define PTHREAD_FORK_EMULATION
-#include 
-
-#define pthread_t              pg_pthread_t
-#define pthread_attr_t         pg_pthread_attr_t
-#define pthread_create         pg_pthread_create
-#define pthread_join           pg_pthread_join
-
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int pthread_join(pthread_t th, void **thread_return);
+/* No threads implementation, use none (-j 1) */
+#define pthread_t void *
 #endif
 
 
@@ -210,8 +198,6 @@ typedef struct
    PGconn     *con;            /* connection handle to DB */
    int         id;             /* client No. */
    int         state;          /* state No. */
-   int         cnt;            /* xacts count */
-   int         ecnt;           /* error count */
    int         listen;         /* 0 indicates that an async query has been
                                 * sent */
    int         sleeping;       /* 1 indicates that the client is napping */
@@ -221,15 +207,19 @@ typedef struct
    int64       txn_scheduled;  /* scheduled start time of transaction (usec) */
    instr_time  txn_begin;      /* used for measuring schedule lag times */
    instr_time  stmt_begin;     /* used for measuring statement latencies */
-   int64       txn_latencies;  /* cumulated latencies */
-   int64       txn_sqlats;     /* cumulated square latencies */
    bool        is_throttled;   /* whether transaction throttling is done */
    int         use_file;       /* index in sql_files for this client */
    bool        prepared[MAX_FILES];
+
+   /* per client collected stats */
+   int         cnt;            /* xacts count */
+   int         ecnt;           /* error count */
+   int64       txn_latencies;  /* cumulated latencies */
+   int64       txn_sqlats;     /* cumulated square latencies */
 } CState;
 
 /*
- * Thread state and result
+ * Thread state
  */
 typedef struct
 {
@@ -242,6 +232,9 @@ typedef struct
    int        *exec_count;     /* number of cmd executions (per Command) */
    unsigned short random_state[3];     /* separate randomness for each thread */
    int64       throttle_trigger;       /* previous/next throttling (us) */
+
+   /* per thread collected stats */
+   instr_time  conn_time;
    int64       throttle_lag;   /* total transaction lag behind throttling */
    int64       throttle_lag_max;       /* max transaction lag */
    int64       throttle_latency_skipped;       /* lagging transactions
@@ -251,18 +244,6 @@ typedef struct
 
 #define INVALID_THREAD     ((pthread_t) 0)
 
-typedef struct
-{
-   instr_time  conn_time;
-   int64       xacts;
-   int64       latencies;
-   int64       sqlats;
-   int64       throttle_lag;
-   int64       throttle_lag_max;
-   int64       throttle_latency_skipped;
-   int64       latency_late;
-} TResult;
-
 /*
  * queries read from files
  */
@@ -2926,6 +2907,13 @@ main(int argc, char **argv)
                    fprintf(stderr, "invalid number of threads: %d\n", nthreads);
                    exit(1);
                }
+#ifndef ENABLE_THREAD_SAFETY
+               if (nthreads != 1)
+               {
+                   fprintf(stderr, "threads are not supported on this platform, use -j1\n");
+                   exit(1);
+               }
+#endif   /* !ENABLE_THREAD_SAFETY */
                break;
            case 'C':
                benchmarking_option_set = true;
@@ -3194,22 +3182,6 @@ main(int argc, char **argv)
        exit(1);
    }
 
-   /*
-    * is_latencies only works with multiple threads in thread-based
-    * implementations, not fork-based ones, because it supposes that the
-    * parent can see changes made to the per-thread execution stats by child
-    * threads.  It seems useful enough to accept despite this limitation, but
-    * perhaps we should FIXME someday (by passing the stats data back up
-    * through the parent-to-child pipes).
-    */
-#ifndef ENABLE_THREAD_SAFETY
-   if (is_latencies && nthreads > 1)
-   {
-       fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
-       exit(1);
-   }
-#endif
-
    /*
     * save main process id in the global variable because process id will be
     * changed after fork.
@@ -3414,6 +3386,7 @@ main(int argc, char **argv)
        setalarm(duration);
 
    /* start threads */
+#ifdef ENABLE_THREAD_SAFETY
    for (i = 0; i < nthreads; i++)
    {
        TState     *thread = &threads[i];
@@ -3436,32 +3409,43 @@ main(int argc, char **argv)
            thread->thread = INVALID_THREAD;
        }
    }
+#else
+   INSTR_TIME_SET_CURRENT(threads[0].start_time);
+   threads[0].thread = INVALID_THREAD;
+#endif   /* ENABLE_THREAD_SAFETY */
 
    /* wait for threads and accumulate results */
    INSTR_TIME_SET_ZERO(conn_total_time);
    for (i = 0; i < nthreads; i++)
    {
-       void       *ret = NULL;
+       TState     *thread = &threads[i];
+       int         j;
 
+#ifdef ENABLE_THREAD_SAFETY
        if (threads[i].thread == INVALID_THREAD)
-           ret = threadRun(&threads[i]);
+           /* actually run this thread directly in the main thread */
+           (void) threadRun(thread);
        else
-           pthread_join(threads[i].thread, &ret);
+           /* wait of other threads. should check that 0 is returned? */
+           pthread_join(thread->thread, NULL);
+#else
+       (void) threadRun(thread);
+#endif   /* ENABLE_THREAD_SAFETY */
 
-       if (ret != NULL)
-       {
-           TResult    *r = (TResult *) ret;
+       /* thread level stats */
+       throttle_lag += thread->throttle_lag;
+       throttle_latency_skipped = threads->throttle_latency_skipped;
+       latency_late = thread->latency_late;
+       if (throttle_lag_max > thread->throttle_lag_max)
+           throttle_lag_max = thread->throttle_lag_max;
+       INSTR_TIME_ADD(conn_total_time, thread->conn_time);
 
-           total_xacts += r->xacts;
-           total_latencies += r->latencies;
-           total_sqlats += r->sqlats;
-           throttle_lag += r->throttle_lag;
-           throttle_latency_skipped += r->throttle_latency_skipped;
-           latency_late += r->latency_late;
-           if (r->throttle_lag_max > throttle_lag_max)
-               throttle_lag_max = r->throttle_lag_max;
-           INSTR_TIME_ADD(conn_total_time, r->conn_time);
-           free(ret);
+       /* client-level stats */
+       for (j = 0; j < thread->nstate; j++)
+       {
+           total_xacts += thread->state[j].cnt;
+           total_latencies += thread->state[i].txn_latencies;
+           total_sqlats += thread->state[i].txn_sqlats;
        }
    }
    disconnect_all(state, nclients);
@@ -3491,7 +3475,6 @@ threadRun(void *arg)
 {
    TState     *thread = (TState *) arg;
    CState     *state = thread->state;
-   TResult    *result;
    FILE       *logfile = NULL; /* per-thread log file */
    instr_time  start,
                end;
@@ -3522,9 +3505,7 @@ threadRun(void *arg)
    thread->throttle_lag = 0;
    thread->throttle_lag_max = 0;
 
-   result = pg_malloc(sizeof(TResult));
-
-   INSTR_TIME_SET_ZERO(result->conn_time);
+   INSTR_TIME_SET_ZERO(thread->conn_time);
 
    /* open log file if requested */
    if (use_log)
@@ -3555,8 +3536,8 @@ threadRun(void *arg)
    }
 
    /* time after thread and connections set up */
-   INSTR_TIME_SET_CURRENT(result->conn_time);
-   INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+   INSTR_TIME_SET_CURRENT(thread->conn_time);
+   INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
 
    agg_vals_init(&aggs, thread->start_time);
 
@@ -3568,7 +3549,7 @@ threadRun(void *arg)
        int         prev_ecnt = st->ecnt;
 
        st->use_file = getrand(thread, 0, num_files - 1);
-       if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+       if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
            remains--;          /* I've aborted */
 
        if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3650,11 +3631,7 @@ threadRun(void *arg)
        }
 
        /* also wake up to print the next progress report on time */
-       if (progress && min_usec > 0
-#if !defined(PTHREAD_FORK_EMULATION)
-           && thread->tid == 0
-#endif   /* !PTHREAD_FORK_EMULATION */
-           )
+       if (progress && min_usec > 0)
        {
            /* get current time if needed */
            if (now_usec == 0)
@@ -3710,7 +3687,7 @@ threadRun(void *arg)
            if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
                            || commands[st->state]->type == META_COMMAND))
            {
-               if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+               if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
                    remains--;  /* I've aborted */
            }
 
@@ -3723,76 +3700,6 @@ threadRun(void *arg)
            }
        }
 
-#ifdef PTHREAD_FORK_EMULATION
-       /* each process reports its own progression */
-       if (progress)
-       {
-           instr_time  now_time;
-           int64       now;
-
-           INSTR_TIME_SET_CURRENT(now_time);
-           now = INSTR_TIME_GET_MICROSEC(now_time);
-           if (now >= next_report)
-           {
-               /* generate and show report */
-               int64       count = 0,
-                           lats = 0,
-                           sqlats = 0,
-                           skipped = 0;
-               int64       lags = thread->throttle_lag;
-               int64       run = now - last_report;
-               double      tps,
-                           total_run,
-                           latency,
-                           sqlat,
-                           stdev,
-                           lag;
-
-               for (i = 0; i < nstate; i++)
-               {
-                   count += state[i].cnt;
-                   lats += state[i].txn_latencies;
-                   sqlats += state[i].txn_sqlats;
-               }
-
-               total_run = (now - thread_start) / 1000000.0;
-               tps = 1000000.0 * (count - last_count) / run;
-               latency = 0.001 * (lats - last_lats) / (count - last_count);
-               sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
-               stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
-               lag = 0.001 * (lags - last_lags) / (count - last_count);
-               skipped = thread->throttle_latency_skipped - last_skipped;
-
-               fprintf(stderr,
-                       "progress %d: %.1f s, %.1f tps, "
-                       "lat %.3f ms stddev %.3f",
-                       thread->tid, total_run, tps, latency, stdev);
-               if (throttle_delay)
-               {
-                   fprintf(stderr, ", lag %.3f ms", lag);
-                   if (latency_limit)
-                       fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
-               }
-               fprintf(stderr, "\n");
-
-               last_count = count;
-               last_lats = lats;
-               last_sqlats = sqlats;
-               last_lags = lags;
-               last_report = now;
-               last_skipped = thread->throttle_latency_skipped;
-
-               /*
-                * Ensure that the next report is in the future, in case
-                * pgbench/postgres got stuck somewhere.
-                */
-               do
-               {
-                   next_report += (int64) progress *1000000;
-               } while (now >= next_report);
-           }
-       }
-#else
        /* progress report by thread 0 for all threads */
        if (progress && thread->tid == 0)
        {
@@ -3817,6 +3724,17 @@ threadRun(void *arg)
                            lag,
                            stdev;
 
+               /*
+                * Add up the statistics of all threads.
+                *
+                * XXX: No locking. There is no guarantee that we get an
+                * atomic snapshot of the transaction count and latencies, so
+                * these figures can well be off by a small amount. The
+                * progress is report's purpose is to give a quick overview of
+                * how the test is going, so that shouldn't matter too much.
+                * (If a read from a 64-bit integer is not atomic, you might
+                * get a "torn" read and completely bogus latencies though!)
+                */
                for (i = 0; i < progress_nclients; i++)
                {
                    count += state[i].cnt;
@@ -3864,31 +3782,16 @@ threadRun(void *arg)
                } while (now >= next_report);
            }
        }
-#endif   /* PTHREAD_FORK_EMULATION */
    }
 
 done:
    INSTR_TIME_SET_CURRENT(start);
    disconnect_all(state, nstate);
-   result->xacts = 0;
-   result->latencies = 0;
-   result->sqlats = 0;
-   for (i = 0; i < nstate; i++)
-   {
-       result->xacts += state[i].cnt;
-       result->latencies += state[i].txn_latencies;
-       result->sqlats += state[i].txn_sqlats;
-   }
-   result->throttle_lag = thread->throttle_lag;
-   result->throttle_lag_max = thread->throttle_lag_max;
-   result->throttle_latency_skipped = thread->throttle_latency_skipped;
-   result->latency_late = thread->latency_late;
-
    INSTR_TIME_SET_CURRENT(end);
-   INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+   INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
    if (logfile)
        fclose(logfile);
-   return result;
+   return NULL;
 }
 
 /*
@@ -3910,90 +3813,6 @@ setalarm(int seconds)
    alarm(seconds);
 }
 
-#ifndef ENABLE_THREAD_SAFETY
-
-/*
- * implements pthread using fork.
- */
-
-typedef struct fork_pthread
-{
-   pid_t       pid;
-   int         pipes[2];
-}  fork_pthread;
-
-static int
-pthread_create(pthread_t *thread,
-              pthread_attr_t *attr,
-              void *(*start_routine) (void *),
-              void *arg)
-{
-   fork_pthread *th;
-   void       *ret;
-   int         rc;
-
-   th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
-   if (pipe(th->pipes) < 0)
-   {
-       free(th);
-       return errno;
-   }
-
-   th->pid = fork();
-   if (th->pid == -1)          /* error */
-   {
-       free(th);
-       return errno;
-   }
-   if (th->pid != 0)           /* in parent process */
-   {
-       close(th->pipes[1]);
-       *thread = th;
-       return 0;
-   }
-
-   /* in child process */
-   close(th->pipes[0]);
-
-   /* set alarm again because the child does not inherit timers */
-   if (duration > 0)
-       setalarm(duration);
-
-   ret = start_routine(arg);
-   rc = write(th->pipes[1], ret, sizeof(TResult));
-   (void) rc;
-   close(th->pipes[1]);
-   free(th);
-   exit(0);
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
-   int         status;
-
-   while (waitpid(th->pid, &status, 0) != th->pid)
-   {
-       if (errno != EINTR)
-           return errno;
-   }
-
-   if (thread_return != NULL)
-   {
-       /* assume result is TResult */
-       *thread_return = pg_malloc(sizeof(TResult));
-       if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
-       {
-           free(*thread_return);
-           *thread_return = NULL;
-       }
-   }
-   close(th->pipes[0]);
-
-   free(th);
-   return 0;
-}
-#endif
 #else                          /* WIN32 */
 
 static VOID CALLBACK