-Testing:
-o Giant runs.
-o Records of odd lengths.
-o Empty files.
-
Cleanups:
-o Clean up data types and make sure they cannot overflow. (size_t vs. u64 vs. sh_off_t vs. uns)
-o Clean up log levels.
o Clean up introductory comments.
o Log messages should show both original and new size of the data. The speed
should be probably calculated from the former.
#define ASORT_MIN_SHIFT 2
+#define ASORT_TRACE(x...) ASORT_XTRACE(1, x)
+#define ASORT_XTRACE(level, x...) do { if (sorter_trace_array >= level) msg(L_DEBUG, x); } while(0)
+
static void
asort_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output)
{
asort_threads_use_count++;
if (run && !asort_threads_ready)
{
- SORT_XTRACE(2, "Initializing thread pool (%d threads)", sorter_threads);
+ ASORT_TRACE("Initializing thread pool (%d threads)", sorter_threads);
asort_thread_pool.num_threads = sorter_threads;
worker_pool_init(&asort_thread_pool);
asort_threads_ready = 1;
ucwlib_lock();
if (!--asort_threads_use_count && asort_threads_ready)
{
- SORT_XTRACE(2, "Shutting down thread pool");
+ ASORT_TRACE("Shutting down thread pool");
worker_pool_cleanup(&asort_thread_pool);
asort_threads_ready = 0;
}
void
asort_run(struct asort_context *ctx)
{
- SORT_XTRACE(10, "Array-sorting %d items per %d bytes, hash_bits=%d", ctx->num_elts, ctx->elt_size, ctx->hash_bits);
+ ASORT_TRACE("Array-sorting %d items per %d bytes, hash_bits=%d", ctx->num_elts, ctx->elt_size, ctx->hash_bits);
uns allow_threads UNUSED = (sorter_threads > 1 &&
ctx->num_elts * ctx->elt_size >= sorter_thread_threshold &&
!(sorter_debug & SORT_DEBUG_ASORT_NO_THREADS));
#ifdef CONFIG_UCW_THREADS
if (allow_threads)
{
- SORT_XTRACE(12, "Decided to use parallel quicksort");
+ ASORT_XTRACE(2, "Decided to use parallel quicksort");
threaded_quicksort(ctx);
return;
}
#endif
- SORT_XTRACE(12, "Decided to use sequential quicksort");
+ ASORT_XTRACE(2, "Decided to use sequential quicksort");
ctx->quicksort(ctx->array, ctx->num_elts);
}
else
#ifdef CONFIG_UCW_THREADS
if (allow_threads)
{
- SORT_XTRACE(12, "Decided to use parallel radix-sort (swap=%d)", swap);
+ ASORT_XTRACE(2, "Decided to use parallel radix-sort (swap=%d)", swap);
threaded_radixsort(ctx, swap);
return;
}
#endif
- SORT_XTRACE(12, "Decided to use sequential radix-sort (swap=%d)", swap);
+ ASORT_XTRACE(2, "Decided to use sequential radix-sort (swap=%d)", swap);
asort_radix(ctx, ctx->array, ctx->buffer, ctx->num_elts, ctx->hash_bits, swap);
if (swap)
ctx->array = ctx->buffer;
}
- SORT_XTRACE(11, "Array-sort finished");
+ ASORT_XTRACE(2, "Array-sort finished");
}
#include "lib/clists.h"
/* Configuration, some of the variables are used by the old sorter, too. */
-extern uns sorter_trace, sorter_stream_bufsize;
+extern uns sorter_trace, sorter_trace_array, sorter_stream_bufsize;
extern uns sorter_debug, sorter_min_radix_bits, sorter_max_radix_bits, sorter_add_radix_bits;
extern uns sorter_min_multiway_bits, sorter_max_multiway_bits;
extern uns sorter_threads, sorter_thread_threshold, sorter_thread_chunk;
#include "lib/sorter/common.h"
uns sorter_trace;
+uns sorter_trace_array;
u64 sorter_bufsize = 65536;
uns sorter_debug;
uns sorter_min_radix_bits;
static struct cf_section sorter_config = {
CF_ITEMS {
CF_UNS("Trace", &sorter_trace),
+ CF_UNS("TraceArray", &sorter_trace_array),
CF_SECTION("FileAccess", &sorter_fb_params, &fbpar_cf),
CF_SECTION("SmallFileAccess", &sorter_fb_params, &fbpar_cf),
CF_U64("SmallInput", &sorter_small_input),
{
// The final bucket doesn't have any file associated yet, so replace
// it with the new bucket.
- SORT_XTRACE(2, "Replaced final bucket");
+ SORT_XTRACE(3, "Replaced final bucket");
b->flags |= SBF_FINAL;
sbuck_drop(join);
}
{
sorter_stop_timer(ctx, &ctx->total_pre_time);
sh_off_t size = sbuck_ins_or_join(ins[0], list_pos, join, join_size);
- SORT_XTRACE(((b->flags & SBF_SOURCE) ? 1 : 2), "Sorted in memory (%s, %dMB/s)", stk_fsize(size), sorter_speed(ctx, size));
+ SORT_XTRACE(((b->flags & SBF_SOURCE) ? 1 : 3), "Sorted in memory (%s, %dMB/s)", stk_fsize(size), sorter_speed(ctx, size));
sbuck_drop(b);
return;
}
cnode *list_pos = b->n.prev;
sh_off_t join_size;
struct sort_bucket *join = sbuck_join_to(b, &join_size);
- uns trace_level = (b->flags & SBF_SOURCE) ? 1 : 2;
+ uns trace_level = (b->flags & SBF_SOURCE) ? 1 : 3;
clist_init(&parts);
ASSERT(!(sorter_debug & SORT_DEBUG_NO_PRESORT));
uns max_ways = 1 << sorter_max_multiway_bits;
struct sort_bucket *ways[max_ways+1];
- SORT_XTRACE(2, "Starting up to %d-way merge", max_ways);
+ SORT_XTRACE(3, "Starting up to %d-way merge", max_ways);
for (;;)
{
uns n = 0;
bits = MIN(bits + sorter_add_radix_bits, sorter_max_radix_bits);
uns nbuck = 1 << bits;
- SORT_XTRACE(2, "Running radix split on %s with hash %d bits of %d (expecting %s buckets)",
+ SORT_XTRACE(3, "Running radix split on %s with hash %d bits of %d (expecting %s buckets)",
F_BSIZE(b), bits, b->hash_bits, stk_fsize(sbuck_size(b) / nbuck));
sorter_free_buf(ctx);
sorter_start_timer(ctx);
// Drop empty buckets
if (!sbuck_have(b))
{
- SORT_XTRACE(3, "Dropping empty bucket");
+ SORT_XTRACE(4, "Dropping empty bucket");
sbuck_drop(b);
return;
}
multiway_bits = 0;
}
- SORT_XTRACE(2, "Decisions: size=%s max=%s runs=%d bits=%d hash=%d -> radix=%d multi=%d",
+ SORT_XTRACE(3, "Decisions: size=%s max=%s runs=%d bits=%d hash=%d -> radix=%d multi=%d",
stk_fsize(insize), stk_fsize(mem), b->runs, bits, b->hash_bits,
radix_bits, multiway_bits);
P(key) *buf = ctx->big_buf;
uns maxkeys = P(internal_num_keys)(ctx);
- SORT_XTRACE(4, "s-fixint: Reading (maxkeys=%u, hash_bits=%d)", maxkeys, bin->hash_bits);
+ SORT_XTRACE(5, "s-fixint: Reading (maxkeys=%u, hash_bits=%d)", maxkeys, bin->hash_bits);
uns n = 0;
while (n < maxkeys && P(read_key)(in, &buf[n]))
n++;
return 0;
void *workspace UNUSED = ALIGN_PTR(&buf[n], CPU_PAGE_SIZE);
- SORT_XTRACE(3, "s-fixint: Sorting %u items (%s items, %s workspace)",
+ SORT_XTRACE(4, "s-fixint: Sorting %u items (%s items, %s workspace)",
n,
stk_fsize(n * sizeof(P(key))),
stk_fsize(n * P(internal_workspace)()));
);
ctx->total_int_time += get_timer(&timer);
- SORT_XTRACE(4, "s-fixint: Writing");
+ SORT_XTRACE(5, "s-fixint: Writing");
if (n < maxkeys)
bout = bout_only;
struct fastbuf *out = sbuck_write(bout);
P(write_key)(out, &buf[i]);
}
#ifdef SORT_UNIFY
- SORT_XTRACE(3, "Merging reduced %d records", merged);
+ SORT_XTRACE(4, "Merging reduced %d records", merged);
#endif
return (n == maxkeys);
#ifdef SORT_VAR_DATA
if (sizeof(key) + 2*CPU_PAGE_SIZE + SORT_DATA_SIZE(key) + P(internal_workspace)(&key) > bufsize)
{
- SORT_XTRACE(3, "s-internal: Generating a giant run");
+ SORT_XTRACE(4, "s-internal: Generating a giant run");
struct fastbuf *out = sbuck_write(bout);
P(copy_data)(&key, in, out);
bout->runs++;
}
#endif
- SORT_XTRACE(4, "s-internal: Reading");
+ SORT_XTRACE(5, "s-internal: Reading");
P(internal_item_t) *item_array = ctx->big_buf, *item = item_array, *last_item;
byte *end = (byte *) ctx->big_buf + bufsize;
size_t remains = bufsize - CPU_PAGE_SIZE;
uns count = last_item - item_array;
void *workspace UNUSED = ALIGN_PTR(last_item, CPU_PAGE_SIZE);
- SORT_XTRACE(3, "s-internal: Read %u items (%s items, %s workspace, %s data)",
+ SORT_XTRACE(4, "s-internal: Read %u items (%s items, %s workspace, %s data)",
count,
stk_fsize((byte*)last_item - (byte*)item_array),
stk_fsize(end - (byte*)last_item - remains),
);
ctx->total_int_time += get_timer(&timer);
- SORT_XTRACE(4, "s-internal: Writing");
+ SORT_XTRACE(5, "s-internal: Writing");
if (!ctx->more_keys)
bout = bout_only;
struct fastbuf *out = sbuck_write(bout);
#endif
}
#ifdef SORT_UNIFY
- SORT_XTRACE(3, "Merging reduced %u records", merged);
+ SORT_XTRACE(4, "Merging reduced %u records", merged);
#endif
return ctx->more_keys;
if (ctx->big_buf)
return;
ctx->big_buf = big_alloc(ctx->big_buf_size);
- SORT_XTRACE(2, "Allocated sorting buffer (%s)", stk_fsize(ctx->big_buf_size));
+ SORT_XTRACE(3, "Allocated sorting buffer (%s)", stk_fsize(ctx->big_buf_size));
}
void
return;
big_free(ctx->big_buf, ctx->big_buf_size);
ctx->big_buf = NULL;
- SORT_XTRACE(2, "Freed sorting buffer");
+ SORT_XTRACE(3, "Freed sorting buffer");
}