Attached is a fairly sizeable update to contrib/dblink. I'd love to get
authorBruce Momjian
Mon, 2 Sep 2002 06:13:31 +0000 (06:13 +0000)
committerBruce Momjian
Mon, 2 Sep 2002 06:13:31 +0000 (06:13 +0000)
review/feedback if anyone is interested and can spend the time. But I'd
also love to get this committed and address changes as incremental
patches ;-), so if there are no objections, please apply.

Below I'll give a synopsis of the changes. More detailed descriptions
are now in a new doc directory under contrib/dblink. There is also a new

dblink.test.sql file which will give a pretty good overview of the
functions and their use.

Joe Conway

contrib/README
contrib/dblink/README.dblink
contrib/dblink/dblink.c
contrib/dblink/dblink.h
contrib/dblink/dblink.sql.in

index 2b0085483e6ea1862c587aa52bf7ccc95a8458a4..0d3b04fd1f90f8b7cfe315b5c54ea52e7551a960 100644 (file)
@@ -48,7 +48,7 @@ dbase -
 
 dblink -
    Allows remote query execution
-   by Joe Conway <joe.conway@mail.com>
+   by Joe Conway <mail@joeconway.com>
 
 dbmirror -
    Replication server
@@ -73,7 +73,7 @@ fulltextindex -
 
 fuzzystrmatch -
    Levenshtein, metaphone, and soundex fuzzy string matching
-   by Joe Conway <joseph.conway@home.com>, Joel Burton 
+   by Joe Conway <mail@joeconway.com>, Joel Burton 
 
 intagg -
    Integer aggregator
index 8e6adf069f13dc0dc80a8d5dd34cafa9ba488520..f304b7729d19cf34db6f1d0181e58a9a00e4391f 100644 (file)
@@ -3,7 +3,9 @@
  *
  * Functions returning results from a remote database
  *
- * Copyright (c) Joseph Conway , 2001, 2002,
+ * Joe Conway 
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  * 
  * Permission to use, copy, modify, and distribute this software and its
  *
  */
 
-
-Version 0.4 (7 April, 2002):
-  Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
-  various utility functions.
-  Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
+Version 0.5 (25 August, 2002):
+  Major overhaul to work with new backend "table function" capability. Removed
+  dblink_strtok() and dblink_replace() functions because they are now
+  available as backend functions (split() and replace() respectively).
+  Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version
+  is no longer backwards portable to PostgreSQL 7.2.
 
 Release Notes:
+  Version 0.5
+    - dblink now supports use directly as a table function; this is the new
+      preferred usage going forward
+    - Use of dblink_tok is now deprecated; original form of dblink is also
+      deprecated. They _will_ be removed in the next version.
+    - dblink_last_oid is also deprecated; use dblink_exec() which returns
+      the command status as a single row, single column result.
+    - Original dblink, dblink_tok, and dblink_last_oid are commented out in
+      dblink.sql; remove the comments to use the deprecated functions.
+    - dblink_strtok() and dblink_replace() functions were removed. Use
+      split() and replace() respectively (new backend functions in
+      PostgreSQL 7.3) instead.
+    - New functions: dblink_exec() for non-SELECT queries; dblink_connect()
+      opens connection that persists for duration of a backend;
+      dblink_disconnect() closes a persistent connection; dblink_open()
+      opens a cursor; dblink_fetch() fetches results from an open cursor.
+      dblink_close() closes a cursor.
+    - New test suite: dblink_check.sh, dblink.test.sql,
+      dblink.test.expected.out. Execute dblink_check.sh from the same
+      directory as the other two files. Output is dblink.test.out and
+      dblink.test.diff. Note that dblink.test.sql is a good source
+      of example usage.
 
   Version 0.4
     - removed cursor wrap around input sql to allow for remote
@@ -59,16 +84,48 @@ Installation:
 
   installs following functions into database template1:
 
-     dblink(text,text) RETURNS setof int
-       - returns a resource id for results from remote query
-     dblink_tok(int,int) RETURNS text
-       - extracts and returns individual field results
-     dblink_strtok(text,text,int) RETURNS text
-       - extracts and returns individual token from delimited text
+     connection
+     ------------
+     dblink_connect(text) RETURNS text
+       - opens a connection that will persist for duration of current
+         backend or until it is disconnected
+     dblink_disconnect() RETURNS text
+       - disconnects a persistent connection
+
+     cursor
+     ------------
+     dblink_open(text,text) RETURNS text
+       - opens a cursor using connection already opened with dblink_connect()
+         that will persist for duration of current backend or until it is
+         closed
+     dblink_fetch(text, int) RETURNS setof record
+       - fetches data from an already opened cursor
+     dblink_close(text) RETURNS text
+       - closes a cursor
+
+     query
+     ------------
+     dblink(text,text) RETURNS setof record
+       - returns a set of results from remote SELECT query
+         (Note: comment out in dblink.sql to use deprecated version)
+     dblink(text) RETURNS setof record
+       - returns a set of results from remote SELECT query, using connection
+         already opened with dblink_connect()
+
+     execute
+     ------------
+     dblink_exec(text, text) RETURNS text
+       - executes an INSERT/UPDATE/DELETE query remotely
+     dblink_exec(text) RETURNS text
+       - executes an INSERT/UPDATE/DELETE query remotely, using connection
+         already opened with dblink_connect()
+
+     misc
+     ------------
+     dblink_current_query() RETURNS text
+       - returns the current query string
      dblink_get_pkey(text) RETURNS setof text
        - returns the field names of a relation's primary key fields
-     dblink_last_oid(int) RETURNS oid
-       - returns the last inserted oid
      dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
        - builds an insert statement using a local tuple, replacing the
          selection key field values with alternate supplied values
@@ -78,338 +135,30 @@ Installation:
      dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
        - builds an update statement using a local tuple, replacing the
          selection key field values with alternate supplied values
-     dblink_current_query() RETURNS text
-       - returns the current query string
-     dblink_replace(text,text,text) RETURNS text
-       - replace all occurences of substring-a in the input-string 
-         with substring-b
-
-Documentation
-==================================================================
-Name
-
-dblink -- Returns a resource id for a data set from a remote database
-
-Synopsis
-
-dblink(text connstr, text sql)
-
-Inputs
-
-  connstr
-
-    standard libpq format connection srting, 
-    e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
-
-  sql
-
-    sql statement that you wish to execute on the remote host
-    e.g. "select * from pg_class"
-
-Outputs
-
-  Returns setof int (res_id)
-
-Example usage
-
-  select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
-               ,'select f1, f2 from mytable');
-
-
-==================================================================
-
-Name
-
-dblink_tok -- Returns individual select field results from a dblink remote query
-
-Synopsis
-
-dblink_tok(int res_id, int fnumber)
-
-Inputs
-
-  res_id
-
-    a resource id returned by a call to dblink()
-
-  fnumber
-
-    the ordinal position (zero based) of the field to be returned from the dblink result set
-
-Outputs
-
-  Returns text
-
-Example usage
-
-  select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
-  from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
-                     ,'select f1, f2 from mytable') as dblink_p) as t1;
-
-
-==================================================================
-
-A more convenient way to use dblink may be to create a view:
-
- create view myremotetable as
- select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
- from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=template1 user=postgres password=postgres'
-                    ,'select proname, prosrc from pg_proc') as dblink_p) as t1;
-
-Then you can simply write:
-
-   select f1, f2 from myremotetable where f1 like 'bytea%';
-
-==================================================================
-Name
-
-dblink_strtok -- Extracts and returns individual token from delimited text
-
-Synopsis
-
-dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
-
-Inputs
-
-  inputstring
-
-    any string you want to parse a token out of;
-    e.g. 'f=1&g=3&h=4'
-
-  delimiter
-
-    a single character to use as the delimiter;
-    e.g. '&' or '='
-
-  posn
-
-    the position of the token of interest, 0 based;
-    e.g. 1
-
-Outputs
-
-  Returns text
-
-Example usage
-
-test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
- dblink_strtok
----------------
- 3
-(1 row)
-
-==================================================================
-Name
-
-dblink_get_pkey -- returns the field names of a relation's primary
-                   key fields
-
-Synopsis
-
-dblink_get_pkey(text relname) RETURNS setof text
-
-Inputs
-
-  relname
-
-    any relation name;
-    e.g. 'foobar'
-
-Outputs
-
-  Returns setof text -- one row for each primary key field, in order of
-                        precedence
 
-Example usage
-
-test=# select dblink_get_pkey('foobar');
- dblink_get_pkey
------------------
- f1
- f2
- f3
- f4
- f5
-(5 rows)
-
-
-==================================================================
-Name
-
-dblink_last_oid -- Returns last inserted oid
-
-Synopsis
-
-dblink_last_oid(int res_id) RETURNS oid
-
-Inputs
-
-  res_id
-
-    any resource id returned by dblink function;
-
-Outputs
-
-  Returns oid of last inserted tuple
-
-Example usage
-
-test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
-               ,'insert into mytable (f1, f2) values (1,2)'));
-
- dblink_last_oid
-----------------
- 16553
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_build_sql_insert -- builds an insert statement using a local
-                           tuple, replacing the selection key field
-                           values with alternate supplied values
-dblink_build_sql_delete -- builds a delete statement using supplied
-                           values for selection key field values
-dblink_build_sql_update -- builds an update statement using a local
-                           tuple, replacing the selection key field
-                           values with alternate supplied values
-
-
-Synopsis
-
-dblink_build_sql_insert(text relname
-                         ,int2vector primary_key_attnums
-                         ,int2 num_primary_key_atts
-                         ,_text src_pk_att_vals_array
-                         ,_text tgt_pk_att_vals_array) RETURNS text
-dblink_build_sql_delete(text relname
-                         ,int2vector primary_key_attnums
-                         ,int2 num_primary_key_atts
-                         ,_text tgt_pk_att_vals_array) RETURNS text
-dblink_build_sql_update(text relname
-                         ,int2vector primary_key_attnums
-                         ,int2 num_primary_key_atts
-                         ,_text src_pk_att_vals_array
-                         ,_text tgt_pk_att_vals_array) RETURNS text
-
-Inputs
-
-  relname
-
-    any relation name;
-    e.g. 'foobar'
-
-  primary_key_attnums
-
-    vector of primary key attnums (1 based, see pg_index.indkey);
-    e.g. '1 2'
-
-  num_primary_key_atts
-
-    number of primary key attnums in the vector; e.g. 2
-
-  src_pk_att_vals_array
-
-    array of primary key values, used to look up the local matching
-    tuple, the values of which are then used to construct the SQL
-    statement
-
-  tgt_pk_att_vals_array
-
-    array of primary key values, used to replace the local tuple
-    values in the SQL statement
-
-Outputs
-
-  Returns text -- requested SQL statement
-
-Example usage
-
-test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
-             dblink_build_sql_insert
---------------------------------------------------
- INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
-(1 row)
-
-test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
-           dblink_build_sql_delete
----------------------------------------------
- DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
-(1 row)
-
-test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
-                   dblink_build_sql_update
--------------------------------------------------------------
- UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_current_query -- returns the current query string
-
-Synopsis
-
-dblink_current_query () RETURNS text
-
-Inputs
-
-  None
-
-Outputs
-
-  Returns text -- a copy of the currently executing query
-
-Example usage
-
-test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
-                                                                dblink_current_query
------------------------------------------------------------------------------------------------------------------------------------------------------
- select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
-(1 row)
-
-
-==================================================================
-Name
-
-dblink_replace -- replace all occurences of substring-a in the
-                  input-string with substring-b
-
-Synopsis
-
-dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
-
-Inputs
-
-  input-string
-
-    the starting string, before replacement of substring-a
-
-  substring-a
-
-    the substring to find and replace
-
-  substring-b
-
-    the substring to be substituted in place of substring-a
-
-Outputs
-
-  Returns text -- a copy of the starting string, but with all occurences of
-                  substring-a replaced with substring-b
+  Not installed by default
+     deprecated
+     ------------
+     dblink(text,text) RETURNS setof int
+       - *DEPRECATED* returns a resource id for results from remote query
+         (Note: must uncomment in dblink.sql to use)
+     dblink_tok(int,int) RETURNS text
+       - *DEPRECATED* extracts and returns individual field results; used
+         only in conjunction with the *DEPRECATED* form of dblink
+         (Note: must uncomment in dblink.sql to use)
+     dblink_last_oid(int) RETURNS oid
+       - *DEPRECATED* returns the last inserted oid
 
-Example usage
+Documentation:
 
-test=# select dblink_replace('12345678901234567890','56','hello');
-       dblink_replace
-----------------------------
- 1234hello78901234hello7890
-(1 row)
+  See the following files:
+     doc/connection
+     doc/cursor
+     doc/query
+     doc/execute
+     doc/misc
+     doc/deprecated
 
 ==================================================================
-
-
 -- Joe Conway
 
index 0401e06f4f123e75ca29b3aa5704e5cd67108ed3..a6ede5ae1c1c81cd71c11f43f7ae06fe6cef84c7 100644 (file)
@@ -3,7 +3,9 @@
  *
  * Functions returning results from a remote database
  *
- * Copyright (c) Joseph Conway , 2001, 2002,
+ * Joe Conway 
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
  * Permission to use, copy, modify, and distribute this software and its
  *
  */
 
-#include "dblink.h"
+#include 
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "libpq-int.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/tupdesc.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_index.h"
+#include "catalog/pg_type.h"
+#include "executor/executor.h"
+#include "executor/spi.h"
+#include "lib/stringinfo.h"
+#include "nodes/nodes.h"
+#include "nodes/execnodes.h"
+#include "nodes/pg_list.h"
+#include "parser/parse_type.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
+#include "dblink.h"
 
 /*
  * Internal declarations
  */
 static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
-static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
 static char **get_pkey_attnames(Oid relid, int16 *numatts);
-static char *get_strtok(char *fldtext, char *fldsep, int fldnum);
 static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
 static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
 static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
@@ -43,14 +68,593 @@ static char *quote_ident_cstr(char *rawstr);
 static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
 static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
 static Oid get_relid_from_relname(text *relname_text);
-static dblink_results  *get_res_ptr(int32 res_id_index);
+static dblink_results *get_res_ptr(int32 res_id_index);
 static void append_res_ptr(dblink_results *results);
 static void remove_res_ptr(dblink_results *results);
+static TupleDesc pgresultGetTupleDesc(PGresult *res);
 
 /* Global */
-List   *res_id = NIL;
-int        res_id_index = 0;
+List      *res_id = NIL;
+int            res_id_index = 0;
+PGconn    *persistent_conn = NULL;
+
+#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
+#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
+#define xpfree(var_) \
+   do { \
+       if (var_ != NULL) \
+       { \
+           pfree(var_); \
+           var_ = NULL; \
+       } \
+   } while (0)
+
+
+/*
+ * Create a persistent connection to another database
+ */
+PG_FUNCTION_INFO_V1(dblink_connect);
+Datum
+dblink_connect(PG_FUNCTION_ARGS)
+{
+   char           *connstr = GET_STR(PG_GETARG_TEXT_P(0));
+   char           *msg;
+   text           *result_text;
+   MemoryContext   oldcontext;
+
+   if (persistent_conn != NULL)
+       PQfinish(persistent_conn);
+
+   oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+   persistent_conn = PQconnectdb(connstr);
+   MemoryContextSwitchTo(oldcontext);
+
+   if (PQstatus(persistent_conn) == CONNECTION_BAD)
+   {
+       msg = pstrdup(PQerrorMessage(persistent_conn));
+       PQfinish(persistent_conn);
+       persistent_conn = NULL;
+       elog(ERROR, "dblink_connect: connection error: %s", msg);
+   }
+
+   result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+   PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Clear a persistent connection to another database
+ */
+PG_FUNCTION_INFO_V1(dblink_disconnect);
+Datum
+dblink_disconnect(PG_FUNCTION_ARGS)
+{
+   text           *result_text;
+
+   if (persistent_conn != NULL)
+       PQfinish(persistent_conn);
+
+   persistent_conn = NULL;
+
+   result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+   PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * opens a cursor using a persistent connection
+ */
+PG_FUNCTION_INFO_V1(dblink_open);
+Datum
+dblink_open(PG_FUNCTION_ARGS)
+{
+   char           *msg;
+   PGresult       *res = NULL;
+   PGconn         *conn = NULL;
+   text           *result_text;
+   char           *curname = GET_STR(PG_GETARG_TEXT_P(0));
+   char           *sql = GET_STR(PG_GETARG_TEXT_P(1));
+   StringInfo      str = makeStringInfo();
+
+   if (persistent_conn != NULL)
+       conn = persistent_conn;
+   else
+       elog(ERROR, "dblink_open: no connection available");
+
+   res = PQexec(conn, "BEGIN");
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+   {
+       msg = pstrdup(PQerrorMessage(conn));
+       PQclear(res);
+
+       PQfinish(conn);
+       persistent_conn = NULL;
+
+       elog(ERROR, "dblink_open: begin error: %s", msg);
+   }
+   PQclear(res);
+
+   appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql);
+   res = PQexec(conn, str->data);
+   if (!res ||
+       (PQresultStatus(res) != PGRES_COMMAND_OK &&
+        PQresultStatus(res) != PGRES_TUPLES_OK))
+   {
+       msg = pstrdup(PQerrorMessage(conn));
+
+       PQclear(res);
+
+       PQfinish(conn);
+       persistent_conn = NULL;
+
+       elog(ERROR, "dblink: sql error: %s", msg);
+   }
+
+   result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+   PG_RETURN_TEXT_P(result_text);
+}
 
+/*
+ * closes a cursor
+ */
+PG_FUNCTION_INFO_V1(dblink_close);
+Datum
+dblink_close(PG_FUNCTION_ARGS)
+{
+   PGconn         *conn = NULL;
+   PGresult       *res = NULL;
+   char           *curname = GET_STR(PG_GETARG_TEXT_P(0));
+   StringInfo      str = makeStringInfo();
+   text           *result_text;
+   char           *msg;
+
+   if (persistent_conn != NULL)
+       conn = persistent_conn;
+   else
+       elog(ERROR, "dblink_close: no connection available");
+
+   appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));
+
+   /* close the cursor */
+   res = PQexec(conn, str->data);
+   if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+   {
+       msg = pstrdup(PQerrorMessage(conn));
+       PQclear(res);
+
+       PQfinish(persistent_conn);
+       persistent_conn = NULL;
+
+       elog(ERROR, "dblink_close: sql error: %s", msg);
+   }
+
+   PQclear(res);
+
+   /* commit the transaction */
+   res = PQexec(conn, "COMMIT");
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+   {
+       msg = pstrdup(PQerrorMessage(conn));
+       PQclear(res);
+
+       PQfinish(persistent_conn);
+       persistent_conn = NULL;
+
+       elog(ERROR, "dblink_close: commit error: %s", msg);
+   }
+   PQclear(res);
+
+   result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK")));
+   PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Fetch results from an open cursor
+ */
+PG_FUNCTION_INFO_V1(dblink_fetch);
+Datum
+dblink_fetch(PG_FUNCTION_ARGS)
+{
+   FuncCallContext    *funcctx;
+   TupleDesc           tupdesc = NULL;
+   int                 call_cntr;
+   int                 max_calls;
+   TupleTableSlot     *slot;
+   AttInMetadata      *attinmeta;
+   char               *msg;
+   PGresult           *res = NULL;
+   MemoryContext       oldcontext;
+
+   /* stuff done only on the first call of the function */
+   if(SRF_IS_FIRSTCALL())
+   {
+       Oid             functypeid;
+       char            functyptype;
+       Oid             funcid = fcinfo->flinfo->fn_oid;
+       PGconn         *conn = NULL;
+       StringInfo      str = makeStringInfo();
+       char           *curname = GET_STR(PG_GETARG_TEXT_P(0));
+       int             howmany = PG_GETARG_INT32(1);
+
+       /* create a function context for cross-call persistence */
+       funcctx = SRF_FIRSTCALL_INIT();
+
+       /* switch to memory context appropriate for multiple function calls */
+       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+       if (persistent_conn != NULL)
+           conn = persistent_conn;
+       else
+           elog(ERROR, "dblink_fetch: no connection available");
+
+       appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname));
+
+       res = PQexec(conn, str->data);
+       if (!res ||
+           (PQresultStatus(res) != PGRES_COMMAND_OK &&
+            PQresultStatus(res) != PGRES_TUPLES_OK))
+       {
+           msg = pstrdup(PQerrorMessage(conn));
+           PQclear(res);
+
+           PQfinish(persistent_conn);
+           persistent_conn = NULL;
+
+           elog(ERROR, "dblink_fetch: sql error: %s", msg);
+       }
+       else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+           /* cursor does not exist - closed already or bad name */
+           PQclear(res);
+           elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname));
+       }
+
+       funcctx->max_calls = PQntuples(res);
+
+       /* got results, keep track of them */
+       funcctx->user_fctx = res;
+
+       /* fast track when no results */
+       if (funcctx->max_calls < 1)
+           SRF_RETURN_DONE(funcctx);
+
+       /* check typtype to see if we have a predetermined return type */
+       functypeid = get_func_rettype(funcid);
+       functyptype = get_typtype(functypeid);
+
+       if (functyptype == 'c')
+           tupdesc = TypeGetTupleDesc(functypeid, NIL);
+       else if (functyptype == 'p' && functypeid == RECORDOID)
+           tupdesc = pgresultGetTupleDesc(res);
+       else if (functyptype == 'b')
+           elog(ERROR, "dblink_fetch: invalid kind of return type specified for function");
+       else
+           elog(ERROR, "dblink_fetch: unknown kind of return type specified for function");
+
+       /* store needed metadata for subsequent calls */
+       slot = TupleDescGetSlot(tupdesc);
+       funcctx->slot = slot;
+       attinmeta = TupleDescGetAttInMetadata(tupdesc);
+       funcctx->attinmeta = attinmeta;
+
+       MemoryContextSwitchTo(oldcontext);
+    }
+
+   /* stuff done on every call of the function */
+   funcctx = SRF_PERCALL_SETUP();
+
+   /*
+    * initialize per-call variables
+    */
+   call_cntr = funcctx->call_cntr;
+   max_calls = funcctx->max_calls;
+
+   slot = funcctx->slot;
+
+   res = (PGresult *) funcctx->user_fctx;
+   attinmeta = funcctx->attinmeta;
+   tupdesc = attinmeta->tupdesc;
+
+   if (call_cntr < max_calls)  /* do when there is more left to send */
+   {
+       char      **values;
+       HeapTuple   tuple;
+       Datum       result;
+       int     i;
+       int     nfields = PQnfields(res);
+
+       values = (char **) palloc(nfields * sizeof(char *));
+       for (i = 0; i < nfields; i++)
+       {
+           if (PQgetisnull(res, call_cntr, i) == 0)
+               values[i] = PQgetvalue(res, call_cntr, i);
+           else
+               values[i] = NULL;
+       }
+
+       /* build the tuple */
+       tuple = BuildTupleFromCStrings(attinmeta, values);
+
+       /* make the tuple into a datum */
+       result = TupleGetDatum(slot, tuple);
+
+       SRF_RETURN_NEXT(funcctx, result);
+   }
+   else    /* do when there is no more left */
+   {
+       PQclear(res);
+       SRF_RETURN_DONE(funcctx);
+   }
+}
+
+/*
+ * Note: this is the new preferred version of dblink
+ */
+PG_FUNCTION_INFO_V1(dblink_record);
+Datum
+dblink_record(PG_FUNCTION_ARGS)
+{
+   FuncCallContext    *funcctx;
+   TupleDesc           tupdesc = NULL;
+   int                 call_cntr;
+   int                 max_calls;
+   TupleTableSlot     *slot;
+   AttInMetadata      *attinmeta;
+   char               *msg;
+   PGresult           *res = NULL;
+   bool                is_sql_cmd = false;
+   char               *sql_cmd_status = NULL;
+   MemoryContext       oldcontext;
+
+   /* stuff done only on the first call of the function */
+   if(SRF_IS_FIRSTCALL())
+   {
+       Oid             functypeid;
+       char            functyptype;
+       Oid             funcid = fcinfo->flinfo->fn_oid;
+       PGconn         *conn = NULL;
+       char           *connstr = NULL;
+       char           *sql = NULL;
+
+       /* create a function context for cross-call persistence */
+       funcctx = SRF_FIRSTCALL_INIT();
+
+       /* switch to memory context appropriate for multiple function calls */
+       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+       if (fcinfo->nargs == 2)
+       {
+           connstr = GET_STR(PG_GETARG_TEXT_P(0));
+           sql = GET_STR(PG_GETARG_TEXT_P(1));
+
+           conn = PQconnectdb(connstr);
+           if (PQstatus(conn) == CONNECTION_BAD)
+           {
+               msg = pstrdup(PQerrorMessage(conn));
+               PQfinish(conn);
+               elog(ERROR, "dblink: connection error: %s", msg);
+           }
+       }
+       else if (fcinfo->nargs == 1)
+       {
+           sql = GET_STR(PG_GETARG_TEXT_P(0));
+
+           if (persistent_conn != NULL)
+               conn = persistent_conn;
+           else
+               elog(ERROR, "dblink: no connection available");
+       }
+       else
+           elog(ERROR, "dblink: wrong number of arguments");
+
+       res = PQexec(conn, sql);
+       if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+       {
+           msg = pstrdup(PQerrorMessage(conn));
+           PQclear(res);
+           PQfinish(conn);
+           if (fcinfo->nargs == 1)
+               persistent_conn = NULL;
+
+           elog(ERROR, "dblink: sql error: %s", msg);
+       }
+       else
+       {
+           if (PQresultStatus(res) == PGRES_COMMAND_OK)
+           {
+               is_sql_cmd = true;
+
+               /* need a tuple descriptor representing one TEXT column */
+               tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
+               TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                                  TEXTOID, -1, 0, false);
+
+               /*
+                * and save a copy of the command status string to return
+                * as our result tuple
+                */
+               sql_cmd_status = PQcmdStatus(res);
+               funcctx->max_calls = 1;
+           }
+           else
+               funcctx->max_calls = PQntuples(res);
+
+           /* got results, keep track of them */
+           funcctx->user_fctx = res;
+
+           /* if needed, close the connection to the database and cleanup */
+           if (fcinfo->nargs == 2)
+               PQfinish(conn);
+       }
+
+       /* fast track when no results */
+       if (funcctx->max_calls < 1)
+           SRF_RETURN_DONE(funcctx);
+
+       /* check typtype to see if we have a predetermined return type */
+       functypeid = get_func_rettype(funcid);
+       functyptype = get_typtype(functypeid);
+
+       if (!is_sql_cmd)
+       {
+           if (functyptype == 'c')
+               tupdesc = TypeGetTupleDesc(functypeid, NIL);
+           else if (functyptype == 'p' && functypeid == RECORDOID)
+               tupdesc = pgresultGetTupleDesc(res);
+           else if (functyptype == 'b')
+               elog(ERROR, "Invalid kind of return type specified for function");
+           else
+               elog(ERROR, "Unknown kind of return type specified for function");
+       }
+
+       /* store needed metadata for subsequent calls */
+       slot = TupleDescGetSlot(tupdesc);
+       funcctx->slot = slot;
+       attinmeta = TupleDescGetAttInMetadata(tupdesc);
+       funcctx->attinmeta = attinmeta;
+
+       MemoryContextSwitchTo(oldcontext);
+    }
+
+   /* stuff done on every call of the function */
+   funcctx = SRF_PERCALL_SETUP();
+
+   /*
+    * initialize per-call variables
+    */
+   call_cntr = funcctx->call_cntr;
+   max_calls = funcctx->max_calls;
+
+   slot = funcctx->slot;
+
+   res = (PGresult *) funcctx->user_fctx;
+   attinmeta = funcctx->attinmeta;
+   tupdesc = attinmeta->tupdesc;
+
+   if (call_cntr < max_calls)  /* do when there is more left to send */
+   {
+       char      **values;
+       HeapTuple   tuple;
+       Datum       result;
+
+       if (!is_sql_cmd)
+       {
+           int     i;
+           int     nfields = PQnfields(res);
+
+           values = (char **) palloc(nfields * sizeof(char *));
+           for (i = 0; i < nfields; i++)
+           {
+               if (PQgetisnull(res, call_cntr, i) == 0)
+                   values[i] = PQgetvalue(res, call_cntr, i);
+               else
+                   values[i] = NULL;
+           }
+       }
+       else
+       {
+           values = (char **) palloc(1 * sizeof(char *));
+           values[0] = sql_cmd_status;
+       }
+
+       /* build the tuple */
+       tuple = BuildTupleFromCStrings(attinmeta, values);
+
+       /* make the tuple into a datum */
+       result = TupleGetDatum(slot, tuple);
+
+       SRF_RETURN_NEXT(funcctx, result);
+   }
+   else    /* do when there is no more left */
+   {
+       PQclear(res);
+       SRF_RETURN_DONE(funcctx);
+   }
+}
+
+/*
+ * Execute an SQL non-SELECT command
+ */
+PG_FUNCTION_INFO_V1(dblink_exec);
+Datum
+dblink_exec(PG_FUNCTION_ARGS)
+{
+   char           *msg;
+   PGresult       *res = NULL;
+   char           *sql_cmd_status = NULL;
+   TupleDesc       tupdesc = NULL;
+   text           *result_text;
+   PGconn         *conn = NULL;
+   char           *connstr = NULL;
+   char           *sql = NULL;
+
+   if (fcinfo->nargs == 2)
+   {
+       connstr = GET_STR(PG_GETARG_TEXT_P(0));
+       sql = GET_STR(PG_GETARG_TEXT_P(1));
+
+       conn = PQconnectdb(connstr);
+       if (PQstatus(conn) == CONNECTION_BAD)
+       {
+           msg = pstrdup(PQerrorMessage(conn));
+           PQfinish(conn);
+           elog(ERROR, "dblink_exec: connection error: %s", msg);
+       }
+   }
+   else if (fcinfo->nargs == 1)
+   {
+       sql = GET_STR(PG_GETARG_TEXT_P(0));
+
+       if (persistent_conn != NULL)
+           conn = persistent_conn;
+       else
+           elog(ERROR, "dblink_exec: no connection available");
+   }
+   else
+       elog(ERROR, "dblink_exec: wrong number of arguments");
+
+
+   res = PQexec(conn, sql);
+   if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+   {
+       msg = pstrdup(PQerrorMessage(conn));
+       PQclear(res);
+       PQfinish(conn);
+       if (fcinfo->nargs == 1)
+           persistent_conn = NULL;
+
+       elog(ERROR, "dblink_exec: sql error: %s", msg);
+   }
+   else
+   {
+       if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+           /* need a tuple descriptor representing one TEXT column */
+           tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID);
+           TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+                              TEXTOID, -1, 0, false);
+
+           /*
+            * and save a copy of the command status string to return
+            * as our result tuple
+            */
+           sql_cmd_status = PQcmdStatus(res);
+       }
+       else
+           elog(ERROR, "dblink_exec: queries returning results not allowed");
+   }
+   PQclear(res);
+
+   /* if needed, close the connection to the database and cleanup */
+   if (fcinfo->nargs == 2)
+       PQfinish(conn);
+
+   result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status)));
+   PG_RETURN_TEXT_P(result_text);
+}
+
+/*
+ * Note: this original version of dblink is DEPRECATED;
+ * it *will* be removed in favor of the new version on next release
+ */
 PG_FUNCTION_INFO_V1(dblink);
 Datum
 dblink(PG_FUNCTION_ARGS)
@@ -179,14 +783,15 @@ dblink(PG_FUNCTION_ARGS)
    PG_RETURN_NULL();
 }
 
-
 /*
+ * Note: dblink_tok is DEPRECATED;
+ * it *will* be removed in favor of the new version on next release
+ *
  * dblink_tok
  * parse dblink output string
  * return fldnum item (0 based)
  * based on provided field separator
  */
-
 PG_FUNCTION_INFO_V1(dblink_tok);
 Datum
 dblink_tok(PG_FUNCTION_ARGS)
@@ -241,162 +846,121 @@ dblink_tok(PG_FUNCTION_ARGS)
    }
 }
 
-
-/*
- * dblink_strtok
- * parse input string
- * return ord item (0 based)
- * based on provided field separator
- */
-PG_FUNCTION_INFO_V1(dblink_strtok);
-Datum
-dblink_strtok(PG_FUNCTION_ARGS)
-{
-   char        *fldtext;
-   char        *fldsep;
-   int         fldnum;
-   char        *buffer;
-   text        *result_text;
-
-   fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
-   fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
-   fldnum = PG_GETARG_INT32(2);
-
-   if (fldtext[0] == '\0')
-   {
-       elog(ERROR, "get_strtok: blank list not permitted");
-   }
-   if (fldsep[0] == '\0')
-   {
-       elog(ERROR, "get_strtok: blank field separator not permitted");
-   }
-
-   buffer = get_strtok(fldtext, fldsep, fldnum);
-
-   pfree(fldtext);
-   pfree(fldsep);
-
-   if (buffer == NULL)
-   {
-       PG_RETURN_NULL();
-   }
-   else
-   {
-       result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
-       pfree(buffer);
-
-       PG_RETURN_TEXT_P(result_text);
-   }
-}
-
-
 /*
  * dblink_get_pkey
  * 
- * Return comma delimited list of primary key
- * fields for the supplied relation,
+ * Return list of primary key fields for the supplied relation,
  * or NULL if none exists.
  */
 PG_FUNCTION_INFO_V1(dblink_get_pkey);
 Datum
 dblink_get_pkey(PG_FUNCTION_ARGS)
 {
-   text                    *relname_text;
-   Oid                     relid;
-   char                    **result;
-   text                    *result_text;
-   int16                   numatts;
-   ReturnSetInfo           *rsi;
-   dblink_array_results    *ret_set;
+   int16               numatts;
+   Oid                 relid;
+   char              **results;
+   FuncCallContext    *funcctx;
+   int32               call_cntr;
+   int32               max_calls;
+   TupleTableSlot     *slot;
+   AttInMetadata      *attinmeta;
+   MemoryContext       oldcontext;
+
+   /* stuff done only on the first call of the function */
+   if(SRF_IS_FIRSTCALL())
+   {
+       TupleDesc       tupdesc = NULL;
+
+       /* create a function context for cross-call persistence */
+       funcctx = SRF_FIRSTCALL_INIT();
+
+       /* switch to memory context appropriate for multiple function calls */
+       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+       /* convert relname to rel Oid */
+       relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
+       if (!OidIsValid(relid))
+           elog(ERROR, "dblink_get_pkey: relation does not exist");
 
-   if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
-       elog(ERROR, "dblink: function called in context that does not accept a set result");
+       /* need a tuple descriptor representing one INT and one TEXT column */
+       tupdesc = CreateTemplateTupleDesc(2, WITHOUTOID);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
+                          INT4OID, -1, 0, false);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
+                          TEXTOID, -1, 0, false);
 
-   if (fcinfo->flinfo->fn_extra == NULL)
-   {
-       relname_text = PG_GETARG_TEXT_P(0);
+       /* allocate a slot for a tuple with this tupdesc */
+       slot = TupleDescGetSlot(tupdesc);
 
-       /*
-        * Convert relname to rel OID.
-        */
-       relid = get_relid_from_relname(relname_text);
-       if (!OidIsValid(relid))
-           elog(ERROR, "dblink_get_pkey: relation does not exist");
+       /* assign slot to function context */
+       funcctx->slot = slot;
 
        /*
-        * get an array of attnums.
+        * Generate attribute metadata needed later to produce tuples from raw
+        * C strings
         */
-       result = get_pkey_attnames(relid, &numatts);
+       attinmeta = TupleDescGetAttInMetadata(tupdesc);
+       funcctx->attinmeta = attinmeta;
+
+       /* get an array of attnums */
+       results = get_pkey_attnames(relid, &numatts);
 
-       if ((result != NULL) && (numatts > 0))
+       if ((results != NULL) && (numatts > 0))
        {
-           ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
+           funcctx->max_calls = numatts;
 
-           ret_set->elem_num = 0;
-           ret_set->num_elems = numatts;
-           ret_set->res = result;
+           /* got results, keep track of them */
+           funcctx->user_fctx = results;
+       }
+       else    /* fast track when no results */
+           SRF_RETURN_DONE(funcctx);
 
-           fcinfo->flinfo->fn_extra = (void *) ret_set;
+       MemoryContextSwitchTo(oldcontext);
+    }
 
-           rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-           rsi->isDone = ExprMultipleResult;
+   /* stuff done on every call of the function */
+   funcctx = SRF_PERCALL_SETUP();
 
-           result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+   /*
+    * initialize per-call variables
+    */
+   call_cntr = funcctx->call_cntr;
+   max_calls = funcctx->max_calls;
 
-           PG_RETURN_TEXT_P(result_text);
-       }
-       else
-       {
-           rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-           rsi->isDone = ExprEndResult;
+   slot = funcctx->slot;
 
-           PG_RETURN_NULL();
-       }
-   }
-   else
-   {
-       /*
-        * check for more results
-        */
-       ret_set = fcinfo->flinfo->fn_extra;
-       ret_set->elem_num++;
-       result = ret_set->res;
+   results = (char **) funcctx->user_fctx;
+   attinmeta = funcctx->attinmeta;
 
-       if (ret_set->elem_num < ret_set->num_elems)
-       {
-           /*
-            * fetch next one
-            */
-           rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-           rsi->isDone = ExprMultipleResult;
+   if (call_cntr < max_calls)  /* do when there is more left to send */
+   {
+       char      **values;
+       HeapTuple   tuple;
+       Datum       result;
 
-           result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
-           PG_RETURN_TEXT_P(result_text);
-       }
-       else
-       {
-           int     i;
+       values = (char **) palloc(2 * sizeof(char *));
+       values[0] = (char *) palloc(12);    /* sign, 10 digits, '\0' */
 
-           /*
-            * or if no more, clean things up
-            */
-           for (i = 0; i < ret_set->num_elems; i++)
-               pfree(result[i]);
+       sprintf(values[0], "%d", call_cntr + 1);
 
-           pfree(ret_set->res);
-           pfree(ret_set);
+       values[1] = results[call_cntr];
 
-           rsi = (ReturnSetInfo *) fcinfo->resultinfo;
-           rsi->isDone = ExprEndResult;
+       /* build the tuple */
+       tuple = BuildTupleFromCStrings(attinmeta, values);
 
-           PG_RETURN_NULL();
-       }
+       /* make the tuple into a datum */
+       result = TupleGetDatum(slot, tuple);
+
+       SRF_RETURN_NEXT(funcctx, result);
    }
-   PG_RETURN_NULL();
+   else    /* do when there is no more left */
+       SRF_RETURN_DONE(funcctx);
 }
 
-
 /*
+ * Note: dblink_last_oid is DEPRECATED;
+ * it *will* be removed on next release
+ *
  * dblink_last_oid
  * return last inserted oid
  */
@@ -447,23 +1011,26 @@ Datum
 dblink_build_sql_insert(PG_FUNCTION_ARGS)
 {
    Oid         relid;
-   text        *relname_text;
-   int16       *pkattnums;
+   text       *relname_text;
+   int16      *pkattnums;
    int16       pknumatts;
-   char        **src_pkattvals;
-   char        **tgt_pkattvals;
-   ArrayType   *src_pkattvals_arry;
-   ArrayType   *tgt_pkattvals_arry;
+   char      **src_pkattvals;
+   char      **tgt_pkattvals;
+   ArrayType  *src_pkattvals_arry;
+   ArrayType  *tgt_pkattvals_arry;
    int         src_ndim;
-   int         *src_dim;
+   int        *src_dim;
    int         src_nitems;
    int         tgt_ndim;
    int         *tgt_dim;
    int         tgt_nitems;
    int         i;
-   char        *ptr;
-   char        *sql;
-   text        *sql_text;
+   char       *ptr;
+   char       *sql;
+   text       *sql_text;
+   int16       typlen;
+   bool        typbyval;
+   char        typalign;
 
    relname_text = PG_GETARG_TEXT_P(0);
 
@@ -503,12 +1070,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
     * get array of pointers to c-strings from the input source array
     */
    Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
+   get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
+                                   &typlen, &typbyval, &typalign);
+
    src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
    ptr = ARR_DATA_PTR(src_pkattvals_arry);
    for (i = 0; i < src_nitems; i++)
    {
        src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-       ptr += INTALIGN(*(int32 *) ptr);
+       ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+       ptr = (char *) att_align(ptr, typalign);
    }
 
    /*
@@ -529,12 +1100,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
     * get array of pointers to c-strings from the input target array
     */
    Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+   get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+                                   &typlen, &typbyval, &typalign);
+
    tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
    ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
    for (i = 0; i < tgt_nitems; i++)
    {
        tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-       ptr += INTALIGN(*(int32 *) ptr);
+       ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+       ptr = (char *) att_align(ptr, typalign);
    }
 
    /*
@@ -586,6 +1161,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
    char        *ptr;
    char        *sql;
    text        *sql_text;
+   int16       typlen;
+   bool        typbyval;
+   char        typalign;
 
    relname_text = PG_GETARG_TEXT_P(0);
 
@@ -624,12 +1202,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
     * get array of pointers to c-strings from the input target array
     */
    Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+   get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+                                   &typlen, &typbyval, &typalign);
+
    tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
    ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
    for (i = 0; i < tgt_nitems; i++)
    {
        tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-       ptr += INTALIGN(*(int32 *) ptr);
+       ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+       ptr = (char *) att_align(ptr, typalign);
    }
 
    /*
@@ -690,6 +1272,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
    char        *ptr;
    char        *sql;
    text        *sql_text;
+   int16       typlen;
+   bool        typbyval;
+   char        typalign;
 
    relname_text = PG_GETARG_TEXT_P(0);
 
@@ -729,12 +1314,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
     * get array of pointers to c-strings from the input source array
     */
    Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
+   get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry),
+                                   &typlen, &typbyval, &typalign);
+
    src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
    ptr = ARR_DATA_PTR(src_pkattvals_arry);
    for (i = 0; i < src_nitems; i++)
    {
        src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-       ptr += INTALIGN(*(int32 *) ptr);
+       ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+       ptr = (char *) att_align(ptr, typalign);
    }
 
    /*
@@ -755,12 +1344,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
     * get array of pointers to c-strings from the input target array
     */
    Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
+   get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry),
+                                   &typlen, &typbyval, &typalign);
+
    tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
    ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
    for (i = 0; i < tgt_nitems; i++)
    {
        tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
-       ptr += INTALIGN(*(int32 *) ptr);
+       ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr));
+       ptr = (char *) att_align(ptr, typalign);
    }
 
    /*
@@ -779,7 +1372,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
    PG_RETURN_TEXT_P(sql_text);
 }
 
-
 /*
  * dblink_current_query
  * return the current query string
@@ -797,64 +1389,6 @@ dblink_current_query(PG_FUNCTION_ARGS)
 }
 
 
-/*
- * dblink_replace_text
- * replace all occurences of 'old_sub_str' in 'orig_str'
- * with 'new_sub_str' to form 'new_str'
- * 
- * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
- * otherwise returns 'new_str' 
- */
-PG_FUNCTION_INFO_V1(dblink_replace_text);
-Datum
-dblink_replace_text(PG_FUNCTION_ARGS)
-{
-   text        *left_text;
-   text        *right_text;
-   text        *buf_text;
-   text        *ret_text;
-   char        *ret_str;
-   int         curr_posn;
-   text        *src_text = PG_GETARG_TEXT_P(0);
-   int         src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
-   text        *from_sub_text = PG_GETARG_TEXT_P(1);
-   int         from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
-   text        *to_sub_text = PG_GETARG_TEXT_P(2);
-   char        *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
-   StringInfo  str = makeStringInfo();
-
-   if (src_text_len == 0 || from_sub_text_len == 0)
-       PG_RETURN_TEXT_P(src_text);
-
-   buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
-   curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
-
-   while (curr_posn > 0)
-   {
-       left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
-       right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
-
-       appendStringInfo(str, "%s",
-                        DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
-       appendStringInfo(str, "%s", to_sub_str);
-
-       pfree(buf_text);
-       pfree(left_text);
-       buf_text = right_text;
-       curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
-   }
-
-   appendStringInfo(str, "%s",
-                    DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
-   pfree(buf_text);
-
-   ret_str = pstrdup(str->data);
-   ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
-
-   PG_RETURN_TEXT_P(ret_text);
-}
-
-
 /*************************************************************
  * internal functions
  */
@@ -884,31 +1418,6 @@ init_dblink_results(MemoryContext fn_mcxt)
    return retval;
 }
 
-
-/*
- * init_dblink_array_results
- *  - create an empty dblink_array_results data structure
- */
-static dblink_array_results *
-init_dblink_array_results(MemoryContext fn_mcxt)
-{
-   MemoryContext oldcontext;
-   dblink_array_results *retval;
-
-   oldcontext = MemoryContextSwitchTo(fn_mcxt);
-
-   retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
-   MemSet(retval, 0, sizeof(dblink_array_results));
-
-   retval->elem_num = -1;
-   retval->num_elems = 0;
-   retval->res = NULL;
-
-   MemoryContextSwitchTo(oldcontext);
-
-   return retval;
-}
-
 /*
  * get_pkey_attnames
  * 
@@ -927,21 +1436,14 @@ get_pkey_attnames(Oid relid, int16 *numatts)
    Relation        rel;
    TupleDesc       tupdesc;
 
-   /*
-    * Open relation using relid, get tupdesc
-    */
+   /* open relation using relid, get tupdesc */
    rel = relation_open(relid, AccessShareLock);
    tupdesc = rel->rd_att;
 
-   /*
-    * Initialize numatts to 0 in case no primary key
-    * exists
-    */
+   /* initialize numatts to 0 in case no primary key exists */
    *numatts = 0;
 
-   /*
-    * Use relid to get all related indexes
-    */
+   /* use relid to get all related indexes */
    indexRelation = heap_openr(IndexRelationName, AccessShareLock);
    ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
                           F_OIDEQ, ObjectIdGetDatum(relid));
@@ -951,9 +1453,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
    {
        Form_pg_index   index = (Form_pg_index) GETSTRUCT(indexTuple);
 
-       /*
-        * We're only interested if it is the primary key
-        */
+       /* we're only interested if it is the primary key */
        if (index->indisprimary == TRUE)
        {
            i = 0;
@@ -963,6 +1463,7 @@ get_pkey_attnames(Oid relid, int16 *numatts)
            if (*numatts > 0)
            {
                result = (char **) palloc(*numatts * sizeof(char *));
+
                for (i = 0; i < *numatts; i++)
                    result[i] = SPI_fname(tupdesc, index->indkey[i]);
            }
@@ -976,41 +1477,6 @@ get_pkey_attnames(Oid relid, int16 *numatts)
    return result;
 }
 
-
-/*
- * get_strtok
- * 
- * parse input string
- * return ord item (0 based)
- * based on provided field separator
- */
-static char *
-get_strtok(char *fldtext, char *fldsep, int fldnum)
-{
-   int         j = 0;
-   char        *result;
-
-   if (fldnum < 0)
-   {
-       elog(ERROR, "get_strtok: field number < 0 not permitted");
-   }
-
-   if (fldsep[0] == '\0')
-   {
-       elog(ERROR, "get_strtok: blank field separator not permitted");
-   }
-
-   result = strtok(fldtext, fldsep);
-   for (j = 1; j < fldnum + 1; j++)
-   {
-       result = strtok(NULL, fldsep);
-       if (result == NULL)
-           return NULL;
-   } 
-
-   return pstrdup(result);
-}
-
 static char *
 get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
@@ -1035,6 +1501,8 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
    natts = tupdesc->natts;
 
    tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+   if (!tuple)
+       elog(ERROR, "dblink_build_sql_insert: row not found");
 
    appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
 
@@ -1175,6 +1643,8 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval
    natts = tupdesc->natts;
 
    tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+   if (!tuple)
+       elog(ERROR, "dblink_build_sql_update: row not found");
 
    appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
 
@@ -1314,7 +1784,8 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
     */
    rel = relation_open(relid, AccessShareLock);
    relname =  RelationGetRelationName(rel);
-   tupdesc = rel->rd_att;
+   tupdesc = CreateTupleDescCopy(rel->rd_att);
+   relation_close(rel, AccessShareLock);
 
    /*
     * Connect to SPI manager
@@ -1388,7 +1859,6 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p
 static Oid
 get_relid_from_relname(text *relname_text)
 {
-#ifdef NamespaceRelationName
    RangeVar   *relvar;
    Relation    rel;
    Oid         relid;
@@ -1397,16 +1867,6 @@ get_relid_from_relname(text *relname_text)
    rel = heap_openrv(relvar, AccessShareLock);
    relid = RelationGetRelid(rel);
    relation_close(rel, AccessShareLock);
-#else
-   char       *relname;
-   Relation    rel;
-   Oid         relid;
-
-   relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text)));
-   rel = relation_openr(relname, AccessShareLock);
-   relid = RelationGetRelid(rel);
-   relation_close(rel, AccessShareLock);
-#endif   /* NamespaceRelationName */
 
    return relid;
 }
@@ -1456,3 +1916,55 @@ remove_res_ptr(dblink_results *results)
        res_id_index = 0;
 }
 
+static TupleDesc
+pgresultGetTupleDesc(PGresult *res)
+{
+   int             natts;
+   AttrNumber      attnum;
+   TupleDesc       desc;
+   char           *attname;
+   int32           atttypmod;
+   int             attdim;
+   bool            attisset;
+   Oid             atttypid;
+   int             i;
+
+   /*
+    * allocate a new tuple descriptor
+    */
+   natts = PQnfields(res);
+   if (natts < 1)
+       elog(ERROR, "cannot create a description for empty results");
+
+   desc = CreateTemplateTupleDesc(natts, WITHOUTOID);
+
+   attnum = 0;
+
+   for (i = 0; i < natts; i++)
+   {
+       /*
+        * for each field, get the name and type information from the query
+        * result and have TupleDescInitEntry fill in the attribute
+        * information we need.
+        */
+       attnum++;
+
+       attname = PQfname(res, i);
+       atttypid = PQftype(res, i);
+       atttypmod = PQfmod(res, i);
+
+       if (PQfsize(res, i) != get_typlen(atttypid))
+           elog(ERROR, "Size of remote field \"%s\" does not match size "
+               "of local type \"%s\"",
+               attname,
+               format_type_with_typemod(atttypid, atttypmod));
+
+       attdim = 0;
+       attisset = false;
+
+       TupleDescInitEntry(desc, attnum, attname, atttypid,
+                          atttypmod, attdim, attisset);
+   }
+
+   return desc;
+}
index 4d53005ac604b1cda72bd2d30897deba224f62e5..ddca6241c4a03e89e26b69e730249e8a83143365 100644 (file)
@@ -3,7 +3,9 @@
  *
  * Functions returning results from a remote database
  *
- * Copyright (c) Joseph Conway , 2001, 2002,
+ * Joe Conway 
+ *
+ * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
  * Permission to use, copy, modify, and distribute this software and its
 #ifndef DBLINK_H
 #define DBLINK_H
 
-#include 
-#include "postgres.h"
-#include "libpq-fe.h"
-#include "libpq-int.h"
-#include "fmgr.h"
-#include "access/tupdesc.h"
-#include "access/heapam.h"
-#include "catalog/catname.h"
-#include "catalog/pg_index.h"
-#include "catalog/pg_type.h"
-#include "executor/executor.h"
-#include "executor/spi.h"
-#include "lib/stringinfo.h"
-#include "nodes/nodes.h"
-#include "nodes/execnodes.h"
-#include "nodes/pg_list.h"
-#include "parser/parse_type.h"
-#include "tcop/tcopprot.h"
-#include "utils/builtins.h"
-#include "utils/fmgroids.h"
-#include "utils/array.h"
-#include "utils/syscache.h"
-
-#ifdef NamespaceRelationName
-#include "catalog/namespace.h"
-#endif   /* NamespaceRelationName */
-
-/*
- * Max SQL statement size
- */
-#define DBLINK_MAX_SQLSTATE_SIZE       16384
-
 /*
  * This struct holds the results of the remote query.
  * Use fn_extra to hold a pointer to it across calls
@@ -82,43 +52,27 @@ typedef struct
    PGresult   *res;
 }  dblink_results;
 
-
-/*
- * This struct holds results in the form of an array.
- * Use fn_extra to hold a pointer to it across calls
- */
-typedef struct
-{
-   /*
-    * elem being accessed
-    */
-   int         elem_num;
-
-   /*
-    * number of elems
-    */
-   int         num_elems;
-
-   /*
-    * the actual array
-    */
-   void        *res;
-
-}  dblink_array_results;
-
 /*
  * External declarations
  */
+/* deprecated */
 extern Datum dblink(PG_FUNCTION_ARGS);
 extern Datum dblink_tok(PG_FUNCTION_ARGS);
-extern Datum dblink_strtok(PG_FUNCTION_ARGS);
+
+/* supported */
+extern Datum dblink_connect(PG_FUNCTION_ARGS);
+extern Datum dblink_disconnect(PG_FUNCTION_ARGS);
+extern Datum dblink_open(PG_FUNCTION_ARGS);
+extern Datum dblink_close(PG_FUNCTION_ARGS);
+extern Datum dblink_fetch(PG_FUNCTION_ARGS);
+extern Datum dblink_record(PG_FUNCTION_ARGS);
+extern Datum dblink_exec(PG_FUNCTION_ARGS);
 extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
 extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
 extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
 extern Datum dblink_current_query(PG_FUNCTION_ARGS);
-extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
 
 extern char    *debug_query_string;
 
index bea4378907244508a937a86cad9aa37e4ea47e13..b92801a5c51ecac8e3510ca3dde125a75b881ffb 100644 (file)
@@ -1,21 +1,58 @@
-CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
-  AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
+-- Uncomment the following 9 lines to use original DEPRECATED functions
+--CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
+--  AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
+--  WITH (isstrict);
+--CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
+--  AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
+--  WITH (isstrict);
+--CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
+--  AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
+--  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_connect (text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_connect' LANGUAGE 'c'
   WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
-  AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
+CREATE OR REPLACE FUNCTION dblink_disconnect () RETURNS text
+  AS 'MODULE_PATHNAME','dblink_disconnect' LANGUAGE 'c'
   WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
-  AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
-  WITH (iscachable, isstrict);
+CREATE OR REPLACE FUNCTION dblink_open (text,text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_open' LANGUAGE 'c'
+  WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof text
-  AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
+CREATE OR REPLACE FUNCTION dblink_fetch (text,int) RETURNS setof record
+  AS 'MODULE_PATHNAME','dblink_fetch' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_close (text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_close' LANGUAGE 'c'
+  WITH (isstrict);
+
+-- Note: if this is a first time install of dblink, the following DROP
+-- FUNCTION line is expected to fail.
+-- Comment out the following 4 lines if the DEPRECATED functions are used.
+DROP FUNCTION dblink (text,text);
+CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof record
+  AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink (text) RETURNS setof record
+  AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_exec (text,text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
+  WITH (isstrict);
+
+CREATE OR REPLACE FUNCTION dblink_exec (text) RETURNS text
+  AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c'
   WITH (isstrict);
 
-CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
-  AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
+CREATE TYPE dblink_pkey_results AS (position int4, colname text);
+
+CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof dblink_pkey_results
+  AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
   WITH (isstrict);
 
 CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text
@@ -32,7 +69,3 @@ CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _tex
 
 CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
   AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
-
-CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
-  AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
-  WITH (iscachable, isstrict);