Add --latency-limit option to pgbench.
authorHeikki Linnakangas
Mon, 13 Oct 2014 17:25:56 +0000 (20:25 +0300)
committerHeikki Linnakangas
Mon, 13 Oct 2014 17:50:24 +0000 (20:50 +0300)
This allows transactions that take longer than specified limit to be counted
separately. With --rate, transactions that are already late by the time we
get to execute them are skipped altogether. Using --latency-limit with
--rate allows you to "catch up" more quickly, if there's a hickup in the
server causing a lot of transactions to stall momentarily.

Fabien COELHO, reviewed by Rukh Meski and heavily refactored by me.

contrib/pgbench/pgbench.c
doc/src/sgml/pgbench.sgml

index e9431ee7860c9429ecbf0ec53656ff1846dbd9b6..3453a1f7a6b2901ffa92692096123e3a5b2f84f8 100644 (file)
@@ -140,6 +140,14 @@ double     sample_rate = 0.0;
  */
 int64      throttle_delay = 0;
 
+/*
+ * Transactions which take longer than this limit (in usec) are counted as
+ * late, and reported as such, although they are completed anyway. When
+ * throttling is enabled, execution time slots that are more than this late
+ * are skipped altogether, and counted separately.
+ */
+int64      latency_limit = 0;
+
 /*
  * tablespace selection
  */
@@ -238,6 +246,8 @@ typedef struct
    int64       throttle_trigger;       /* previous/next throttling (us) */
    int64       throttle_lag;   /* total transaction lag behind throttling */
    int64       throttle_lag_max;       /* max transaction lag */
+   int64       throttle_latency_skipped; /* lagging transactions skipped */
+   int64       latency_late;   /* late transactions */
 } TState;
 
 #define INVALID_THREAD     ((pthread_t) 0)
@@ -250,6 +260,8 @@ typedef struct
    int64       sqlats;
    int64       throttle_lag;
    int64       throttle_lag_max;
+   int64       throttle_latency_skipped;
+   int64       latency_late;
 } TResult;
 
 /*
@@ -284,6 +296,8 @@ typedef struct
 
    long        start_time;     /* when does the interval start */
    int         cnt;            /* number of transactions */
+   int         skipped;        /* number of transactions skipped under
+                                * --rate and --latency-limit */
 
    double      min_latency;    /* min/max latencies */
    double      max_latency;
@@ -348,7 +362,7 @@ static void setalarm(int seconds);
 static void *threadRun(void *arg);
 
 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
-     AggVals *agg);
+     AggVals *agg, bool skipped);
 
 static void
 usage(void)
@@ -375,6 +389,8 @@ usage(void)
         "  -f, --file=FILENAME      read transaction script from FILENAME\n"
           "  -j, --jobs=NUM           number of threads (default: 1)\n"
           "  -l, --log                write transaction times to log file\n"
+          "  -L, --latency-limit=NUM  count transactions lasting more than NUM ms\n"
+          "                           as late.\n"
           "  -M, --protocol=simple|extended|prepared\n"
           "                           protocol for submitting queries (default: simple)\n"
           "  -n, --no-vacuum          do not run VACUUM before tests\n"
@@ -994,7 +1010,9 @@ void
 agg_vals_init(AggVals *aggs, instr_time start)
 {
    /* basic counters */
-   aggs->cnt = 0;              /* number of transactions */
+   aggs->cnt = 0;              /* number of transactions (includes skipped) */
+   aggs->skipped = 0;          /* xacts skipped under --rate --latency-limit */
+
    aggs->sum_latency = 0;      /* SUM(latency) */
    aggs->sum2_latency = 0;             /* SUM(latency*latency) */
 
@@ -1050,8 +1068,34 @@ top:
        int64       wait = getPoissonRand(thread, throttle_delay);
 
        thread->throttle_trigger += wait;
-
        st->txn_scheduled = thread->throttle_trigger;
+
+       /*
+        * If this --latency-limit is used, and this slot is already late so
+        * that the transaction will miss the latency limit even if it
+        * completed immediately, we skip this time slot and iterate till the
+        * next slot that isn't late yet.
+        */
+       if (latency_limit)
+       {
+           int64       now_us;
+
+           if (INSTR_TIME_IS_ZERO(now))
+               INSTR_TIME_SET_CURRENT(now);
+           now_us = INSTR_TIME_GET_MICROSEC(now);
+           while (thread->throttle_trigger < now_us - latency_limit)
+           {
+               thread->throttle_latency_skipped++;
+
+               if (logfile)
+                   doLog(thread, st, logfile, &now, agg, true);
+
+               wait = getPoissonRand(thread, throttle_delay);
+               thread->throttle_trigger += wait;
+               st->txn_scheduled = thread->throttle_trigger;
+           }
+       }
+
        st->sleeping = 1;
        st->throttling = true;
        st->is_throttled = true;
@@ -1119,12 +1163,13 @@ top:
        if (commands[st->state + 1] == NULL)
        {
            /* only calculate latency if an option is used that needs it */
-           if (progress || throttle_delay)
+           if (progress || throttle_delay || latency_limit)
            {
                int64       latency;
 
                if (INSTR_TIME_IS_ZERO(now))
                    INSTR_TIME_SET_CURRENT(now);
+
                latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
 
                st->txn_latencies += latency;
@@ -1137,11 +1182,15 @@ top:
                 * transactions, overflow would take 256 hours.
                 */
                st->txn_sqlats += latency * latency;
+
+               /* record over the limit transactions if needed. */
+               if (latency_limit && latency > latency_limit)
+                   thread->latency_late++;
            }
 
            /* record the time it took in the log */
            if (logfile)
-               doLog(thread, st, logfile, &now, agg);
+               doLog(thread, st, logfile, &now, agg, false);
        }
 
        if (commands[st->state]->type == SQL_COMMAND)
@@ -1227,7 +1276,7 @@ top:
    }
 
    /* Record transaction start time under logging, progress or throttling */
-   if ((logfile || progress || throttle_delay) && st->state == 0)
+   if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
    {
        INSTR_TIME_SET_CURRENT(st->txn_begin);
 
@@ -1605,7 +1654,8 @@ top:
  * print log entry after completing one transaction.
  */
 static void
-doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
+doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
+     bool skipped)
 {
    double      lag;
    double      latency;
@@ -1622,7 +1672,10 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
        INSTR_TIME_SET_CURRENT(*now);
 
    latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
-   lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
+   if (skipped)
+       lag = latency;
+   else
+       lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
 
    /* should we aggregate the results or not? */
    if (agg_interval > 0)
@@ -1634,26 +1687,34 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
        if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
        {
            agg->cnt += 1;
-           agg->sum_latency += latency;
-           agg->sum2_latency += latency * latency;
+           if (skipped)
+           {
+               /* there is no latency to record if the transaction was skipped */
+               agg->skipped += 1;
+           }
+           else
+           {
+               agg->sum_latency += latency;
+               agg->sum2_latency += latency * latency;
 
-           /* first in this aggregation interval */
-           if ((agg->cnt == 1) || (latency < agg->min_latency))
-               agg->min_latency = latency;
+               /* first in this aggregation interval */
+               if ((agg->cnt == 1) || (latency < agg->min_latency))
+                   agg->min_latency = latency;
 
-           if ((agg->cnt == 1) || (latency > agg->max_latency))
-               agg->max_latency = latency;
+               if ((agg->cnt == 1) || (latency > agg->max_latency))
+                   agg->max_latency = latency;
 
-           /* and the same for schedule lag */
-           if (throttle_delay)
-           {
-               agg->sum_lag += lag;
-               agg->sum2_lag += lag * lag;
+               /* and the same for schedule lag */
+               if (throttle_delay)
+               {
+                   agg->sum_lag += lag;
+                   agg->sum2_lag += lag * lag;
 
-               if ((agg->cnt == 1) || (lag < agg->min_lag))
-                   agg->min_lag = lag;
-               if ((agg->cnt == 1) || (lag > agg->max_lag))
-                   agg->max_lag = lag;
+                   if ((agg->cnt == 1) || (lag < agg->min_lag))
+                       agg->min_lag = lag;
+                   if ((agg->cnt == 1) || (lag > agg->max_lag))
+                       agg->max_lag = lag;
+               }
            }
        }
        else
@@ -1677,11 +1738,15 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
                        agg->min_latency,
                        agg->max_latency);
                if (throttle_delay)
+               {
                    fprintf(logfile, " %.0f %.0f %.0f %.0f",
                            agg->sum_lag,
                            agg->sum2_lag,
                            agg->min_lag,
                            agg->max_lag);
+                   if (latency_limit)
+                       fprintf(logfile, " %d", agg->skipped);
+               }
                fputc('\n', logfile);
 
                /* move to the next inteval */
@@ -1689,6 +1754,7 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
 
                /* reset for "no transaction" intervals */
                agg->cnt = 0;
+               agg->skipped = 0;
                agg->min_latency = 0;
                agg->max_latency = 0;
                agg->sum_latency = 0;
@@ -1701,10 +1767,11 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
 
            /* reset the values to include only the current transaction. */
            agg->cnt = 1;
+           agg->skipped = skipped ? 1 : 0;
            agg->min_latency = latency;
            agg->max_latency = latency;
-           agg->sum_latency = latency;
-           agg->sum2_latency = latency * latency;
+           agg->sum_latency = skipped ? 0.0 : latency;
+           agg->sum2_latency = skipped ? 0.0 : latency * latency;
            agg->min_lag = lag;
            agg->max_lag = lag;
            agg->sum_lag = lag;
@@ -1717,14 +1784,23 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
 #ifndef WIN32
 
        /* This is more than we really ought to know about instr_time */
-       fprintf(logfile, "%d %d %.0f %d %ld %ld",
-               st->id, st->cnt, latency, st->use_file,
-               (long) now->tv_sec, (long) now->tv_usec);
+       if (skipped)
+           fprintf(logfile, "%d %d skipped %d %ld %ld",
+                   st->id, st->cnt, st->use_file,
+                   (long) now->tv_sec, (long) now->tv_usec);
+       else
+           fprintf(logfile, "%d %d %.0f %d %ld %ld",
+                   st->id, st->cnt, latency, st->use_file,
+                   (long) now->tv_sec, (long) now->tv_usec);
 #else
 
        /* On Windows, instr_time doesn't provide a timestamp anyway */
-       fprintf(logfile, "%d %d %.0f %d 0 0",
-               st->id, st->cnt, latency, st->use_file);
+       if (skipped)
+           fprintf(logfile, "%d %d skipped %d 0 0",
+                   st->id, st->cnt, st->use_file);
+       else
+           fprintf(logfile, "%d %d %.0f %d 0 0",
+                   st->id, st->cnt, latency, st->use_file);
 #endif
        if (throttle_delay)
            fprintf(logfile, " %.0f", lag);
@@ -2424,7 +2500,8 @@ printResults(int ttype, int64 normal_xacts, int nclients,
             TState *threads, int nthreads,
             instr_time total_time, instr_time conn_total_time,
             int64 total_latencies, int64 total_sqlats,
-            int64 throttle_lag, int64 throttle_lag_max)
+            int64 throttle_lag, int64 throttle_lag_max,
+            int64 throttle_latency_skipped, int64 latency_late)
 {
    double      time_include,
                tps_include,
@@ -2463,7 +2540,17 @@ printResults(int ttype, int64 normal_xacts, int nclients,
               normal_xacts);
    }
 
-   if (throttle_delay || progress)
+   if (throttle_delay && latency_limit)
+       printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
+              throttle_latency_skipped,
+              100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
+
+   if (latency_limit)
+       printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
+              latency_limit / 1000.0, latency_late,
+              100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
+
+   if (throttle_delay || progress || latency_limit)
    {
        /* compute and show latency average and standard deviation */
        double      latency = 0.001 * total_latencies / normal_xacts;
@@ -2578,6 +2665,7 @@ main(int argc, char **argv)
        {"sampling-rate", required_argument, NULL, 4},
        {"aggregate-interval", required_argument, NULL, 5},
        {"rate", required_argument, NULL, 'R'},
+       {"latency-limit", required_argument, NULL, 'L'},
        {NULL, 0, NULL, 0}
    };
 
@@ -2607,6 +2695,8 @@ main(int argc, char **argv)
    int64       total_sqlats = 0;
    int64       throttle_lag = 0;
    int64       throttle_lag_max = 0;
+   int64       throttle_latency_skipped = 0;
+   int64       latency_late = 0;
 
    int         i;
 
@@ -2651,7 +2741,7 @@ main(int argc, char **argv)
    state = (CState *) pg_malloc(sizeof(CState));
    memset(state, 0, sizeof(CState));
 
-   while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
+   while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
    {
        switch (c)
        {
@@ -2848,6 +2938,18 @@ main(int argc, char **argv)
                    throttle_delay = (int64) (1000000.0 / throttle_value);
                }
                break;
+           case 'L':
+               {
+                   double limit_ms = atof(optarg);
+                   if (limit_ms <= 0.0)
+                   {
+                       fprintf(stderr, "invalid latency limit: %s\n", optarg);
+                       exit(1);
+                   }
+                   benchmarking_option_set = true;
+                   latency_limit = (int64) (limit_ms * 1000);
+               }
+               break;
            case 0:
                /* This covers long options which take no argument. */
                if (foreign_keys || unlogged_tables)
@@ -3143,6 +3245,8 @@ main(int argc, char **argv)
        thread->random_state[0] = random();
        thread->random_state[1] = random();
        thread->random_state[2] = random();
+       thread->throttle_latency_skipped = 0;
+       thread->latency_late = 0;
 
        if (is_latencies)
        {
@@ -3217,6 +3321,8 @@ main(int argc, char **argv)
            total_latencies += r->latencies;
            total_sqlats += r->sqlats;
            throttle_lag += r->throttle_lag;
+           throttle_latency_skipped += r->throttle_latency_skipped;
+           latency_late += r->latency_late;
            if (r->throttle_lag_max > throttle_lag_max)
                throttle_lag_max = r->throttle_lag_max;
            INSTR_TIME_ADD(conn_total_time, r->conn_time);
@@ -3239,7 +3345,8 @@ main(int argc, char **argv)
    INSTR_TIME_SUBTRACT(total_time, start_time);
    printResults(ttype, total_xacts, nclients, threads, nthreads,
                 total_time, conn_total_time, total_latencies, total_sqlats,
-                throttle_lag, throttle_lag_max);
+                throttle_lag, throttle_lag_max, throttle_latency_skipped,
+                latency_late);
 
    return 0;
 }
@@ -3264,7 +3371,8 @@ threadRun(void *arg)
    int64       last_count = 0,
                last_lats = 0,
                last_sqlats = 0,
-               last_lags = 0;
+               last_lags = 0,
+               last_skipped = 0;
 
    AggVals     aggs;
 
@@ -3467,7 +3575,8 @@ threadRun(void *arg)
                /* generate and show report */
                int64       count = 0,
                            lats = 0,
-                           sqlats = 0;
+                           sqlats = 0,
+                           skipped = 0;
                int64       lags = thread->throttle_lag;
                int64       run = now - last_report;
                double      tps,
@@ -3490,23 +3599,26 @@ threadRun(void *arg)
                sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
                stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
                lag = 0.001 * (lags - last_lags) / (count - last_count);
+               skipped = thread->throttle_latency_skipped - last_skipped;
 
+               fprintf(stderr,
+                       "progress %d: %.1f s, %.1f tps, "
+                       "lat %.3f ms stddev %.3f",
+                       thread->tid, total_run, tps, latency, stdev);
                if (throttle_delay)
-                   fprintf(stderr,
-                           "progress %d: %.1f s, %.1f tps, "
-                           "lat %.3f ms stddev %.3f, lag %.3f ms\n",
-                           thread->tid, total_run, tps, latency, stdev, lag);
-               else
-                   fprintf(stderr,
-                           "progress %d: %.1f s, %.1f tps, "
-                           "lat %.3f ms stddev %.3f\n",
-                           thread->tid, total_run, tps, latency, stdev);
+               {
+                   fprintf(stderr, ", lag %.3f ms", lag);
+                   if (latency_limit)
+                       fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
+               }
+               fprintf(stderr, "\n");
 
                last_count = count;
                last_lats = lats;
                last_sqlats = sqlats;
                last_lags = lags;
                last_report = now;
+               last_skipped = thread->throttle_latency_skipped;
                next_report += (int64) progress *1000000;
            }
        }
@@ -3525,7 +3637,8 @@ threadRun(void *arg)
                int64       count = 0,
                            lats = 0,
                            sqlats = 0,
-                           lags = 0;
+                           lags = 0,
+                           skipped = 0;
                int64       run = now - last_report;
                double      tps,
                            total_run,
@@ -3550,23 +3663,26 @@ threadRun(void *arg)
                sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
                stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
                lag = 0.001 * (lags - last_lags) / (count - last_count);
+               skipped = thread->throttle_latency_skipped - last_skipped;
 
+               fprintf(stderr,
+                       "progress: %.1f s, %.1f tps, "
+                       "lat %.3f ms stddev %.3f",
+                       total_run, tps, latency, stdev);
                if (throttle_delay)
-                   fprintf(stderr,
-                           "progress: %.1f s, %.1f tps, "
-                           "lat %.3f ms stddev %.3f, lag %.3f ms\n",
-                           total_run, tps, latency, stdev, lag);
-               else
-                   fprintf(stderr,
-                           "progress: %.1f s, %.1f tps, "
-                           "lat %.3f ms stddev %.3f\n",
-                           total_run, tps, latency, stdev);
+               {
+                   fprintf(stderr, ", lag %.3f ms", lag);
+                   if (latency_limit)
+                       fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
+               }
+               fprintf(stderr, "\n");
 
                last_count = count;
                last_lats = lats;
                last_sqlats = sqlats;
                last_lags = lags;
                last_report = now;
+               last_skipped = thread->throttle_latency_skipped;
                next_report += (int64) progress *1000000;
            }
        }
@@ -3587,6 +3703,9 @@ done:
    }
    result->throttle_lag = thread->throttle_lag;
    result->throttle_lag_max = thread->throttle_lag_max;
+   result->throttle_latency_skipped = thread->throttle_latency_skipped;
+   result->latency_late = thread->latency_late;
+
    INSTR_TIME_SET_CURRENT(end);
    INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
    if (logfile)
index c4e0cbd79ae3c827a56f756a0df938218b1aa106..7d203cda84c7740197d97510dbaa81700808e737 100644 (file)
@@ -344,6 +344,24 @@ pgbench  options  dbname
       
      
 
+     
+       limit
+      limit
+      
+       
+        Transaction which last more than limit milliseconds
+        are counted and reported separately, as late.
+       
+       
+        When throttling is used (
+        lag behind schedule by more than limit ms, and thus
+        have no hope of meeting the latency limit, are not sent to the server
+        at all. They are counted and reported separately as
+        skipped.
+       
+       
+     
+
      
        querymode
       querymode
@@ -453,6 +471,15 @@ pgbench  options  dbname
         latency.
        
 
+       
+        If 
+        a transaction can lag behind so much that it is already over the
+        latency limit when the previous transaction ends, because the latency
+        is calculated from the scheduled start time. Such transactions are
+        not sent to the server, but are skipped altogether and counted
+        separately.
+       
+
        
         A high schedule lag time is an indication that the system cannot
         process transactions at the specified rate, with the chosen number of
@@ -940,7 +967,7 @@ END;
    The format of the log is:
 
 
-client_id transaction_no time file_no time_epoch time_us [schedule_lag]
+client_id transaction_no time file_no time_epoch time_us schedule_lag
 
 
    where time is the total elapsed transaction time in microseconds,
@@ -950,20 +977,40 @@ END;
    UNIX epoch format timestamp and an offset
    in microseconds (suitable for creating an ISO 8601
    timestamp with fractional seconds) showing when
-   the transaction completed. The last field, schedule_lag, is
-   the difference between the transaction's scheduled start time, and the
-   time it actually started, in microseconds. It is only present when the
-   
+   the transaction completed.
+   Field schedule_lag is the difference between the
+   transaction's scheduled start time, and the time it actually started, in
+   microseconds. It is only present when the 
+   The last field skipped_transactions reports the number of
+   transactions skipped because they were too far behind schedule. It is only
+   present when both options 
+   are used.
   
 
   
-   Here are example outputs:
+   Here is a snippet of the log file generated:
 
  0 199 2241 0 1175850568 995598
  0 200 2465 0 1175850568 998079
  0 201 2513 0 1175850569 608
  0 202 2038 0 1175850569 2663
-
+
+
+   Another example with --rate=100 and --latency-limit=5 (note the additional
+   schedule_lag column):
+
+ 0 81 4621 0 1412881037 912698 3005
+ 0 82 6173 0 1412881037 914578 4304
+ 0 83 skipped 0 1412881037 914578 5217
+ 0 83 skipped 0 1412881037 914578 5099
+ 0 83 4722 0 1412881037 916203 3108
+ 0 84 4142 0 1412881037 918023 2333
+ 0 85 2465 0 1412881037 919759 740
+
+   In this example, transaction 82 was late, because it's latency (6.173 ms) was
+   over the 5 ms limit. The next two transactions were skipped, because they
+   were already late before they were even started.
+  
 
   
    When running a long test on hardware that can handle a lot of transactions,
@@ -979,7 +1026,7 @@ END;
    With the  option, the logs use a bit different format:
 
 
-interval_start num_of_transactions latency_sum latency_2_sum min_latency max_latency [lag_sum lag_2_sum min_lag max_lag]
+interval_start num_of_transactions latency_sum latency_2_sum min_latency max_latency lag_sum lag_2_sum min_lag max_lag skipped_transactions
 
 
    where interval_start is the start of the interval (UNIX epoch
@@ -990,8 +1037,11 @@ END;
    latency_2_sum is a sum of 2nd powers of latencies. The last two
    fields are min_latency - a minimum latency within the interval, and
    max_latency - maximum latency within the interval. A transaction is
-   counted into the interval when it was committed. The last four fields, 
-   lag_sum, lag_2_sum, min_lag, and max_lag, are only present if the --rate option is used.
+   counted into the interval when it was committed. The fields in the end,
+   lag_sum, lag_2_sum, min_lag,
+   and max_lag, are only present if the 
+   option is used. The very last one, skipped_transactions,
+   is only present if the option 
    They are calculated from the time each transaction had to wait for the
    previous one to finish, i.e. the difference between each transaction's
    scheduled start time and the time it actually started.