libpq: Add PQsendPipelineSync()
authorMichael Paquier
Tue, 16 Jan 2024 01:13:42 +0000 (10:13 +0900)
committerMichael Paquier
Tue, 16 Jan 2024 01:13:42 +0000 (10:13 +0900)
This new function is equivalent to PQpipelineSync(), except that it does
not flush anything to the server except if the size threshold of the
output buffer is reached; the user must subsequently call PQflush()
instead.

Its purpose is to reduce the system call overhead of pipeline mode, by
giving to applications more control over the timing of the flushes when
manipulating commands in pipeline mode.

Author: Anton Kirilov
Reviewed-by: Jelte Fennema-Nio, Robert Haas, Álvaro Herrera, Denis
Laxalde, Michael Paquier
Discussion: https://postgr.es/m/CACV6eE5arHFZEA717=iKEa_OewpVFfWJOmsOdGrqqsr8CJVfWQ@mail.gmail.com

doc/src/sgml/libpq.sgml
src/interfaces/libpq/exports.txt
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/libpq-fe.h
src/test/modules/libpq_pipeline/libpq_pipeline.c
src/test/modules/libpq_pipeline/traces/multi_pipelines.trace

index 0bbb84744e655bf77ecdf4e9d1cb56dfdb02c28f..173ab779a08540241bc5c8d4ad4a8c959156abb4 100644 (file)
@@ -3547,8 +3547,9 @@ ExecStatusType PQresultStatus(const PGresult *res);
           
            
             The PGresult represents a
-            synchronization point in pipeline mode, requested by
-            .
+            synchronization point in pipeline mode, requested by either
+             or
+            .
             This status occurs only when pipeline mode has been selected.
            
           
@@ -5122,7 +5123,8 @@ int PQsendClosePortal(PGconn *conn, const char *portalName);
        ,
        ,
        ,
-       , or
+       ,
+       , or
        
        call, and returns it.
        A null pointer is returned when the command is complete and there
@@ -5507,8 +5509,9 @@ int PQflush(PGconn *conn);
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
      Note that results are buffered on the server side; the server flushes
-     that buffer when a synchronization point is established with
-     PQpipelineSync, or when
+     that buffer when a synchronization point is established with either
+     PQpipelineSync or
+     PQsendPipelineSync, or when
      PQsendFlushRequest is called.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
@@ -5565,7 +5568,8 @@ int PQflush(PGconn *conn);
      PGresult types PGRES_PIPELINE_SYNC
      and PGRES_PIPELINE_ABORTED.
      PGRES_PIPELINE_SYNC is reported exactly once for each
-     PQpipelineSync at the corresponding point
+     PQpipelineSync or
+     PQsendPipelineSync at the corresponding point
      in the pipeline.
      PGRES_PIPELINE_ABORTED is emitted in place of a normal
      query result for the first error and all subsequent results
@@ -5603,7 +5607,8 @@ int PQflush(PGconn *conn);
      PQresultStatus will report a
      PGRES_PIPELINE_ABORTED result for each remaining queued
      operation in an aborted pipeline. The result for
-     PQpipelineSync is reported as
+     PQpipelineSync or
+     PQsendPipelineSync is reported as
      PGRES_PIPELINE_SYNC to signal the end of the aborted pipeline
      and resumption of normal result processing.
     
@@ -5810,6 +5815,32 @@ int PQpipelineSync(PGconn *conn);
      
     
 
+    
+     PQsendPipelineSyncPQsendPipelineSync
+
+     
+      
+       Marks a synchronization point in a pipeline by sending a
+       sync message
+       without flushing the send buffer. This serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see .
+
+
+int PQsendPipelineSync(PGconn *conn);
+
+      
+      
+       Returns 1 for success. Returns 0 if the connection is not in
+       pipeline mode or sending a
+       sync message
+       failed.
+       Note that the message is not itself flushed to the server automatically;
+       use PQflush if necessary.
+      
+     
+    
+
     
      PQsendFlushRequestPQsendFlushRequest
 
index 28b861fd93bcfcb26523ee6502fa970a9306d9b6..088592deb160fb4774fec78b2af3d8a0115dc9e1 100644 (file)
@@ -192,3 +192,4 @@ PQclosePortal             189
 PQsendClosePrepared       190
 PQsendClosePortal         191
 PQchangePassword          192
+PQsendPipelineSync        193
index e1768d5475d3e18d3d0a1c26f2b54899d1f8eb2e..52d41658c1f32e39f6d478fd3cfe5741b3f59c85 100644 (file)
@@ -81,6 +81,7 @@ static int    PQsendTypedCommand(PGconn *conn, char command, char type,
                               const char *target);
 static int check_field_number(const PGresult *res, int field_num);
 static void pqPipelineProcessQueue(PGconn *conn);
+static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush);
 static int pqPipelineFlush(PGconn *conn);
 
 
@@ -3224,25 +3225,48 @@ pqPipelineProcessQueue(PGconn *conn)
 /*
  * PQpipelineSync
  *     Send a Sync message as part of a pipeline, and flush to server
+ */
+int
+PQpipelineSync(PGconn *conn)
+{
+   return pqPipelineSyncInternal(conn, true);
+}
+
+/*
+ * PQsendPipelineSync
+ *     Send a Sync message as part of a pipeline, without flushing to server
+ */
+int
+PQsendPipelineSync(PGconn *conn)
+{
+   return pqPipelineSyncInternal(conn, false);
+}
+
+/*
+ * Workhorse function for PQpipelineSync and PQsendPipelineSync.
  *
  * It's legal to start submitting more commands in the pipeline immediately,
  * without waiting for the results of the current pipeline. There's no need to
  * end pipeline mode and start it again.
  *
- * If a command in a pipeline fails, every subsequent command up to and including
- * the result to the Sync message sent by PQpipelineSync gets set to
- * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
- * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ * If a command in a pipeline fails, every subsequent command up to and
+ * including the result to the Sync message sent by pqPipelineSyncInternal
+ * gets set to PGRES_PIPELINE_ABORTED state. If the whole pipeline is
+ * processed without error, a PGresult with PGRES_PIPELINE_SYNC is produced.
  *
- * Queries can already have been sent before PQpipelineSync is called, but
- * PQpipelineSync needs to be called before retrieving command results.
+ * Queries can already have been sent before pqPipelineSyncInternal is called,
+ * but pqPipelineSyncInternal needs to be called before retrieving command
+ * results.
  *
  * The connection will remain in pipeline mode and unavailable for new
  * synchronous command execution functions until all results from the pipeline
  * are processed by the client.
+ *
+ * immediate_flush controls if the flush happens immediately after sending the
+ * Sync message or not.
  */
-int
-PQpipelineSync(PGconn *conn)
+static int
+pqPipelineSyncInternal(PGconn *conn, bool immediate_flush)
 {
    PGcmdQueueEntry *entry;
 
@@ -3288,9 +3312,19 @@ PQpipelineSync(PGconn *conn)
    /*
     * Give the data a push.  In nonblock mode, don't complain if we're unable
     * to send it all; PQgetResult() will do any additional flushing needed.
+    * If immediate_flush is disabled, the data is pushed if we are past the
+    * size threshold.
     */
-   if (PQflush(conn) < 0)
-       goto sendFailed;
+   if (immediate_flush)
+   {
+       if (pqFlush(conn) < 0)
+           goto sendFailed;
+   }
+   else
+   {
+       if (pqPipelineFlush(conn) < 0)
+           goto sendFailed;
+   }
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
index f0ec660cb69ff1fd8797ec7afff6301f0f4e3eb9..defc415fa3fb8e380cad27a62356819c229137b8 100644 (file)
@@ -474,6 +474,7 @@ extern int  PQenterPipelineMode(PGconn *conn);
 extern int PQexitPipelineMode(PGconn *conn);
 extern int PQpipelineSync(PGconn *conn);
 extern int PQsendFlushRequest(PGconn *conn);
+extern int PQsendPipelineSync(PGconn *conn);
 
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
index 71cd04c5f2332de3b31bf8059a88871feb377d05..5f43aa40de4399d5f02ca57f64a68d2e48827281 100644 (file)
@@ -162,6 +162,7 @@ test_multi_pipelines(PGconn *conn)
    if (PQenterPipelineMode(conn) != 1)
        pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
 
+   /* first pipeline */
    if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
                          dummy_params, NULL, NULL, 0) != 1)
        pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
@@ -169,14 +170,27 @@ test_multi_pipelines(PGconn *conn)
    if (PQpipelineSync(conn) != 1)
        pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
 
+   /* second pipeline */
    if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
                          dummy_params, NULL, NULL, 0) != 1)
        pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
 
+   /* Skip flushing once. */
+   if (PQsendPipelineSync(conn) != 1)
+       pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+   /* third pipeline */
+   if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+                         dummy_params, NULL, NULL, 0) != 1)
+       pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
+
    if (PQpipelineSync(conn) != 1)
        pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
 
    /* OK, start processing the results */
+
+   /* first pipeline */
+
    res = PQgetResult(conn);
    if (res == NULL)
        pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
@@ -214,6 +228,35 @@ test_multi_pipelines(PGconn *conn)
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
        pg_fatal("Unexpected result code %s from second pipeline item",
                 PQresStatus(PQresultStatus(res)));
+   PQclear(res);
+   res = NULL;
+
+   if (PQgetResult(conn) != NULL)
+       pg_fatal("PQgetResult returned something extra after first result");
+
+   if (PQexitPipelineMode(conn) != 0)
+       pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when sync result expected: %s",
+                PQerrorMessage(conn));
+
+   if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+       pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+                PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+   PQclear(res);
+
+   /* third pipeline */
+
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+                PQerrorMessage(conn));
+
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       pg_fatal("Unexpected result code %s from third pipeline item",
+                PQresStatus(PQresultStatus(res)));
 
    res = PQgetResult(conn);
    if (res != NULL)
index 4b9ab07ca42f8053e7d02f50fa9c7c53387eb526..1ee21f61dce13175d62910caed7010257e8530e9 100644 (file)
@@ -8,6 +8,17 @@ F  19  Bind     "" "" 0 1 1 '1' 1 0
 F  6   Describe     P ""
 F  9   Execute  "" 0
 F  4   Sync
+F  21  Parse    "" "SELECT $1" 1 NNNN
+F  19  Bind     "" "" 0 1 1 '1' 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  4   Sync
+B  4   ParseComplete
+B  4   BindComplete
+B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0
+B  11  DataRow  1 1 '1'
+B  13  CommandComplete  "SELECT 1"
+B  5   ReadyForQuery    I
 B  4   ParseComplete
 B  4   BindComplete
 B  33  RowDescription   1 "?column?" NNNN 0 NNNN 4 -1 0