* each source run; we repeatedly output the smallest tuple and insert the
* next tuple from its source tape (if any). When the heap empties, the merge
* is complete. The basic merge algorithm thus needs very little memory ---
- * only M tuples for an M-way merge, and M is at most six in the present code.
+ * only M tuples for an M-way merge, and M is constrained to a small number.
* However, we can still make good use of our full workMem allocation by
* pre-reading additional tuples from each source tape. Without prereading,
* our access pattern to the temporary file would be very erratic; on average
* on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
* saves one cycle of writing all the data out to disk and reading it in.
*
+ * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
+ * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
+ * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
+ * tape drives are expensive beasts, and in particular that there will always
+ * be many more runs than tape drives. In our implementation a "tape drive"
+ * doesn't cost much more than a few Kb of memory buffers, so we can afford
+ * to have lots of them. In particular, if we can have as many tape drives
+ * as sorted runs, we can eliminate any repeated I/O at all. In the current
+ * code we determine the number of tapes M on the basis of workMem: we want
+ * workMem/M to be large enough that we read a fair amount of data each time
+ * we preread from a tape, so as to maintain the locality of access described
+ * above. Nonetheless, with large workMem we can have many tapes.
+ *
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.57 2006/01/05 01:56:29 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.58 2006/02/19 05:54:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
} TupSortStatus;
/*
- * We use a seven-tape polyphase merge, which is the "sweet spot" on the
- * tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
+ * Parameters for calculation of number of tapes to use --- see inittapes().
+ *
+ * In this calculation we assume that each tape will cost us about 3 blocks
+ * worth of buffer space (which is an underestimate for very large data
+ * volumes, but it's probably close enough --- see logtape.c).
+ *
+ * MERGE_BUFFER_SIZE is how much data we'd like to read from each
+ * tape during a preread cycle (see discussion at top of file).
*/
-#define MAXTAPES 7 /* Knuth's T */
-#define TAPERANGE (MAXTAPES-1) /* Knuth's P */
+#define MINTAPES 7 /* minimum number of tapes */
+#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
+#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
/*
* Private state of a Tuplesort operation.
bool randomAccess; /* did caller request random access? */
long availMem; /* remaining memory available, in bytes */
long allowedMem; /* total memory allowed, in bytes */
+ int maxTapes; /* number of tapes (Knuth's T) */
+ int tapeRange; /* maxTapes-1 (Knuth's P) */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
/*
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
* and FINALMERGE, the tuples are organized in "heap" order per Algorithm
* H. (Note that memtupcount only counts the tuples that are part of the
- * heap --- during merge passes, memtuples[] entries beyond TAPERANGE are
+ * heap --- during merge passes, memtuples[] entries beyond tapeRange are
* never in the heap and are used to hold pre-read tuples.) In state
* SORTEDONTAPE, the array is not used.
*/
*/
int currentRun;
+ /*
+ * Unless otherwise noted, all pointer variables below are pointers
+ * to arrays of length maxTapes, holding per-tape data.
+ */
+
/*
* These variables are only used during merge passes. mergeactive[i] is
* true if we are reading an input run from (actual) tape number i and
* in these lists, because memtuples[0] is part of the merge heap and is
* never a pre-read tuple.
*/
- bool mergeactive[MAXTAPES]; /* Active input run source? */
- int mergenext[MAXTAPES]; /* first preread tuple for each source */
- int mergelast[MAXTAPES]; /* last preread tuple for each source */
- long mergeavailmem[MAXTAPES]; /* availMem for prereading
- * tapes */
+ bool *mergeactive; /* Active input run source? */
+ int *mergenext; /* first preread tuple for each source */
+ int *mergelast; /* last preread tuple for each source */
+ long *mergeavailmem; /* availMem for prereading tapes */
long spacePerTape; /* actual per-tape target usage */
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
*/
int Level; /* Knuth's l */
int destTape; /* current output tape (Knuth's j, less 1) */
- int tp_fib[MAXTAPES]; /* Target Fibonacci run counts (A[]) */
- int tp_runs[MAXTAPES]; /* # of real runs on each tape */
- int tp_dummy[MAXTAPES]; /* # of dummy runs for each tape (D[]) */
- int tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */
+ int *tp_fib; /* Target Fibonacci run counts (A[]) */
+ int *tp_runs; /* # of real runs on each tape */
+ int *tp_dummy; /* # of dummy runs for each tape (D[]) */
+ int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
/*
* These variables are used after completion of sorting to keep track of
*/
TupleDesc tupDesc;
int nKeys;
- ScanKey scanKeys;
- SortFunctionKind *sortFnKinds;
+ ScanKey scanKeys; /* array of length nKeys */
+ SortFunctionKind *sortFnKinds; /* array of length nKeys */
/*
* These variables are specific to the IndexTuple case; they are set by
state->currentRun = 0;
- /* Algorithm D variables will be initialized by inittapes, if needed */
+ /*
+ * maxTapes, tapeRange, and Algorithm D variables will be initialized by
+ * inittapes(), if needed
+ */
state->result_tape = -1; /* flag that result tape has not been formed */
return true;
}
+/*
+ * tuplesort_merge_order - report merge order we'll use for given memory
+ *
+ * This is exported for use by the planner. allowedMem is in bytes.
+ *
+ * This must match the calculation in inittapes. The only reason we
+ * don't fold the code together is that inittapes wants to know if the
+ * MINTAPES limitation applies or not.
+ */
+int
+tuplesort_merge_order(long allowedMem)
+{
+ int maxTapes;
+
+ /* see inittapes for comments */
+ maxTapes = (int) ((allowedMem - TAPE_BUFFER_OVERHEAD) /
+ (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD)) + 1;
+
+ maxTapes = Max(maxTapes, MINTAPES);
+
+ /* The merge order is one less than the number of tapes */
+ return maxTapes - 1;
+}
/*
* inittapes - initialize for tape sorting.
static void
inittapes(Tuplesortstate *state)
{
- int ntuples,
+ int maxTapes,
+ ntuples,
j;
+ /*
+ * Determine the number of tapes to use based on allowed memory.
+ *
+ * We need T+1 tapes to do a T-way merge, and we want MERGE_BUFFER_SIZE
+ * tuple workspace for each input tape of the merge. The output tape
+ * doesn't account for tuple workspace but it does need tape buffer space.
+ *
+ * Keep this code in sync with tuplesort_merge_order!
+ */
+ maxTapes = (int) ((state->allowedMem - TAPE_BUFFER_OVERHEAD) /
+ (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD)) + 1;
+
+ /*
+ * We will use at least MINTAPES regardless, but otherwise we decrease
+ * availMem to reflect the space that goes into buffers.
+ */
+ if (maxTapes >= MINTAPES)
+ {
+ /* maxTapes is OK, adjust availMem */
+ USEMEM(state, maxTapes * TAPE_BUFFER_OVERHEAD);
+ }
+ else
+ {
+ /*
+ * Force minimum tape count. In this path we ignore the tape buffers
+ * in our space calculation, to avoid driving availMem permanently
+ * negative if allowedMem is really tiny. (This matches the pre-8.2
+ * behavior which was to ignore the tape buffers always, on the
+ * grounds that they were fixed-size overhead.)
+ */
+ maxTapes = MINTAPES;
+ }
+ state->maxTapes = maxTapes;
+ state->tapeRange = maxTapes - 1;
+
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "switching to external sort: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "switching to external sort with %d tapes: %s",
+ maxTapes, pg_rusage_show(&state->ru_start));
#endif
- state->tapeset = LogicalTapeSetCreate(MAXTAPES);
+ /*
+ * Create the tape set and allocate the per-tape data arrays.
+ */
+ state->tapeset = LogicalTapeSetCreate(maxTapes);
+
+ state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
+ state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
+ state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
+ state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
+ state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
/*
* Allocate the memtupindex array, same size as memtuples.
/*
* Initialize variables of Algorithm D (step D1).
*/
- for (j = 0; j < MAXTAPES; j++)
+ for (j = 0; j < maxTapes; j++)
{
state->tp_fib[j] = 1;
state->tp_runs[j] = 0;
state->tp_dummy[j] = 1;
state->tp_tapenum[j] = j;
}
- state->tp_fib[TAPERANGE] = 0;
- state->tp_dummy[TAPERANGE] = 0;
+ state->tp_fib[state->tapeRange] = 0;
+ state->tp_dummy[state->tapeRange] = 0;
state->Level = 1;
state->destTape = 0;
/* Step D4: increase level */
state->Level++;
a = state->tp_fib[0];
- for (j = 0; j < TAPERANGE; j++)
+ for (j = 0; j < state->tapeRange; j++)
{
state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
state->tp_fib[j] = a + state->tp_fib[j + 1];
}
/* End of step D2: rewind all output tapes to prepare for merging */
- for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
+ for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
for (;;)
{
/* Step D5: merge runs onto tape[T] until tape[P] is empty */
- while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
+ while (state->tp_runs[state->tapeRange - 1] ||
+ state->tp_dummy[state->tapeRange - 1])
{
bool allDummy = true;
bool allOneRun = true;
- for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
+ for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] == 0)
allDummy = false;
}
if (allDummy)
{
- state->tp_dummy[TAPERANGE]++;
- for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
+ state->tp_dummy[state->tapeRange]++;
+ for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
state->tp_dummy[tapenum]--;
}
else
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
- LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
+ LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
false);
/* rewind used-up input tape P, and prepare it for write pass */
- LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
+ LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
true);
- state->tp_runs[TAPERANGE - 1] = 0;
+ state->tp_runs[state->tapeRange - 1] = 0;
/*
* reassign tape units per step D6; note we no longer care about A[]
*/
- svTape = state->tp_tapenum[TAPERANGE];
- svDummy = state->tp_dummy[TAPERANGE];
- svRuns = state->tp_runs[TAPERANGE];
- for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
+ svTape = state->tp_tapenum[state->tapeRange];
+ svDummy = state->tp_dummy[state->tapeRange];
+ svRuns = state->tp_runs[state->tapeRange];
+ for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
{
state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
* output tape while rewinding it. The last iteration of step D6 would be
* a waste of cycles anyway...
*/
- state->result_tape = state->tp_tapenum[TAPERANGE];
+ state->result_tape = state->tp_tapenum[state->tapeRange];
LogicalTapeFreeze(state->tapeset, state->result_tape);
state->status = TSS_SORTEDONTAPE;
}
static void
mergeonerun(Tuplesortstate *state)
{
- int destTape = state->tp_tapenum[TAPERANGE];
+ int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
int tupIndex;
void *tup;
* output tape, and increment its count of real runs.
*/
markrunend(state, destTape);
- state->tp_runs[TAPERANGE]++;
+ state->tp_runs[state->tapeRange]++;
#ifdef TRACE_SORT
if (trace_sort)
Assert(state->memtupcount == 0);
/* Clear merge-pass state variables */
- memset(state->mergeactive, 0, sizeof(state->mergeactive));
- memset(state->mergenext, 0, sizeof(state->mergenext));
- memset(state->mergelast, 0, sizeof(state->mergelast));
- memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
+ memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive));
+ memset(state->mergenext, 0, state->maxTapes * sizeof(*state->mergenext));
+ memset(state->mergelast, 0, state->maxTapes * sizeof(*state->mergelast));
+ memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = MAXTAPES; /* first slot available for preread */
+ state->mergefirstfree = state->maxTapes; /* 1st slot avail for preread */
/* Adjust run counts and mark the active tapes */
activeTapes = 0;
- for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
+ for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] > 0)
state->tp_dummy[tapenum]--;
*/
Assert(activeTapes > 0);
state->spacePerTape = state->availMem / activeTapes;
- for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (state->mergeactive[srcTape])
state->mergeavailmem[srcTape] = state->spacePerTape;
mergepreread(state);
/* Load the merge heap with the first tuple from each input tape */
- for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
int tupIndex = state->mergenext[srcTape];
void *tup;
long priorAvail,
spaceUsed;
- for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (!state->mergeactive[srcTape])
continue;
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished writing%s run %d: %s",
+ elog(LOG, "finished writing%s run %d to tape %d: %s",
(state->memtupcount == 0) ? " final" : "",
- state->currentRun,
+ state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
#endif