--- /dev/null
+
+
+
Logical Decoding
+
+
+ PostgreSQL provides infrastructure to stream the modifications performed
+ via SQL to external consumers. This functionality can be used to for a
+ variety of purposes, including replication solutions and auditing.
+
+
+ Changes are sent out in streams identified by logical replication slots.
+ Each stream outputs each change exactly once.
+
+
+ The format in which those changes are streamed is determined by the output
+ plugin used. An example plugin is provided, and additional plugins can be
+ written to extend the choice of available formats without modifying any
+ core code.
+ Every output plugin has access to each individual new row produced
+ by INSERT and the new row version created
+ by UPDATE. Availability of old row versions for
+ UPDATE and delete DELETE depends on
+ the configured
+ REPLICA
+ IDENTITY.
+
+
+ Changes can be consumed either using the streaming replication protocol
+ (see and
+ ), or by calling functions
+ via SQL (see ). It is also possible
+ to write additional methods of consuming the output of a replication slot
+ without modifying core code
+ (see ).
+
+
+
+
Logical Decoding Example
+ The following example demonstartes the SQL interface.
+
+ Before you can use logical decoding, you must set
+ to logical and
+ ot at least 1.
+ Then, you should connect to the target database (in the example
+ below, postgres) as a superuser.
+
+postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
+postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ slotname | xlog_position
+-----------------+---------------
+ regression_slot | 0/16B1970
+(1 row)
+
+postgres=# SELECT * FROM pg_replication_slots;
+ slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
+-----------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
+ regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408
+(1 row)
+
+postgres=# -- There are no changes to see yet
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location | xid | data
+----------+-----+------
+(0 rows)
+
+postgres=# CREATE TABLE data(id serial primary key, data text);
+CREATE TABLE
+
+postgres=# -- DDL isn't replicated, so all you'll see is the transaction
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location | xid | data
+-----------+-----+------------
+ 0/16D5D48 | 688 | BEGIN 688
+ 0/16E0380 | 688 | COMMIT 688
+(2 rows)
+
+postgres=# -- Once changes are read, they're consumed and not emitted
+postgres=# -- in a subsequent call:
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location | xid | data
+----------+-----+------
+(0 rows)
+
+postgres=# BEGIN;
+postgres=# INSERT INTO data(data) VALUES('1');
+postgres=# INSERT INTO data(data) VALUES('2');
+postgres=# COMMIT;
+
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ location | xid | data
+-----------+-----+-----------------------------------------------
+ 0/16E0478 | 689 | BEGIN 689
+ 0/16E0478 | 689 | table public.data: INSERT: id[int4]:1 data[text]:'1'
+ 0/16E0580 | 689 | table public.data: INSERT: id[int4]:2 data[text]:'2'
+ 0/16E0650 | 689 | COMMIT 689
+(4 rows)
+
+postgres=# INSERT INTO data(data) VALUES('3');
+
+postgres=# -- You can also peek ahead in the change stream without consuming changes
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
+ location | xid | data
+-----------+-----+-----------------------------------------------
+ 0/16E09C0 | 690 | BEGIN 690
+ 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3'
+ 0/16E0B90 | 690 | COMMIT 690
+(3 rows)
+
+postgres=# -- You can also peek ahead in the change stream without consuming changes
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
+ location | xid | data
+-----------+-----+-----------------------------------------------
+ 0/16E09C0 | 690 | BEGIN 690
+ 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3'
+ 0/16E0B90 | 690 | COMMIT 690
+(3 rows)
+
+postgres=# -- options can be passed to output plugin, to influence the formatting
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
+ location | xid | data
+-----------+-----+-----------------------------------------------
+ 0/16E09C0 | 690 | BEGIN 690
+ 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3'
+ 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01)
+(3 rows)
+
+postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
+postgres=# -- server resources:
+postgres=# SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+-----------------------
+
+(1 row)
+
+ The following example shows usage of the walsender interface using
+ the pg_recvlogical
+ shell command. It requires the replication configurations to be allowed
+ (see )
+ and max_wal_senders to be set sufficiently high for
+ another connection.
+
+# pg_recvlogical -d testdb --slot test --create
+# pg_recvlogical -d testdb --slot test --start -f -
+CTRL-Z
+# psql -c "INSERT INTO data(data) VALUES('4');"
+# fg
+BEGIN 693
+table public.data: INSERT: id[int4]:4 data[text]:'4'
+COMMIT 693
+CTRL-C
+# pg_recvlogical -d testdb --slot test --drop
+
+
+
+
Logical Decoding Concepts
+
+
+
+
Logical Decoding
+ Logical decoding is the the process of extracting all persistent changes
+ to a database's tables into a coherent, easy to understand format which
+ can be interpreted without detailed knowledge of the database's internal
+ state.
+
+ In
PostgreSQL, logical decoding is implemented
+ by decoding the contents of the write-ahead
+ log, which describe changes on a storage level, into an
+ application-specific form such as a stream of tuples or SQL statements.
+
+
+
+
+
+
Logical Replication Slot
+
+
+
+
Replication Slots
+ In the context of logical replication, a slot represents a stream of
+ changes which can be replayed to a client in the order they were made on
+ the origin server. Each slot streams a sequence of changes from a single
+ database, sending each change exactly once (except when peeking forward
+ in the stream).
+
+
+
PostgreSQL also has streaming replication slots
+ (see ), but they are used somewhat
+ differently there.
+
+
+ Replication slots have an identifier which is unique across all databases
+ in a
PostgreSQL cluster. Slots persist
+ independently of the connection using them and are crash-safe.
+
+ Multiple independent slots may exist for a single database. Each slot has
+ its own state, allowing different consumers to receive changes from
+ different points in the database change stream. For most applications, a
+ separate slot will be required for each consumer.
+
+ A logical replication slot knows nothing about the state of the
+ receiver(s). It's even possible to have multiple different receivers using
+ the same slot at different times; they'll just get the changes following
+ on from when the last receiver stopped consuming them. Only one receiver
+ may consume changes from a slot at any given time.
+
+
+ Replication slots persist across crashes and know nothing about the state
+ of their consumer(s). They will prevent removal of required resources
+ even when there is no connection using them. This consumes storage
+ because neither required WAL nor required rows from the system catalogs
+ can be removed by VACUUM as long as they are required by a replication
+ slot, so if a slot is no longer required it should be dropped.
+
+
+
+
+
Output Plugins
+ Output plugins transform the data from the write-ahead log's internal
+ representation into the format the consumer of a replication slot desires.
+
+
+
+
Exported Snapshots
+ When a new replication slot is created using the walsender interface a
+ snapshot is exported
+ (see ) which will show
+ exactly the state of the database after which all changes will be
+ included in the change stream. This can be used to create a new replica by
+ using SET TRANSACTION
+ SNAPSHOT to read the state of the database at the moment
+ the slot was created. This transaction can then be used to dump the
+ database's state at that point in time which afterwards can be updated
+ using the slot's contents without loosing any changes.
+
+
+
+
+
Streaming Replication Protocol Interface
+ The CREATE_REPLICATION_SLOT SLOT slotname LOGICAL
+ options, DROP_REPLICATION_SLOT SLOT slotname
+ and START_REPLICATION SLOT slotname LOGICAL options
+ commands can be used to create, drop and stream changes from a replication
+ slot respectively. These commands are only available over a replication
+ connection; they cannot be used via SQL.
+ See .
+
+ The pg_recvlogical command
+ (see ) can be used to control logical
+ decoding over a walsender connection.
+
+
+
+
Logical Decoding SQL Interface
+ See for detailed documentation on
+ the SQL-level API for interacting with logical decoding.
+
+ Synchronous replication (see ) is
+ only supported on replication slots used over the walsender interface. The
+ function interface and additional, non-core interfaces do not support
+ synchronous replication.
+
+
+
+
System catalogs related to logical decoding
+ The pg_replication_slots
+ view and the
+ pg_stat_replication
+ view provide information about the current state of replication slots and
+ walsender connections respectively. These views apply to both physical and
+ logical replication.
+
+
+
+
Logical Decoding Output Plugins
+ An example output plugin can be found in the
+
+ contrib/test_decoding
+
+ subdirectory of the PostgreSQL source tree.
+
+
+
Initialization Function
+
+
+ An output plugin is loaded by dynamically loading a shared library with
+ the output plugin's name as the library basename. The normal library
+ search path is used to locate the library. To provide the required output
+ plugin callbacks and to indicate that the library is actually an output
+ plugin it needs to provide a function named
+ _PG_output_plugin_init. This function is passed a
+ struct that needs to be filled with the callback function pointers for
+ individual actions.
+typedef struct OutputPluginCallbacks
+{
+ LogicalDecodeStartupCB startup_cb;
+ LogicalDecodeBeginCB begin_cb;
+ LogicalDecodeChangeCB change_cb;
+ LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeShutdownCB shutdown_cb;
+} OutputPluginCallbacks;
+typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
+
+ The begin_cb, change_cb
+ and commit_cb callbacks are required,
+ while startup_cb
+ and shutdown_cb are optional.
+
+
+
+
+
Capabilities
+ To decode, format and output changes, output plugins can use most of the
+ backend's normal infrastructure, including calling output functions. Read
+ only access to relations is permitted as long as only relations are
+ accessed that either have been created by initdb in
+ the pg_catalog schema, or have are marked as user
+ provided catalog tables using
+ALTER TABLE user_catalog_table SET (user_catalog_table = true);
+CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
+
+ Any actions leading to xid assignment are prohibited. That, among others,
+ includes writing to tables, performing DDL changes and
+ calling txid_current().
+
+
+
+
+
Output Plugin Callbacks
+ An output plugin gets notified about changes that are happening via
+ various callbacks it needs to provide.
+
+ Concurrent transactions are decoded in commit order and only changes
+ belonging to a specific transaction are decoded inbetween
+ the begin and commit
+ callbacks. Transactions that were rolled back explicitly or implicitly
+ never get
+ decoded. Successfull SAVEPOINTs are
+ folded into the transaction containing them in the order they were
+ exectuded within that transaction.
+
+
+ Only transactions that have already safely been flushed to disk will be
+ decoded. That can lead to a COMMIT not immediately being decoded in a
+ directly following pg_logical_slot_get_changes()
+ when synchronous_commit is set
+ to off.
+
+
+
+
Startup Callback
+ The optional startup_cb callback is called whenever
+ an replication slot is created or asked to stream changes, independent
+ of the number of changes that are ready to be put out.
+typedef void (*LogicalDecodeStartupCB) (
+ struct LogicalDecodingContext *ctx,
+ OutputPluginOptions *options,
+ bool is_init
+);
+
+ The is_init paramter will be true when the
+ replication slot is being created and false
+ otherwise.
options points to a struct of options
+ that output plugins can set:
+typedef struct OutputPluginOptions
+{
+ OutputPluginOutputType output_type;
+} OutputPluginOptions;
+
+ output_type has to either be set to
+ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
+ or OUTPUT_PLUGIN_BINARY_OUTPUT.
+
+ The startup callback should validate the options present in
+ ctx->output_plugin_options. If the output plugin
+ needs to have a state, it can
+ use ctx->output_plugin_private to store it.
+
+
+
+
Shutdown Callback
+ The optional shutdown_cb callback is called
+ whenever a formerly active replication slot is not used anymore and can
+ be used to deallocate resources private to the output plugin. The slot
+ isn't necessarily being dropped, streaming is just being stopped.
+typedef void (*LogicalDecodeShutdownCB) (
+ struct LogicalDecodingContext *ctx
+);
+
+
+
+
+
Transaction Begin Callback
+ The required begin_cb callback is called whenever a
+ start of a commited transaction has been decoded. Aborted transactions
+ and their contents never get decoded.
+typedef void (*LogicalDecodeBeginCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn
+);
+
+ The
txn parameter contains meta information about
+ the transaction, like the timestamp at which it has been committed and
+ its XID.
+
+
+
+
Transaction End Callback
+ The required commit_cb callback is called whenever
+ a transaction commit has been
+ decoded. The change_cb callbacks for all modified
+ rows will have been called before this, if there have been any modified
+ rows.
+typedef void (*LogicalDecodeCommitCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn
+);
+
+
+
+
+
Callback called for each individual change in a
+ transaction
+ The required change_cb callback is called for every
+ individual row modification inside a transaction, may it be
+ an INSERT, UPDATE
+ or DELETE. Even if the original command modified
+ several rows at once the callback will be called indvidually for each
+ row.
+typedef void (*LogicalDecodeChangeCB) (
+ struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change
+);
+
+ The
ctx and
txn parameters
+ have the same contents as for the begin_cb
+ and commit_cb callbacks, but additionally the
+ relation descriptor
relation points to the
+ relation the row belongs to and a struct
+
change describing the row modification are passed
+ in.
+
+
+ Only changes in user defined tables that are not unlogged
+ (see ) and not temporary
+ (see ) can be extracted using
+ logical decoding.
+
+
+
+
+
+
Functions for producing output from an output plugin
+ To actually produce output, output plugins can write data to
+ the StringInfo output buffer
+ in ctx->out when inside
+ the begin_cb, commit_cb
+ or change_cb callbacks. Before writing to the output
+ buffer OutputPluginPrepareWrite(ctx, last_write) has
+ to be called, and after finishing writing to the
+ buffer OutputPluginWrite(ctx, last_write) has to be
+ called to perform the write. The
last_write
+ indicates whether a particular write was the callback's last write.
+
+ The following example shows how to output data to the consumer of an
+ output plugin:
+OutputPluginPrepareWrite(ctx, true);
+appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
+OutputPluginWrite(ctx, true);
+
+
+
+
+
+
Logical Decoding Output Writers
+ It is possible to add more output methods for logical decoding.
+ For details, see
+ src/backend/replication/logical/logicalfuncs.c.
+ Essentially, three functions need to be provided: one to read WAL, one to
+ prepare writing output, and one to write the output
+ (see ).
+
+
+
+
Synchronous replication support for Logical Decoding
+ Logical decoding may be used to to build
+ synchronous
+ replication solutions with the same user interface as synchronous
+ replication for streaming
+ replication. To do this, the walsender interface
+ (see ) must be used to stream out
+ data. Clients have to send Standby status update (F)
+ (see ) messages, just like streaming
+ replication clients do.
+
+
+ A synchronous replica receiving changes via logical decoding will work in
+ the scope of a single database. Since, in contrast to
+ that,
synchronous_standby_names currently is
+ server wide, this means this technique will not work properly if more
+ than one database is actively used.
+
+
+
+