- Allows streaming or serializing changes immediately in logical decoding.
- The allowed values of logical_replication_mode are
- buffered and immediate . When set
- to immediate , stream each change if
+ The allowed values are buffered and
+ immediate . The default is buffered .
+ This parameter is intended to be used to test logical decoding and
+ replication of large transactions. The effect of
+ logical_replication_mode is different for the
+ publisher and subscriber:
+
+
+ On the publisher side, logical_replication_mode
+ allows streaming or serializing changes immediately in logical decoding.
+ When set to immediate , stream each change if the
streaming option (see optional parameters set by
CREATE SUBSCRIPTION )
is enabled, otherwise, serialize each change. When set to
- buffered , which is the default, decoding will stream
- or serialize changes when logical_decoding_work_mem
- is reached.
+ buffered , the decoding will stream or serialize
+ changes when logical_decoding_work_mem is reached.
+
- This parameter is intended to be used to test logical decoding and
- replication of large transactions for which otherwise we need to
- generate the changes till logical_decoding_work_mem
- is reached.
+ On the subscriber side, if the streaming option is set to
+ parallel , logical_replication_mode
+ can be used to direct the leader apply worker to send changes to the
+ shared memory queue or to serialize all changes to the file. When set to
+ buffered , the leader sends changes to parallel apply
+ workers via a shared memory queue. When set to
+ immediate , the leader serializes all changes to files
+ and notifies the parallel apply workers to read and apply them at the
+ end of the transaction.
Assert(!IsTransactionState());
Assert(!winfo->serialize_changes);
+ /*
+ * We don't try to send data to parallel worker for 'immediate' mode. This
+ * is primarily used for testing purposes.
+ */
+ if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
+ return false;
+
/*
* This timeout is a bit arbitrary but testing revealed that it is sufficient
* to send the message unless the parallel apply worker is waiting on some
startTime = GetCurrentTimestamp();
else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
SHM_SEND_TIMEOUT_MS))
- {
- ereport(LOG,
- (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
- winfo->shared->xid)));
return false;
- }
}
}
pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
bool stream_locked)
{
+ ereport(LOG,
+ (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
+ winfo->shared->xid)));
+
/*
* The parallel apply worker could be stuck for some reason (say waiting
* on some lock by other backend), so stop trying to send data directly to
{
{"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS,
- gettext_noop("Controls when to replicate each change."),
- gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."),
+ gettext_noop("Controls when to replicate or apply each change."),
+ gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. "
+ "On the subscriber, it allows serialization of all changes to files and notifies the "
+ "parallel apply workers to read and apply them at the end of the transaction."),
GUC_NOT_IN_SAMPLE
},
&logical_replication_mode,
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(10000), 'data replicated to subscriber after dropping index');
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(15000), 'parallel apply worker replayed all changes from file');
+
$node_subscriber->stop;
$node_publisher->stop;
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2 ");
my $appname = 'tap_sub';
test_streaming($node_publisher, $node_subscriber, $appname, 1);
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ ROLLBACK;
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is aborted on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(0), 'check rollback was reflected on subscriber');
+
+# Serialize the ABORT sub-transaction.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ SAVEPOINT sp;
+ INSERT INTO test_tab_2 values(1);
+ ROLLBACK TO sp;
+ COMMIT;
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that only sub-transaction is aborted on subscriber.
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'check rollback to savepoint was reflected on subscriber');
+
$node_subscriber->stop;
$node_publisher->stop;
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication (streaming = on)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2 ");
my $appname = 'tap_sub';
test_streaming($node_publisher, $node_subscriber, $appname, 1);
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ PREPARE TRANSACTION 'xact';
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'transaction is committed on subscriber');
+
###############################
# check all the cleanup
###############################