Code review/prettification for generic_xlog.c.
authorTom Lane
Sat, 9 Apr 2016 19:02:19 +0000 (15:02 -0400)
committerTom Lane
Sat, 9 Apr 2016 19:02:19 +0000 (15:02 -0400)
Improve commentary, use more specific names for the delta fields,
const-ify pointer arguments where possible, avoid assuming that
initializing only the first element of a local array will guarantee
that the remaining elements end up as we need them.  (I think that
code in generic_redo actually worked, but only because InvalidBuffer
is zero; this is a particularly ugly way of depending on that ...)

src/backend/access/transam/generic_xlog.c

index bd0a1b908dfe61b97d80d63149350cb044f5326d..a32f1711cf3c1f693442536535016de8a9c9b6a6 100644 (file)
  * - length of page region (OffsetNumber)
  * - data - the data to place into the region ('length' number of bytes)
  *
- * Unchanged regions of a page are not represented in its delta.  As a
- * result, a delta can be more compact than the full page image.  But having
- * an unchanged region in the middle of two fragments that is smaller than
- * the fragment header (offset and length) does not pay off in terms of the
- * overall size of the delta. For this reason, we break fragments only if
- * the unchanged region is bigger than MATCH_THRESHOLD.
+ * Unchanged regions of a page are not represented in its delta.  As a result,
+ * a delta can be more compact than the full page image.  But having an
+ * unchanged region between two fragments that is smaller than the fragment
+ * header (offset+length) does not pay off in terms of the overall size of
+ * the delta.  For this reason, we merge adjacent fragments if the unchanged
+ * region between them is <= MATCH_THRESHOLD bytes.
  *
  * The worst case for delta sizes occurs when we did not find any unchanged
  * region in the page.  The size of the delta will be the size of the page plus
  */
 #define FRAGMENT_HEADER_SIZE   (2 * sizeof(OffsetNumber))
 #define MATCH_THRESHOLD            FRAGMENT_HEADER_SIZE
-#define MAX_DELTA_SIZE         BLCKSZ + FRAGMENT_HEADER_SIZE
+#define MAX_DELTA_SIZE         (BLCKSZ + FRAGMENT_HEADER_SIZE)
 
 /* Struct of generic xlog data for single page */
 typedef struct
 {
    Buffer      buffer;         /* registered buffer */
-   char        image[BLCKSZ];  /* copy of page image for modification */
-   char        data[MAX_DELTA_SIZE];   /* delta between page images */
-   int         dataLen;        /* space consumed in data field */
    bool        fullImage;      /* are we taking a full image of this page? */
+   int         deltaLen;       /* space consumed in delta field */
+   char        image[BLCKSZ];  /* copy of page image for modification */
+   char        delta[MAX_DELTA_SIZE];  /* delta between page images */
 } PageData;
 
 /* State of generic xlog record construction */
@@ -61,22 +61,26 @@ struct GenericXLogState
 };
 
 static void writeFragment(PageData *pageData, OffsetNumber offset,
-             OffsetNumber len, Pointer data);
-static void writeDelta(PageData *pageData);
-static void applyPageRedo(Page page, Pointer data, Size dataSize);
+             OffsetNumber len, const char *data);
+static void computeDelta(PageData *pageData);
+static void applyPageRedo(Page page, const char *delta, Size deltaSize);
+
 
 /*
- * Write next fragment into delta.
+ * Write next fragment into pageData's delta.
+ *
+ * The fragment has the given offset and length, and data points to the
+ * actual data (of length length).
  */
 static void
 writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
-             Pointer data)
+             const char *data)
 {
-   Pointer     ptr = pageData->data + pageData->dataLen;
+   char       *ptr = pageData->delta + pageData->deltaLen;
 
-   /* Check if we have enough space */
-   Assert(pageData->dataLen + sizeof(offset) +
-          sizeof(length) + length <= sizeof(pageData->data));
+   /* Verify we have enough space */
+   Assert(pageData->deltaLen + sizeof(offset) +
+          sizeof(length) + length <= sizeof(pageData->delta));
 
    /* Write fragment data */
    memcpy(ptr, &offset, sizeof(offset));
@@ -86,14 +90,14 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
    memcpy(ptr, data, length);
    ptr += length;
 
-   pageData->dataLen = ptr - pageData->data;
+   pageData->deltaLen = ptr - pageData->delta;
 }
 
 /*
- * Make delta for given page.
+ * Compute the delta record for given page.
  */
 static void
-writeDelta(PageData *pageData)
+computeDelta(PageData *pageData)
 {
    Page        page = BufferGetPage(pageData->buffer, NULL, NULL,
                                     BGP_NO_SNAPSHOT_TEST),
@@ -106,6 +110,8 @@ writeDelta(PageData *pageData)
                imageLower = ((PageHeader) image)->pd_lower,
                imageUpper = ((PageHeader) image)->pd_upper;
 
+   pageData->deltaLen = 0;
+
    for (i = 0; i < BLCKSZ; i++)
    {
        bool        match;
@@ -181,22 +187,22 @@ writeDelta(PageData *pageData)
        char        tmp[BLCKSZ];
 
        memcpy(tmp, image, BLCKSZ);
-       applyPageRedo(tmp, pageData->data, pageData->dataLen);
-       if (memcmp(tmp, page, pageLower)
-           || memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper))
+       applyPageRedo(tmp, pageData->delta, pageData->deltaLen);
+       if (memcmp(tmp, page, pageLower) != 0 ||
+         memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper) != 0)
            elog(ERROR, "result of generic xlog apply does not match");
    }
 #endif
 }
 
 /*
- * Start new generic xlog record.
+ * Start new generic xlog record for modifications to specified relation.
  */
 GenericXLogState *
 GenericXLogStart(Relation relation)
 {
-   int         i;
    GenericXLogState *state;
+   int         i;
 
    state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
 
@@ -209,24 +215,30 @@ GenericXLogStart(Relation relation)
 
 /*
  * Register new buffer for generic xlog record.
+ *
+ * Returns pointer to the page's image in the GenericXLogState, which
+ * is what the caller should modify.
+ *
+ * If the buffer is already registered, just return its existing entry.
  */
 Page
 GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
 {
    int         block_id;
 
-   /* Place new buffer to unused slot in array */
+   /* Search array for existing entry or first unused slot */
    for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
    {
        PageData   *page = &state->pages[block_id];
 
        if (BufferIsInvalid(page->buffer))
        {
+           /* Empty slot, so use it (there cannot be a match later) */
            page->buffer = buffer;
-           memcpy(page->image, BufferGetPage(buffer, NULL, NULL,
-                                             BGP_NO_SNAPSHOT_TEST), BLCKSZ);
-           page->dataLen = 0;
            page->fullImage = isNew;
+           memcpy(page->image,
+                  BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
+                  BLCKSZ);
            return (Page) page->image;
        }
        else if (page->buffer == buffer)
@@ -239,15 +251,16 @@ GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
        }
    }
 
-   elog(ERROR, "maximum number of %d generic xlog buffers is exceeded",
+   elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
         MAX_GENERIC_XLOG_PAGES);
-
    /* keep compiler quiet */
    return NULL;
 }
 
 /*
  * Unregister particular buffer for generic xlog record.
+ *
+ * XXX this is dangerous and should go away.
  */
 void
 GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
@@ -274,7 +287,8 @@ GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
 }
 
 /*
- * Put all changes in registered buffers to generic xlog record.
+ * Apply changes represented by GenericXLogState to the actual buffers,
+ * and emit a generic xlog record.
  */
 XLogRecPtr
 GenericXLogFinish(GenericXLogState *state)
@@ -291,33 +305,35 @@ GenericXLogFinish(GenericXLogState *state)
 
        for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
        {
+           PageData   *pageData = &state->pages[i];
+           Page        page;
            char        tmp[BLCKSZ];
-           PageData   *page = &state->pages[i];
 
-           if (BufferIsInvalid(page->buffer))
+           if (BufferIsInvalid(pageData->buffer))
                continue;
 
+           page = BufferGetPage(pageData->buffer, NULL, NULL,
+                                BGP_NO_SNAPSHOT_TEST);
+
            /* Swap current and saved page image. */
-           memcpy(tmp, page->image, BLCKSZ);
-           memcpy(page->image, BufferGetPage(page->buffer, NULL, NULL,
-                                             BGP_NO_SNAPSHOT_TEST), BLCKSZ);
-           memcpy(BufferGetPage(page->buffer, NULL, NULL,
-                                BGP_NO_SNAPSHOT_TEST), tmp, BLCKSZ);
+           memcpy(tmp, pageData->image, BLCKSZ);
+           memcpy(pageData->image, page, BLCKSZ);
+           memcpy(page, tmp, BLCKSZ);
 
-           if (page->fullImage)
+           if (pageData->fullImage)
            {
                /* A full page image does not require anything special */
-               XLogRegisterBuffer(i, page->buffer, REGBUF_FORCE_IMAGE);
+               XLogRegisterBuffer(i, pageData->buffer, REGBUF_FORCE_IMAGE);
            }
            else
            {
                /*
-                * In normal mode, calculate delta and write it as data
+                * In normal mode, calculate delta and write it as xlog data
                 * associated with this page.
                 */
-               XLogRegisterBuffer(i, page->buffer, REGBUF_STANDARD);
-               writeDelta(page);
-               XLogRegisterBufData(i, page->data, page->dataLen);
+               XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
+               computeDelta(pageData);
+               XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
            }
        }
 
@@ -327,13 +343,13 @@ GenericXLogFinish(GenericXLogState *state)
        /* Set LSN and mark buffers dirty */
        for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
        {
-           PageData   *page = &state->pages[i];
+           PageData   *pageData = &state->pages[i];
 
-           if (BufferIsInvalid(page->buffer))
+           if (BufferIsInvalid(pageData->buffer))
                continue;
-           PageSetLSN(BufferGetPage(page->buffer, NULL, NULL,
+           PageSetLSN(BufferGetPage(pageData->buffer, NULL, NULL,
                                     BGP_NO_SNAPSHOT_TEST), lsn);
-           MarkBufferDirty(page->buffer);
+           MarkBufferDirty(pageData->buffer);
        }
        END_CRIT_SECTION();
    }
@@ -343,13 +359,15 @@ GenericXLogFinish(GenericXLogState *state)
        START_CRIT_SECTION();
        for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
        {
-           PageData   *page = &state->pages[i];
+           PageData   *pageData = &state->pages[i];
 
-           if (BufferIsInvalid(page->buffer))
+           if (BufferIsInvalid(pageData->buffer))
                continue;
-           memcpy(BufferGetPage(page->buffer, NULL, NULL,
-                                BGP_NO_SNAPSHOT_TEST), page->image, BLCKSZ);
-           MarkBufferDirty(page->buffer);
+           memcpy(BufferGetPage(pageData->buffer, NULL, NULL,
+                                BGP_NO_SNAPSHOT_TEST),
+                  pageData->image,
+                  BLCKSZ);
+           MarkBufferDirty(pageData->buffer);
        }
        END_CRIT_SECTION();
    }
@@ -360,7 +378,9 @@ GenericXLogFinish(GenericXLogState *state)
 }
 
 /*
- * Abort generic xlog record.
+ * Abort generic xlog record construction.  No changes are applied to buffers.
+ *
+ * Note: caller is responsible for releasing locks/pins on buffers, if needed.
  */
 void
 GenericXLogAbort(GenericXLogState *state)
@@ -372,10 +392,10 @@ GenericXLogAbort(GenericXLogState *state)
  * Apply delta to given page image.
  */
 static void
-applyPageRedo(Page page, Pointer data, Size dataSize)
+applyPageRedo(Page page, const char *delta, Size deltaSize)
 {
-   Pointer     ptr = data,
-               end = data + dataSize;
+   const char *ptr = delta;
+   const char *end = delta + deltaSize;
 
    while (ptr < end)
    {
@@ -399,10 +419,11 @@ applyPageRedo(Page page, Pointer data, Size dataSize)
 void
 generic_redo(XLogReaderState *record)
 {
-   uint8       block_id;
-   Buffer      buffers[MAX_GENERIC_XLOG_PAGES] = {InvalidBuffer};
    XLogRecPtr  lsn = record->EndRecPtr;
+   Buffer      buffers[MAX_GENERIC_XLOG_PAGES];
+   uint8       block_id;
 
+   /* Protect limited size of buffers[] array */
    Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
 
    /* Iterate over blocks */
@@ -411,20 +432,24 @@ generic_redo(XLogReaderState *record)
        XLogRedoAction action;
 
        if (!XLogRecHasBlockRef(record, block_id))
+       {
+           buffers[block_id] = InvalidBuffer;
            continue;
+       }
 
        action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
 
        /* Apply redo to given block if needed */
        if (action == BLK_NEEDS_REDO)
        {
-           Pointer     blockData;
-           Size        blockDataSize;
            Page        page;
+           char       *blockDelta;
+           Size        blockDeltaSize;
 
-           page = BufferGetPage(buffers[block_id], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
-           blockData = XLogRecGetBlockData(record, block_id, &blockDataSize);
-           applyPageRedo(page, blockData, blockDataSize);
+           page = BufferGetPage(buffers[block_id], NULL, NULL,
+                                BGP_NO_SNAPSHOT_TEST);
+           blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
+           applyPageRedo(page, blockDelta, blockDeltaSize);
 
            PageSetLSN(page, lsn);
            MarkBufferDirty(buffers[block_id]);