#include
#include "access/parallel.h"
+#include "port/pg_bitutils.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
#define BARRIER_SHOULD_CHECK(flags, type) \
(((flags) & (((uint32) 1) << (uint32) (type))) != 0)
+/* Clear the relevant type bit from the flags. */
+#define BARRIER_CLEAR_BIT(flags, type) \
+ ((flags) &= ~(((uint32) 1) << (uint32) (type)))
+
static ProcSignalHeader *ProcSignal = NULL;
static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
static bool CheckProcSignal(ProcSignalReason reason);
static void CleanupProcSignalState(int status, Datum arg);
-static void ProcessBarrierPlaceholder(void);
+static void ResetProcSignalBarrierBits(uint32 flags);
+static bool ProcessBarrierPlaceholder(void);
/*
* ProcSignalShmemSize
volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i];
uint64 oldval;
+ /*
+ * It's important that we check only pss_barrierGeneration here and
+ * not pss_barrierCheckMask. Bits in pss_barrierCheckMask get cleared
+ * before the barrier is actually absorbed, but pss_barrierGeneration
+ * is updated only afterward.
+ */
oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
while (oldval < generation)
{
{
uint64 local_gen;
uint64 shared_gen;
- uint32 flags;
+ volatile uint32 flags;
Assert(MyProcSignalSlot);
* read of the barrier generation above happens before we atomically
* extract the flags, and that any subsequent state changes happen
* afterward.
+ *
+ * NB: In order to avoid race conditions, we must zero pss_barrierCheckMask
+ * first and only afterwards try to do barrier processing. If we did it
+ * in the other order, someone could send us another barrier of some
+ * type right after we called the barrier-processing function but before
+ * we cleared the bit. We would have no way of knowing that the bit needs
+ * to stay set in that case, so the need to call the barrier-processing
+ * function again would just get forgotten. So instead, we tentatively
+ * clear all the bits and then put back any for which we don't manage
+ * to successfully absorb the barrier.
*/
flags = pg_atomic_exchange_u32(&MyProcSignalSlot->pss_barrierCheckMask, 0);
/*
- * Process each type of barrier. It's important that nothing we call from
- * here throws an error, because pss_barrierCheckMask has already been
- * cleared. If we jumped out of here before processing all barrier types,
- * then we'd forget about the need to do so later.
- *
- * NB: It ought to be OK to call the barrier-processing functions
- * unconditionally, but it's more efficient to call only the ones that
- * might need us to do something based on the flags.
+ * If there are no flags set, then we can skip doing any real work.
+ * Otherwise, establish a PG_TRY block, so that we don't lose track of
+ * which types of barrier processing are needed if an ERROR occurs.
*/
- if (BARRIER_SHOULD_CHECK(flags, PROCSIGNAL_BARRIER_PLACEHOLDER))
- ProcessBarrierPlaceholder();
+ if (flags != 0)
+ {
+ bool success = true;
+
+ PG_TRY();
+ {
+ /*
+ * Process each type of barrier. The barrier-processing functions
+ * should normally return true, but may return false if the barrier
+ * can't be absorbed at the current time. This should be rare,
+ * because it's pretty expensive. Every single
+ * CHECK_FOR_INTERRUPTS() will return here until we manage to
+ * absorb the barrier, and that cost will add up in a hurry.
+ *
+ * NB: It ought to be OK to call the barrier-processing functions
+ * unconditionally, but it's more efficient to call only the ones
+ * that might need us to do something based on the flags.
+ */
+ while (flags != 0)
+ {
+ ProcSignalBarrierType type;
+ bool processed = true;
+
+ type = (ProcSignalBarrierType) pg_rightmost_one_pos32(flags);
+ switch (type)
+ {
+ case PROCSIGNAL_BARRIER_PLACEHOLDER:
+ processed = ProcessBarrierPlaceholder();
+ break;
+ }
+
+ /*
+ * To avoid an infinite loop, we must always unset the bit
+ * in flags.
+ */
+ BARRIER_CLEAR_BIT(flags, type);
+
+ /*
+ * If we failed to process the barrier, reset the shared bit
+ * so we try again later, and set a flag so that we don't bump
+ * our generation.
+ */
+ if (!processed)
+ {
+ ResetProcSignalBarrierBits(((uint32) 1) << type);
+ success = false;
+ }
+ }
+ }
+ PG_CATCH();
+ {
+ /*
+ * If an ERROR occurred, we'll need to try again later to handle
+ * that barrier type and any others that haven't been handled yet
+ * or weren't successfully absorbed.
+ */
+ ResetProcSignalBarrierBits(flags);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ /*
+ * If some barrier types were not successfully absorbed, we will have
+ * to try again later.
+ */
+ if (!success)
+ return;
+ }
/*
* State changes related to all types of barriers that might have been
pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierGeneration, shared_gen);
}
+/*
+ * If it turns out that we couldn't absorb one or more barrier types, either
+ * because the barrier-processing functions returned false or due to an error,
+ * arrange for processing to be retried later.
+ */
static void
+ResetProcSignalBarrierBits(uint32 flags)
+{
+ pg_atomic_fetch_or_u32(&MyProcSignalSlot->pss_barrierCheckMask, flags);
+ ProcSignalBarrierPending = true;
+ InterruptPending = true;
+}
+
+static bool
ProcessBarrierPlaceholder(void)
{
/*
* appropriately descriptive. Get rid of this function and instead have
* ProcessBarrierSomethingElse. Most likely, that function should live in
* the file pertaining to that subsystem, rather than here.
+ *
+ * The return value should be 'true' if the barrier was successfully
+ * absorbed and 'false' if not. Note that returning 'false' can lead to
+ * very frequent retries, so try hard to make that an uncommon case.
*/
+ return true;
}
/*