Multi-threaded version of pgbench contributed by ITAGAKI Takahiro,
authorTatsuo Ishii
Mon, 3 Aug 2009 15:18:14 +0000 (15:18 +0000)
committerTatsuo Ishii
Mon, 3 Aug 2009 15:18:14 +0000 (15:18 +0000)
reviewed by Greg Smith and Josh Williams.

Following is the proposal from ITAGAKI Takahiro:

Pgbench is a famous tool to measure postgres performance, but nowadays
it does not work well because it cannot use multiple CPUs. On the other
hand, postgres server can use CPUs very well, so the bottle-neck of
workload is *in pgbench*.

Multi-threading would be a solution. The attached patch adds -j
(number of jobs) option to pgbench. If the value N is greater than 1,
pgbench runs with N threads. Connections are equally-divided into
them (ex. -c64 -j4 => 4 threads with 16 connections each). It can
run on POSIX platforms with pthread and on Windows with win32 threads.

Here are results of multi-threaded pgbench runs on Fedora 11 with intel
core i7 (8 logical cores = 4 physical cores * HT). -j8 (8 threads) was
the best and the tps is 4.5 times of -j1, that is a traditional result.

$ pgbench -i -s10
$ pgbench -n -S -c64 -j1   =>  tps = 11600.158593
$ pgbench -n -S -c64 -j2   =>  tps = 17947.100954
$ pgbench -n -S -c64 -j4   =>  tps = 26571.124001
$ pgbench -n -S -c64 -j8   =>  tps = 52725.470403
$ pgbench -n -S -c64 -j16  =>  tps = 38976.675319
$ pgbench -n -S -c64 -j32  =>  tps = 28998.499601
$ pgbench -n -S -c64 -j64  =>  tps = 26701.877815

Is it acceptable to use pthread in contrib module?
If ok, I will add the patch to the next commitfest.

contrib/pgbench/pgbench.c
doc/src/sgml/pgbench.sgml

index 7ede6954aa16393ce7cdea4f5a5f75d1aee30772..0c3704a2ff14dad5749ef2e96b8eafa7aec2c32a 100644 (file)
@@ -4,7 +4,7 @@
  * A simple benchmark program for PostgreSQL
  * Originally written by Tatsuo Ishii and enhanced by many contributors.
  *
- * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.88 2009/07/30 09:28:00 mha Exp $
+ * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.89 2009/08/03 15:18:14 ishii Exp $
  * Copyright (c) 2000-2009, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -35,6 +35,7 @@
 
 #include "libpq-fe.h"
 #include "pqsignal.h"
+#include "portability/instr_time.h"
 
 #include 
 
 #include       /* for getrlimit */
 #endif
 
+#ifndef INT64_MAX
+#define INT64_MAX  INT64CONST(0x7FFFFFFFFFFFFFFF)
+#endif
+
+/*
+ * Multi-platform pthread implementations
+ */
+
+#ifdef WIN32
+/* Use native win32 threads on Windows */
+typedef struct win32_pthread   *pthread_t;
+typedef int                        pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+
+#elif defined(ENABLE_THREAD_SAFETY)
+/* Use platform-dependent pthread */
+#include 
+
+#else
+
+#include 
+/* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */
+#define pthread_t              pg_pthread_t
+#define pthread_attr_t         pg_pthread_attr_t
+#define pthread_create         pg_pthread_create
+#define pthread_join           pg_pthread_join
+typedef struct fork_pthread       *pthread_t;
+typedef int                        pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+
+#endif
+
 extern char *optarg;
 extern int optind;
 
@@ -74,7 +109,6 @@ extern int   optind;
 
 #define DEFAULT_NXACTS 10      /* default nxacts */
 
-int            nclients = 1;       /* default number of simulated clients */
 int            nxacts = 0;         /* number of transactions per client */
 int            duration = 0;       /* duration in seconds */
 
@@ -102,8 +136,6 @@ FILE       *LOGFILE = NULL;
 
 bool       use_log;            /* log transaction latencies to a file */
 
-int            remains;            /* number of remaining clients */
-
 int            is_connect;         /* establish connection  for each transaction */
 
 char      *pghost = "";
@@ -138,14 +170,33 @@ typedef struct
    int         listen;         /* 0 indicates that an async query has been
                                 * sent */
    int         sleeping;       /* 1 indicates that the client is napping */
-   struct timeval until;       /* napping until */
+   int64       until;          /* napping until (usec) */
    Variable   *variables;      /* array of variable definitions */
    int         nvariables;
-   struct timeval txn_begin;   /* used for measuring latencies */
+   instr_time  txn_begin;      /* used for measuring latencies */
    int         use_file;       /* index in sql_files for this client */
    bool        prepared[MAX_FILES];
 } CState;
 
+/*
+ * Thread state and result
+ */
+typedef struct
+{
+   pthread_t       thread;     /* thread handle */
+   CState         *state;      /* array of CState */
+   int             nstate;     /* length of state */
+   instr_time      start_time; /* thread start time */
+} TState;
+
+#define INVALID_THREAD     ((pthread_t) 0)
+
+typedef struct
+{
+   instr_time      conn_time;
+   int             xacts;
+} TResult;
+
 /*
  * queries read from files
  */
@@ -171,8 +222,9 @@ typedef struct
    char       *argv[MAX_ARGS]; /* command list */
 } Command;
 
-Command   **sql_files[MAX_FILES];      /* SQL script files */
-int            num_files;          /* number of script files */
+static Command   **sql_files[MAX_FILES];   /* SQL script files */
+static int         num_files;              /* number of script files */
+static int         debug = 0;              /* debug flag */
 
 /* default scenario */
 static char *tpc_b = {
@@ -215,44 +267,9 @@ static char *select_only = {
    "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
 };
 
-/* Connection overhead time */
-static struct timeval conn_total_time = {0, 0};
-
 /* Function prototypes */
 static void setalarm(int seconds);
-
-
-/* Calculate total time */
-static void
-addTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
-{
-   int         sec = t1->tv_sec + t2->tv_sec;
-   int         usec = t1->tv_usec + t2->tv_usec;
-
-   if (usec >= 1000000)
-   {
-       usec -= 1000000;
-       sec++;
-   }
-   result->tv_sec = sec;
-   result->tv_usec = usec;
-}
-
-/* Calculate time difference */
-static void
-diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
-{
-   int         sec = t1->tv_sec - t2->tv_sec;
-   int         usec = t1->tv_usec - t2->tv_usec;
-
-   if (usec < 0)
-   {
-       usec += 1000000;
-       sec--;
-   }
-   result->tv_sec = sec;
-   result->tv_usec = usec;
-}
+static void* threadRun(void *arg);
 
 static void
 usage(const char *progname)
@@ -270,6 +287,7 @@ usage(const char *progname)
           "  -D VARNAME=VALUE\n"
           "               define variable for use by custom script\n"
           "  -f FILENAME  read transaction script from FILENAME\n"
+          "  -j NUM       number of threads (default: 1)\n"
           "  -l           write transaction times to log file\n"
           "  -M {simple|extended|prepared}\n"
           "               protocol for submitting queries to server (default: simple)\n"
@@ -379,29 +397,6 @@ discard_response(CState *state)
    } while (res);
 }
 
-/* check to see if the SQL result was good */
-static int
-check(CState *state, PGresult *res, int n)
-{
-   CState     *st = &state[n];
-
-   switch (PQresultStatus(res))
-   {
-       case PGRES_COMMAND_OK:
-       case PGRES_TUPLES_OK:
-           /* OK */
-           break;
-       default:
-           fprintf(stderr, "Client %d aborted in state %d: %s",
-                   n, st->state, PQerrorMessage(st->con));
-           remains--;          /* I've aborted */
-           PQfinish(st->con);
-           st->con = NULL;
-           return (-1);
-   }
-   return (0);                 /* OK */
-}
-
 static int
 compareVariables(const void *v1, const void *v2)
 {
@@ -598,11 +593,24 @@ preparedStatementName(char *buffer, int file, int state)
    sprintf(buffer, "P%d_%d", file, state);
 }
 
-static void
-doCustom(CState *state, int n, int debug)
+static bool
+clientDone(CState *st, bool ok)
+{
+   (void) ok;  /* unused */
+
+   if (st->con != NULL)
+   {
+       PQfinish(st->con);
+       st->con = NULL;
+   }
+   return false;   /* always false */
+}
+
+/* return false iff client should be disconnected */
+static bool
+doCustom(CState *st, instr_time *conn_time)
 {
    PGresult   *res;
-   CState     *st = &state[n];
    Command   **commands;
 
 top:
@@ -610,16 +618,13 @@ top:
 
    if (st->sleeping)
    {                           /* are we sleeping? */
-       int         usec;
-       struct timeval now;
+       instr_time  now;
 
-       gettimeofday(&now, NULL);
-       usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
-           st->until.tv_usec - now.tv_usec;
-       if (usec <= 0)
+       INSTR_TIME_SET_CURRENT(now);
+       if (st->until <= INSTR_TIME_GET_MICROSEC(now))
            st->sleeping = 0;   /* Done sleeping, go ahead with next command */
        else
-           return;             /* Still sleeping, nothing to do here */
+           return true;        /* Still sleeping, nothing to do here */
    }
 
    if (st->listen)
@@ -627,17 +632,14 @@ top:
        if (commands[st->state]->type == SQL_COMMAND)
        {
            if (debug)
-               fprintf(stderr, "client %d receiving\n", n);
+               fprintf(stderr, "client %d receiving\n", st->id);
            if (!PQconsumeInput(st->con))
            {                   /* there's something wrong */
-               fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
-               remains--;      /* I've aborted */
-               PQfinish(st->con);
-               st->con = NULL;
-               return;
+               fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
+               return clientDone(st, false);
            }
            if (PQisBusy(st->con))
-               return;         /* don't have the whole result yet */
+               return true;    /* don't have the whole result yet */
        }
 
        /*
@@ -645,25 +647,35 @@ top:
         */
        if (use_log && commands[st->state + 1] == NULL)
        {
-           double      diff;
-           struct timeval now;
-
-           gettimeofday(&now, NULL);
-           diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
-               (int) (now.tv_usec - st->txn_begin.tv_usec);
-
-           fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
-                   st->id, st->cnt, diff, st->use_file,
-                   (long) now.tv_sec, (long) now.tv_usec);
+           instr_time  diff;
+           double      sec;
+           double      msec;
+           double      usec;
+
+           INSTR_TIME_SET_CURRENT(diff);
+           INSTR_TIME_SUBTRACT(diff, st->txn_begin);
+           sec = INSTR_TIME_GET_DOUBLE(diff);
+           msec = INSTR_TIME_GET_MILLISEC(diff);
+           usec = (double) INSTR_TIME_GET_MICROSEC(diff);
+
+           fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n",
+                   st->id, st->cnt, usec, st->use_file,
+                   sec, usec - sec * 1000.0);
        }
 
        if (commands[st->state]->type == SQL_COMMAND)
        {
            res = PQgetResult(st->con);
-           if (check(state, res, n))
+           switch (PQresultStatus(res))
            {
-               PQclear(res);
-               return;
+               case PGRES_COMMAND_OK:
+               case PGRES_TUPLES_OK:
+                   break;  /* OK */
+               default:
+                   fprintf(stderr, "Client %d aborted in state %d: %s",
+                       st->id, st->state, PQerrorMessage(st->con));
+                   PQclear(res);
+                   return clientDone(st, false);
            }
            PQclear(res);
            discard_response(st);
@@ -679,15 +691,7 @@ top:
 
            ++st->cnt;
            if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
-           {
-               remains--;      /* I've done */
-               if (st->con != NULL)
-               {
-                   PQfinish(st->con);
-                   st->con = NULL;
-               }
-               return;
-           }
+               return clientDone(st, true);    /* exit success */
        }
 
        /* increment state counter */
@@ -702,27 +706,20 @@ top:
 
    if (st->con == NULL)
    {
-       struct timeval t1,
-                   t2,
-                   t3;
+       instr_time  start, end;
 
-       gettimeofday(&t1, NULL);
+       INSTR_TIME_SET_CURRENT(start);
        if ((st->con = doConnect()) == NULL)
        {
-           fprintf(stderr, "Client %d aborted in establishing connection.\n",
-                   n);
-           remains--;          /* I've aborted */
-           PQfinish(st->con);
-           st->con = NULL;
-           return;
+           fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
+           return clientDone(st, false);
        }
-       gettimeofday(&t2, NULL);
-       diffTime(&t2, &t1, &t3);
-       addTime(&conn_total_time, &t3, &conn_total_time);
+       INSTR_TIME_SET_CURRENT(end);
+       INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
    }
 
    if (use_log && st->state == 0)
-       gettimeofday(&(st->txn_begin), NULL);
+       INSTR_TIME_SET_CURRENT(st->txn_begin);
 
    if (commands[st->state]->type == SQL_COMMAND)
    {
@@ -738,11 +735,11 @@ top:
            {
                fprintf(stderr, "out of memory\n");
                st->ecnt++;
-               return;
+               return true;
            }
 
            if (debug)
-               fprintf(stderr, "client %d sending %s\n", n, sql);
+               fprintf(stderr, "client %d sending %s\n", st->id, sql);
            r = PQsendQuery(st->con, sql);
            free(sql);
        }
@@ -754,7 +751,7 @@ top:
            getQueryParams(st, command, params);
 
            if (debug)
-               fprintf(stderr, "client %d sending %s\n", n, sql);
+               fprintf(stderr, "client %d sending %s\n", st->id, sql);
            r = PQsendQueryParams(st->con, sql, command->argc - 1,
                                  NULL, params, NULL, NULL, 0);
        }
@@ -788,7 +785,7 @@ top:
            preparedStatementName(name, st->use_file, st->state);
 
            if (debug)
-               fprintf(stderr, "client %d sending %s\n", n, name);
+               fprintf(stderr, "client %d sending %s\n", st->id, name);
            r = PQsendQueryPrepared(st->con, name, command->argc - 1,
                                    params, NULL, NULL, 0);
        }
@@ -798,7 +795,7 @@ top:
        if (r == 0)
        {
            if (debug)
-               fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
+               fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
            st->ecnt++;
        }
        else
@@ -812,7 +809,7 @@ top:
 
        if (debug)
        {
-           fprintf(stderr, "client %d executing \\%s", n, argv[0]);
+           fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
            for (i = 1; i < argc; i++)
                fprintf(stderr, " %s", argv[i]);
            fprintf(stderr, "\n");
@@ -831,7 +828,7 @@ top:
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
                    st->ecnt++;
-                   return;
+                   return true;
                }
                min = atoi(var);
            }
@@ -853,7 +850,7 @@ top:
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
                    st->ecnt++;
-                   return;
+                   return true;
                }
                max = atoi(var);
            }
@@ -864,7 +861,7 @@ top:
            {
                fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
                st->ecnt++;
-               return;
+               return true;
            }
 
 #ifdef DEBUG
@@ -876,7 +873,7 @@ top:
            {
                fprintf(stderr, "%s: out of memory\n", argv[0]);
                st->ecnt++;
-               return;
+               return true;
            }
 
            st->listen = 1;
@@ -894,7 +891,7 @@ top:
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
                    st->ecnt++;
-                   return;
+                   return true;
                }
                ope1 = atoi(var);
            }
@@ -911,7 +908,7 @@ top:
                    {
                        fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
                        st->ecnt++;
-                       return;
+                       return true;
                    }
                    ope2 = atoi(var);
                }
@@ -930,7 +927,7 @@ top:
                    {
                        fprintf(stderr, "%s: division by zero\n", argv[0]);
                        st->ecnt++;
-                       return;
+                       return true;
                    }
                    snprintf(res, sizeof(res), "%d", ope1 / ope2);
                }
@@ -938,7 +935,7 @@ top:
                {
                    fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
                    st->ecnt++;
-                   return;
+                   return true;
                }
            }
 
@@ -946,7 +943,7 @@ top:
            {
                fprintf(stderr, "%s: out of memory\n", argv[0]);
                st->ecnt++;
-               return;
+               return true;
            }
 
            st->listen = 1;
@@ -955,7 +952,7 @@ top:
        {
            char       *var;
            int         usec;
-           struct timeval now;
+           instr_time now;
 
            if (*argv[1] == ':')
            {
@@ -963,7 +960,7 @@ top:
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
                    st->ecnt++;
-                   return;
+                   return true;
                }
                usec = atoi(var);
            }
@@ -980,9 +977,8 @@ top:
            else
                usec *= 1000000;
 
-           gettimeofday(&now, NULL);
-           st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000;
-           st->until.tv_usec = (now.tv_usec + usec) % 1000000;
+           INSTR_TIME_SET_CURRENT(now);
+           st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
            st->sleeping = 1;
 
            st->listen = 1;
@@ -990,18 +986,23 @@ top:
 
        goto top;
    }
+
+   return true;
 }
 
 /* discard connections */
 static void
-disconnect_all(CState *state)
+disconnect_all(CState *state, int length)
 {
    int         i;
 
-   for (i = 0; i < nclients; i++)
+   for (i = 0; i < length; i++)
    {
        if (state[i].con)
+       {
            PQfinish(state[i].con);
+           state[i].con = NULL;
+       }
    }
 }
 
@@ -1267,6 +1268,24 @@ process_commands(char *buf)
                return NULL;
            }
 
+           /*
+            * Split argument into number and unit for "sleep 1ms" or so.
+            * We don't have to terminate the number argument with null
+            * because it will parsed with atoi, that ignores trailing
+            * non-digit characters.
+            */
+           if (my_commands->argv[1][0] != ':')
+           {
+               char    *c = my_commands->argv[1];
+               while (isdigit(*c)) { c++; }
+               if (*c)
+               {
+                   my_commands->argv[2] = c;
+                   if (my_commands->argc < 3)
+                       my_commands->argc = 3;
+               }
+           }
+
            if (my_commands->argc >= 3)
            {
                if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
@@ -1453,25 +1472,18 @@ process_builtin(char *tb)
 
 /* print out results */
 static void
-printResults(
-            int ttype, CState *state,
-            struct timeval * start_time, struct timeval * end_time)
+printResults(int ttype, int normal_xacts, int nclients, int nthreads,
+            instr_time total_time, instr_time conn_total_time)
 {
-   double      t1,
-               t2;
-   int         i;
-   int         normal_xacts = 0;
+   double      time_include,
+               tps_include,
+               tps_exclude;
    char       *s;
 
-   for (i = 0; i < nclients; i++)
-       normal_xacts += state[i].cnt;
-
-   t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec);
-   t1 = normal_xacts * 1000000.0 / t1;
-
-   t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
-       (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
-   t2 = normal_xacts * 1000000.0 / t2;
+   time_include = INSTR_TIME_GET_DOUBLE(total_time);
+   tps_include = normal_xacts / time_include;
+   tps_exclude = normal_xacts / (time_include -
+       (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
 
    if (ttype == 0)
        s = "TPC-B (sort of)";
@@ -1486,6 +1498,7 @@ printResults(
    printf("scaling factor: %d\n", scale);
    printf("query mode: %s\n", QUERYMODE[querymode]);
    printf("number of clients: %d\n", nclients);
+   printf("number of threads: %d\n", nthreads);
    if (duration <= 0)
    {
        printf("number of transactions per client: %d\n", nxacts);
@@ -1498,8 +1511,8 @@ printResults(
        printf("number of transactions actually processed: %d\n",
               normal_xacts);
    }
-   printf("tps = %f (including connections establishing)\n", t1);
-   printf("tps = %f (excluding connections establishing)\n", t2);
+   printf("tps = %f (including connections establishing)\n", tps_include);
+   printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 }
 
 
@@ -1507,29 +1520,26 @@ int
 main(int argc, char **argv)
 {
    int         c;
+   int         nclients = 1;       /* default number of simulated clients */
+   int         nthreads = 1;       /* default number of threads */
    int         is_init_mode = 0;       /* initialize mode? */
    int         is_no_vacuum = 0;       /* no vacuum at all before testing? */
    int         do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
-   int         debug = 0;      /* debug flag */
    int         ttype = 0;      /* transaction type. 0: TPC-B, 1: SELECT only,
                                 * 2: skip update of branches and tellers */
    char       *filename = NULL;
    bool        scale_given = false;
 
    CState     *state;          /* status of clients */
+   TState     *threads;        /* array of thread */
 
-   struct timeval start_time;  /* start up time */
-   struct timeval end_time;    /* end time */
+   instr_time  start_time;     /* start up time */
+   instr_time  total_time;
+   instr_time  conn_total_time;
+   int         total_xacts;
 
    int         i;
 
-   fd_set      input_mask;
-   int         nsocks;         /* return from select(2) */
-   int         maxsock;        /* max socket number to be waited */
-   struct timeval now;
-   struct timeval timeout;
-   int         min_usec;
-
 #ifdef HAVE_GETRLIMIT
    struct rlimit rlim;
 #endif
@@ -1579,7 +1589,7 @@ main(int argc, char **argv)
 
    memset(state, 0, sizeof(*state));
 
-   while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
+   while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
    {
        switch (c)
        {
@@ -1632,6 +1642,14 @@ main(int argc, char **argv)
                }
 #endif   /* HAVE_GETRLIMIT */
                break;
+           case 'j':   /* jobs */
+               nthreads = atoi(optarg);
+               if (nthreads <= 0)
+               {
+                   fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+                   exit(1);
+               }
+               break;
            case 'C':
                is_connect = 1;
                break;
@@ -1752,7 +1770,11 @@ main(int argc, char **argv)
    if (nxacts <= 0 && duration <= 0)
        nxacts = DEFAULT_NXACTS;
 
-   remains = nclients;
+   if (nclients % nthreads != 0)
+   {
+       fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads);
+       exit(1);
+   }
 
    if (nclients > 1)
    {
@@ -1770,6 +1792,7 @@ main(int argc, char **argv)
        {
            int         j;
 
+           state[i].id = i;
            for (j = 0; j < state[0].nvariables; j++)
            {
                if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
@@ -1879,33 +1902,8 @@ main(int argc, char **argv)
    PQfinish(con);
 
    /* set random seed */
-   gettimeofday(&start_time, NULL);
-   srandom((unsigned int) start_time.tv_usec);
-
-   /* get start up time */
-   gettimeofday(&start_time, NULL);
-
-   /* set alarm if duration is specified. */
-   if (duration > 0)
-       setalarm(duration);
-
-   if (is_connect == 0)
-   {
-       struct timeval t,
-                   now;
-
-       /* make connections to the database */
-       for (i = 0; i < nclients; i++)
-       {
-           state[i].id = i;
-           if ((state[i].con = doConnect()) == NULL)
-               exit(1);
-       }
-       /* time after connections set up */
-       gettimeofday(&now, NULL);
-       diffTime(&now, &start_time, &t);
-       addTime(&conn_total_time, &t, &conn_total_time);
-   }
+   INSTR_TIME_SET_CURRENT(start_time);
+   srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
 
    /* process bultin SQL scripts */
    switch (ttype)
@@ -1929,140 +1927,227 @@ main(int argc, char **argv)
            break;
    }
 
+   /* get start up time */
+   INSTR_TIME_SET_CURRENT(start_time);
+
+   /* set alarm if duration is specified. */
+   if (duration > 0)
+       setalarm(duration);
+
+   /* start threads */
+   threads = (TState *) malloc(sizeof(TState) * nthreads);
+   for (i = 0; i < nthreads; i++)
+   {
+       threads[i].state = &state[nclients / nthreads * i];
+       threads[i].nstate = nclients / nthreads;
+       INSTR_TIME_SET_CURRENT(threads[i].start_time);
+
+       /* the first thread (i = 0) is executed by main thread */
+       if (i > 0)
+       {
+           int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+           if (err != 0 || threads[i].thread == INVALID_THREAD)
+           {
+               fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+               exit(1);
+           }
+       }
+       else
+       {
+           threads[i].thread = INVALID_THREAD;
+       }
+   }
+
+   /* wait for threads and accumulate results */
+   total_xacts = 0;
+   INSTR_TIME_SET_ZERO(conn_total_time);
+   for (i = 0; i < nthreads; i++)
+   {
+       void *ret = NULL;
+
+       if (threads[i].thread == INVALID_THREAD)
+           ret = threadRun(&threads[i]);
+       else
+           pthread_join(threads[i].thread, &ret);
+
+       if (ret != NULL)
+       {
+           TResult *r = (TResult *) ret;
+           total_xacts += r->xacts;
+           INSTR_TIME_ADD(conn_total_time, r->conn_time);
+           free(ret);
+       }
+   }
+   disconnect_all(state, nclients);
+
+   /* get end time */
+   INSTR_TIME_SET_CURRENT(total_time);
+   INSTR_TIME_SUBTRACT(total_time, start_time);
+   printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
+   if (LOGFILE)
+       fclose(LOGFILE);
+
+   return 0;
+}
+
+static void *
+threadRun(void *arg)
+{
+   TState     *thread = (TState *) arg;
+   CState     *state = thread->state;
+   TResult    *result;
+   instr_time  start, end;
+   int         nstate = thread->nstate;
+   int         remains = nstate;   /* number of remaining clients */
+   int         i;
+
+   result = malloc(sizeof(TResult));
+   INSTR_TIME_SET_ZERO(result->conn_time);
+
+   if (is_connect == 0)
+   {
+       /* make connections to the database */
+       for (i = 0; i < nstate; i++)
+       {
+           if ((state[i].con = doConnect()) == NULL)
+               goto done;
+       }
+   }
+
+   /* time after thread and connections set up */
+   INSTR_TIME_SET_CURRENT(result->conn_time);
+   INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+
    /* send start up queries in async manner */
-   for (i = 0; i < nclients; i++)
+   for (i = 0; i < nstate; i++)
    {
-       Command   **commands = sql_files[state[i].use_file];
-       int         prev_ecnt = state[i].ecnt;
+       CState     *st = &state[i];
+       Command   **commands = sql_files[st->use_file];
+       int         prev_ecnt = st->ecnt;
 
-       state[i].use_file = getrand(0, num_files - 1);
-       doCustom(state, i, debug);
+       st->use_file = getrand(0, num_files - 1);
+       if (!doCustom(st, &result->conn_time))
+           remains--;      /* I've aborted */
 
-       if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+       if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
        {
-           fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
+           fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
            remains--;          /* I've aborted */
-           PQfinish(state[i].con);
-           state[i].con = NULL;
+           PQfinish(st->con);
+           st->con = NULL;
        }
    }
 
-   for (;;)
+   while (remains > 0)
    {
-       if (remains <= 0)
-       {                       /* all done ? */
-           disconnect_all(state);
-           /* get end time */
-           gettimeofday(&end_time, NULL);
-           printResults(ttype, state, &start_time, &end_time);
-           if (LOGFILE)
-               fclose(LOGFILE);
-           exit(0);
-       }
+       fd_set          input_mask;
+       int             maxsock;        /* max socket number to be waited */
+       int64           now_usec = 0;
+       int64           min_usec;
 
        FD_ZERO(&input_mask);
 
        maxsock = -1;
-       min_usec = -1;
-       for (i = 0; i < nclients; i++)
+       min_usec = INT64_MAX;
+       for (i = 0; i < nstate; i++)
        {
-           Command   **commands = sql_files[state[i].use_file];
+           CState     *st = &state[i];
+           Command   **commands = sql_files[st->use_file];
+           int         sock;
 
-           if (state[i].sleeping)
+           if (st->sleeping)
            {
                int         this_usec;
-               int         sock = PQsocket(state[i].con);
 
-               if (min_usec < 0)
+               if (min_usec == INT64_MAX)
                {
-                   gettimeofday(&now, NULL);
-                   min_usec = 0;
+                   instr_time  now;
+                   INSTR_TIME_SET_CURRENT(now);
+                   now_usec = INSTR_TIME_GET_MICROSEC(now);
                }
 
-               this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
-                   state[i].until.tv_usec - now.tv_usec;
-
-               if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
+               this_usec = st->until - now_usec;
+               if (min_usec > this_usec)
                    min_usec = this_usec;
-
-               FD_SET      (sock, &input_mask);
-
-               if (maxsock < sock)
-                   maxsock = sock;
            }
-           else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
+           else if (st->con == NULL)
            {
-               int         sock = PQsocket(state[i].con);
-
-               if (sock < 0)
-               {
-                   disconnect_all(state);
-                   exit(1);
-               }
-               FD_SET      (sock, &input_mask);
+               continue;
+           }
+           else if (commands[st->state]->type == META_COMMAND)
+           {
+               min_usec = 0;   /* the connection is ready to run */
+               break;
+           }
 
-               if (maxsock < sock)
-                   maxsock = sock;
+           sock = PQsocket(st->con);
+           if (sock < 0)
+           {
+               fprintf(stderr, "bad socket: %s\n", strerror(errno));
+               goto done;
            }
+
+           FD_SET(sock, &input_mask);
+           if (maxsock < sock)
+               maxsock = sock;
        }
 
-       if (maxsock != -1)
+       if (min_usec > 0 && maxsock != -1)
        {
-           if (min_usec >= 0)
+           int     nsocks;         /* return from select(2) */
+
+           if (min_usec != INT64_MAX)
            {
+               struct timeval  timeout;
                timeout.tv_sec = min_usec / 1000000;
                timeout.tv_usec = min_usec % 1000000;
-
-               nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
-                               (fd_set *) NULL, &timeout);
+               nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
            }
            else
-               nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
-                               (fd_set *) NULL, (struct timeval *) NULL);
+               nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
            if (nsocks < 0)
            {
                if (errno == EINTR)
                    continue;
                /* must be something wrong */
-               disconnect_all(state);
                fprintf(stderr, "select failed: %s\n", strerror(errno));
-               exit(1);
-           }
-#ifdef NOT_USED
-           else if (nsocks == 0)
-           {                   /* timeout */
-               fprintf(stderr, "select timeout\n");
-               for (i = 0; i < nclients; i++)
-               {
-                   fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
-                           i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
-               }
-               exit(0);
+               goto done;
            }
-#endif
        }
 
        /* ok, backend returns reply */
-       for (i = 0; i < nclients; i++)
+       for (i = 0; i < nstate; i++)
        {
-           Command   **commands = sql_files[state[i].use_file];
-           int         prev_ecnt = state[i].ecnt;
+           CState     *st = &state[i];
+           Command   **commands = sql_files[st->use_file];
+           int         prev_ecnt = st->ecnt;
 
-           if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
-                         || commands[state[i].state]->type == META_COMMAND))
+           if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
+                         || commands[st->state]->type == META_COMMAND))
            {
-               doCustom(state, i, debug);
+               if (!doCustom(st, &result->conn_time))
+                   remains--;      /* I've aborted */
            }
 
-           if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+           if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
            {
-               fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
+               fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
                remains--;      /* I've aborted */
-               PQfinish(state[i].con);
-               state[i].con = NULL;
+               PQfinish(st->con);
+               st->con = NULL;
            }
        }
    }
+
+done:
+   INSTR_TIME_SET_CURRENT(start);
+   disconnect_all(state, nstate);
+   result->xacts = 0;
+   for (i = 0; i < nstate; i++)
+       result->xacts += state[i].cnt;
+   INSTR_TIME_SET_CURRENT(end);
+   INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+   return result;
 }
 
 
@@ -2084,6 +2169,87 @@ setalarm(int seconds)
    pqsignal(SIGALRM, handle_sig_alarm);
    alarm(seconds);
 }
+
+#ifndef ENABLE_THREAD_SAFETY
+
+/*
+ * implements pthread using fork.
+ */
+
+typedef struct fork_pthread
+{
+   pid_t   pid;
+   int     pipes[2];
+} fork_pthread;
+
+static int
+pthread_create(pthread_t *thread,
+              pthread_attr_t *attr,
+              void * (*start_routine)(void *),
+              void *arg)
+{
+   fork_pthread   *th;
+   void           *ret;
+
+   th = (fork_pthread *) malloc(sizeof(fork_pthread));
+   pipe(th->pipes);
+
+   th->pid = fork();
+   if (th->pid == -1)  /* error */
+   {
+       free(th);
+       return errno;
+   }
+   if (th->pid != 0)   /* parent process */
+   {
+       close(th->pipes[1]);
+       *thread = th;
+       return 0;
+   }
+
+   /* child process */
+   close(th->pipes[0]);
+
+   /* set alarm again because the child does not inherit timers */
+   if (duration > 0)
+       setalarm(duration);
+
+   ret = start_routine(arg);
+   write(th->pipes[1], ret, sizeof(TResult));
+   close(th->pipes[1]);
+   free(th);
+   exit(0);
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+   int     status;
+
+   while (waitpid(th->pid, &status, 0) != th->pid)
+   {
+       if (errno != EINTR)
+           return errno;
+   }
+
+   if (thread_return != NULL)
+   {
+       /* assume result is TResult */
+       *thread_return = malloc(sizeof(TResult));
+       if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
+       {
+           free(*thread_return);
+           *thread_return = NULL;
+       }
+   }
+   close(th->pipes[0]);
+
+   free(th);
+   return 0;
+}
+
+#endif
+
 #else                          /* WIN32 */
 
 static VOID CALLBACK
@@ -2110,4 +2276,70 @@ setalarm(int seconds)
    }
 }
 
+/* partial pthread implementation for Windows */
+
+typedef struct win32_pthread
+{
+   HANDLE      handle;
+   void       *(*routine)(void *);
+   void       *arg;
+   void       *result;
+} win32_pthread;
+
+static unsigned __stdcall
+win32_pthread_run(void *arg)
+{
+   win32_pthread *th = (win32_pthread *) arg;
+
+   th->result = th->routine(th->arg);
+
+   return 0;
+}
+
+static int
+pthread_create(pthread_t *thread,
+              pthread_attr_t *attr,
+              void * (*start_routine)(void *),
+              void *arg)
+{
+   int             save_errno;
+   win32_pthread   *th;
+
+   th = (win32_pthread *) malloc(sizeof(win32_pthread));
+   th->routine = start_routine;
+   th->arg = arg;
+   th->result = NULL;
+
+   th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+   if (th->handle == NULL)
+   {
+       save_errno = errno;
+       free(th);
+       return save_errno;
+   }
+
+   *thread = th;
+   return 0;
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+   if (th == NULL || th->handle == NULL)
+       return errno = EINVAL;
+
+   if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+   {
+       _dosmaperr(GetLastError());
+       return errno;
+   }
+
+   if (thread_return)
+       *thread_return = th->result;
+
+   CloseHandle(th->handle);
+   free(th);
+   return 0;
+}
+
 #endif   /* WIN32 */
index 5c30e8499f5f80277eb095356ad23a98e7fd0368..c34f7acbbb9c644a32cb1011b11b5478e8da21f0 100644 (file)
@@ -1,4 +1,4 @@
-
+
 
 
  pgbench
@@ -171,6 +171,14 @@ pgbench  options  dbname
        sessions.  Default is 1.
       
      
+     
+      -j threads
+      
+       Number of worker threads. Clients are equally-divided into those
+       threads and executed in it. The number of clients must be a multiple
+       number of threads. Default is 1.
+      
+     
      
       -t transactions