shm_mq_sendv: Fix flushing bug when receiver not yet attached.
authorRobert Haas
Tue, 24 May 2022 14:55:01 +0000 (10:55 -0400)
committerRobert Haas
Tue, 31 May 2022 12:46:54 +0000 (08:46 -0400)
With the old logic, when the reciever had not yet attached, we would
never call shm_mq_inc_bytes_written(), even if force_flush = true
was specified. That could result in a situation where data that the
sender believes it has sent is never received.

Along the way, remove a useless function prototype for a nonexistent
function from shm_mq.h.

Commit 46846433a03dff4f2e08c8a161e54a842da360d6 introduced these
problems.

Pavan Deolasee, with a few changes by me.

Discussion: https://postgr.es/m/CABOikdPkwtLLCTnzzmpSMXo3QZa2yXq0J7Q61ssdLFAJYrOVvQ@mail.gmail.com

src/backend/storage/ipc/shm_mq.c
src/include/storage/shm_mq.h

index 6139c622e0b28205cf11ea888c52a538159414d6..8ca24de8d62beb6edf8d3009d127b7cdbcba0d0e 100644 (file)
@@ -518,8 +518,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
 
    /*
     * If the counterparty is known to have attached, we can read mq_receiver
-    * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
-    * more caution is needed.
+    * without acquiring the spinlock.  Otherwise, more caution is needed.
     */
    if (mqh->mqh_counterparty_attached)
        receiver = mq->mq_receiver;
@@ -528,9 +527,8 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
        SpinLockAcquire(&mq->mq_mutex);
        receiver = mq->mq_receiver;
        SpinLockRelease(&mq->mq_mutex);
-       if (receiver == NULL)
-           return SHM_MQ_SUCCESS;
-       mqh->mqh_counterparty_attached = true;
+       if (receiver != NULL)
+           mqh->mqh_counterparty_attached = true;
    }
 
    /*
@@ -541,7 +539,8 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
    if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
    {
        shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
-       SetLatch(&receiver->procLatch);
+       if (receiver != NULL)
+           SetLatch(&receiver->procLatch);
        mqh->mqh_send_pending = 0;
    }
 
index f5220baa781193e7a9c6a2c14897fd6b28b48c66..b6fe68725d23a767cb52ffe9b7eb303a549dce57 100644 (file)
@@ -76,7 +76,6 @@ extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
                                  int iovcnt, bool nowait, bool force_flush);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
                                    Size *nbytesp, void **datap, bool nowait);
-extern void shm_mq_flush(shm_mq_handle *mqh);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);