Fix statistics reporting in logical replication workers
authorPeter Eisentraut
Mon, 8 May 2017 16:07:59 +0000 (12:07 -0400)
committerPeter Eisentraut
Mon, 8 May 2017 16:10:22 +0000 (12:10 -0400)
This new arrangement ensures that statistics are reported right after
commit of transactions.  The previous arrangement didn't get this quite
right and could lead to assertion failures.

Author: Petr Jelinek 
Reported-by: Erik Rijkers
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c

index 0823000f001b4dbabf1a1672dc7c6e28d0b42a61..7e51076b376d9fbb9e3c0eb955aeef26afe04a88 100644 (file)
@@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
    static List *table_states = NIL;
    static HTAB *last_start_times = NULL;
    ListCell   *lc;
+   bool        started_tx = false;
 
    Assert(!IsTransactionState());
 
@@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
        table_states = NIL;
 
        StartTransactionCommand();
+       started_tx = true;
 
        /* Fetch all non-ready tables. */
        rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
        }
        MemoryContextSwitchTo(oldctx);
 
-       CommitTransactionCommand();
-
        table_states_valid = true;
    }
 
@@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
            {
                rstate->state = SUBREL_STATE_READY;
                rstate->lsn = current_lsn;
-               StartTransactionCommand();
+               if (!started_tx)
+               {
+                   StartTransactionCommand();
+                   started_tx = true;
+               }
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        rstate->relid, rstate->state,
                                        rstate->lsn);
-               CommitTransactionCommand();
            }
        }
        else
@@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
            }
        }
    }
+
+   if (started_tx)
+   {
+       CommitTransactionCommand();
+       pgstat_report_stat(false);
+   }
 }
 
 /*
@@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                        MyLogicalRepWorker->relstate,
                                        MyLogicalRepWorker->relstate_lsn);
                CommitTransactionCommand();
+               pgstat_report_stat(false);
 
                /*
                 * We want to do the table data sync in single
index 2d7770d4dc1502db71e338a72701c8b60b371f1c..a61240ceee7d3719316926664024c5e9bf9b9fa8 100644 (file)
@@ -453,6 +453,7 @@ apply_handle_commit(StringInfo s)
        replorigin_session_origin_timestamp = commit_data.committime;
 
        CommitTransactionCommand();
+       pgstat_report_stat(false);
 
        store_flush_position(commit_data.end_lsn);
    }
@@ -462,7 +463,6 @@ apply_handle_commit(StringInfo s)
    /* Process any tables that are being synchronized in parallel. */
    process_syncing_tables(commit_data.end_lsn);
 
-   pgstat_report_stat(false);
    pgstat_report_activity(STATE_IDLE, NULL);
 }