SORT_DEBUG_NO_JOIN = 2,
SORT_DEBUG_KEEP_BUCKETS = 4,
SORT_DEBUG_NO_RADIX = 8,
+ SORT_DEBUG_NO_MULTIWAY = 16,
};
struct sort_bucket;
// Bucket arrays are NULL-terminated.
void (*twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs);
+ // Multi-way merge: merge an arbitrary number of source buckets to a single destination bucket.
+ void (*multiway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket *out);
+
// Radix split according to hash function
void (*radix_split)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket **outs, uns bitpos, uns numbits);
clist_insert_after(&ins[0]->n, list_pos);
}
+static void
+sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
+{
+ clist parts;
+ cnode *list_pos = b->n.prev;
+ struct sort_bucket *join = sbuck_join_to(b);
+
+ clist_init(&parts);
+ ASSERT(!(sorter_debug & SORT_DEBUG_NO_PRESORT));
+ // FIXME: What if the parts will be too small?
+ SORT_XTRACE(3, "%s", ((b->flags & SBF_CUSTOM_PRESORT) ? "Custom presorting" : "Presorting"));
+ uns cont;
+ uns part_cnt = 0;
+ u64 total_size = 0;
+ sorter_start_timer(ctx);
+ do
+ {
+ struct sort_bucket *p = sbuck_new(ctx);
+ clist_add_tail(&parts, &p->n);
+ cont = sorter_presort(ctx, b, p, (!part_cnt && join) ? join : p);
+ part_cnt++;
+ total_size += sbuck_size(p);
+ }
+ while (cont);
+ sorter_stop_timer(ctx, &ctx->total_pre_time);
+
+ // FIXME: This is way too similar to the two-way case.
+ if (part_cnt == 1)
+ {
+ struct sort_bucket *p = clist_head(&parts);
+ SORT_XTRACE(((b->flags & SBF_SOURCE) ? 1 : 2), "Sorted in memory");
+ if (join)
+ {
+ ASSERT(join->runs == 2);
+ join->runs--;
+ sbuck_drop(b);
+ }
+ else
+ clist_insert_after(&p->n, list_pos);
+ sbuck_drop(b);
+ return;
+ }
+
+ SORT_TRACE("Multi-way presorting pass (%d parts, %s, %dMB/s)", part_cnt, stk_fsize(total_size), sorter_speed(ctx, total_size));
+ sbuck_drop(b);
+
+ uns max_ways = 16;
+ struct sort_bucket *ways[max_ways+1];
+ SORT_XTRACE(2, "Starting up to %d-way merge", max_ways);
+ for (;;)
+ {
+ uns n = 0;
+ struct sort_bucket *p;
+ while (n < max_ways && (p = clist_head(&parts)))
+ {
+ clist_remove(&p->n);
+ ways[n++] = p;
+ }
+ ways[n] = NULL;
+ ASSERT(n > 1);
+
+ struct sort_bucket *out;
+ out = sbuck_new(ctx); // FIXME: No joining so far
+ sorter_start_timer(ctx);
+ ctx->multiway_merge(ctx, ways, out);
+ sorter_stop_timer(ctx, &ctx->total_ext_time);
+
+ for (uns i=0; i<n; i++)
+ sbuck_drop(ways[i]);
+
+ if (clist_empty(&parts))
+ {
+ clist_insert_after(&out->n, list_pos);
+ SORT_TRACE("Multi-way merge completed (%s, %dMB/s)", F_BSIZE(out), sorter_speed(ctx, sbuck_size(out)));
+ return;
+ }
+ else
+ {
+ clist_add_tail(&parts, &out->n);
+ SORT_TRACE("Multi-way merge pass (%d ways, %s, %dMB/s)", n, F_BSIZE(out), sorter_speed(ctx, sbuck_size(out)));
+ }
+ }
+}
+
static uns
sorter_radix_bits(struct sort_context *ctx, struct sort_bucket *b)
{
sbuck_drop(b);
else if (b->runs == 1)
sorter_join(b);
+ else if (ctx->multiway_merge && !(sorter_debug & (SORT_DEBUG_NO_MULTIWAY | SORT_DEBUG_NO_PRESORT))) // FIXME: Just kidding...
+ sorter_multiway(ctx, b);
else if (bits = sorter_radix_bits(ctx, b))
sorter_radix(ctx, b, bits);
else
}
}
-static void P(multiway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket **ins, uns num_ins, struct sort_bucket *out)
+static void P(multiway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket **ins, struct sort_bucket *out)
{
+ uns num_ins = 0;
+ while (ins[num_ins])
+ num_ins++;
+
uns n2 = 1;
while (n2 < num_ins)
n2 *= 2;
struct fastbuf *fins[num_ins];
P(key) keys[num_ins]; // FIXME: Tune num_ins according to L1 cache size
int tree[2*n2]; // A complete binary tree, leaves are input streams, each internal vertex contains a minimum of its sons
- for (uns i=1; i<=n2; i++)
+ for (uns i=1; i<2*n2; i++)
tree[i] = -1;
for (uns i=0; i<num_ins; i++)