static void
IdentifySystem(void)
{
- StringInfoData buf;
char sysid[32];
- char tli[11];
char xpos[MAXFNAMELEN];
XLogRecPtr logptr;
char *dbname = NULL;
- Size len;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[4];
+ bool nulls[4];
/*
* Reply with a result set with one row, four columns. First col is system
else
logptr = GetFlushRecPtr();
- snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
if (MyDatabaseId != InvalidOid)
MemoryContextSwitchTo(cur);
}
- /* Send a RowDescription message */
- pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 4, 2); /* 4 fields */
-
- /* first field */
- pq_sendstring(&buf, "systemid"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* second field */
- pq_sendstring(&buf, "timeline"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, INT4OID, 4); /* type oid */
- pq_sendint(&buf, 4, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* third field */
- pq_sendstring(&buf, "xlogpos"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ MemSet(nulls, false, sizeof(nulls));
- /* fourth field */
- pq_sendstring(&buf, "dbname"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
- pq_endmessage(&buf);
+ /* need a tuple descriptor representing four columns */
+ tupdesc = CreateTemplateTupleDesc(4, false);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
+ INT4OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
+ TEXTOID, -1, 0);
- /* Send a DataRow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 4, 2); /* # of columns */
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
/* column 1: system identifier */
- len = strlen(sysid);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) &sysid, len);
+ values[0] = CStringGetTextDatum(sysid);
/* column 2: timeline */
- len = strlen(tli);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) tli, len);
+ values[1] = Int32GetDatum(ThisTimeLineID);
/* column 3: xlog position */
- len = strlen(xpos);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) xpos, len);
+ values[2] = CStringGetTextDatum(xpos);
/* column 4: database name, or NULL if none */
if (dbname)
- {
- len = strlen(dbname);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, (char *) dbname, len);
- }
+ values[3] = CStringGetTextDatum(dbname);
else
- {
- pq_sendint(&buf, -1, 4);
- }
+ nulls[3] = true;
- pq_endmessage(&buf);
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
+
+ end_tup_output(tstate);
}
*/
if (sendTimeLineIsHistoric)
{
- char tli_str[11];
char startpos_str[8 + 1 + 8 + 1];
- Size len;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[2];
+ bool nulls[2];
- snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint(&buf, 2, 2); /* 2 fields */
-
- /* Field header */
- pq_sendstring(&buf, "next_tli");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ MemSet(nulls, false, sizeof(nulls));
/*
+ * Need a tuple descriptor representing two columns.
* int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned.
*/
- pq_sendint(&buf, INT8OID, 4); /* type oid */
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
-
- pq_sendstring(&buf, "next_tli_startpos");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
- pq_endmessage(&buf);
+ tupdesc = CreateTemplateTupleDesc(2, false);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
+ INT8OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
+ TEXTOID, -1, 0);
- /* Data row */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 2, 2); /* number of columns */
+ /* prepare for projection of tuple */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
- len = strlen(tli_str);
- pq_sendint(&buf, len, 4); /* length */
- pq_sendbytes(&buf, tli_str, len);
+ values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
+ values[1] = CStringGetTextDatum(startpos_str);
- len = strlen(startpos_str);
- pq_sendint(&buf, len, 4); /* length */
- pq_sendbytes(&buf, startpos_str, len);
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
- pq_endmessage(&buf);
+ end_tup_output(tstate);
}
/* Send CommandComplete message */
{
const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN];
- StringInfoData buf;
- Size len;
+ char *slot_name;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[4];
+ bool nulls[4];
Assert(!MyReplicationSlot);
(uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
(uint32) MyReplicationSlot->data.confirmed_flush);
- pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 4, 2); /* 4 fields */
-
- /* first field: slot name */
- pq_sendstring(&buf, "slot_name"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* second field: LSN at which we became consistent */
- pq_sendstring(&buf, "consistent_point"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* third field: exported snapshot's name */
- pq_sendstring(&buf, "snapshot_name"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* fourth field: output plugin */
- pq_sendstring(&buf, "output_plugin"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ MemSet(nulls, false, sizeof(nulls));
- pq_endmessage(&buf);
-
- /* Send a DataRow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 4, 2); /* # of columns */
+ /*
+ * Need a tuple descriptor representing four columns:
+ * - first field: the slot name
+ * - second field: LSN at which we became consistent
+ * - third field: exported snapshot's name
+ * - fourth field: output plugin
+ */
+ tupdesc = CreateTemplateTupleDesc(4, false);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
+ TEXTOID, -1, 0);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc);
/* slot_name */
- len = strlen(NameStr(MyReplicationSlot->data.name));
- pq_sendint(&buf, len, 4); /* col1 len */
- pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
+ slot_name = NameStr(MyReplicationSlot->data.name);
+ values[0] = CStringGetTextDatum(slot_name);
/* consistent wal location */
- len = strlen(xpos);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, xpos, len);
+ values[1] = CStringGetTextDatum(xpos);
/* snapshot name, or NULL if none */
if (snapshot_name != NULL)
- {
- len = strlen(snapshot_name);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, snapshot_name, len);
- }
+ values[2] = CStringGetTextDatum(snapshot_name);
else
- pq_sendint(&buf, -1, 4);
+ nulls[2] = true;
/* plugin, or NULL if none */
if (cmd->plugin != NULL)
- {
- len = strlen(cmd->plugin);
- pq_sendint(&buf, len, 4);
- pq_sendbytes(&buf, cmd->plugin, len);
- }
+ values[3] = CStringGetTextDatum(cmd->plugin);
else
- pq_sendint(&buf, -1, 4);
+ nulls[3] = true;
- pq_endmessage(&buf);
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
+ end_tup_output(tstate);
ReplicationSlotRelease();
}