/* Configuration, some of the variables are used by the old sorter, too. */
extern uns sorter_trace, sorter_presort_bufsize, sorter_stream_bufsize;
-extern uns sorter_debug;
+extern uns sorter_debug, sorter_min_radix_bits, sorter_max_radix_bits;
extern u64 sorter_bufsize;
#define SORT_TRACE(x...) do { if (sorter_trace) log(L_DEBUG, x); } while(0)
size_t big_buf_size, big_buf_half_size;
int (*custom_presort)(struct fastbuf *dest, void *buf, size_t bufsize);
+
// Take as much as possible from the source bucket, sort it in memory and dump to destination bucket.
// Return 1 if there is more data available in the source bucket.
int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only);
+
+ // Estimate how much input data from `b' will fit in the internal sorting buffer.
+ u64 (*internal_estimate)(struct sort_context *ctx, struct sort_bucket *b);
+
// Two-way split/merge: merge up to 2 source buckets to up to 2 destination buckets.
// Bucket arrays are NULL-terminated.
void (*twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs);
+
// Radix split according to hash function
void (*radix_split)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket **outs, uns bitpos, uns numbits);
/* Buffers */
void *sorter_alloc(struct sort_context *ctx, uns size);
+void sorter_prepare_buf(struct sort_context *ctx);
void sorter_alloc_buf(struct sort_context *ctx);
void sorter_free_buf(struct sort_context *ctx);
uns sorter_stream_bufsize = 65536;
u64 sorter_bufsize = 65536;
uns sorter_debug;
+uns sorter_min_radix_bits;
+uns sorter_max_radix_bits;
static struct cf_section sorter_config = {
CF_ITEMS {
CF_UNS("StreamBuffer", &sorter_stream_bufsize),
CF_U64("SortBuffer", &sorter_bufsize),
CF_UNS("Debug", &sorter_debug),
+ CF_UNS("MinRadixBits", &sorter_min_radix_bits),
+ CF_UNS("MaxRadixBits", &sorter_max_radix_bits),
CF_END
}
};
clist_insert_after(&ins[0]->n, list_pos);
}
-static int
-sorter_radix_p(struct sort_context *ctx, struct sort_bucket *b)
+static uns
+sorter_radix_bits(struct sort_context *ctx, struct sort_bucket *b)
{
- return b->hash_bits && ctx->radix_split &&
- !(sorter_debug & SORT_DEBUG_NO_RADIX) &&
- sbuck_size(b) > (sh_off_t)sorter_bufsize;
+ if (!b->hash_bits || !ctx->radix_split || (sorter_debug & SORT_DEBUG_NO_RADIX))
+ return 0;
+
+ u64 in = sbuck_size(b);
+ u64 mem = ctx->internal_estimate(ctx, b);
+ if (in < mem)
+ return 0;
+
+ uns n;
+ for (n = sorter_min_radix_bits; n < sorter_max_radix_bits && n < b->hash_bits; n++)
+ if ((in >> n) < mem)
+ break;
+ return n;
}
static void
-sorter_radix(struct sort_context *ctx, struct sort_bucket *b)
+ sorter_radix(struct sort_context *ctx, struct sort_bucket *b, uns bits)
{
- uns bits = MIN(b->hash_bits, 4); /* FIXME */
uns nbuck = 1 << bits;
- SORT_XTRACE(2, "Running radix sort on %s with %d bits of %d", F_BSIZE(b), bits, b->hash_bits);
+ SORT_XTRACE(2, "Running radix sort on %s with %d bits of %d (expected size %s)",
+ F_BSIZE(b), bits, b->hash_bits, stk_fsize(sbuck_size(b) / nbuck));
sorter_start_timer(ctx);
struct sort_bucket **outs = alloca(nbuck * sizeof(struct sort_bucket *));
min = MIN(min, s);
max = MAX(max, s);
sum += s;
+ if (nbuck > 4)
+ sbuck_swap_out(outs[i]);
}
SORT_TRACE("Radix split (%d buckets, %s min, %s max, %s avg, %dMB/s)", nbuck,
{
ctx->pool = mp_new(4096);
clist_init(&ctx->bucket_list);
+ sorter_prepare_buf(ctx);
/* FIXME: Remember to test sorting of empty files */
clist_add_head(&ctx->bucket_list, &bout->n);
struct sort_bucket *b;
+ uns bits;
while (bout = clist_head(&ctx->bucket_list), b = clist_next(&ctx->bucket_list, &bout->n))
{
SORT_XTRACE(2, "Next block: %s, %d hash bits", F_BSIZE(b), b->hash_bits);
sbuck_drop(b);
else if (b->runs == 1)
sorter_join(b);
- else if (sorter_radix_p(ctx, b))
- sorter_radix(ctx, b);
+ else if (bits = sorter_radix_bits(ctx, b))
+ sorter_radix(ctx, b, bits);
else
sorter_twoway(ctx, b);
}
return (n == maxkeys);
}
+
+static u64
+P(internal_estimate)(struct sort_context *ctx, struct sort_bucket *b UNUSED)
+{
+ return ctx->big_buf_half_size;
+}
return ctx->more_keys;
}
+
+static u64
+P(internal_estimate)(struct sort_context *ctx, struct sort_bucket *b UNUSED)
+{
+ uns avg;
+#ifdef SORT_VAR_KEY
+ avg = ALIGN_TO(sizeof(P(key))/4, CPU_STRUCT_ALIGN); // Wild guess...
+#else
+ avg = ALIGN_TO(sizeof(P(key)), CPU_STRUCT_ALIGN);
+#endif
+ // We ignore the data part of records, it probably won't make the estimate much worse
+ return (ctx->big_buf_half_size / (avg + sizeof(P(internal_item_t))) * avg);
+}
}
void
-sorter_alloc_buf(struct sort_context *ctx)
+sorter_prepare_buf(struct sort_context *ctx)
{
- if (ctx->big_buf)
- return;
u64 bs = MAX(sorter_bufsize/2, 1);
bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE);
- ctx->big_buf = big_alloc(2*bs);
ctx->big_buf_size = 2*bs;
- ctx->big_buf_half = ((byte*) ctx->big_buf) + bs;
ctx->big_buf_half_size = bs;
- SORT_XTRACE(2, "Allocated sorting buffer (2*%s)", stk_fsize(bs));
+}
+
+void
+sorter_alloc_buf(struct sort_context *ctx)
+{
+ if (ctx->big_buf)
+ return;
+ ctx->big_buf = big_alloc(ctx->big_buf_size);
+ ctx->big_buf_half = ((byte*) ctx->big_buf) + ctx->big_buf_half_size;
+ SORT_XTRACE(2, "Allocated sorting buffer (2*%s)", stk_fsize(ctx->big_buf_half_size));
}
void
#endif
ctx.internal_sort = P(internal);
+ ctx.internal_estimate = P(internal_estimate);
ctx.twoway_merge = P(twoway_merge);
sorter_run(&ctx);