Improve efficiency of dblink by using libpq's new row processor API.
authorTom Lane
Wed, 4 Apr 2012 22:39:08 +0000 (18:39 -0400)
committerTom Lane
Wed, 4 Apr 2012 22:39:08 +0000 (18:39 -0400)
This patch provides a test case for libpq's row processor API.
contrib/dblink can deal with very large result sets by dumping them into
a tuplestore (which can spill to disk) --- but until now, the intermediate
storage of the query result in a PGresult meant memory bloat for any large
result.  Now we use a row processor to convert the data to tuple form and
dump it directly into the tuplestore.

A limitation is that this only works for plain dblink() queries, not
dblink_send_query() followed by dblink_get_result().  In the latter
case we don't know the desired tuple rowtype soon enough.  While hack
solutions to that are possible, a different user-level API would
probably be a better answer.

Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane

contrib/dblink/dblink.c
doc/src/sgml/dblink.sgml

index 46c7cc5923f18d3f0d614dbd327c37bef46c4af8..8154cae7bdb90870dabf6cc2473645dbc0e968dc 100644 (file)
@@ -63,12 +63,28 @@ typedef struct remoteConn
    bool        newXactForCursor;       /* Opened a transaction for a cursor */
 } remoteConn;
 
+typedef struct storeInfo
+{
+   FunctionCallInfo fcinfo;
+   Tuplestorestate *tuplestore;
+   AttInMetadata *attinmeta;
+   MemoryContext tmpcontext;
+   char      **cstrs;
+} storeInfo;
+
 /*
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
 static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
+static void materializeQueryResult(FunctionCallInfo fcinfo,
+                      PGconn *conn,
+                      const char *conname,
+                      const char *sql,
+                      bool fail);
+static int storeHandler(PGresult *res, const PGdataValue *columns,
+            const char **errmsgp, void *param);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -629,100 +645,118 @@ dblink_get_result(PG_FUNCTION_ARGS)
 static Datum
 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 {
-   char       *msg;
-   PGresult   *res = NULL;
-   PGconn     *conn = NULL;
-   char       *connstr = NULL;
-   char       *sql = NULL;
-   char       *conname = NULL;
-   remoteConn *rconn = NULL;
-   bool        fail = true;    /* default to backward compatible */
-   bool        freeconn = false;
+   PGconn     *volatile conn = NULL;
+   volatile bool freeconn = false;
 
    prepTuplestoreResult(fcinfo);
 
    DBLINK_INIT;
 
-   if (!is_async)
+   PG_TRY();
    {
-       if (PG_NARGS() == 3)
-       {
-           /* text,text,bool */
-           DBLINK_GET_CONN;
-           sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-           fail = PG_GETARG_BOOL(2);
-       }
-       else if (PG_NARGS() == 2)
+       char       *msg;
+       char       *connstr = NULL;
+       char       *sql = NULL;
+       char       *conname = NULL;
+       remoteConn *rconn = NULL;
+       bool        fail = true;    /* default to backward compatible */
+
+       if (!is_async)
        {
-           /* text,text or text,bool */
-           if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+           if (PG_NARGS() == 3)
            {
+               /* text,text,bool */
+               DBLINK_GET_CONN;
+               sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+               fail = PG_GETARG_BOOL(2);
+           }
+           else if (PG_NARGS() == 2)
+           {
+               /* text,text or text,bool */
+               if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+               {
+                   conn = pconn->conn;
+                   sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+                   fail = PG_GETARG_BOOL(1);
+               }
+               else
+               {
+                   DBLINK_GET_CONN;
+                   sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+               }
+           }
+           else if (PG_NARGS() == 1)
+           {
+               /* text */
                conn = pconn->conn;
                sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-               fail = PG_GETARG_BOOL(1);
            }
            else
+               /* shouldn't happen */
+               elog(ERROR, "wrong number of arguments");
+       }
+       else    /* is_async */
+       {
+           /* get async result */
+           if (PG_NARGS() == 2)
            {
-               DBLINK_GET_CONN;
-               sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+               /* text,bool */
+               DBLINK_GET_NAMED_CONN;
+               fail = PG_GETARG_BOOL(1);
            }
+           else if (PG_NARGS() == 1)
+           {
+               /* text */
+               DBLINK_GET_NAMED_CONN;
+           }
+           else
+               /* shouldn't happen */
+               elog(ERROR, "wrong number of arguments");
        }
-       else if (PG_NARGS() == 1)
+
+       if (!conn)
+           DBLINK_CONN_NOT_AVAIL;
+
+       if (!is_async)
        {
-           /* text */
-           conn = pconn->conn;
-           sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+           /* synchronous query, use efficient tuple collection method */
+           materializeQueryResult(fcinfo, conn, conname, sql, fail);
        }
        else
-           /* shouldn't happen */
-           elog(ERROR, "wrong number of arguments");
-   }
-   else    /* is_async */
-   {
-       /* get async result */
-       if (PG_NARGS() == 2)
-       {
-           /* text,bool */
-           DBLINK_GET_NAMED_CONN;
-           fail = PG_GETARG_BOOL(1);
-       }
-       else if (PG_NARGS() == 1)
        {
-           /* text */
-           DBLINK_GET_NAMED_CONN;
+           /* async result retrieval, do it the old way */
+           PGresult   *res = PQgetResult(conn);
+
+           /* NULL means we're all done with the async results */
+           if (res)
+           {
+               if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+                   PQresultStatus(res) != PGRES_TUPLES_OK)
+               {
+                   dblink_res_error(conname, res, "could not execute query",
+                                    fail);
+                   /* if fail isn't set, we'll return an empty query result */
+               }
+               else
+               {
+                   materializeResult(fcinfo, res);
+               }
+           }
        }
-       else
-           /* shouldn't happen */
-           elog(ERROR, "wrong number of arguments");
    }
-
-   if (!conn)
-       DBLINK_CONN_NOT_AVAIL;
-
-   /* synchronous query, or async result retrieval */
-   if (!is_async)
-       res = PQexec(conn, sql);
-   else
+   PG_CATCH();
    {
-       res = PQgetResult(conn);
-       /* NULL means we're all done with the async results */
-       if (!res)
-           return (Datum) 0;
+       /* if needed, close the connection to the database */
+       if (freeconn)
+           PQfinish(conn);
+       PG_RE_THROW();
    }
+   PG_END_TRY();
 
-   /* if needed, close the connection to the database and cleanup */
+   /* if needed, close the connection to the database */
    if (freeconn)
        PQfinish(conn);
 
-   if (!res ||
-       (PQresultStatus(res) != PGRES_COMMAND_OK &&
-        PQresultStatus(res) != PGRES_TUPLES_OK))
-   {
-       dblink_res_error(conname, res, "could not execute query", fail);
-       return (Datum) 0;
-   }
-
-   materializeResult(fcinfo, res);
    return (Datum) 0;
 }
 
@@ -890,6 +924,259 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
    PG_END_TRY();
 }
 
+/*
+ * Execute the given SQL command and store its results into a tuplestore
+ * to be returned as the result of the current function.
+ * This is equivalent to PQexec followed by materializeResult, but we make
+ * use of libpq's "row processor" API to reduce per-row overhead.
+ */
+static void
+materializeQueryResult(FunctionCallInfo fcinfo,
+                      PGconn *conn,
+                      const char *conname,
+                      const char *sql,
+                      bool fail)
+{
+   ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+   PGresult   *volatile res = NULL;
+   storeInfo   sinfo;
+
+   /* prepTuplestoreResult must have been called previously */
+   Assert(rsinfo->returnMode == SFRM_Materialize);
+
+   PG_TRY();
+   {
+       /* initialize storeInfo to empty */
+       memset(&sinfo, 0, sizeof(sinfo));
+       sinfo.fcinfo = fcinfo;
+
+       /* We'll collect tuples using storeHandler */
+       PQsetRowProcessor(conn, storeHandler, &sinfo);
+
+       res = PQexec(conn, sql);
+
+       /* We don't keep the custom row processor installed permanently */
+       PQsetRowProcessor(conn, NULL, NULL);
+
+       if (!res ||
+           (PQresultStatus(res) != PGRES_COMMAND_OK &&
+            PQresultStatus(res) != PGRES_TUPLES_OK))
+       {
+           /*
+            * dblink_res_error will clear the passed PGresult, so we need
+            * this ugly dance to avoid doing so twice during error exit
+            */
+           PGresult   *res1 = res;
+
+           res = NULL;
+           dblink_res_error(conname, res1, "could not execute query", fail);
+           /* if fail isn't set, we'll return an empty query result */
+       }
+       else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+           /*
+            * storeHandler didn't get called, so we need to convert the
+            * command status string to a tuple manually
+            */
+           TupleDesc   tupdesc;
+           AttInMetadata *attinmeta;
+           Tuplestorestate *tupstore;
+           HeapTuple   tuple;
+           char       *values[1];
+           MemoryContext oldcontext;
+
+           /*
+            * need a tuple descriptor representing one TEXT column to return
+            * the command status string as our result tuple
+            */
+           tupdesc = CreateTemplateTupleDesc(1, false);
+           TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                              TEXTOID, -1, 0);
+           attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+           oldcontext = MemoryContextSwitchTo(
+                                   rsinfo->econtext->ecxt_per_query_memory);
+           tupstore = tuplestore_begin_heap(true, false, work_mem);
+           rsinfo->setResult = tupstore;
+           rsinfo->setDesc = tupdesc;
+           MemoryContextSwitchTo(oldcontext);
+
+           values[0] = PQcmdStatus(res);
+
+           /* build the tuple and put it into the tuplestore. */
+           tuple = BuildTupleFromCStrings(attinmeta, values);
+           tuplestore_puttuple(tupstore, tuple);
+
+           PQclear(res);
+       }
+       else
+       {
+           Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+           /* storeHandler should have created a tuplestore */
+           Assert(rsinfo->setResult != NULL);
+
+           PQclear(res);
+       }
+   }
+   PG_CATCH();
+   {
+       /* be sure to unset the custom row processor */
+       PQsetRowProcessor(conn, NULL, NULL);
+       /* be sure to release any libpq result we collected */
+       if (res)
+           PQclear(res);
+       /* and clear out any pending data in libpq */
+       while ((res = PQskipResult(conn)) != NULL)
+           PQclear(res);
+       PG_RE_THROW();
+   }
+   PG_END_TRY();
+}
+
+/*
+ * Custom row processor for materializeQueryResult.
+ * Prototype of this function must match PQrowProcessor.
+ */
+static int
+storeHandler(PGresult *res, const PGdataValue *columns,
+            const char **errmsgp, void *param)
+{
+   storeInfo  *sinfo = (storeInfo *) param;
+   int         nfields = PQnfields(res);
+   char      **cstrs = sinfo->cstrs;
+   HeapTuple   tuple;
+   char       *pbuf;
+   int         pbuflen;
+   int         i;
+   MemoryContext oldcontext;
+
+   if (columns == NULL)
+   {
+       /* Prepare for new result set */
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
+       TupleDesc   tupdesc;
+
+       /*
+        * It's possible to get more than one result set if the query string
+        * contained multiple SQL commands.  In that case, we follow PQexec's
+        * traditional behavior of throwing away all but the last result.
+        */
+       if (sinfo->tuplestore)
+           tuplestore_end(sinfo->tuplestore);
+       sinfo->tuplestore = NULL;
+
+       /* get a tuple descriptor for our result type */
+       switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
+       {
+           case TYPEFUNC_COMPOSITE:
+               /* success */
+               break;
+           case TYPEFUNC_RECORD:
+               /* failed to determine actual type of RECORD */
+               ereport(ERROR,
+                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                        errmsg("function returning record called in context "
+                               "that cannot accept type record")));
+               break;
+           default:
+               /* result type isn't composite */
+               elog(ERROR, "return type must be a row type");
+               break;
+       }
+
+       /* make sure we have a persistent copy of the tupdesc */
+       tupdesc = CreateTupleDescCopy(tupdesc);
+
+       /* check result and tuple descriptor have the same number of columns */
+       if (nfields != tupdesc->natts)
+           ereport(ERROR,
+                   (errcode(ERRCODE_DATATYPE_MISMATCH),
+                    errmsg("remote query result rowtype does not match "
+                           "the specified FROM clause rowtype")));
+
+       /* Prepare attinmeta for later data conversions */
+       sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+       /* Create a new, empty tuplestore */
+       oldcontext = MemoryContextSwitchTo(
+                                   rsinfo->econtext->ecxt_per_query_memory);
+       sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->setResult = sinfo->tuplestore;
+       rsinfo->setDesc = tupdesc;
+       MemoryContextSwitchTo(oldcontext);
+
+       /*
+        * Set up sufficiently-wide string pointers array; this won't change
+        * in size so it's easy to preallocate.
+        */
+       if (sinfo->cstrs)
+           pfree(sinfo->cstrs);
+       sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
+
+       /* Create short-lived memory context for data conversions */
+       if (!sinfo->tmpcontext)
+           sinfo->tmpcontext =
+               AllocSetContextCreate(CurrentMemoryContext,
+                                     "dblink temporary context",
+                                     ALLOCSET_DEFAULT_MINSIZE,
+                                     ALLOCSET_DEFAULT_INITSIZE,
+                                     ALLOCSET_DEFAULT_MAXSIZE);
+
+       return 1;
+   }
+
+   CHECK_FOR_INTERRUPTS();
+
+   /*
+    * Do the following work in a temp context that we reset after each tuple.
+    * This cleans up not only the data we have direct access to, but any
+    * cruft the I/O functions might leak.
+    */
+   oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
+
+   /*
+    * The strings passed to us are not null-terminated, but the datatype
+    * input functions we're about to call require null termination.  Copy the
+    * strings and add null termination.  As a micro-optimization, allocate
+    * all the strings with one palloc.
+    */
+   pbuflen = nfields;      /* count the null terminators themselves */
+   for (i = 0; i < nfields; i++)
+   {
+       int         len = columns[i].len;
+
+       if (len > 0)
+           pbuflen += len;
+   }
+   pbuf = (char *) palloc(pbuflen);
+
+   for (i = 0; i < nfields; i++)
+   {
+       int         len = columns[i].len;
+
+       if (len < 0)
+           cstrs[i] = NULL;
+       else
+       {
+           cstrs[i] = pbuf;
+           memcpy(pbuf, columns[i].value, len);
+           pbuf += len;
+           *pbuf++ = '\0';
+       }
+   }
+
+   /* Convert row to a tuple, and add it to the tuplestore */
+   tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+
+   tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+   /* Clean up */
+   MemoryContextSwitchTo(oldcontext);
+   MemoryContextReset(sinfo->tmpcontext);
+
+   return 1;
+}
+
 /*
  * List all open dblink connections by name.
  * Returns an array of all connection names.
index 855495c54d5e9ab8034bf4a2cfeffd22fda07da9..72ca765be73cff4f8f915c95d75b623c234f3bd4 100644 (file)
@@ -425,14 +425,6 @@ SELECT *
   
    Notes
 
-   
-    dblink fetches the entire remote query result before
-    returning any of it to the local system.  If the query is expected
-    to return a large number of rows, it's better to open it as a cursor
-    with dblink_open and then fetch a manageable number
-    of rows at a time.
-   
-
    
     A convenient way to use dblink with predetermined
     queries is to create a view.
@@ -1432,6 +1424,18 @@ dblink_get_result(text connname [, bool fail_on_error]) returns setof record
     sent, and one additional time to obtain an empty set result,
     before the connection can be used again.
    
+
+   
+    When using dblink_send_query and
+    dblink_get_result, dblink fetches the entire
+    remote query result before returning any of it to the local query
+    processor.  If the query returns a large number of rows, this can result
+    in transient memory bloat in the local session.  It may be better to open
+    such a query as a cursor with dblink_open and then fetch a
+    manageable number of rows at a time.  Alternatively, use plain
+    dblink(), which avoids memory bloat by spooling large result
+    sets to disk.
+