- logical-replication-mode" xreflabel="logical_replication_mode">
- logical_replication_mode (enum )
+ debug-logical-replication-streaming" xreflabel="debug_logical_replication_streaming">
+ debug_logical_replication_streaming (enum )
-
logical_replication_mode configuration parameter
+
debug_logical_replication_streaming configuration parameter
immediate . The default is buffered .
This parameter is intended to be used to test logical decoding and
replication of large transactions. The effect of
- logical_replication_mode is different for the
+ debug_logical_replication_streaming is different for the
publisher and subscriber:
- On the publisher side, logical_replication_mode
+ On the publisher side, debug_logical_replication_streaming
allows streaming or serializing changes immediately in logical decoding.
When set to immediate , stream each change if the
streaming
On the subscriber side, if the streaming option is set to
- parallel , logical_replication_mode
+ parallel , debug_logical_replication_streaming
can be used to direct the leader apply worker to send changes to the
shared memory queue or to serialize all changes to the file. When set to
buffered , the leader sends changes to parallel apply
* We don't try to send data to parallel worker for 'immediate' mode. This
* is primarily used for testing purposes.
*/
- if (unlikely(logical_replication_mode == LOGICAL_REP_MODE _IMMEDIATE))
+ if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING _IMMEDIATE))
return false;
/*
static const Size max_changes_in_memory = 4096; /* XXX for restore only */
/* GUC variable */
-int logical_replication_mode = LOGICAL_REP_MODE _BUFFERED;
+int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING _BUFFERED;
/* ---------------------------------------
* primary reorderbuffer support routines
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
* disk or send to the output plugin until we reach under the memory limit.
*
- * If logical_replication_mode is set to "immediate", stream or serialize the
- * changes immediately.
+ * If debug_logical_replication_streaming is set to "immediate", stream or
+ * serialize the changes immediately.
*
* XXX At this point we select the transactions until we reach under the memory
* limit, but we might also adapt a more elaborate eviction strategy - for example
ReorderBufferTXN *txn;
/*
- * Bail out if logical_replication_mode is buffered and we haven't
- * exceeded the memory limit.
+ * Bail out if debug_logical_replication_streaming is buffered and we
+ * haven't exceeded the memory limit.
*/
- if (logical_replication_mode == LOGICAL_REP_MODE _BUFFERED &&
+ if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING _BUFFERED &&
rb->size < logical_decoding_work_mem * 1024L)
return;
/*
- * If logical_replication_mode is immediate, loop until there's no change.
- * Otherwise, loop until we reach under the memory limit. One might think
- * that just by evicting the largest (sub)transaction we will come under
- * the memory limit based on assumption that the selected transaction is
- * at least as large as the most recent change (which caused us to go over
- * the memory limit). However, that is not true because a user can reduc e
- * the logical_decoding_work_mem to a smaller value before the most recent
- * change.
+ * If debug_logical_replication_streaming is immediate, loop until there's
+ * no change. Otherwise, loop until we reach under the memory limit. One
+ * might think that just by evicting the largest (sub)transaction we will
+ * come under the memory limit based on assumption that the selected
+ * transaction is at least as large as the most recent change (which
+ * caused us to go over the memory limit). However, that is not tru e
+ * because a user can reduce the logical_decoding_work_mem to a smaller
+ * value before the most recent change.
*/
while (rb->size >= logical_decoding_work_mem * 1024L ||
- (logical_replication_mode == LOGICAL_REP_MODE _IMMEDIATE &&
+ (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING _IMMEDIATE &&
rb->size > 0))
{
/*
{NULL, 0, false}
};
-static const struct config_enum_entry logical_replication_mode _options[] = {
- {"buffered", LOGICAL_REP_MODE _BUFFERED, false},
- {"immediate", LOGICAL_REP_MODE _IMMEDIATE, false},
+static const struct config_enum_entry debug_logical_replication_streaming _options[] = {
+ {"buffered", DEBUG_LOGICAL_REP_STREAMING _BUFFERED, false},
+ {"immediate", DEBUG_LOGICAL_REP_STREAMING _IMMEDIATE, false},
{NULL, 0, false}
};
},
{
- {"logical_replication_mode ", PGC_USERSET, DEVELOPER_OPTIONS,
- gettext_noop("Controls when to replicate or apply each change ."),
+ {"debug_logical_replication_streaming ", PGC_USERSET, DEVELOPER_OPTIONS,
+ gettext_noop("Forces immediate streaming or serialization of changes in large transactions ."),
gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding. "
"On the subscriber, it allows serialization of all changes to files and notifies the "
"parallel apply workers to read and apply them at the end of the transaction."),
GUC_NOT_IN_SAMPLE
},
- &logical_replication_mode ,
- LOGICAL_REP_MODE_BUFFERED, logical_replication_mode _options,
+ &debug_logical_replication_streaming ,
+ DEBUG_LOGICAL_REP_STREAMING_BUFFERED, debug_logical_replication_streaming _options,
NULL, NULL, NULL
},
/* GUC variables */
extern PGDLLIMPORT int logical_decoding_work_mem;
-extern PGDLLIMPORT int logical_replication_mode ;
+extern PGDLLIMPORT int debug_logical_replication_streaming ;
-/* possible values for logical_replication_mode */
+/* possible values for debug_logical_replication_streaming */
typedef enum
{
- LOGICAL_REP_MODE _BUFFERED,
- LOGICAL_REP_MODE _IMMEDIATE
-} LogicalRep Mode;
+ DEBUG_LOGICAL_REP_STREAMING _BUFFERED,
+ DEBUG_LOGICAL_REP_STREAMING _IMMEDIATE
+} DebugLogicalRepStreaming Mode;
/* an individual tuple, stored in one chunk of memory */
typedef struct ReorderBufferTupleBuf
# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
- 'logical_replication_mode = immediate');
+ 'debug_logical_replication_streaming = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf',
"log_min_messages = warning");
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
- 'logical_replication_mode = immediate');
+ 'debug_logical_replication_streaming = immediate');
$node_publisher->start;
# Create subscriber node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
- 'logical_replication_mode = immediate');
+ 'debug_logical_replication_streaming = immediate');
$node_publisher->start;
# Create subscriber node
# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
- 'logical_replication_mode = immediate');
+ 'debug_logical_replication_streaming = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf',
"log_min_messages = warning");
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
- 'logical_replication_mode = immediate');
+ 'debug_logical_replication_streaming = immediate');
$node_publisher->start;
# Create subscriber node
$node_publisher->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
-logical_replication_mode = immediate
+debug_logical_replication_streaming = immediate
));
$node_publisher->start;
# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
- 'logical_replication_mode = immediate');
+ 'debug_logical_replication_streaming = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf',
"log_min_messages = warning");