Improve shm_mq portability around MAXIMUM_ALIGNOF and sizeof(Size).
authorRobert Haas
Tue, 18 Mar 2014 15:19:13 +0000 (11:19 -0400)
committerRobert Haas
Tue, 18 Mar 2014 15:23:13 +0000 (11:23 -0400)
Revise the original decision to expose a uint64-based interface and
use Size everywhere possible.  Avoid assuming that MAXIMUM_ALIGNOF is
8, or making any assumption about the relationship between that value
and sizeof(Size).  If MAXIMUM_ALIGNOF is bigger, we'll now insert
padding after the length word; if it's smaller, we are now prepared
to read and write the length word in chunks.

Per discussion with Tom Lane.

src/backend/storage/ipc/shm_mq.c
src/include/storage/shm_mq.h

index 2d298a35983b1722565fb70f87fa420afeedb43b..b31f4fb693cb640065057f0f746464af40bfa038 100644 (file)
@@ -72,7 +72,7 @@ struct shm_mq
    PGPROC     *mq_sender;
    uint64      mq_bytes_read;
    uint64      mq_bytes_written;
-   uint64      mq_ring_size;
+   Size        mq_ring_size;
    bool        mq_detached;
    uint8       mq_ring_offset;
    char        mq_ring[FLEXIBLE_ARRAY_MEMBER];
@@ -103,15 +103,16 @@ struct shm_mq
  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
  * the buffer, and mqh_buflen is the number of bytes allocated for it.
  *
- * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word
+ * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
  * are used to track the state of non-blocking operations.  When the caller
  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
  * are expected to retry the call at a later time with the same argument;
  * we need to retain enough state to pick up where we left off.
- * mqh_did_length_word tracks whether we read or wrote the length word,
- * mqh_partial_message_bytes tracks the number of payload bytes read or
- * written, and mqh_expected_bytes - which is used only for reads - tracks
- * the expected total size of the payload.
+ * mqh_length_word_complete tracks whether we are done sending or receiving
+ * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
+ * the number of bytes read or written for either the length word or the
+ * message itself, and mqh_expected_bytes - which is used only for reads -
+ * tracks the expected total size of the payload.
  *
  * mqh_counterparty_attached tracks whether we know the counterparty to have
  * attached to the queue at some previous point.  This lets us avoid some
@@ -128,25 +129,25 @@ struct shm_mq_handle
    dsm_segment *mqh_segment;
    BackgroundWorkerHandle *mqh_handle;
    char       *mqh_buffer;
-   uint64      mqh_buflen;
-   uint64      mqh_consume_pending;
-   uint64      mqh_partial_message_bytes;
-   uint64      mqh_expected_bytes;
-   bool        mqh_did_length_word;
+   Size        mqh_buflen;
+   Size        mqh_consume_pending;
+   Size        mqh_partial_bytes;
+   Size        mqh_expected_bytes;
+   bool        mqh_length_word_complete;
    bool        mqh_counterparty_attached;
    MemoryContext mqh_context;
 };
 
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
-                 void *data, bool nowait, uint64 *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
-                    bool nowait, uint64 *nbytesp, void **datap);
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+                 void *data, bool nowait, Size *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
+                    bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
                     BackgroundWorkerHandle *handle);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
 
@@ -163,7 +164,7 @@ shm_mq *
 shm_mq_create(void *address, Size size)
 {
    shm_mq     *mq = address;
-   uint64      data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+   Size        data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
 
    /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
    size = MAXALIGN_DOWN(size);
@@ -289,8 +290,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
    mqh->mqh_buflen = 0;
    mqh->mqh_consume_pending = 0;
    mqh->mqh_context = CurrentMemoryContext;
-   mqh->mqh_partial_message_bytes = 0;
-   mqh->mqh_did_length_word = false;
+   mqh->mqh_partial_bytes = 0;
+   mqh->mqh_length_word_complete = false;
    mqh->mqh_counterparty_attached = false;
 
    if (seg != NULL)
@@ -314,41 +315,48 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
  * the length or payload will corrupt the queue.)
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
 {
    shm_mq_result   res;
    shm_mq         *mq = mqh->mqh_queue;
-   uint64          bytes_written;
+   Size            bytes_written;
 
    Assert(mq->mq_sender == MyProc);
 
-   /* Write the message length into the buffer. */
-   if (!mqh->mqh_did_length_word)
+   /* Try to write, or finish writing, the length word into the buffer. */
+   while (!mqh->mqh_length_word_complete)
    {
-       res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
-                               &bytes_written);
+       Assert(mqh->mqh_partial_bytes < sizeof(Size));
+       res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+                               ((char *) &nbytes) + mqh->mqh_partial_bytes,
+                               nowait, &bytes_written);
+       mqh->mqh_partial_bytes += bytes_written;
        if (res != SHM_MQ_SUCCESS)
            return res;
 
-       /*
-        * We're sure to have sent the length in full, since we always
-        * write a MAXALIGN'd chunk.
-        */
-       Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
-       mqh->mqh_did_length_word = true;
+       if (mqh->mqh_partial_bytes >= sizeof(Size))
+       {
+           Assert(mqh->mqh_partial_bytes == sizeof(Size));
+
+           mqh->mqh_partial_bytes = 0;
+           mqh->mqh_length_word_complete = true;
+       }
+
+       /* Length word can't be split unless bigger than required alignment. */
+       Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
    }
 
    /* Write the actual data bytes into the buffer. */
-   Assert(mqh->mqh_partial_message_bytes <= nbytes);
-   res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
-                           ((char *) data) + mqh->mqh_partial_message_bytes,
+   Assert(mqh->mqh_partial_bytes <= nbytes);
+   res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
+                           ((char *) data) + mqh->mqh_partial_bytes,
                            nowait, &bytes_written);
    if (res == SHM_MQ_WOULD_BLOCK)
-       mqh->mqh_partial_message_bytes += bytes_written;
+       mqh->mqh_partial_bytes += bytes_written;
    else
    {
-       mqh->mqh_partial_message_bytes = 0;
-       mqh->mqh_did_length_word = false;
+       mqh->mqh_partial_bytes = 0;
+       mqh->mqh_length_word_complete = false;
    }
    if (res != SHM_MQ_SUCCESS)
        return res;
@@ -380,13 +388,12 @@ shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
  * function again after the process latch has been set.
  */
 shm_mq_result
-shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
+shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 {
    shm_mq         *mq = mqh->mqh_queue;
    shm_mq_result   res;
-   uint64          rb = 0;
-   uint64          nbytes;
-   uint64          needed;
+   Size            rb = 0;
+   Size            nbytes;
    void           *rawdata;
 
    Assert(mq->mq_receiver == MyProc);
@@ -414,44 +421,91 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
        mqh->mqh_consume_pending = 0;
    }
 
-   /* Determine the message length. */
-   if (mqh->mqh_did_length_word)
-   {
-       /* We've partially received a message; recall expected length. */
-       nbytes = mqh->mqh_expected_bytes;
-   }
-   else
+   /* Try to read, or finish reading, the length word from the buffer. */
+   while (!mqh->mqh_length_word_complete)
    {
        /* Try to receive the message length word. */
-       res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
+       Assert(mqh->mqh_partial_bytes < sizeof(Size));
+       res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+                                  nowait, &rb, &rawdata);
        if (res != SHM_MQ_SUCCESS)
            return res;
-       Assert(rb >= sizeof(uint64));
-       memcpy(&nbytes, rawdata, sizeof(uint64));
-       mqh->mqh_expected_bytes = nbytes;
 
-       /* If we've already got the whole message, we're done. */
-       needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
-       if (rb >= needed)
+       /*
+        * Hopefully, we'll receive the entire message length word at once.
+        * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
+        * multiple reads.
+        */
+       if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
        {
+           Size            needed;
+
+           nbytes = * (Size *) rawdata;
+
+           /* If we've already got the whole message, we're done. */
+           needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
+           if (rb >= needed)
+           {
+               /*
+                * Technically, we could consume the message length information
+                * at this point, but the extra write to shared memory wouldn't
+                * be free and in most cases we would reap no benefit.
+                */
+               mqh->mqh_consume_pending = needed;
+               *nbytesp = nbytes;
+               *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
+               return SHM_MQ_SUCCESS;
+           }
+
            /*
-            * Technically, we could consume the message length information at
-            * this point, but the extra write to shared memory wouldn't be
-            * free and in most cases we would reap no benefit.
+            * We don't have the whole message, but we at least have the whole
+            * length word.
             */
-           mqh->mqh_consume_pending = needed;
-           *nbytesp = nbytes;
-           *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
-           return SHM_MQ_SUCCESS;
+           mqh->mqh_expected_bytes = nbytes;
+           mqh->mqh_length_word_complete = true;
+           shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+           rb -= MAXALIGN(sizeof(Size));
        }
+       else
+       {
+           Size    lengthbytes;
+
+           /* Can't be split unless bigger than required alignment. */
+           Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
 
-       /* Consume the length word. */
-       shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
-       mqh->mqh_did_length_word = true;
-       rb -= MAXALIGN64(sizeof(uint64));
+           /* Message word is split; need buffer to reassemble. */
+           if (mqh->mqh_buffer == NULL)
+           {
+               mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
+                                                    MQH_INITIAL_BUFSIZE);
+               mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
+           }
+           Assert(mqh->mqh_buflen >= sizeof(Size));
+
+           /* Copy and consume partial length word. */
+           if (mqh->mqh_partial_bytes + rb > sizeof(Size))
+               lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
+           else
+               lengthbytes = rb - mqh->mqh_partial_bytes;
+           memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
+                  lengthbytes);
+           mqh->mqh_partial_bytes += lengthbytes;
+           shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+           rb -= lengthbytes;
+
+           /* If we now have the whole word, we're ready to read payload. */
+           if (mqh->mqh_partial_bytes >= sizeof(Size))
+           {
+               Assert(mqh->mqh_partial_bytes == sizeof(Size));
+               mqh->mqh_expected_bytes = * (Size *) mqh->mqh_buffer;
+               mqh->mqh_length_word_complete = true;
+               mqh->mqh_partial_bytes = 0;
+           }
+       }
    }
+   nbytes = mqh->mqh_expected_bytes;
 
-   if (mqh->mqh_partial_message_bytes == 0)
+   if (mqh->mqh_partial_bytes == 0)
    {
        /*
         * Try to obtain the whole message in a single chunk.  If this works,
@@ -463,8 +517,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
            return res;
        if (rb >= nbytes)
        {
-           mqh->mqh_did_length_word = false;
-           mqh->mqh_consume_pending = MAXALIGN64(nbytes);
+           mqh->mqh_length_word_complete = false;
+           mqh->mqh_consume_pending = MAXALIGN(nbytes);
            *nbytesp = nbytes;
            *datap = rawdata;
            return SHM_MQ_SUCCESS;
@@ -477,7 +531,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
         */
        if (mqh->mqh_buflen < nbytes)
        {
-           uint64      newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
+           Size    newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
 
            while (newbuflen < nbytes)
                newbuflen *= 2;
@@ -496,12 +550,12 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
    /* Loop until we've copied the entire message. */
    for (;;)
    {
-       uint64  still_needed;
+       Size    still_needed;
 
        /* Copy as much as we can. */
-       Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
-       memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
-       mqh->mqh_partial_message_bytes += rb;
+       Assert(mqh->mqh_partial_bytes + rb <= nbytes);
+       memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
+       mqh->mqh_partial_bytes += rb;
 
        /*
         * Update count of bytes read, with alignment padding.  Note
@@ -509,16 +563,15 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
         * end of a message, because the buffer size is a multiple of
         * MAXIMUM_ALIGNOF, and each read and write is as well.
         */
-       Assert(mqh->mqh_partial_message_bytes == nbytes ||
-               rb == MAXALIGN64(rb));
-       shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
+       Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
+       shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
 
        /* If we got all the data, exit the loop. */
-       if (mqh->mqh_partial_message_bytes >= nbytes)
+       if (mqh->mqh_partial_bytes >= nbytes)
            break;
 
        /* Wait for some more data. */
-       still_needed = nbytes - mqh->mqh_partial_message_bytes;
+       still_needed = nbytes - mqh->mqh_partial_bytes;
        res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
        if (res != SHM_MQ_SUCCESS)
            return res;
@@ -529,8 +582,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
    /* Return the complete message, and reset for next message. */
    *nbytesp = nbytes;
    *datap = mqh->mqh_buffer;
-   mqh->mqh_did_length_word = false;
-   mqh->mqh_partial_message_bytes = 0;
+   mqh->mqh_length_word_complete = false;
+   mqh->mqh_partial_bytes = 0;
    return SHM_MQ_SUCCESS;
 }
 
@@ -598,14 +651,14 @@ shm_mq_detach(shm_mq *mq)
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
-                 uint64 *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
+                 Size *bytes_written)
 {
    shm_mq     *mq = mqh->mqh_queue;
-   uint64      sent = 0;
+   Size        sent = 0;
    uint64      used;
-   uint64      ringsize = mq->mq_ring_size;
-   uint64      available;
+   Size        ringsize = mq->mq_ring_size;
+   Size        available;
 
    while (sent < nbytes)
    {
@@ -651,7 +704,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
            res = shm_mq_notify_receiver(mq);
            if (res != SHM_MQ_SUCCESS)
            {
-               *bytes_written = res;
+               *bytes_written = sent;
                return res;
            }
 
@@ -679,8 +732,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
        }
        else
        {
-           uint64  offset = mq->mq_bytes_written % ringsize;
-           uint64  sendnow = Min(available, ringsize - offset);
+           Size    offset = mq->mq_bytes_written % (uint64) ringsize;
+           Size    sendnow = Min(available, ringsize - offset);
 
            /* Write as much data as we can via a single memcpy(). */
            memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
@@ -693,8 +746,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
             * end of a run of bytes, because the buffer size is a multiple of
             * MAXIMUM_ALIGNOF, and each read is as well.
             */
-           Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
-           shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
+           Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+           shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
            /*
             * For efficiency, we don't set the reader's latch here.  We'll
@@ -717,23 +770,23 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
  * bytes_needed.
  */
 static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
-                    uint64 *nbytesp, void **datap)
+shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+                    Size *nbytesp, void **datap)
 {
+   Size        ringsize = mq->mq_ring_size;
    uint64      used;
-   uint64      ringsize = mq->mq_ring_size;
    uint64      written;
 
    for (;;)
    {
-       uint64      offset;
+       Size        offset;
        bool        detached;
 
        /* Get bytes written, so we can compute what's available to read. */
        written = shm_mq_get_bytes_written(mq, &detached);
        used = written - mq->mq_bytes_read;
        Assert(used <= ringsize);
-       offset = mq->mq_bytes_read % ringsize;
+       offset = mq->mq_bytes_read % (uint64) ringsize;
 
        /* If we have enough data or buffer has wrapped, we're done. */
        if (used >= bytes_needed || offset + used >= ringsize)
@@ -872,7 +925,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
  * Increment the number of bytes read.
  */
 static void
-shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
 {
    PGPROC     *sender;
 
@@ -907,7 +960,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
  * Increment the number of bytes written.
  */
 static void
-shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
 {
    SpinLockAcquire(&mq->mq_mutex);
    mq->mq_bytes_written += n;
index 1bc1f5611e71ea1ceb6d271b6b8a876b3b59e041..c7dd90532bf4e74c79e1400ac8acfe03d2d7a343 100644 (file)
@@ -57,9 +57,9 @@ extern void shm_mq_detach(shm_mq *);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-           uint64 nbytes, void *data, bool nowait);
+           Size nbytes, void *data, bool nowait);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
-              uint64 *nbytesp, void **datap, bool nowait);
+              Size *nbytesp, void **datap, bool nowait);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);