From baa9f9a3368c8d318b9711340727f822d8fc8a34 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Fri, 2 Feb 2007 19:09:59 +0100 Subject: [PATCH] More bits of the sorter. In-memory presorting (the general case, but without unification) seems to work. However, I am not happy with the bucket mechanics yet, it's too complicated. --- lib/sorter/common.h | 40 +++++--- lib/sorter/config.c | 2 + lib/sorter/govern.c | 208 ++++++++++++++++++++++++++++++++++------ lib/sorter/s-internal.h | 98 ++++++++++++++++++- lib/sorter/s-twoway.h | 16 ++-- lib/sorter/sorter.h | 31 ++++-- 6 files changed, 336 insertions(+), 59 deletions(-) diff --git a/lib/sorter/common.h b/lib/sorter/common.h index 9747a5e0..15083663 100644 --- a/lib/sorter/common.h +++ b/lib/sorter/common.h @@ -14,6 +14,10 @@ /* Configuration, some of the variables are used by the old sorter, too. */ extern uns sorter_trace, sorter_presort_bufsize, sorter_stream_bufsize; +extern u64 sorter_bufsize; + +#define SORT_TRACE(x...) do { if (sorter_trace) log(L_DEBUG, x); } while(0) +#define SORT_XTRACE(x...) do { if (sorter_trace > 1) log(L_DEBUG, x); } while(0) struct sort_bucket { cnode n; @@ -21,14 +25,15 @@ struct sort_bucket { struct fastbuf *fb; byte *name; u64 size; // Size in bytes - uns runs; // Number of runs, 0 if unknown + uns runs; // Number of runs, 0 if not sorted uns hash_bits; // Remaining bits of the hash function byte *ident; // Identifier used in debug messages }; enum sort_bucket_flags { - SBF_FINAL = 1, // This bucket corresponds to the final output file - SBF_SOURCE = 2, // Contains the source file + SBF_FINAL = 1, // This bucket corresponds to the final output file (always 1 run) + SBF_SOURCE = 2, // Contains the source file (always 0 runs) + SBF_CUSTOM_PRESORT = 4, // Contains source to read via custom presorter }; struct sort_context { @@ -38,24 +43,35 @@ struct sort_context { struct mempool *pool; clist bucket_list; - byte *big_buf, *big_buf_half; - uns big_buf_size, big_buf_half_size; + void *big_buf, *big_buf_half; + size_t big_buf_size, big_buf_half_size; - struct fastbuf *(*custom_presort)(void); + int (*custom_presort)(struct fastbuf *dest, byte *buf, size_t bufsize); // Take as much as possible from the source bucket, sort it in memory and dump to destination bucket. // Return 1 if there is more data available in the source bucket. - int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out); + int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only); // Two-way split/merge: merge up to 2 source buckets to up to 2 destination buckets. // Bucket arrays are NULL-terminated. void (*twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs); + + // State variables of internal_sort + void *key_buf; + int more_keys; }; void sorter_run(struct sort_context *ctx); -struct sort_bucket *sorter_new_bucket(struct sort_context *ctx); -struct fastbuf *sorter_open_read(struct sort_bucket *b); -struct fastbuf *sorter_open_write(struct sort_bucket *b); -void sorter_close_read(struct sort_bucket *b); -void sorter_close_write(struct sort_bucket *b); +void *sorter_alloc(struct sort_context *ctx, uns size); +void sorter_alloc_buf(struct sort_context *ctx); +void sorter_free_buf(struct sort_context *ctx); + +// Operations on buckets +struct sort_bucket *sbuck_new(struct sort_context *ctx); +void sbuck_drop(struct sort_bucket *b); +int sbuck_can_read(struct sort_bucket *b); +struct fastbuf *sbuck_open_read(struct sort_bucket *b); +struct fastbuf *sbuck_open_write(struct sort_bucket *b); +void sbuck_close_read(struct sort_bucket *b); +void sbuck_close_write(struct sort_bucket *b); #endif diff --git a/lib/sorter/config.c b/lib/sorter/config.c index 0026316b..b1504047 100644 --- a/lib/sorter/config.c +++ b/lib/sorter/config.c @@ -15,12 +15,14 @@ uns sorter_trace; uns sorter_presort_bufsize = 65536; uns sorter_stream_bufsize = 65536; +u64 sorter_bufsize = 65536; static struct cf_section sorter_config = { CF_ITEMS { CF_UNS("Trace", &sorter_trace), CF_UNS("PresortBuffer", &sorter_presort_bufsize), CF_UNS("StreamBuffer", &sorter_stream_bufsize), + CF_U64("SortBuffer", &sorter_bufsize), CF_END } }; diff --git a/lib/sorter/govern.c b/lib/sorter/govern.c index aece061c..97942dee 100644 --- a/lib/sorter/govern.c +++ b/lib/sorter/govern.c @@ -12,14 +12,38 @@ #include "lib/mempool.h" #include "lib/sorter/common.h" +void * +sorter_alloc(struct sort_context *ctx, uns size) +{ + return mp_alloc_zero(ctx->pool, size); +} + struct sort_bucket * -sorter_new_bucket(struct sort_context *ctx) +sbuck_new(struct sort_context *ctx) { - return mp_alloc_zero(ctx->pool, sizeof(struct sort_bucket)); + return sorter_alloc(ctx, sizeof(struct sort_bucket)); +} + +void +sbuck_drop(struct sort_bucket *b) +{ + if (b) + { + if (b->n.prev) + clist_remove(&b->n); + bclose(b->fb); + bzero(b, sizeof(*b)); + } +} + +int +sbuck_can_read(struct sort_bucket *b) +{ + return b && b->size; } struct fastbuf * -sorter_open_read(struct sort_bucket *b) +sbuck_open_read(struct sort_bucket *b) { /* FIXME: These functions should handle buckets with no fb and only name. */ ASSERT(b->fb); @@ -27,7 +51,7 @@ sorter_open_read(struct sort_bucket *b) } struct fastbuf * -sorter_open_write(struct sort_bucket *b) +sbuck_open_write(struct sort_bucket *b) { if (!b->fb) b->fb = bopen_tmp(sorter_stream_bufsize); @@ -35,7 +59,7 @@ sorter_open_write(struct sort_bucket *b) } void -sorter_close_read(struct sort_bucket *b) +sbuck_close_read(struct sort_bucket *b) { if (!b) return; @@ -45,46 +69,174 @@ sorter_close_read(struct sort_bucket *b) } void -sorter_close_write(struct sort_bucket *b) +sbuck_close_write(struct sort_bucket *b) { if (b->fb) { b->size = btell(b->fb); brewind(b->fb); } - /* FIXME: Remove empty buckets from the list automatically? */ } void -sorter_run(struct sort_context *ctx) +sorter_alloc_buf(struct sort_context *ctx) { - ctx->pool = mp_new(4096); - ASSERT(!ctx->custom_presort); - ASSERT(!ctx->out_fb); - clist_init(&ctx->bucket_list); + if (ctx->big_buf) + return; + u64 bs = MAX(sorter_bufsize/2, 1); + bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE); + ctx->big_buf = big_alloc(2*bs); + ctx->big_buf_size = 2*bs; + ctx->big_buf_half = ((byte*) ctx->big_buf) + bs; + ctx->big_buf_half_size = bs; + SORT_XTRACE("Allocated sorting buffer (%jd bytes)", (uintmax_t) bs); +} - /* FIXME: There should be a way how to detect size of the input file */ +void +sorter_free_buf(struct sort_context *ctx) +{ + if (!ctx->big_buf) + return; + big_free(ctx->big_buf, ctx->big_buf_size); + ctx->big_buf = NULL; + SORT_XTRACE("Freed sorting buffer"); +} - /* Trivial 2-way merge with no presorting (just a testing hack) */ - struct sort_bucket *bin = sorter_new_bucket(ctx); - bin->flags = SBF_SOURCE; - bin->fb = ctx->in_fb; - bin->ident = "src"; +static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only) +{ + /* FIXME: Mode with no presorting (mostly for debugging) */ + sorter_alloc_buf(ctx); + if (in->flags & SBF_CUSTOM_PRESORT) + { + struct fastbuf *f = sbuck_open_write(out); + return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size); // FIXME: out_only optimization? + } + return ctx->internal_sort(ctx, in, out, out_only); +} + +static inline struct sort_bucket * +sbuck_join_to(struct sort_bucket *b) +{ + struct sort_bucket *out = (struct sort_bucket *) b->n.prev; // Such bucket is guaranteed to exist + return (out->flags & SBF_FINAL) ? out : NULL; +} + +static void +sorter_join(struct sort_bucket *b) +{ + struct sort_bucket *join = sbuck_join_to(b); + ASSERT(join); + + // FIXME: What if the final bucket doesn't contain any file yet? + + SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) b->size); + struct fastbuf *src = sbuck_open_read(b); + struct fastbuf *dest = sbuck_open_write(join); + bbcopy(src, dest, ~0U); + sbuck_drop(b); +} + +static void +sorter_twoway(struct sort_context *ctx, struct sort_bucket *b) +{ struct sort_bucket *ins[3], *outs[3]; - ins[0] = bin; - ins[1] = NULL; + struct sort_bucket *join = sbuck_join_to(b); + + SORT_TRACE("Presorting"); + ins[0] = sbuck_new(ctx); + sbuck_open_read(b); + if (!sorter_presort(ctx, b, ins[0], join ? : ins[0])) + { + if (join) + sbuck_drop(ins[0]); + else + clist_insert_after(&ins[0]->n, &b->n); + sbuck_drop(b); + return; + } + ins[1] = sbuck_new(ctx); + ins[2] = NULL; + int i = 1; + while (sorter_presort(ctx, b, ins[i], ins[i])) + i = 1-i; + sbuck_close_read(b); + sbuck_close_write(ins[0]); + sbuck_close_write(ins[1]); + + SORT_TRACE("Main sorting"); do { - outs[0] = sorter_new_bucket(ctx); - outs[1] = sorter_new_bucket(ctx); + if (ins[0]->runs == 1 && ins[1]->runs == 1 && join) // FIXME: Debug switch for disabling joining optimizations + { + // This is guaranteed to produce a single run, so join if possible + outs[0] = join; + outs[1] = NULL; + ctx->twoway_merge(ctx, ins, outs); + ASSERT(outs[0]->runs == 2); + outs[0]->runs--; + SORT_TRACE("Pass done (joined final run)"); + sbuck_drop(b); + return; + } + outs[0] = sbuck_new(ctx); + outs[1] = sbuck_new(ctx); outs[2] = NULL; - log(L_DEBUG, "Pass..."); ctx->twoway_merge(ctx, ins, outs); - log(L_DEBUG, "Done (%d+%d runs)", outs[0]->runs, outs[1]->runs); - sorter_close_write(outs[0]); - sorter_close_write(outs[1]); + sbuck_close_write(outs[0]); + sbuck_close_write(outs[1]); + SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) outs[0]->size, (uintmax_t) outs[1]->size); + sbuck_drop(ins[0]); + sbuck_drop(ins[1]); memcpy(ins, outs, 3*sizeof(struct sort_bucket *)); - } while (ins[1]->fb); + } while (ins[1]->size); + + sbuck_drop(ins[1]); + clist_insert_after(&ins[0]->n, &b->n); + sbuck_drop(b); +} + +void +sorter_run(struct sort_context *ctx) +{ + ctx->pool = mp_new(4096); + clist_init(&ctx->bucket_list); + + /* FIXME: There should be a way how to detect size of the input file */ + /* FIXME: Remember to test sorting of empty files */ + + // Create bucket containing the source + struct sort_bucket *bin = sbuck_new(ctx); + bin->flags = SBF_SOURCE; + if (ctx->custom_presort) + bin->flags |= SBF_CUSTOM_PRESORT; + else + bin->fb = ctx->in_fb; + bin->ident = "in"; + bin->size = ~(u64)0; + bin->hash_bits = ctx->hash_bits; + clist_add_tail(&ctx->bucket_list, &bin->n); + + // Create bucket for the output + struct sort_bucket *bout = sbuck_new(ctx); + bout->flags = SBF_FINAL; + bout->fb = ctx->out_fb; + bout->ident = "out"; + bout->runs = 1; + clist_add_head(&ctx->bucket_list, &bout->n); + + struct sort_bucket *b; + while (b = clist_next(&ctx->bucket_list, &bout->n)) + { + if (!b->size) + sbuck_drop(b); + else if (b->runs == 1) + sorter_join(b); + else + sorter_twoway(ctx, b); + } - ctx->out_fb = sorter_open_read(ins[0]); + sorter_free_buf(ctx); + sbuck_close_write(bout); + SORT_XTRACE("Final size: %jd", (uintmax_t) bout->size); + ctx->out_fb = sbuck_open_read(bout); } diff --git a/lib/sorter/s-internal.h b/lib/sorter/s-internal.h index 911b74e2..7f779401 100644 --- a/lib/sorter/s-internal.h +++ b/lib/sorter/s-internal.h @@ -7,8 +7,100 @@ * of the GNU Lesser General Public License. */ -static int P(internal)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out) +typedef struct { + P(key) *key; + // FIXME: Add the hash here to save cache misses +} P(internal_item_t); + +#define ASORT_PREFIX(x) SORT_PREFIX(array_##x) +#define ASORT_KEY_TYPE P(internal_item_t) +#define ASORT_ELT(i) ary[i] +#define ASORT_LT(x,y) (P(compare)((x).key, (y).key) < 0) +#define ASORT_EXTRA_ARGS , P(internal_item_t) *ary +#include "lib/arraysort.h" + +static int P(internal)(struct sort_context *ctx, struct sort_bucket *bin, struct sort_bucket *bout, struct sort_bucket *bout_only) { - /* FIXME :) */ - return 0; + sorter_alloc_buf(ctx); + ASSERT(bin->fb); // Expects the input bucket to be already open for reading + struct fastbuf *in = bin->fb; + + P(key) key, *keybuf = ctx->key_buf; + if (!keybuf) + keybuf = ctx->key_buf = sorter_alloc(ctx, sizeof(key)); + if (ctx->more_keys) + { + key = *keybuf; + ctx->more_keys = 0; + } + else if (!P(read_key)(in, &key)) + return 0; + +#ifdef SORT_VAR_DATA + if (sizeof(key) + 1024 + SORT_DATA_SIZE(key) > ctx->big_buf_half_size) + { + SORT_XTRACE("s-internal: Generating a giant run"); + struct fastbuf *out = sorter_open_write(bout); /* FIXME: Using a non-direct buffer would be nice here */ + P(copy_data)(&key, in, out); + bout->runs++; + return 1; // We don't know, but 1 is always safe + } +#endif + + size_t bufsize = ctx->big_buf_half_size; /* FIXME: In some cases, we can use the whole buffer */ + bufsize = MIN((u64)bufsize, (u64)~0U * sizeof(P(internal_item_t))); // The number of records must fit in uns + + SORT_XTRACE("s-internal: Reading (bufsize=%zd)", bufsize); + P(internal_item_t) *item_array = ctx->big_buf, *item = item_array, *last_item; + byte *end = (byte *) ctx->big_buf + bufsize; + do + { + uns ksize = SORT_KEY_SIZE(key); +#ifdef SORT_UNIFY + uns ksize_aligned = ALIGN_TO(ksize, CPU_STRUCT_ALIGN); +#else + uns ksize_aligned = ksize; +#endif + uns dsize = SORT_DATA_SIZE(key); + uns recsize = ALIGN_TO(ksize_aligned + dsize, CPU_STRUCT_ALIGN); + if (unlikely(sizeof(P(internal_item_t)) + recsize > (size_t)(end - (byte *) item))) + { + ctx->more_keys = 1; + *keybuf = key; + break; + } + end -= recsize; + memcpy(end, &key, ksize); +#ifdef SORT_VAR_DATA + breadb(in, end + ksize_aligned, dsize); +#endif + item->key = (P(key)*) end; + item++; + } + while (P(read_key)(in, &key)); + last_item = item; + + uns count = last_item - item_array; + SORT_XTRACE("s-internal: Sorting %d items", count); + P(array_sort)(count, item_array); + + SORT_XTRACE("s-internal: Writing"); + if (!ctx->more_keys) + bout = bout_only; + struct fastbuf *out = sbuck_open_write(bout); + bout->runs++; + /* FIXME: No unification done yet */ + for (item = item_array; item < last_item; item++) + { + P(write_key)(out, item->key); +#ifdef SORT_VAR_DATA + uns ksize = SORT_KEY_SIZE(*item->key); +#ifdef SORT_UNIFY + ksize = ALIGN_TO(ksize, CPU_STRUCT_ALIGN); +#endif + bwrite(out, (byte *) item->key + ksize, SORT_DATA_SIZE(*item->key)); +#endif + } + + return ctx->more_keys; } diff --git a/lib/sorter/s-twoway.h b/lib/sorter/s-twoway.h index 6bbdf28d..16a6720b 100644 --- a/lib/sorter/s-twoway.h +++ b/lib/sorter/s-twoway.h @@ -10,7 +10,7 @@ /* FIXME: There is a plenty of room for further optimization */ /* FIXME: Swap outputs if there already are some runs? */ -static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs) +static void P(twoway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket **ins, struct sort_bucket **outs) { struct fastbuf *fin1, *fin2, *fout1, *fout2, *ftmp; P(key) kbuf1, kbuf2, kbuf3, kbuf4; @@ -20,11 +20,11 @@ static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, int comp; uns run_count = 0; - fin1 = sorter_open_read(ins[0]); + fin1 = sbuck_open_read(ins[0]); next1 = P(read_key)(fin1, kin1); - if (ins[1]) + if (sbuck_can_read(ins[1])) { - fin2 = sorter_open_read(ins[1]); + fin2 = sbuck_open_read(ins[1]); next2 = P(read_key)(fin2, kin2); } else @@ -50,9 +50,9 @@ static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, if (unlikely(!fout1)) { if (!fout2) - fout1 = sorter_open_write(outs[0]); + fout1 = sbuck_open_write(outs[0]); else if (outs[1]) - fout1 = sorter_open_write(outs[1]); + fout1 = sbuck_open_write(outs[1]); else fout1 = fout2; } @@ -100,8 +100,8 @@ static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, } } - sorter_close_read(ins[0]); - sorter_close_read(ins[1]); + sbuck_close_read(ins[0]); + sbuck_close_read(ins[1]); if (fout2 && fout2 != fout1) outs[1]->runs += run_count / 2; if (fout1) diff --git a/lib/sorter/sorter.h b/lib/sorter/sorter.h index fe6cefc1..a380bec7 100644 --- a/lib/sorter/sorter.h +++ b/lib/sorter/sorter.h @@ -56,7 +56,7 @@ * * Unification: * - * SORT_MERGE merge items with identical keys, needs the following functions: + * SORT_UNIFY merge items with identical keys, needs the following functions: * void PREFIX_write_merged(struct fastbuf *f, SORT_KEY **keys, uns n, byte *buf) * takes n records in memory with keys which compare equal and writes * a single record to the given fastbuf. Data for each key can @@ -70,10 +70,11 @@ * * SORT_INPUT_FILE file of a given name * SORT_INPUT_FB fastbuf stream - * SORT_INPUT_PRESORT custom presorter: call function PREFIX_presorter (see below) - * to get successive batches of pre-sorted data as temporary - * fastbuf streams or NULL if no more data is available. + * SORT_INPUT_PRESORT custom presorter. Calls function + * int PREFIX_presort(struct fastbuf *dest, byte *buf, size_t bufsize); + * to get successive batches of pre-sorted data. * The function is passed a page-aligned presorting buffer. + * It returns 1 on success or 0 on EOF. * * Output (chose one of these): * @@ -143,7 +144,7 @@ static inline int P(hash) (P(key) *x) #endif #endif -#ifdef SORT_MERGE +#ifdef SORT_UNIFY #define LESS < #else #define LESS <= @@ -154,10 +155,22 @@ static inline int P(hash) (P(key) *x) #define SORT_ASSERT_UNIQUE #endif +#ifdef SORT_KEY_SIZE +#define SORT_VAR_KEY +#else +#define SORT_KEY_SIZE(key) sizeof(key) +#endif + +#ifdef SORT_DATA_SIZE +#define SORT_VAR_DATA +#else +#define SORT_DATA_SIZE(key) 0 +#endif + static inline void P(copy_data)(P(key) *key, struct fastbuf *in, struct fastbuf *out) { bwrite(out, key, sizeof(P(key))); -#ifdef SORT_DATA_SIZE +#ifdef SORT_VAR_DATA bbcopy(in, out, SORT_DATA_SIZE(*key)); #else (void) in; @@ -192,7 +205,7 @@ static struct fastbuf *P(sort)( ctx.in_fb = in; #elif defined(SORT_INPUT_PRESORT) ASSERT(!in); - ctx.custom_presort = P(presorter); + ctx.custom_presort = P(presort); #else #error No input given. #endif @@ -234,9 +247,11 @@ static struct fastbuf *P(sort)( #undef SORT_KEY_REGULAR #undef SORT_KEY_SIZE #undef SORT_DATA_SIZE +#undef SORT_VAR_KEY +#undef SORT_VAR_DATA #undef SORT_INT #undef SORT_HASH_BITS -#undef SORT_MERGE +#undef SORT_UNIFY #undef SORT_INPUT_FILE #undef SORT_INPUT_FB #undef SORT_INPUT_PRESORT -- 2.39.2