Add --rate option.
authorTatsuo Ishii <ishii@postgresql.org>
Mon, 22 Jul 2013 23:40:22 +0000 (08:40 +0900)
committerTatsuo Ishii <ishii@postgresql.org>
Mon, 22 Jul 2013 23:40:22 +0000 (08:40 +0900)
This controls the target transaction rate to certain tps, rather than
maximum. Patch contributed by Fabien COELHO, reviewed by Greg Smith,
and slight editing by me.

contrib/pgbench/pgbench.c
doc/src/sgml/pgbench.sgml

index 2ad8f0bb5b48411e2f8c5b5be76708f4a98768ac..ad8e272c9109c755a67f32f6318b745a17fe4d10 100644 (file)
@@ -136,6 +136,12 @@ int            unlogged_tables = 0;
  */
 double     sample_rate = 0.0;
 
+/*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64      throttle_delay = 0;
+
 /*
  * tablespace selection
  */
@@ -202,11 +208,13 @@ typedef struct
    int         listen;         /* 0 indicates that an async query has been
                                 * sent */
    int         sleeping;       /* 1 indicates that the client is napping */
+   bool        throttling;     /* whether nap is for throttling */
    int64       until;          /* napping until (usec) */
    Variable   *variables;      /* array of variable definitions */
    int         nvariables;
    instr_time  txn_begin;      /* used for measuring transaction latencies */
    instr_time  stmt_begin;     /* used for measuring statement latencies */
+   bool        is_throttled;   /* whether transaction throttling is done */
    int         use_file;       /* index in sql_files for this client */
    bool        prepared[MAX_FILES];
 } CState;
@@ -224,6 +232,9 @@ typedef struct
    instr_time *exec_elapsed;   /* time spent executing cmds (per Command) */
    int        *exec_count;     /* number of cmd executions (per Command) */
    unsigned short random_state[3];     /* separate randomness for each thread */
+   int64       throttle_trigger;   /* previous/next throttling (us) */
+   int64       throttle_lag;       /* total transaction lag behind throttling */
+   int64       throttle_lag_max;   /* max transaction lag */
 } TState;
 
 #define INVALID_THREAD     ((pthread_t) 0)
@@ -232,6 +243,8 @@ typedef struct
 {
    instr_time  conn_time;
    int         xacts;
+   int64       throttle_lag;
+   int64       throttle_lag_max;
 } TResult;
 
 /*
@@ -356,6 +369,7 @@ usage(void)
           "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
           "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
           "  -r, --report-latencies   report average latency per command\n"
+          "  -R, --rate=SPEC          target rate in transactions per second\n"
           "  -s, --scale=NUM          report this scale factor in output\n"
           "  -S, --select-only        perform SELECT-only transactions\n"
           "  -t, --transactions       number of transactions each client runs "
@@ -898,17 +912,62 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 {
    PGresult   *res;
    Command   **commands;
+   bool        trans_needs_throttle = false;
 
 top:
    commands = sql_files[st->use_file];
 
+   /*
+    * Handle throttling once per transaction by sleeping.  It is simpler
+    * to do this here rather than at the end, because so much complicated
+    * logic happens below when statements finish.
+    */
+   if (throttle_delay && ! st->is_throttled)
+   {
+       /*
+        * Use inverse transform sampling to randomly generate a delay, such
+        * that the series of delays will approximate a Poisson distribution
+        * centered on the throttle_delay time.
+                 *
+                 * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
+        *
+        * If transactions are too slow or a given wait is shorter than
+        * a transaction, the next transaction will start right away.
+        */
+       int64 wait = (int64)
+           throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+       thread->throttle_trigger += wait;
+
+       st->until = thread->throttle_trigger;
+       st->sleeping = 1;
+       st->throttling = true;
+       st->is_throttled = true;
+       if (debug)
+           fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+                   st->id, wait);
+   }
+
    if (st->sleeping)
    {                           /* are we sleeping? */
        instr_time  now;
+       int64 now_us;
 
        INSTR_TIME_SET_CURRENT(now);
-       if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+       now_us = INSTR_TIME_GET_MICROSEC(now);
+       if (st->until <= now_us)
+       {
            st->sleeping = 0;   /* Done sleeping, go ahead with next command */
+           if (st->throttling)
+           {
+               /* Measure lag of throttled transaction relative to target */
+               int64 lag = now_us - st->until;
+               thread->throttle_lag += lag;
+               if (lag > thread->throttle_lag_max)
+                   thread->throttle_lag_max = lag;
+               st->throttling = false;
+           }
+       }
        else
            return true;        /* Still sleeping, nothing to do here */
    }
@@ -1095,6 +1154,15 @@ top:
            st->state = 0;
            st->use_file = (int) getrand(thread, 0, num_files - 1);
            commands = sql_files[st->use_file];
+           st->is_throttled = false;
+           /*
+            * No transaction is underway anymore, which means there is nothing
+            * to listen to right now.  When throttling rate limits are active,
+            * a sleep will happen next, as the next transaction starts.  And
+            * then in any case the next SQL command will set listen back to 1.
+            */
+           st->listen = 0;
+           trans_needs_throttle = (throttle_delay>0);
        }
    }
 
@@ -1113,6 +1181,16 @@ top:
        INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
    }
 
+   /*
+         * This ensures that a throttling delay is inserted before proceeding
+         * with sql commands, after the first transaction. The first transaction
+         * throttling is performed when first entering doCustom.
+    */
+   if (trans_needs_throttle) {
+       trans_needs_throttle = false;
+       goto top;
+   }
+
    /* Record transaction start time if logging is enabled */
    if (logfile && st->state == 0)
        INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -2017,7 +2095,8 @@ process_builtin(char *tb)
 static void
 printResults(int ttype, int normal_xacts, int nclients,
             TState *threads, int nthreads,
-            instr_time total_time, instr_time conn_total_time)
+            instr_time total_time, instr_time conn_total_time,
+            int64 throttle_lag, int64 throttle_lag_max)
 {
    double      time_include,
                tps_include,
@@ -2055,6 +2134,19 @@ printResults(int ttype, int normal_xacts, int nclients,
        printf("number of transactions actually processed: %d\n",
               normal_xacts);
    }
+
+   if (throttle_delay)
+   {
+       /*
+        * Report average transaction lag under rate limit throttling.  This
+        * is the delay between scheduled and actual start times for the
+        * transaction.  The measured lag may be caused by thread/client load,
+        * the database load, or the Poisson throttling process.
+        */
+       printf("average rate limit schedule lag: %.3f ms (max %.3f ms)\n",
+              0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+   }
+
    printf("tps = %f (including connections establishing)\n", tps_include);
    printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2140,6 +2232,7 @@ main(int argc, char **argv)
        {"unlogged-tables", no_argument, &unlogged_tables, 1},
        {"sampling-rate", required_argument, NULL, 4},
        {"aggregate-interval", required_argument, NULL, 5},
+       {"rate", required_argument, NULL, 'R'},
        {NULL, 0, NULL, 0}
    };
 
@@ -2162,6 +2255,8 @@ main(int argc, char **argv)
    instr_time  total_time;
    instr_time  conn_total_time;
    int         total_xacts;
+   int64       throttle_lag = 0;
+   int64       throttle_lag_max = 0;
 
    int         i;
 
@@ -2206,7 +2301,7 @@ main(int argc, char **argv)
    state = (CState *) pg_malloc(sizeof(CState));
    memset(state, 0, sizeof(CState));
 
-   while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1)
+   while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
    {
        switch (c)
        {
@@ -2371,6 +2466,19 @@ main(int argc, char **argv)
                    exit(1);
                }
                break;
+           case 'R':
+           {
+               /* get a double from the beginning of option value */
+               double throttle_value = atof(optarg);
+               if (throttle_value <= 0.0)
+               {
+                   fprintf(stderr, "invalid rate limit: %s\n", optarg);
+                   exit(1);
+               }
+               /* Invert rate limit into a time offset */
+               throttle_delay = (int64) (1000000.0 / throttle_value);
+           }
+               break;
            case 0:
                /* This covers long options which take no argument. */
                break;
@@ -2408,6 +2516,9 @@ main(int argc, char **argv)
        }
    }
 
+    /* compute a per thread delay */
+   throttle_delay *= nthreads;
+
    if (argc > optind)
        dbName = argv[optind];
    else
@@ -2721,6 +2832,9 @@ main(int argc, char **argv)
            TResult    *r = (TResult *) ret;
 
            total_xacts += r->xacts;
+           throttle_lag += r->throttle_lag;
+           if (r->throttle_lag_max > throttle_lag_max)
+               throttle_lag_max = r->throttle_lag_max;
            INSTR_TIME_ADD(conn_total_time, r->conn_time);
            free(ret);
        }
@@ -2731,7 +2845,7 @@ main(int argc, char **argv)
    INSTR_TIME_SET_CURRENT(total_time);
    INSTR_TIME_SUBTRACT(total_time, start_time);
    printResults(ttype, total_xacts, nclients, threads, nthreads,
-                total_time, conn_total_time);
+                total_time, conn_total_time, throttle_lag, throttle_lag_max);
 
    return 0;
 }
@@ -2756,6 +2870,17 @@ threadRun(void *arg)
 
    AggVals     aggs;
 
+   /*
+    * Initialize throttling rate target for all of the thread's clients.  It
+    * might be a little more accurate to reset thread->start_time here too.
+    * The possible drift seems too small relative to typical throttle delay
+    * times to worry about it.
+    */
+   INSTR_TIME_SET_CURRENT(start);
+   thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+   thread->throttle_lag = 0;
+   thread->throttle_lag_max = 0;
+
    result = pg_malloc(sizeof(TResult));
 
    INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2831,25 +2956,38 @@ threadRun(void *arg)
            Command   **commands = sql_files[st->use_file];
            int         sock;
 
-           if (st->sleeping)
+           if (st->con == NULL)
            {
-               int         this_usec;
-
-               if (min_usec == INT64_MAX)
+               continue;
+           }
+           else if (st->sleeping)
+           {
+               if (st->throttling && timer_exceeded)
                {
-                   instr_time  now;
-
-                   INSTR_TIME_SET_CURRENT(now);
-                   now_usec = INSTR_TIME_GET_MICROSEC(now);
+                   /* interrupt client which has not started a transaction */
+                   remains--;
+                   st->sleeping = 0;
+                   st->throttling = false;
+                   PQfinish(st->con);
+                   st->con = NULL;
+                   continue;
                }
+               else /* just a nap from the script */
+               {
+                   int         this_usec;
 
-               this_usec = st->until - now_usec;
-               if (min_usec > this_usec)
-                   min_usec = this_usec;
-           }
-           else if (st->con == NULL)
-           {
-               continue;
+                   if (min_usec == INT64_MAX)
+                   {
+                       instr_time  now;
+
+                       INSTR_TIME_SET_CURRENT(now);
+                       now_usec = INSTR_TIME_GET_MICROSEC(now);
+                   }
+
+                   this_usec = st->until - now_usec;
+                   if (min_usec > this_usec)
+                       min_usec = this_usec;
+               }
            }
            else if (commands[st->state]->type == META_COMMAND)
            {
@@ -2986,6 +3124,8 @@ done:
    result->xacts = 0;
    for (i = 0; i < nstate; i++)
        result->xacts += state[i].cnt;
+   result->throttle_lag = thread->throttle_lag;
+   result->throttle_lag_max = thread->throttle_lag_max;
    INSTR_TIME_SET_CURRENT(end);
    INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
    if (logfile)
index 62555e11a25d3fc44b485872688f2f8e36a816ee..49a79b194efd6d70dbdaeb7e03c6d3c9080631b5 100644 (file)
@@ -409,7 +409,7 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
 
      <varlistentry>
       <term><option>-P</option> <replaceable>sec</></term>
-      <term><option>--progress=</option> <replaceable>sec</></term>
+      <term><option>--progress=</option><replaceable>sec</></term>
       <listitem>
        <para>
         Show progress report every <literal>sec</> seconds.
@@ -417,6 +417,52 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-R</option> <replaceable>rate</></term>
+      <term><option>--rate=</option><replaceable>rate</></term>
+      <listitem>
+       <para>
+        Execute transactions targeting the specified rate instead of running
+        as fast as possible (the default).  The rate is given in transactions
+        per second.  If the targeted rate is above the maximum possible rate,
+        the rate limit won't impact the results.
+       </para>
+       <para>
+        The rate is targeted by starting transactions along a
+        Poisson-distributed schedule time line.  The expected finish time
+        schedule moves forward based on when the client first started, not
+        when the previous transaction ended.  That approach means that when
+        transactions go past their original scheduled end time, it is
+        possible for later ones to catch up again.
+       </para>
+       <para>        
+        When throttling is active, the average and maximum transaction
+        schedule lag time are reported in ms.  This is the delay between
+        the original scheduled transaction time and the actual transaction
+        start times.  The schedule lag shows whether a transaction was
+        started on time or late.  Once a client starts running behind its
+        schedule, every following transaction can continue to be penalized
+        for schedule lag.  If faster transactions are able to catch up, it's
+        possible for them to get back on schedule again.  The lag measurement
+        of every transaction is shown when pgbench is run with debugging
+        output.
+       </para>
+       <para>
+        High rate limit schedule lag values, that is lag values that are large
+        compared to the actual transaction latency, indicate that something is
+        amiss in the throttling process.  High schedule lag can highlight a subtle
+        problem there even if the target rate limit is met in the end.  One
+        possible cause of schedule lag is insufficient pgbench threads to
+        handle all of the clients.  To improve that, consider reducing the
+        number of clients, increasing the number of threads in pgbench, or
+        running pgbench on a separate host.  Another possibility is that the
+        database is not keeping up with the load at some point.  When that
+        happens, you will have to reduce the expected transaction rate to
+        lower schedule lag.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-s</option> <replaceable>scale_factor</></term>
       <term><option>--scale=</option><replaceable>scale_factor</></term>