From 73c12b4b33fc8017585439894aff79c20ef99ea8 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 10 Sep 2007 20:05:10 +0200 Subject: [PATCH] Rewritten handling of small chunks in the parallel radix-sorter. They are now immediately submitted to the worker thread instead of keeping them in a list of delayed chunks for the whole course of the algorithm. This avoids memory usage explosions caused by queueing lots of smallish chunks. We prefer to wait for the small chunks to complete before starting the next large chunk. --- lib/sorter/array.c | 27 +++++++++++++++++++-------- lib/sorter/common.h | 1 - 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/lib/sorter/array.c b/lib/sorter/array.c index d724f841..efa314a8 100644 --- a/lib/sorter/array.c +++ b/lib/sorter/array.c @@ -228,6 +228,18 @@ rs_finish(struct worker_thread *thr UNUSED, struct work *ww) DBG("Thread %d: Finishing done", thr->id); } +static void +rs_wait_small(struct asort_context *ctx) +{ + struct rs_work *w; + + while (w = (struct rs_work *) work_wait(ctx->rs_work_queue)) + { + DBG("Reaping small chunk of %d items", w->num_elts); + ep_free(ctx->eltpool, w); + } +} + static void rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output) { @@ -237,6 +249,9 @@ rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns uns blksize = num_elts / sorter_threads; DBG(">>> n=%d h=%d s=%d blk=%d sw=%d", num_elts, hash_bits, shift, blksize, swapped_output); + // If there are any small chunks in progress, wait for them to finish + rs_wait_small(ctx); + // Start parallel counting void *iptr = array; for (uns i=0; irs_bits, &w->w.n); DBG("Scheduling block %d+%d", pos, n); + work_submit(ctx->rs_work_queue, &w->w); } } else @@ -348,8 +363,7 @@ threaded_radixsort(struct asort_context *ctx) for (uns i=0; irs_works[i] = big_alloc(sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits)); - // Prepare work structures for all remaining small bits which will be sorted later. - clist_init(&ctx->rs_bits); + // Prepare a pool for all remaining small bits which will be sorted on background. ctx->eltpool = ep_new(sizeof(struct rs_work), 1000); // Do the big splitting @@ -359,12 +373,9 @@ threaded_radixsort(struct asort_context *ctx) big_free(ctx->rs_works[i], sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits)); // Finish the small blocks - struct rs_work *w, *tmp; - CLIST_WALK_DELSAFE(w, ctx->rs_bits, tmp) - work_submit(&q, &w->w); - while (work_wait(&q)) - ; + rs_wait_small(ctx); + ASSERT(!ctx->eltpool->num_allocated); ep_delete(ctx->eltpool); work_queue_cleanup(&q); asort_stop_threads(); diff --git a/lib/sorter/common.h b/lib/sorter/common.h index cd8053e1..50cbf34b 100644 --- a/lib/sorter/common.h +++ b/lib/sorter/common.h @@ -135,7 +135,6 @@ struct asort_context { // Used internally by array.c struct rs_work **rs_works; struct work_queue *rs_work_queue; - clist rs_bits; struct eltpool *eltpool; }; -- 2.39.2