Add the additional information to the logical replication worker errcontext.
authorAmit Kapila
Tue, 8 Mar 2022 02:38:32 +0000 (08:08 +0530)
committerAmit Kapila
Tue, 8 Mar 2022 02:38:32 +0000 (08:08 +0530)
This commits adds both the finish LSN (commit_lsn in case transaction got
committed, prepare_lsn in case of a prepared transaction, etc.) and
replication origin name to the existing error context message.

This will help users in specifying the origin name and transaction finish
LSN to pg_replication_origin_advance() SQL function to skip a particular
transaction.

Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Euler Taveira, and Amit Kapila
Discussion: https://postgr.es/m/CAD21AoBarBf2oTF71ig2g_o=3Z_Dt6_sOpMQma1kFgbnA5OZ_w@mail.gmail.com

doc/src/sgml/logical-replication.sgml
src/backend/replication/logical/worker.c

index fb4472356d5b329177d648383347075f9288c1d3..82326c3901934d6d3f3f4aecddf87dfbb7da7f9d 100644 (file)
   
    The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the 
+   transaction that conflicts with the existing data.  When a conflict produces
+   an error, the replication won't proceed, and the logical replication worker will
+   emit the following kind of message to the subscriber's server log:
+
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (c)=(1) already exists.
+CONTEXT:  processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
+
+   The LSN of the transaction that contains the change violating the constraint and
+   the replication origin name can be found from the server log (LSN 0/14C0378 and
+   replication origin pg_16395 in the above case).  To skip the
+   transaction, the subscription needs to be disabled temporarily by
+   ALTER SUBSCRIPTION ... DISABLE first. Then, the transaction
+   can be skipped by calling the
+   
    pg_replication_origin_advance() function with
-   a node_name corresponding to the subscription name,
-   and a position.  The current position of origins can be seen in the
+   the node_name (i.e., pg_16395) and the
+   next LSN of the transaction's LSN (i.e., LSN 0/14C0379).  After that the replication
+   can be resumed by ALTER SUBSCRIPTION ... ENABLE.  The current
+   position of origins can be seen in the
    
    pg_replication_origin_status system view.
   
index 92aa794706dfc611d9718af0c1ad830b7f0b2fc2..8653e1d8402a7f918d74daf154761ab575794281 100644 (file)
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
    /* Remote node information */
    int         remote_attnum;  /* -1 if invalid */
    TransactionId remote_xid;
+   XLogRecPtr  finish_lsn;
+   char       *origin_name;
 } ApplyErrorCallbackArg;
 
 static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
    .rel = NULL,
    .remote_attnum = -1,
    .remote_xid = InvalidTransactionId,
+   .finish_lsn = InvalidXLogRecPtr,
+   .origin_name = NULL,
 };
 
 static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
 static inline void reset_apply_error_context_info(void);
 
 /*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
    LogicalRepBeginData begin_data;
 
    logicalrep_read_begin(s, &begin_data);
-   set_apply_error_context_xact(begin_data.xid);
+   set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
    remote_final_lsn = begin_data.final_lsn;
 
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
                 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
    logicalrep_read_begin_prepare(s, &begin_data);
-   set_apply_error_context_xact(begin_data.xid);
+   set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
    remote_final_lsn = begin_data.prepare_lsn;
 
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
    char        gid[GIDSIZE];
 
    logicalrep_read_commit_prepared(s, &prepare_data);
-   set_apply_error_context_xact(prepare_data.xid);
+   set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
    /* Compute GID for two_phase transactions. */
    TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
    char        gid[GIDSIZE];
 
    logicalrep_read_rollback_prepared(s, &rollback_data);
-   set_apply_error_context_xact(rollback_data.xid);
+   set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
    /* Compute GID for two_phase transactions. */
    TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
                 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
 
    logicalrep_read_stream_prepare(s, &prepare_data);
-   set_apply_error_context_xact(prepare_data.xid);
+   set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
 
    elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
-   set_apply_error_context_xact(stream_xid);
+   set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
 
    /*
     * Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
     */
    if (xid == subxid)
    {
-       set_apply_error_context_xact(xid);
+       set_apply_error_context_xact(xid, InvalidXLogRecPtr);
        stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    }
    else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
        bool        found = false;
        char        path[MAXPGPATH];
 
-       set_apply_error_context_xact(subxid);
+       set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
 
        subidx = -1;
        begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
                 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
    xid = logicalrep_read_stream_commit(s, &commit_data);
-   set_apply_error_context_xact(xid);
+   set_apply_error_context_xact(xid, commit_data.commit_lsn);
 
    elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
        myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
        pfree(syncslotname);
+
+       /*
+        * Allocate the origin name in long-lived context for error context
+        * message.
+        */
+       ReplicationOriginNameForTablesync(MySubscription->oid,
+                                         MyLogicalRepWorker->relid,
+                                         originname,
+                                         sizeof(originname));
+       apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                  originname);
    }
    else
    {
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
         * does some initializations on the upstream so let's still call it.
         */
        (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+       /*
+        * Allocate the origin name in long-lived context for error context
+        * message.
+        */
+       apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                  originname);
    }
 
    /*
@@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
    if (apply_error_callback_arg.command == 0)
        return;
 
+   Assert(errarg->origin_name);
+
    if (errarg->rel == NULL)
    {
        if (!TransactionIdIsValid(errarg->remote_xid))
-           errcontext("processing remote data during \"%s\"",
+           errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+                      errarg->origin_name,
                       logicalrep_message_type(errarg->command));
-       else
-           errcontext("processing remote data during \"%s\" in transaction %u",
+       else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+           errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+                      errarg->origin_name,
                       logicalrep_message_type(errarg->command),
                       errarg->remote_xid);
+       else
+           errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+                      errarg->origin_name,
+                      logicalrep_message_type(errarg->command),
+                      errarg->remote_xid,
+                      LSN_FORMAT_ARGS(errarg->finish_lsn));
    }
    else if (errarg->remote_attnum < 0)
-       errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+       errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+                  errarg->origin_name,
                   logicalrep_message_type(errarg->command),
                   errarg->rel->remoterel.nspname,
                   errarg->rel->remoterel.relname,
-                  errarg->remote_xid);
+                  errarg->remote_xid,
+                  LSN_FORMAT_ARGS(errarg->finish_lsn));
    else
-       errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+       errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+                  errarg->origin_name,
                   logicalrep_message_type(errarg->command),
                   errarg->rel->remoterel.nspname,
                   errarg->rel->remoterel.relname,
                   errarg->rel->remoterel.attnames[errarg->remote_attnum],
-                  errarg->remote_xid);
+                  errarg->remote_xid,
+                  LSN_FORMAT_ARGS(errarg->finish_lsn));
 }
 
 /* Set transaction information of apply error callback */
 static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
 {
    apply_error_callback_arg.remote_xid = xid;
+   apply_error_callback_arg.finish_lsn = lsn;
 }
 
 /* Reset all information of apply error callback */
@@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
    apply_error_callback_arg.command = 0;
    apply_error_callback_arg.rel = NULL;
    apply_error_callback_arg.remote_attnum = -1;
-   set_apply_error_context_xact(InvalidTransactionId);
+   set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }