/* Configuration, some of the variables are used by the old sorter, too. */
extern uns sorter_trace, sorter_presort_bufsize, sorter_stream_bufsize;
+extern u64 sorter_bufsize;
+
+#define SORT_TRACE(x...) do { if (sorter_trace) log(L_DEBUG, x); } while(0)
+#define SORT_XTRACE(x...) do { if (sorter_trace > 1) log(L_DEBUG, x); } while(0)
struct sort_bucket {
cnode n;
struct fastbuf *fb;
byte *name;
u64 size; // Size in bytes
- uns runs; // Number of runs, 0 if unknown
+ uns runs; // Number of runs, 0 if not sorted
uns hash_bits; // Remaining bits of the hash function
byte *ident; // Identifier used in debug messages
};
enum sort_bucket_flags {
- SBF_FINAL = 1, // This bucket corresponds to the final output file
- SBF_SOURCE = 2, // Contains the source file
+ SBF_FINAL = 1, // This bucket corresponds to the final output file (always 1 run)
+ SBF_SOURCE = 2, // Contains the source file (always 0 runs)
+ SBF_CUSTOM_PRESORT = 4, // Contains source to read via custom presorter
};
struct sort_context {
struct mempool *pool;
clist bucket_list;
- byte *big_buf, *big_buf_half;
- uns big_buf_size, big_buf_half_size;
+ void *big_buf, *big_buf_half;
+ size_t big_buf_size, big_buf_half_size;
- struct fastbuf *(*custom_presort)(void);
+ int (*custom_presort)(struct fastbuf *dest, byte *buf, size_t bufsize);
// Take as much as possible from the source bucket, sort it in memory and dump to destination bucket.
// Return 1 if there is more data available in the source bucket.
- int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out);
+ int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only);
// Two-way split/merge: merge up to 2 source buckets to up to 2 destination buckets.
// Bucket arrays are NULL-terminated.
void (*twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs);
+
+ // State variables of internal_sort
+ void *key_buf;
+ int more_keys;
};
void sorter_run(struct sort_context *ctx);
-struct sort_bucket *sorter_new_bucket(struct sort_context *ctx);
-struct fastbuf *sorter_open_read(struct sort_bucket *b);
-struct fastbuf *sorter_open_write(struct sort_bucket *b);
-void sorter_close_read(struct sort_bucket *b);
-void sorter_close_write(struct sort_bucket *b);
+void *sorter_alloc(struct sort_context *ctx, uns size);
+void sorter_alloc_buf(struct sort_context *ctx);
+void sorter_free_buf(struct sort_context *ctx);
+
+// Operations on buckets
+struct sort_bucket *sbuck_new(struct sort_context *ctx);
+void sbuck_drop(struct sort_bucket *b);
+int sbuck_can_read(struct sort_bucket *b);
+struct fastbuf *sbuck_open_read(struct sort_bucket *b);
+struct fastbuf *sbuck_open_write(struct sort_bucket *b);
+void sbuck_close_read(struct sort_bucket *b);
+void sbuck_close_write(struct sort_bucket *b);
#endif
uns sorter_trace;
uns sorter_presort_bufsize = 65536;
uns sorter_stream_bufsize = 65536;
+u64 sorter_bufsize = 65536;
static struct cf_section sorter_config = {
CF_ITEMS {
CF_UNS("Trace", &sorter_trace),
CF_UNS("PresortBuffer", &sorter_presort_bufsize),
CF_UNS("StreamBuffer", &sorter_stream_bufsize),
+ CF_U64("SortBuffer", &sorter_bufsize),
CF_END
}
};
#include "lib/mempool.h"
#include "lib/sorter/common.h"
+void *
+sorter_alloc(struct sort_context *ctx, uns size)
+{
+ return mp_alloc_zero(ctx->pool, size);
+}
+
struct sort_bucket *
-sorter_new_bucket(struct sort_context *ctx)
+sbuck_new(struct sort_context *ctx)
{
- return mp_alloc_zero(ctx->pool, sizeof(struct sort_bucket));
+ return sorter_alloc(ctx, sizeof(struct sort_bucket));
+}
+
+void
+sbuck_drop(struct sort_bucket *b)
+{
+ if (b)
+ {
+ if (b->n.prev)
+ clist_remove(&b->n);
+ bclose(b->fb);
+ bzero(b, sizeof(*b));
+ }
+}
+
+int
+sbuck_can_read(struct sort_bucket *b)
+{
+ return b && b->size;
}
struct fastbuf *
-sorter_open_read(struct sort_bucket *b)
+sbuck_open_read(struct sort_bucket *b)
{
/* FIXME: These functions should handle buckets with no fb and only name. */
ASSERT(b->fb);
}
struct fastbuf *
-sorter_open_write(struct sort_bucket *b)
+sbuck_open_write(struct sort_bucket *b)
{
if (!b->fb)
b->fb = bopen_tmp(sorter_stream_bufsize);
}
void
-sorter_close_read(struct sort_bucket *b)
+sbuck_close_read(struct sort_bucket *b)
{
if (!b)
return;
}
void
-sorter_close_write(struct sort_bucket *b)
+sbuck_close_write(struct sort_bucket *b)
{
if (b->fb)
{
b->size = btell(b->fb);
brewind(b->fb);
}
- /* FIXME: Remove empty buckets from the list automatically? */
}
void
-sorter_run(struct sort_context *ctx)
+sorter_alloc_buf(struct sort_context *ctx)
{
- ctx->pool = mp_new(4096);
- ASSERT(!ctx->custom_presort);
- ASSERT(!ctx->out_fb);
- clist_init(&ctx->bucket_list);
+ if (ctx->big_buf)
+ return;
+ u64 bs = MAX(sorter_bufsize/2, 1);
+ bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE);
+ ctx->big_buf = big_alloc(2*bs);
+ ctx->big_buf_size = 2*bs;
+ ctx->big_buf_half = ((byte*) ctx->big_buf) + bs;
+ ctx->big_buf_half_size = bs;
+ SORT_XTRACE("Allocated sorting buffer (%jd bytes)", (uintmax_t) bs);
+}
- /* FIXME: There should be a way how to detect size of the input file */
+void
+sorter_free_buf(struct sort_context *ctx)
+{
+ if (!ctx->big_buf)
+ return;
+ big_free(ctx->big_buf, ctx->big_buf_size);
+ ctx->big_buf = NULL;
+ SORT_XTRACE("Freed sorting buffer");
+}
- /* Trivial 2-way merge with no presorting (just a testing hack) */
- struct sort_bucket *bin = sorter_new_bucket(ctx);
- bin->flags = SBF_SOURCE;
- bin->fb = ctx->in_fb;
- bin->ident = "src";
+static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
+{
+ /* FIXME: Mode with no presorting (mostly for debugging) */
+ sorter_alloc_buf(ctx);
+ if (in->flags & SBF_CUSTOM_PRESORT)
+ {
+ struct fastbuf *f = sbuck_open_write(out);
+ return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size); // FIXME: out_only optimization?
+ }
+ return ctx->internal_sort(ctx, in, out, out_only);
+}
+
+static inline struct sort_bucket *
+sbuck_join_to(struct sort_bucket *b)
+{
+ struct sort_bucket *out = (struct sort_bucket *) b->n.prev; // Such bucket is guaranteed to exist
+ return (out->flags & SBF_FINAL) ? out : NULL;
+}
+
+static void
+sorter_join(struct sort_bucket *b)
+{
+ struct sort_bucket *join = sbuck_join_to(b);
+ ASSERT(join);
+
+ // FIXME: What if the final bucket doesn't contain any file yet?
+
+ SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) b->size);
+ struct fastbuf *src = sbuck_open_read(b);
+ struct fastbuf *dest = sbuck_open_write(join);
+ bbcopy(src, dest, ~0U);
+ sbuck_drop(b);
+}
+
+static void
+sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
+{
struct sort_bucket *ins[3], *outs[3];
- ins[0] = bin;
- ins[1] = NULL;
+ struct sort_bucket *join = sbuck_join_to(b);
+
+ SORT_TRACE("Presorting");
+ ins[0] = sbuck_new(ctx);
+ sbuck_open_read(b);
+ if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
+ {
+ if (join)
+ sbuck_drop(ins[0]);
+ else
+ clist_insert_after(&ins[0]->n, &b->n);
+ sbuck_drop(b);
+ return;
+ }
+ ins[1] = sbuck_new(ctx);
+ ins[2] = NULL;
+ int i = 1;
+ while (sorter_presort(ctx, b, ins[i], ins[i]))
+ i = 1-i;
+ sbuck_close_read(b);
+ sbuck_close_write(ins[0]);
+ sbuck_close_write(ins[1]);
+
+ SORT_TRACE("Main sorting");
do {
- outs[0] = sorter_new_bucket(ctx);
- outs[1] = sorter_new_bucket(ctx);
+ if (ins[0]->runs == 1 && ins[1]->runs == 1 && join) // FIXME: Debug switch for disabling joining optimizations
+ {
+ // This is guaranteed to produce a single run, so join if possible
+ outs[0] = join;
+ outs[1] = NULL;
+ ctx->twoway_merge(ctx, ins, outs);
+ ASSERT(outs[0]->runs == 2);
+ outs[0]->runs--;
+ SORT_TRACE("Pass done (joined final run)");
+ sbuck_drop(b);
+ return;
+ }
+ outs[0] = sbuck_new(ctx);
+ outs[1] = sbuck_new(ctx);
outs[2] = NULL;
- log(L_DEBUG, "Pass...");
ctx->twoway_merge(ctx, ins, outs);
- log(L_DEBUG, "Done (%d+%d runs)", outs[0]->runs, outs[1]->runs);
- sorter_close_write(outs[0]);
- sorter_close_write(outs[1]);
+ sbuck_close_write(outs[0]);
+ sbuck_close_write(outs[1]);
+ SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) outs[0]->size, (uintmax_t) outs[1]->size);
+ sbuck_drop(ins[0]);
+ sbuck_drop(ins[1]);
memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
- } while (ins[1]->fb);
+ } while (ins[1]->size);
+
+ sbuck_drop(ins[1]);
+ clist_insert_after(&ins[0]->n, &b->n);
+ sbuck_drop(b);
+}
+
+void
+sorter_run(struct sort_context *ctx)
+{
+ ctx->pool = mp_new(4096);
+ clist_init(&ctx->bucket_list);
+
+ /* FIXME: There should be a way how to detect size of the input file */
+ /* FIXME: Remember to test sorting of empty files */
+
+ // Create bucket containing the source
+ struct sort_bucket *bin = sbuck_new(ctx);
+ bin->flags = SBF_SOURCE;
+ if (ctx->custom_presort)
+ bin->flags |= SBF_CUSTOM_PRESORT;
+ else
+ bin->fb = ctx->in_fb;
+ bin->ident = "in";
+ bin->size = ~(u64)0;
+ bin->hash_bits = ctx->hash_bits;
+ clist_add_tail(&ctx->bucket_list, &bin->n);
+
+ // Create bucket for the output
+ struct sort_bucket *bout = sbuck_new(ctx);
+ bout->flags = SBF_FINAL;
+ bout->fb = ctx->out_fb;
+ bout->ident = "out";
+ bout->runs = 1;
+ clist_add_head(&ctx->bucket_list, &bout->n);
+
+ struct sort_bucket *b;
+ while (b = clist_next(&ctx->bucket_list, &bout->n))
+ {
+ if (!b->size)
+ sbuck_drop(b);
+ else if (b->runs == 1)
+ sorter_join(b);
+ else
+ sorter_twoway(ctx, b);
+ }
- ctx->out_fb = sorter_open_read(ins[0]);
+ sorter_free_buf(ctx);
+ sbuck_close_write(bout);
+ SORT_XTRACE("Final size: %jd", (uintmax_t) bout->size);
+ ctx->out_fb = sbuck_open_read(bout);
}
* of the GNU Lesser General Public License.
*/
-static int P(internal)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out)
+typedef struct {
+ P(key) *key;
+ // FIXME: Add the hash here to save cache misses
+} P(internal_item_t);
+
+#define ASORT_PREFIX(x) SORT_PREFIX(array_##x)
+#define ASORT_KEY_TYPE P(internal_item_t)
+#define ASORT_ELT(i) ary[i]
+#define ASORT_LT(x,y) (P(compare)((x).key, (y).key) < 0)
+#define ASORT_EXTRA_ARGS , P(internal_item_t) *ary
+#include "lib/arraysort.h"
+
+static int P(internal)(struct sort_context *ctx, struct sort_bucket *bin, struct sort_bucket *bout, struct sort_bucket *bout_only)
{
- /* FIXME :) */
- return 0;
+ sorter_alloc_buf(ctx);
+ ASSERT(bin->fb); // Expects the input bucket to be already open for reading
+ struct fastbuf *in = bin->fb;
+
+ P(key) key, *keybuf = ctx->key_buf;
+ if (!keybuf)
+ keybuf = ctx->key_buf = sorter_alloc(ctx, sizeof(key));
+ if (ctx->more_keys)
+ {
+ key = *keybuf;
+ ctx->more_keys = 0;
+ }
+ else if (!P(read_key)(in, &key))
+ return 0;
+
+#ifdef SORT_VAR_DATA
+ if (sizeof(key) + 1024 + SORT_DATA_SIZE(key) > ctx->big_buf_half_size)
+ {
+ SORT_XTRACE("s-internal: Generating a giant run");
+ struct fastbuf *out = sorter_open_write(bout); /* FIXME: Using a non-direct buffer would be nice here */
+ P(copy_data)(&key, in, out);
+ bout->runs++;
+ return 1; // We don't know, but 1 is always safe
+ }
+#endif
+
+ size_t bufsize = ctx->big_buf_half_size; /* FIXME: In some cases, we can use the whole buffer */
+ bufsize = MIN((u64)bufsize, (u64)~0U * sizeof(P(internal_item_t))); // The number of records must fit in uns
+
+ SORT_XTRACE("s-internal: Reading (bufsize=%zd)", bufsize);
+ P(internal_item_t) *item_array = ctx->big_buf, *item = item_array, *last_item;
+ byte *end = (byte *) ctx->big_buf + bufsize;
+ do
+ {
+ uns ksize = SORT_KEY_SIZE(key);
+#ifdef SORT_UNIFY
+ uns ksize_aligned = ALIGN_TO(ksize, CPU_STRUCT_ALIGN);
+#else
+ uns ksize_aligned = ksize;
+#endif
+ uns dsize = SORT_DATA_SIZE(key);
+ uns recsize = ALIGN_TO(ksize_aligned + dsize, CPU_STRUCT_ALIGN);
+ if (unlikely(sizeof(P(internal_item_t)) + recsize > (size_t)(end - (byte *) item)))
+ {
+ ctx->more_keys = 1;
+ *keybuf = key;
+ break;
+ }
+ end -= recsize;
+ memcpy(end, &key, ksize);
+#ifdef SORT_VAR_DATA
+ breadb(in, end + ksize_aligned, dsize);
+#endif
+ item->key = (P(key)*) end;
+ item++;
+ }
+ while (P(read_key)(in, &key));
+ last_item = item;
+
+ uns count = last_item - item_array;
+ SORT_XTRACE("s-internal: Sorting %d items", count);
+ P(array_sort)(count, item_array);
+
+ SORT_XTRACE("s-internal: Writing");
+ if (!ctx->more_keys)
+ bout = bout_only;
+ struct fastbuf *out = sbuck_open_write(bout);
+ bout->runs++;
+ /* FIXME: No unification done yet */
+ for (item = item_array; item < last_item; item++)
+ {
+ P(write_key)(out, item->key);
+#ifdef SORT_VAR_DATA
+ uns ksize = SORT_KEY_SIZE(*item->key);
+#ifdef SORT_UNIFY
+ ksize = ALIGN_TO(ksize, CPU_STRUCT_ALIGN);
+#endif
+ bwrite(out, (byte *) item->key + ksize, SORT_DATA_SIZE(*item->key));
+#endif
+ }
+
+ return ctx->more_keys;
}
/* FIXME: There is a plenty of room for further optimization */
/* FIXME: Swap outputs if there already are some runs? */
-static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs)
+static void P(twoway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket **ins, struct sort_bucket **outs)
{
struct fastbuf *fin1, *fin2, *fout1, *fout2, *ftmp;
P(key) kbuf1, kbuf2, kbuf3, kbuf4;
int comp;
uns run_count = 0;
- fin1 = sorter_open_read(ins[0]);
+ fin1 = sbuck_open_read(ins[0]);
next1 = P(read_key)(fin1, kin1);
- if (ins[1])
+ if (sbuck_can_read(ins[1]))
{
- fin2 = sorter_open_read(ins[1]);
+ fin2 = sbuck_open_read(ins[1]);
next2 = P(read_key)(fin2, kin2);
}
else
if (unlikely(!fout1))
{
if (!fout2)
- fout1 = sorter_open_write(outs[0]);
+ fout1 = sbuck_open_write(outs[0]);
else if (outs[1])
- fout1 = sorter_open_write(outs[1]);
+ fout1 = sbuck_open_write(outs[1]);
else
fout1 = fout2;
}
}
}
- sorter_close_read(ins[0]);
- sorter_close_read(ins[1]);
+ sbuck_close_read(ins[0]);
+ sbuck_close_read(ins[1]);
if (fout2 && fout2 != fout1)
outs[1]->runs += run_count / 2;
if (fout1)
*
* Unification:
*
- * SORT_MERGE merge items with identical keys, needs the following functions:
+ * SORT_UNIFY merge items with identical keys, needs the following functions:
* void PREFIX_write_merged(struct fastbuf *f, SORT_KEY **keys, uns n, byte *buf)
* takes n records in memory with keys which compare equal and writes
* a single record to the given fastbuf. Data for each key can
*
* SORT_INPUT_FILE file of a given name
* SORT_INPUT_FB fastbuf stream
- * SORT_INPUT_PRESORT custom presorter: call function PREFIX_presorter (see below)
- * to get successive batches of pre-sorted data as temporary
- * fastbuf streams or NULL if no more data is available.
+ * SORT_INPUT_PRESORT custom presorter. Calls function
+ * int PREFIX_presort(struct fastbuf *dest, byte *buf, size_t bufsize);
+ * to get successive batches of pre-sorted data.
* The function is passed a page-aligned presorting buffer.
+ * It returns 1 on success or 0 on EOF.
*
* Output (chose one of these):
*
#endif
#endif
-#ifdef SORT_MERGE
+#ifdef SORT_UNIFY
#define LESS <
#else
#define LESS <=
#define SORT_ASSERT_UNIQUE
#endif
+#ifdef SORT_KEY_SIZE
+#define SORT_VAR_KEY
+#else
+#define SORT_KEY_SIZE(key) sizeof(key)
+#endif
+
+#ifdef SORT_DATA_SIZE
+#define SORT_VAR_DATA
+#else
+#define SORT_DATA_SIZE(key) 0
+#endif
+
static inline void P(copy_data)(P(key) *key, struct fastbuf *in, struct fastbuf *out)
{
bwrite(out, key, sizeof(P(key)));
-#ifdef SORT_DATA_SIZE
+#ifdef SORT_VAR_DATA
bbcopy(in, out, SORT_DATA_SIZE(*key));
#else
(void) in;
ctx.in_fb = in;
#elif defined(SORT_INPUT_PRESORT)
ASSERT(!in);
- ctx.custom_presort = P(presorter);
+ ctx.custom_presort = P(presort);
#else
#error No input given.
#endif
#undef SORT_KEY_REGULAR
#undef SORT_KEY_SIZE
#undef SORT_DATA_SIZE
+#undef SORT_VAR_KEY
+#undef SORT_VAR_DATA
#undef SORT_INT
#undef SORT_HASH_BITS
-#undef SORT_MERGE
+#undef SORT_UNIFY
#undef SORT_INPUT_FILE
#undef SORT_INPUT_FB
#undef SORT_INPUT_PRESORT