Refactor other replication commands to use DestRemoteSimple.
authorRobert Haas
Wed, 1 Feb 2017 18:42:41 +0000 (13:42 -0500)
committerRobert Haas
Wed, 1 Feb 2017 18:42:41 +0000 (13:42 -0500)
Commit a84069d9350400c860d5e932b50dfd337aa407b0 added a new type of
DestReceiver to avoid duplicating the existing code for the SHOW
command, but it turns out we can leverage that new DestReceiver
type in a few more places, saving some code.

Michael Paquier, reviewed by Andres Freund and by me.

Discussion: http://postgr.es/m/CAB7nPqSdFOQC0evc0r1nJeQyGBqjBrR41MC4rcMqUUpoJaZbtQ%40mail.gmail.com
Discussion: http://postgr.es/m/CAB7nPqT2K4XFT1JgqufFBjsOc-NUKXg5qBDucHPMbk6Xi1kYaA@mail.gmail.com

src/backend/access/common/printsimple.c
src/backend/access/common/tupdesc.c
src/backend/replication/walsender.c

index 420de65e2073d18f63b0768573b74beab199793b..5fe1c72da8385690b4f8a222319133d4730eb09d 100644 (file)
@@ -22,6 +22,7 @@
 #include "catalog/pg_type.h"
 #include "fmgr.h"
 #include "libpq/pqformat.h"
+#include "utils/builtins.h"
 
 /*
  * At startup time, send a RowDescription message.
@@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
                }
                break;
 
+           case INT4OID:
+               {
+                   int32   num = DatumGetInt32(value);
+                   char    str[12];    /* sign, 10 digits and '\0' */
+
+                   pg_ltoa(num, str);
+                   pq_sendcountedtext(&buf, str, strlen(str), false);
+               }
+               break;
+
+           case INT8OID:
+               {
+                   int64   num = DatumGetInt64(value);
+                   char    str[23];    /* sign, 21 digits and '\0' */
+
+                   pg_lltoa(num, str);
+                   pq_sendcountedtext(&buf, str, strlen(str), false);
+               }
+               break;
+
            default:
                elog(ERROR, "unsupported type OID: %u", attr->atttypid);
        }
index 083c0303dcfd0cd020862d6bf2d849d2e11633c9..4e2ebe1ae7efdf529e3335c27d810001ce824c6c 100644 (file)
@@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
            att->attstorage = 'p';
            att->attcollation = InvalidOid;
            break;
+
+       case INT8OID:
+           att->attlen = 8;
+           att->attbyval = FLOAT8PASSBYVAL;
+           att->attalign = 'd';
+           att->attstorage = 'p';
+           att->attcollation = InvalidOid;
+           break;
    }
 }
 
index 5909b7dd8c5c5cbcbdcce3c50cefb9fd4321630d..76f09fbdbf2256f5dec2f349194639bc690a2421 100644 (file)
@@ -302,13 +302,15 @@ WalSndShutdown(void)
 static void
 IdentifySystem(void)
 {
-   StringInfoData buf;
    char        sysid[32];
-   char        tli[11];
    char        xpos[MAXFNAMELEN];
    XLogRecPtr  logptr;
    char       *dbname = NULL;
-   Size        len;
+   DestReceiver *dest;
+   TupOutputState *tstate;
+   TupleDesc   tupdesc;
+   Datum       values[4];
+   bool        nulls[4];
 
    /*
     * Reply with a result set with one row, four columns. First col is system
@@ -328,8 +330,6 @@ IdentifySystem(void)
    else
        logptr = GetFlushRecPtr();
 
-   snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
    snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
    if (MyDatabaseId != InvalidOid)
@@ -346,79 +346,42 @@ IdentifySystem(void)
        MemoryContextSwitchTo(cur);
    }
 
-   /* Send a RowDescription message */
-   pq_beginmessage(&buf, 'T');
-   pq_sendint(&buf, 4, 2);     /* 4 fields */
-
-   /* first field */
-   pq_sendstring(&buf, "systemid");    /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
-
-   /* second field */
-   pq_sendstring(&buf, "timeline");    /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, INT4OID, 4);       /* type oid */
-   pq_sendint(&buf, 4, 2);     /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
-
-   /* third field */
-   pq_sendstring(&buf, "xlogpos");     /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
+   dest = CreateDestReceiver(DestRemoteSimple);
+   MemSet(nulls, false, sizeof(nulls));
 
-   /* fourth field */
-   pq_sendstring(&buf, "dbname");      /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
-   pq_endmessage(&buf);
+   /* need a tuple descriptor representing four columns */
+   tupdesc = CreateTemplateTupleDesc(4, false);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
+                             TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
+                             INT4OID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
+                             TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
+                             TEXTOID, -1, 0);
 
-   /* Send a DataRow message */
-   pq_beginmessage(&buf, 'D');
-   pq_sendint(&buf, 4, 2);     /* # of columns */
+   /* prepare for projection of tuples */
+   tstate = begin_tup_output_tupdesc(dest, tupdesc);
 
    /* column 1: system identifier */
-   len = strlen(sysid);
-   pq_sendint(&buf, len, 4);
-   pq_sendbytes(&buf, (char *) &sysid, len);
+   values[0] = CStringGetTextDatum(sysid);
 
    /* column 2: timeline */
-   len = strlen(tli);
-   pq_sendint(&buf, len, 4);
-   pq_sendbytes(&buf, (char *) tli, len);
+   values[1] = Int32GetDatum(ThisTimeLineID);
 
    /* column 3: xlog position */
-   len = strlen(xpos);
-   pq_sendint(&buf, len, 4);
-   pq_sendbytes(&buf, (char *) xpos, len);
+   values[2] = CStringGetTextDatum(xpos);
 
    /* column 4: database name, or NULL if none */
    if (dbname)
-   {
-       len = strlen(dbname);
-       pq_sendint(&buf, len, 4);
-       pq_sendbytes(&buf, (char *) dbname, len);
-   }
+       values[3] = CStringGetTextDatum(dbname);
    else
-   {
-       pq_sendint(&buf, -1, 4);
-   }
+       nulls[3] = true;
 
-   pq_endmessage(&buf);
+   /* send it to dest */
+   do_tup_output(tstate, values, nulls);
+
+   end_tup_output(tstate);
 }
 
 
@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd)
     */
    if (sendTimeLineIsHistoric)
    {
-       char        tli_str[11];
        char        startpos_str[8 + 1 + 8 + 1];
-       Size        len;
+       DestReceiver *dest;
+       TupOutputState *tstate;
+       TupleDesc   tupdesc;
+       Datum       values[2];
+       bool        nulls[2];
 
-       snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
                 (uint32) (sendTimeLineValidUpto >> 32),
                 (uint32) sendTimeLineValidUpto);
 
-       pq_beginmessage(&buf, 'T');     /* RowDescription */
-       pq_sendint(&buf, 2, 2); /* 2 fields */
-
-       /* Field header */
-       pq_sendstring(&buf, "next_tli");
-       pq_sendint(&buf, 0, 4); /* table oid */
-       pq_sendint(&buf, 0, 2); /* attnum */
+       dest = CreateDestReceiver(DestRemoteSimple);
+       MemSet(nulls, false, sizeof(nulls));
 
        /*
+        * Need a tuple descriptor representing two columns.
         * int8 may seem like a surprising data type for this, but in theory
         * int4 would not be wide enough for this, as TimeLineID is unsigned.
         */
-       pq_sendint(&buf, INT8OID, 4);   /* type oid */
-       pq_sendint(&buf, -1, 2);
-       pq_sendint(&buf, 0, 4);
-       pq_sendint(&buf, 0, 2);
-
-       pq_sendstring(&buf, "next_tli_startpos");
-       pq_sendint(&buf, 0, 4); /* table oid */
-       pq_sendint(&buf, 0, 2); /* attnum */
-       pq_sendint(&buf, TEXTOID, 4);   /* type oid */
-       pq_sendint(&buf, -1, 2);
-       pq_sendint(&buf, 0, 4);
-       pq_sendint(&buf, 0, 2);
-       pq_endmessage(&buf);
+       tupdesc = CreateTemplateTupleDesc(2, false);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
+                                 INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
+                                 TEXTOID, -1, 0);
 
-       /* Data row */
-       pq_beginmessage(&buf, 'D');
-       pq_sendint(&buf, 2, 2); /* number of columns */
+       /* prepare for projection of tuple */
+       tstate = begin_tup_output_tupdesc(dest, tupdesc);
 
-       len = strlen(tli_str);
-       pq_sendint(&buf, len, 4);       /* length */
-       pq_sendbytes(&buf, tli_str, len);
+       values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
+       values[1] = CStringGetTextDatum(startpos_str);
 
-       len = strlen(startpos_str);
-       pq_sendint(&buf, len, 4);       /* length */
-       pq_sendbytes(&buf, startpos_str, len);
+       /* send it to dest */
+       do_tup_output(tstate, values, nulls);
 
-       pq_endmessage(&buf);
+       end_tup_output(tstate);
    }
 
    /* Send CommandComplete message */
@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 {
    const char *snapshot_name = NULL;
    char        xpos[MAXFNAMELEN];
-   StringInfoData buf;
-   Size        len;
+   char       *slot_name;
+   DestReceiver *dest;
+   TupOutputState *tstate;
+   TupleDesc   tupdesc;
+   Datum       values[4];
+   bool        nulls[4];
 
    Assert(!MyReplicationSlot);
 
@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
             (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
             (uint32) MyReplicationSlot->data.confirmed_flush);
 
-   pq_beginmessage(&buf, 'T');
-   pq_sendint(&buf, 4, 2);     /* 4 fields */
-
-   /* first field: slot name */
-   pq_sendstring(&buf, "slot_name");   /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
-
-   /* second field: LSN at which we became consistent */
-   pq_sendstring(&buf, "consistent_point");    /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
-
-   /* third field: exported snapshot's name */
-   pq_sendstring(&buf, "snapshot_name");       /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
-
-   /* fourth field: output plugin */
-   pq_sendstring(&buf, "output_plugin");       /* col name */
-   pq_sendint(&buf, 0, 4);     /* table oid */
-   pq_sendint(&buf, 0, 2);     /* attnum */
-   pq_sendint(&buf, TEXTOID, 4);       /* type oid */
-   pq_sendint(&buf, -1, 2);    /* typlen */
-   pq_sendint(&buf, 0, 4);     /* typmod */
-   pq_sendint(&buf, 0, 2);     /* format code */
+   dest = CreateDestReceiver(DestRemoteSimple);
+   MemSet(nulls, false, sizeof(nulls));
 
-   pq_endmessage(&buf);
-
-   /* Send a DataRow message */
-   pq_beginmessage(&buf, 'D');
-   pq_sendint(&buf, 4, 2);     /* # of columns */
+   /*
+    * Need a tuple descriptor representing four columns:
+    * - first field: the slot name
+    * - second field: LSN at which we became consistent
+    * - third field: exported snapshot's name
+    * - fourth field: output plugin
+    */
+   tupdesc = CreateTemplateTupleDesc(4, false);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+                             TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
+                             TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
+                             TEXTOID, -1, 0);
+   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
+                             TEXTOID, -1, 0);
+
+   /* prepare for projection of tuples */
+   tstate = begin_tup_output_tupdesc(dest, tupdesc);
 
    /* slot_name */
-   len = strlen(NameStr(MyReplicationSlot->data.name));
-   pq_sendint(&buf, len, 4);   /* col1 len */
-   pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
+   slot_name = NameStr(MyReplicationSlot->data.name);
+   values[0] = CStringGetTextDatum(slot_name);
 
    /* consistent wal location */
-   len = strlen(xpos);
-   pq_sendint(&buf, len, 4);
-   pq_sendbytes(&buf, xpos, len);
+   values[1] = CStringGetTextDatum(xpos);
 
    /* snapshot name, or NULL if none */
    if (snapshot_name != NULL)
-   {
-       len = strlen(snapshot_name);
-       pq_sendint(&buf, len, 4);
-       pq_sendbytes(&buf, snapshot_name, len);
-   }
+       values[2] = CStringGetTextDatum(snapshot_name);
    else
-       pq_sendint(&buf, -1, 4);
+       nulls[2] = true;
 
    /* plugin, or NULL if none */
    if (cmd->plugin != NULL)
-   {
-       len = strlen(cmd->plugin);
-       pq_sendint(&buf, len, 4);
-       pq_sendbytes(&buf, cmd->plugin, len);
-   }
+       values[3] = CStringGetTextDatum(cmd->plugin);
    else
-       pq_sendint(&buf, -1, 4);
+       nulls[3] = true;
 
-   pq_endmessage(&buf);
+   /* send it to dest */
+   do_tup_output(tstate, values, nulls);
+   end_tup_output(tstate);
 
    ReplicationSlotRelease();
 }