Specifies whether transaction commit will wait for WAL records
to be written to disk before the command returns a success>
- indication to the client. Valid values are on>,
+ indication to the client. Valid values are on>, write>,
local>, and off>. The default, and safe, value
is on>. When off>, there can be a delay between
when success is reported to the client and when the transaction is
If is set, this
parameter also controls whether or not transaction commit will wait
for the transaction's WAL records to be flushed to disk and replicated
- to the standby server. The commit wait will last until a reply from
- the current synchronous standby indicates it has written the commit
- record of the transaction to durable storage. If synchronous
+ to the standby server. When write>, the commit wait will
+ last until a reply from the current synchronous standby indicates
+ it has received the commit record of the transaction to memory.
+ Normally this causes no data loss at the time of failover. However,
+ if both primary and standby crash, and the database cluster of
+ the primary gets corrupted, recent committed transactions might
+ be lost. When on>, the commit wait will last until a reply
+ from the current synchronous standby indicates it has flushed
+ the commit record of the transaction to durable storage. This
+ avoids any data loss unless the database cluster of both primary and
+ standby gets corrupted simultaneously. If synchronous
replication is in use, it will normally be sensible either to wait
- both for WAL records to reach both the local and remote disks, or
+ for both local flush and replication of WAL records, or
to allow the transaction to commit asynchronously. However, the
special value local> is available for transactions that
wish to wait for local flush to disk, but not synchronous replication.
* per-transaction state information.
*
* Replication is either synchronous or not synchronous (async). If it is
- * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
- * for the flush location on the standby before releasing the waiting backend.
+ * async, we just fastpath out of here. If it is sync, then we wait for
+ * the write or flush location on the standby before releasing the waiting backend.
* Further complexity in that interaction is expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
static bool announce_next_takeover = true;
-static void SyncRepQueueInsert(void);
+static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+
+static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
-static bool SyncRepQueueIsOrderedByLSN(void);
+static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
/*
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XLByteLE(XactCommitLSN, WalSndCtl->lsn))
+ XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode]))
{
LWLockRelease(SyncRepLock);
return;
*/
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
- SyncRepQueueInsert();
- Assert(SyncRepQueueIsOrderedByLSN());
+ SyncRepQueueInsert(SyncRepWaitMode);
+ Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode));
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
}
/*
- * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
+ * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
*
* Usually we will go at tail of queue, though it's possible that we arrive
* here out of order, so start at tail and work back to insertion point.
*/
static void
-SyncRepQueueInsert(void)
+SyncRepQueueInsert(int mode)
{
PGPROC *proc;
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
break;
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
if (proc)
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
else
- SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
+ SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
}
/*
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
- int numprocs = 0;
+ int numwrite = 0;
+ int numflush = 0;
int priority = 0;
int i;
return;
}
- if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
+ /*
+ * Set the lsn first so that when we wake backends they will release
+ * up to this location.
+ */
+ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
{
- /*
- * Set the lsn first so that when we wake backends they will release
- * up to this location.
- */
- walsndctl->lsn = MyWalSnd->flush;
- numprocs = SyncRepWakeQueue(false);
+ walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+ numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
+ }
+ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+ numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to %X/%X",
- numprocs,
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ numwrite,
+ MyWalSnd->write.xlogid,
+ MyWalSnd->write.xrecoff,
+ numflush,
MyWalSnd->flush.xlogid,
MyWalSnd->flush.xrecoff);
}
/*
- * Walk queue from head. Set the state of any backends that need to be woken,
- * remove them from the queue, and then wake them. Pass all = true to wake
- * whole queue; otherwise, just wake up to the walsender's LSN.
+ * Walk the specified queue from head. Set the state of any backends that
+ * need to be woken, remove them from the queue, and then wake them.
+ * Pass all = true to wake whole queue; otherwise, just wake up to
+ * the walsender's LSN.
*
* Must hold SyncRepLock.
*/
int
-SyncRepWakeQueue(bool all)
+SyncRepWakeQueue(bool all, int mode)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
- Assert(SyncRepQueueIsOrderedByLSN());
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ Assert(SyncRepQueueIsOrderedByLSN(mode));
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
/*
* Assume the queue is ordered by LSN
*/
- if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
+ if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
return numprocs;
/*
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
- SyncRepWakeQueue(true);
+ {
+ int i;
+
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SyncRepWakeQueue(true, i);
+ }
/*
* Only allow people to join the queue when there are synchronous
#ifdef USE_ASSERT_CHECKING
static bool
-SyncRepQueueIsOrderedByLSN(void)
+SyncRepQueueIsOrderedByLSN(int mode)
{
PGPROC *proc = NULL;
XLogRecPtr lastLSN;
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
lastLSN = proc->waitLSN;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
return true;
}
+
+void
+assign_synchronous_commit(int newval, void *extra)
+{
+ switch (newval)
+ {
+ case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+ SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
+ break;
+ case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+ SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
+ break;
+ default:
+ SyncRepWaitMode = SYNC_REP_NO_WAIT;
+ break;
+ }
+}