Exported Snapshots
- When a new replication slot is created using the streaming replication interface,
- a snapshot is exported
+ When a new replication slot is created using the streaming replication
+ interface (see ), a
+ snapshot is exported
(see ), which will show
exactly the state of the database after which all changes will be
included in the change stream. This can be used to create a new replica by
database's state at that point in time, which afterwards can be updated
using the slot's contents without losing any changes.
+ Creation of a snapshot is not always possible. In particular, it will
+ fail when connected to a hot standby. Applications that do not require
+ snapshot export may suppress it with the NOEXPORT_SNAPSHOT>
+ option.
+
-
- CREATE_REPLICATION_SLOT slot_name> [ TEMPORARY> ] { PHYSICAL> [ RESERVE_WAL> ] | LOGICAL> output_plugin> }
+ id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
+ CREATE_REPLICATION_SLOT slot_name> [ TEMPORARY> ] { PHYSICAL> [ RESERVE_WAL> ] | LOGICAL> output_plugin> [ EXPORT_SNAPSHOT> | NOEXPORT_SNAPSHOT> ] }
+
+
+ EXPORT_SNAPSHOT>
+ NOEXPORT_SNAPSHOT>
+
+ Decides what to do with the snapshot created during logical slot
+ initialization. EXPORT_SNAPSHOT>, which is the default,
+ will export the snapshot for use in other sessions. This option can't
+ be used inside a transaction. NOEXPORT_SNAPSHOT> will
+ just use the snapshot for logical decoding as normal but won't do
+ anything else with it.
+
+
+
PG_TRY();
{
- walrcv_create_slot(wrconn, slotname, false, &lsn);
+ /*
+ * Create permanent slot for the subscription. We won't use the
+ * initial snapshot for anything, so no need to export it.
+ */
+ walrcv_create_slot(wrconn, slotname, false, false, &lsn);
ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher",
slotname)));
static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
+ bool export_snapshot,
XLogRecPtr *lsn);
static bool libpqrcv_command(WalReceiverConn *conn,
const char *cmd, char **err);
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
- bool temporary, XLogRecPtr *lsn)
+ bool temporary, bool export_snapshot, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
initStringInfo(&cmd);
- appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
+ appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
if (temporary)
- appendStringInfo(&cmd, "TEMPORARY ");
+ appendStringInfo(&cmd, " TEMPORARY");
if (conn->logical)
- appendStringInfo(&cmd, "LOGICAL pgoutput");
+ {
+ appendStringInfo(&cmd, " LOGICAL pgoutput");
+ if (export_snapshot)
+ appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
+ else
+ appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
+ }
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_EXPORT_SNAPSHOT
+%token K_NOEXPORT_SNAPSHOT
%type command
%type base_backup start_replication start_logical_replication
%type plugin_opt_elem
%type plugin_opt_arg
%type opt_slot var_name
-%type opt_reserve_wal opt_temporary
+%type opt_temporary
+%type create_slot_opt_list
+%type create_slot_opt
%%
create_replication_slot:
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
- K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
+ K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->temporary = $3;
- cmd->reserve_wal = $5;
+ cmd->options = $5;
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
+ | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->slotname = $2;
cmd->temporary = $3;
cmd->plugin = $5;
+ cmd->options = $6;
$$ = (Node *) cmd;
}
;
+create_slot_opt_list:
+ create_slot_opt_list create_slot_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+create_slot_opt:
+ K_EXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | K_NOEXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(FALSE), -1);
+ }
+ | K_RESERVE_WAL
+ {
+ $$ = makeDefElem("reserve_wal",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ ;
+
/* DROP_REPLICATION_SLOT slot */
drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT
| /* EMPTY */
;
-opt_reserve_wal:
- K_RESERVE_WAL { $$ = true; }
- | /* EMPTY */ { $$ = false; }
- ;
-
opt_temporary:
K_TEMPORARY { $$ = true; }
| /* EMPTY */ { $$ = false; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
+NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
"," { return ','; }
";" { return ';'; }
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
+#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
return count;
}
+/*
+ * Process extra options given to CREATE_REPLICATION_SLOT.
+ */
+static void
+parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
+ bool *reserve_wal,
+ bool *export_snapshot)
+{
+ ListCell *lc;
+ bool snapshot_action_given = false;
+ bool reserve_wal_given = false;
+
+ /* Parse options */
+ foreach (lc, cmd->options)
+ {
+ DefElem *defel = (DefElem *) lfirst(lc);
+
+ if (strcmp(defel->defname, "export_snapshot") == 0)
+ {
+ if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ snapshot_action_given = true;
+ *export_snapshot = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "reserve_wal") == 0)
+ {
+ if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ reserve_wal_given = true;
+ *reserve_wal = true;
+ }
+ else
+ elog(ERROR, "unrecognized option: %s", defel->defname);
+ }
+}
+
/*
* Create a new replication slot.
*/
const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN];
char *slot_name;
+ bool reserve_wal = false;
+ bool export_snapshot = true;
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Assert(!MyReplicationSlot);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
+
/* setup state for XLogReadPage */
sendTimeLineIsHistoric = false;
sendTimeLine = ThisTimeLineID;
DecodingContextFindStartpoint(ctx);
/*
- * Export a plain (not of the snapbuild.c type) snapshot to the user
- * that can be imported into another session.
+ * Export the snapshot if we've been asked to do so.
+ *
+ * NB. We will convert the snapbuild.c kind of snapshot to normal
+ * snapshot when doing this.
*/
- snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+ if (export_snapshot)
+ snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
if (!cmd->temporary)
ReplicationSlotPersist();
}
- else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
+ else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
{
ReplicationSlotReserveWal();
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
slot_name);
else
+ {
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
slot_name, plugin);
+ if (PQserverVersion(conn) >= 100000)
+ /* pg_recvlogical doesn't use an exported snapshot, so suppress */
+ appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+ }
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
ReplicationKind kind;
char *plugin;
bool temporary;
- bool reserve_wal;
+ List *options;
} CreateReplicationSlotCmd;
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
- XLogRecPtr *lsn);
+ bool export_snapshot, XLogRecPtr *lsn);
typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
char **err);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
#define walrcv_command(conn, cmd, err) \
WalReceiverFunctions->walrcv_command(conn, cmd, err)
#define walrcv_disconnect(conn) \