- The following example shows how logical decoding is controlled over the
+ The following examples shows how logical decoding is controlled over the
streaming replication protocol, using the
program included in the PostgreSQL
distribution. This requires that client authentication is set up to allow
replication connections
(see ) and
that max_wal_senders is set sufficiently high to allow
- an additional connection.
+ an additional connection. The second example shows how to stream two-phase
+ transactions. Before you use two-phase commands, you must set
+ to atleast 1.
+Example 1:
$ pg_recvlogical -d postgres --slot=test --create-slot
$ pg_recvlogical -d postgres --slot=test --start -f -
Control Z
COMMIT 693
Control C
$ pg_recvlogical -d postgres --slot=test --drop-slot
+
+Example 2:
+$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
+$ pg_recvlogical -d postgres --slot=test --start -f -
+Control Z
+$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
+$ fg
+BEGIN 694
+table public.data: INSERT: id[integer]:5 data[text]:'5'
+PREPARE TRANSACTION 'test', txid 694
+Control Z
+$ psql -d postgres -c "COMMIT PREPARED 'test';"
+$ fg
+COMMIT PREPARED 'test', txid 694
+Control C
+$ pg_recvlogical -d postgres --slot=test --drop-slot
- CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] }
+ CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT | TWO_PHASE ] }
+
+ TWO_PHASE
+
+ Specify that this logical replication slot supports decoding of two-phase
+ transactions. With this option, two-phase commands like
+ PREPARE TRANSACTION , COMMIT PREPARED
+ and ROLLBACK PREPARED are decoded and transmitted.
+ The transaction will be decoded and transmitted at
+ PREPARE TRANSACTION time.
+
+
+
+
RESERVE_WAL
--plugin , for the database specified
by --dbname .
+
+ The --two-phase can be specified with
+ --create-slot to enable two-phase decoding.
+
+
+ -t
+ --two-phase
+
+ Enables two-phase decoding. This option should only be specified with
+ --create-slot
+
+
+
+
-v
--verbose
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_TWO_PHASE
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
$$ = makeDefElem("reserve_wal",
(Node *)makeInteger(true), -1);
}
+ | K_TWO_PHASE
+ {
+ $$ = makeDefElem("two_phase",
+ (Node *)makeInteger(true), -1);
+ }
;
/* DROP_REPLICATION_SLOT slot */
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+TWO_PHASE { return K_TWO_PHASE; }
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
- CRSSnapshotAction *snapshot_action)
+ CRSSnapshotAction *snapshot_action,
+ bool *two_phase)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
+ bool two_phase_given = false;
/* Parse options */
foreach(lc, cmd->options)
reserve_wal_given = true;
*reserve_wal = true;
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_given = true;
+ *two_phase = true;
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
char xloc[MAXFNAMELEN];
char *slot_name;
bool reserve_wal = false;
+ bool two_phase = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
Assert(!MyReplicationSlot);
- parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase );
/* setup state for WalSndSegmentOpen */
sendTimeLineIsHistoric = false;
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- fal se);
+ two_pha se);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)
if (temp_replication_slot || create_slot)
{
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
- temp_replication_slot, true, true, false))
+ temp_replication_slot, true, true, false, false ))
exit(1);
if (verbose)
pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
- slot_exists_ok))
+ slot_exists_ok, false ))
exit(1);
exit(0);
}
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
+static bool two_phase = false;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
+ printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
{"fsync-interval", required_argument, NULL, 'F'},
{"no-loop", no_argument, NULL, 'n'},
{"verbose", no_argument, NULL, 'v'},
+ {"two-phase", no_argument, NULL, 't'},
{"version", no_argument, NULL, 'V'},
{"help", no_argument, NULL, '?'},
/* connection options */
}
}
- while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "E:f:F:nvt d:h:p:U:wWI:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
case 'v':
verbose++;
break;
+ case 't':
+ two_phase = true;
+ break;
/* connection options */
case 'd':
dbname = pg_strdup(optarg);
exit(1);
}
+ if (two_phase && !do_create_slot)
+ {
+ pg_log_error("--two-phase may only be specified with --create-slot");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
- false, false, slot_exists_ok))
+ false, false, slot_exists_ok, two_phase ))
exit(1);
startpos = InvalidXLogRecPtr;
}
bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
bool is_temporary, bool is_physical, bool reserve_wal,
- bool slot_exists_ok)
+ bool slot_exists_ok, bool two_phase )
{
PQExpBuffer query;
PGresult *res;
Assert((is_physical && plugin == NULL) ||
(!is_physical && plugin != NULL));
+ Assert(!(two_phase && is_physical));
Assert(slot_name != NULL);
/* Build query */
else
{
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+ if (two_phase && PQserverVersion(conn) >= 150000)
+ appendPQExpBufferStr(query, " TWO_PHASE");
+
if (PQserverVersion(conn) >= 100000)
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
const char *plugin, bool is_temporary,
bool is_physical, bool reserve_wal,
- bool slot_exists_ok);
+ bool slot_exists_ok, bool two_phase );
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
TimeLineID *starttli,
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 15 ;
+use Test::More tests => 20 ;
program_help_ok('pg_recvlogical');
program_version_ok('pg_recvlogical');
max_wal_senders = 4
log_min_messages = 'debug1'
log_error_verbosity = verbose
+max_prepared_transactions = 10
});
$node->dump_info;
$node->start;
'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
],
'replayed a transaction');
+
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S',
+ 'test', '-d',
+ $node->connstr('postgres'), '--drop-slot'
+ ],
+ 'slot dropped');
+
+#test with two-phase option enabled
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S',
+ 'test', '-d',
+ $node->connstr('postgres'), '--create-slot', '--two-phase'
+ ],
+ 'slot with two-phase created');
+
+$slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->safe_psql('postgres',
+ "BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
+$node->safe_psql('postgres',
+ "COMMIT PREPARED 'test'");
+$nextlsn =
+ $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
+chomp($nextlsn);
+
+$node->command_fails(
+ [
+ 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+ '--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-'
+ ],
+ 'incorrect usage');
+
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+ '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+ ],
+ 'replayed a two-phase transaction');