libpq: Improve idle state handling in pipeline mode
authorAlvaro Herrera
Tue, 5 Jul 2022 12:21:20 +0000 (14:21 +0200)
committerAlvaro Herrera
Tue, 5 Jul 2022 12:21:20 +0000 (14:21 +0200)
We were going into IDLE state too soon when executing queries via
PQsendQuery in pipeline mode, causing several scenarios to misbehave in
different ways -- most notably, as reported by Daniele Varrazzo, that a
warning message is produced by libpq:
  message type 0x33 arrived from server while idle
But it is also possible, if queries are sent and results consumed not in
lockstep, for the expected mediating NULL result values from PQgetResult
to be lost (a problem which has not been reported, but which is more
serious).

Fix this by introducing two new concepts: one is a command queue element
PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server
response to the Close message that is sent by PQsendQuery.  Because the
application is not expecting any PGresult from this, the mechanism to
consume it is a bit hackish.

The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE
state for libpq's state machine to differentiate "really idle" from
merely "the idle state that occurs in between reading results from the
server for elements in the pipeline".  This makes libpq not go fully
IDLE when the libpq command queue contains entries; in normal cases, we
only go IDLE once at the end of the pipeline, when the server response
to the final SYNC message is received.  (However, there are corner cases
it doesn't fix, such as terminating the query sequence by
PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is
what requires PGQUERY_CLOSE bit above.)

This last bit helps make the libpq state machine clearer; in particular
we can get rid of an ugly hack in pqParseInput3 to avoid considering
IDLE as such when the command queue contains entries.

A new test mode is added to libpq_pipeline.c to tickle some related
problematic cases.

Reported-by: Daniele Varrazzo
Co-authored-by: Kyotaro Horiguchi
Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com

src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-int.h
src/test/modules/libpq_pipeline/libpq_pipeline.c
src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
src/test/modules/libpq_pipeline/traces/pipeline_idle.trace [new file with mode: 0644]

index 41806831940ccdb38f48ef3a7c27ed075ea49f38..e22d0814f096ea19ef0cbf66840e3722fc28b553 100644 (file)
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
             * itself consume commands from the queue; if we're in any other
             * state, we don't have to do anything.
             */
-           if (conn->asyncStatus == PGASYNC_IDLE)
+           if (conn->asyncStatus == PGASYNC_IDLE ||
+               conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
            {
                resetPQExpBuffer(&conn->errorMessage);
                pqPipelineProcessQueue(conn);
@@ -1338,6 +1339,7 @@ static int
 PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 {
    PGcmdQueueEntry *entry = NULL;
+   PGcmdQueueEntry *entry2 = NULL;
 
    if (!PQsendQueryStart(conn, newQuery))
        return 0;
@@ -1353,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
    entry = pqAllocCmdQueueEntry(conn);
    if (entry == NULL)
        return 0;               /* error msg already set */
+   if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+   {
+       entry2 = pqAllocCmdQueueEntry(conn);
+       if (entry2 == NULL)
+           goto sendFailed;
+   }
 
    /* Send the query message(s) */
    if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1422,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
+
+   /*
+    * When pipeline mode is in use, we need a second entry in the command
+    * queue to represent Close Portal message.  This allows us later to wait
+    * for the CloseComplete message to be received before getting in IDLE
+    * state.
+    */
+   if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+   {
+       entry2->queryclass = PGQUERY_CLOSE;
+       entry2->query = NULL;
+       pqAppendCmdQueueEntry(conn, entry2);
+   }
+
    return 1;
 
 sendFailed:
@@ -1667,11 +1689,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
        switch (conn->asyncStatus)
        {
            case PGASYNC_IDLE:
+           case PGASYNC_PIPELINE_IDLE:
            case PGASYNC_READY:
            case PGASYNC_READY_MORE:
            case PGASYNC_BUSY:
                /* ok to queue */
                break;
+
            case PGASYNC_COPY_IN:
            case PGASYNC_COPY_OUT:
            case PGASYNC_COPY_BOTH:
@@ -2047,19 +2071,22 @@ PQgetResult(PGconn *conn)
    {
        case PGASYNC_IDLE:
            res = NULL;         /* query is complete */
-           if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-           {
-               /*
-                * We're about to return the NULL that terminates the round of
-                * results from the current query; prepare to send the results
-                * of the next query when we're called next.  Also, since this
-                * is the start of the results of the next query, clear any
-                * prior error message.
-                */
-               resetPQExpBuffer(&conn->errorMessage);
-               pqPipelineProcessQueue(conn);
-           }
            break;
+       case PGASYNC_PIPELINE_IDLE:
+           Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+           /*
+            * We're about to return the NULL that terminates the round of
+            * results from the current query; prepare to send the results
+            * of the next query, if any, when we're called next.  If there's
+            * no next element in the command queue, this gets us in IDLE
+            * state.
+            */
+           resetPQExpBuffer(&conn->errorMessage);
+           pqPipelineProcessQueue(conn);
+           res = NULL;         /* query is complete */
+           break;
+
        case PGASYNC_READY:
 
            /*
@@ -2080,7 +2107,7 @@ PQgetResult(PGconn *conn)
                 * We're about to send the results of the current query.  Set
                 * us idle now, and ...
                 */
-               conn->asyncStatus = PGASYNC_IDLE;
+               conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
                /*
                 * ... in cases when we're sending a pipeline-sync result,
@@ -2124,6 +2151,22 @@ PQgetResult(PGconn *conn)
            break;
    }
 
+   /* If the next command we expect is CLOSE, read and consume it */
+   if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+       conn->cmd_queue_head &&
+       conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+   {
+       if (res && res->resultStatus != PGRES_FATAL_ERROR)
+       {
+           conn->asyncStatus = PGASYNC_BUSY;
+           parseInput(conn);
+           conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+       }
+       else
+           /* we won't ever see the Close */
+           pqCommandQueueAdvance(conn);
+   }
+
    if (res)
    {
        int         i;
@@ -2932,7 +2975,10 @@ PQexitPipelineMode(PGconn *conn)
    if (!conn)
        return 0;
 
-   if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+   if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+       (conn->asyncStatus == PGASYNC_IDLE ||
+        conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
+       conn->cmd_queue_head == NULL)
        return 1;
 
    switch (conn->asyncStatus)
@@ -2949,9 +2995,16 @@ PQexitPipelineMode(PGconn *conn)
                                 libpq_gettext("cannot exit pipeline mode while busy\n"));
            return 0;
 
-       default:
+       case PGASYNC_IDLE:
+       case PGASYNC_PIPELINE_IDLE:
            /* OK */
            break;
+
+       case PGASYNC_COPY_IN:
+       case PGASYNC_COPY_OUT:
+       case PGASYNC_COPY_BOTH:
+           appendPQExpBufferStr(&conn->errorMessage,
+                                libpq_gettext("cannot exit pipeline mode while in COPY\n"));
    }
 
    /* still work to process */
@@ -2988,6 +3041,10 @@ pqCommandQueueAdvance(PGconn *conn)
    prevquery = conn->cmd_queue_head;
    conn->cmd_queue_head = conn->cmd_queue_head->next;
 
+   /* If the queue is now empty, reset the tail too */
+   if (conn->cmd_queue_head == NULL)
+       conn->cmd_queue_tail = NULL;
+
    /* and make it recyclable */
    prevquery->next = NULL;
    pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3010,15 +3067,35 @@ pqPipelineProcessQueue(PGconn *conn)
        case PGASYNC_BUSY:
            /* client still has to process current query or results */
            return;
+
        case PGASYNC_IDLE:
+           /*
+            * If we're in IDLE mode and there's some command in the queue,
+            * get us into PIPELINE_IDLE mode and process normally.  Otherwise
+            * there's nothing for us to do.
+            */
+           if (conn->cmd_queue_head != NULL)
+           {
+               conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+               break;
+           }
+           return;
+
+       case PGASYNC_PIPELINE_IDLE:
+           Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
            /* next query please */
            break;
    }
 
-   /* Nothing to do if not in pipeline mode, or queue is empty */
-   if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
-       conn->cmd_queue_head == NULL)
+   /*
+    * If there are no further commands to process in the queue, get us in
+    * "real idle" mode now.
+    */
+   if (conn->cmd_queue_head == NULL)
+   {
+       conn->asyncStatus = PGASYNC_IDLE;
        return;
+   }
 
    /* Initialize async result-accumulation state */
    pqClearAsyncResult(conn);
@@ -3105,6 +3182,7 @@ PQpipelineSync(PGconn *conn)
        case PGASYNC_READY_MORE:
        case PGASYNC_BUSY:
        case PGASYNC_IDLE:
+       case PGASYNC_PIPELINE_IDLE:
            /* OK to send sync */
            break;
    }
index 9ab3bf1fcb61a7105bb5ca5f233dd2e2f8568662..c33f904db43ce29168091ee6492ce8a86c1399b2 100644 (file)
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
            if (conn->asyncStatus != PGASYNC_IDLE)
                return;
 
-           /*
-            * We're also notionally not-IDLE when in pipeline mode the state
-            * says "idle" (so we have completed receiving the results of one
-            * query from the server and dispatched them to the application)
-            * but another query is queued; yield back control to caller so
-            * that they can initiate processing of the next query in the
-            * queue.
-            */
-           if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-               conn->cmd_queue_head != NULL)
-               return;
-
            /*
             * Unexpected message in IDLE state; need to recover somehow.
             * ERROR messages are handled using the notice processor;
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
                    }
                    break;
                case '2':       /* Bind Complete */
+                   /* Nothing to do for this message type */
+                   break;
                case '3':       /* Close Complete */
-                   /* Nothing to do for these message types */
+                   /*
+                    * If we get CloseComplete when waiting for it, consume
+                    * the queue element and keep going.  A result is not
+                    * expected from this message; it is just there so that
+                    * we know to wait for it when PQsendQuery is used in
+                    * pipeline mode, before going in IDLE state.  Failing to
+                    * do this makes us receive CloseComplete when IDLE, which
+                    * creates problems.
+                    */
+                   if (conn->cmd_queue_head &&
+                       conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+                   {
+                       pqCommandQueueAdvance(conn);
+                   }
+
                    break;
                case 'S':       /* parameter status */
                    if (getParameterStatus(conn))
index 334aea4b6ea7164099438fde09696964a3da555e..df2f17721cc478e051e3867ceab0c35c4156c101 100644 (file)
@@ -224,7 +224,8 @@ typedef enum
                                 * query */
    PGASYNC_COPY_IN,            /* Copy In data transfer in progress */
    PGASYNC_COPY_OUT,           /* Copy Out data transfer in progress */
-   PGASYNC_COPY_BOTH           /* Copy In/Out data transfer in progress */
+   PGASYNC_COPY_BOTH,          /* Copy In/Out data transfer in progress */
+   PGASYNC_PIPELINE_IDLE,      /* "Idle" between commands in pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
@@ -310,7 +311,8 @@ typedef enum
    PGQUERY_EXTENDED,           /* full Extended protocol (PQexecParams) */
    PGQUERY_PREPARE,            /* Parse only (PQprepare) */
    PGQUERY_DESCRIBE,           /* Describe Statement or Portal */
-   PGQUERY_SYNC                /* Sync (at end of a pipeline) */
+   PGQUERY_SYNC,               /* Sync (at end of a pipeline) */
+   PGQUERY_CLOSE
 } PGQueryClass;
 
 /*
index c27c4e0adaf052749574681dc4d2c50ccab9b08b..dfab924965d2f1528cedd71b8cba916f1cbb7403 100644 (file)
@@ -581,8 +581,6 @@ test_pipeline_abort(PGconn *conn)
    if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
        pg_fatal("exiting pipeline mode didn't seem to work");
 
-   fprintf(stderr, "ok\n");
-
    /*-
     * Since we fired the pipelines off without a surrounding xact, the results
     * should be:
@@ -614,6 +612,8 @@ test_pipeline_abort(PGconn *conn)
    }
 
    PQclear(res);
+
+   fprintf(stderr, "ok\n");
 }
 
 /* State machine enum for test_pipelined_insert */
@@ -968,6 +968,207 @@ test_prepared(PGconn *conn)
    fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print notices, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+   int    *n_notices = (int *) arg;
+
+   (*n_notices)++;
+   fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
+/* Verify behavior in "idle" state */
+static void
+test_pipeline_idle(PGconn *conn)
+{
+   PGresult   *res;
+   int         n_notices = 0;
+
+   fprintf(stderr, "\npipeline idle...\n");
+
+   PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
+   /*
+    * Cause a Close message to be sent to the server, and watch libpq's
+    * reaction to the resulting CloseComplete.  libpq must not get in IDLE
+    * state until that has been received.
+    */
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+   if (PQsendQuery(conn, "SELECT 1") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("Unexpected result code %s from first pipeline item",
+                PQresStatus(PQresultStatus(res)));
+   PQclear(res);
+
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("expected NULL result");
+
+   if (PQpipelineSync(conn) != 1)
+       pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+       pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+                PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+   PQclear(res);
+   res = NULL;
+
+   if (PQexitPipelineMode(conn) != 1)
+       pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+                PQerrorMessage(conn));
+
+   /*
+    * Must not have got any notices here; note bug as described in
+    * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+    */
+   if (n_notices > 0)
+       pg_fatal("got %d notice(s)", n_notices);
+   fprintf(stderr, "ok - 1\n");
+
+   /*
+    * Verify that we can send a query using simple query protocol after one
+    * in pipeline mode.
+    */
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+   if (PQsendQuery(conn, "SELECT 1") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s from first pipeline item",
+                PQresStatus(PQresultStatus(res)));
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("got unexpected non-null result");
+   /* We can exit pipeline mode now */
+   if (PQexitPipelineMode(conn) != 1)
+       pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+                PQerrorMessage(conn));
+   res = PQexec(conn, "SELECT 2");
+   if (n_notices > 0)
+       pg_fatal("got %d notice(s)", n_notices);
+   if (res == NULL)
+       pg_fatal("PQexec returned NULL");
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s from non-pipeline query",
+                PQresStatus(PQresultStatus(res)));
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("did not receive terminating NULL");
+   if (n_notices > 0)
+       pg_fatal("got %d notice(s)", n_notices);
+   fprintf(stderr, "ok - 2\n");
+
+   /*
+    * Case 2: exiting pipeline mode is not OK if a second command is sent.
+    */
+
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+   if (PQsendQuery(conn, "SELECT 1") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s from first pipeline item",
+                PQresStatus(PQresultStatus(res)));
+   if (PQsendQuery(conn, "SELECT 2") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   /* read terminating null from first query */
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("did not receive terminating NULL");
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s from first pipeline item",
+                PQresStatus(PQresultStatus(res)));
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("did not receive terminating NULL");
+   if (PQexitPipelineMode(conn) != 1)
+       pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+                PQerrorMessage(conn));
+
+   /* Try to exit pipeline mode in pipeline-idle state */
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+   if (PQsendQuery(conn, "SELECT 1") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s from first pipeline item",
+                PQresStatus(PQresultStatus(res)));
+   PQclear(res);
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("did not receive terminating NULL");
+   if (PQsendQuery(conn, "SELECT 2") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   if (PQexitPipelineMode(conn) == 1)
+       pg_fatal("exiting pipeline succeeded when it shouldn't");
+   if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
+               strlen("cannot exit pipeline mode")) != 0)
+       pg_fatal("did not get expected error; got: %s",
+                PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   res = PQgetResult(conn);
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s from second pipeline item",
+                PQresStatus(PQresultStatus(res)));
+   PQclear(res);
+   res = PQgetResult(conn);
+   if (res != NULL)
+       pg_fatal("did not receive terminating NULL");
+   if (PQexitPipelineMode(conn) != 1)
+       pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+   if (n_notices > 0)
+       pg_fatal("got %d notice(s)", n_notices);
+   fprintf(stderr, "ok - 3\n");
+
+   /* Have a WARNING in the middle of a resultset */
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
+   if (PQsendQuery(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)") != 1)
+       pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+   PQsendFlushRequest(conn);
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("unexpected NULL result received");
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
+   if (PQexitPipelineMode(conn) != 1)
+       pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
+   fprintf(stderr, "ok - 4\n");
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
@@ -1160,6 +1361,8 @@ test_singlerowmode(PGconn *conn)
 
    if (PQexitPipelineMode(conn) != 1)
        pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+   fprintf(stderr, "ok\n");
 }
 
 /*
@@ -1549,6 +1752,7 @@ print_test_list(void)
    printf("multi_pipelines\n");
    printf("nosync\n");
    printf("pipeline_abort\n");
+   printf("pipeline_idle\n");
    printf("pipelined_insert\n");
    printf("prepared\n");
    printf("simple_pipeline\n");
@@ -1630,7 +1834,10 @@ main(int argc, char **argv)
    /* Set the trace file, if requested */
    if (tracefile != NULL)
    {
-       trace = fopen(tracefile, "w");
+       if (strcmp(tracefile, "-") == 0)
+           trace = stdout;
+       else
+           trace = fopen(tracefile, "w");
        if (trace == NULL)
            pg_fatal("could not open file \"%s\": %m", tracefile);
 
@@ -1650,6 +1857,8 @@ main(int argc, char **argv)
        test_nosync(conn);
    else if (strcmp(testname, "pipeline_abort") == 0)
        test_pipeline_abort(conn);
+   else if (strcmp(testname, "pipeline_idle") == 0)
+       test_pipeline_idle(conn);
    else if (strcmp(testname, "pipelined_insert") == 0)
        test_pipelined_insert(conn, numrows);
    else if (strcmp(testname, "prepared") == 0)
index d8d496c995abc3b61ee850b71bc993158324f933..b02928cad29320130386ce206b56edf1b7e97dd9 100644 (file)
@@ -26,7 +26,8 @@ for my $testname (@tests)
    my @extraargs = ('-r', $numrows);
    my $cmptrace = grep(/^$testname$/,
        qw(simple_pipeline nosync multi_pipelines prepared singlerow
-         pipeline_abort transaction disallowed_in_pipeline)) > 0;
+         pipeline_abort pipeline_idle transaction
+         disallowed_in_pipeline)) > 0;
 
    # For a bunch of tests, generate a libpq trace file too.
    my $traceout = "$TestLib::tmp_check/traces/$testname.trace";
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
new file mode 100644 (file)
index 0000000..3957ee4
--- /dev/null
@@ -0,0 +1,93 @@
+F  16  Parse    "" "SELECT 1" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '1'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  4   Sync
+B  5   ReadyForQuery    I
+F  16  Parse    "" "SELECT 1" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '1'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  13  Query    "SELECT 2"
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '2'
+B  13  CommandComplete  "SELECT 1"
+B  5   ReadyForQuery    I
+F  16  Parse    "" "SELECT 1" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '1'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  16  Parse    "" "SELECT 2" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '2'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  16  Parse    "" "SELECT 1" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '1'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  16  Parse    "" "SELECT 2" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '2'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  49  Parse    "" "SELECT pg_catalog.pg_advisory_unlock(1,1)" 0
+F  12  Bind     "" "" 0 0 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  6   Close    P ""
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  43  RowDescription   1 "pg_advisory_unlock" NNNN 0 NNNN 1 -1 0
+B  NN  NoticeResponse   S "WARNING" V "WARNING" C "01000" M "you don't own a lock of type ExclusiveLock" F "SSSS" L "SSSS" R "SSSS" \x00
+B  11  DataRow  1 1 'f'
+B  13  CommandComplete  "SELECT 1"
+B  4   CloseComplete
+F  4   Terminate