&found);
if (!found)
{
- SpinLockInit(&waitLSN->waitersHeapMutex);
pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX);
pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL);
memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
procInfo->procnum = MyProcNumber;
procInfo->waitLSN = lsn;
- SpinLockAcquire(&waitLSN->waitersHeapMutex);
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode);
procInfo->inHeap = true;
updateMinWaitedLSN();
- SpinLockRelease(&waitLSN->waitersHeapMutex);
+ LWLockRelease(WaitLSNLock);
}
/*
{
WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
- SpinLockAcquire(&waitLSN->waitersHeapMutex);
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
if (!procInfo->inHeap)
{
- SpinLockRelease(&waitLSN->waitersHeapMutex);
+ LWLockRelease(WaitLSNLock);
return;
}
procInfo->inHeap = false;
updateMinWaitedLSN();
- SpinLockRelease(&waitLSN->waitersHeapMutex);
+ LWLockRelease(WaitLSNLock);
}
/*
wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
- SpinLockAcquire(&waitLSN->waitersHeapMutex);
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
/*
* Iterate the pairing heap of waiting processes till we find LSN not yet
updateMinWaitedLSN();
- SpinLockRelease(&waitLSN->waitersHeapMutex);
+ LWLockRelease(WaitLSNLock);
/*
* Set latches for processes, whose waited LSNs are already replayed. This
/*
* A pairing heap of waiting processes order by LSN values (least LSN is
- * on top).
+ * on top). Protected by WaitLSNLock.
*/
pairingheap waitersHeap;
- /* A mutex protecting the pairing heap above */
- slock_t waitersHeapMutex;
-
/* An array with per-process information, indexed by the process number */
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
} WaitLSNState;