]> mj.ucw.cz Git - libucw.git/commitdiff
Rewritten handling of small chunks in the parallel radix-sorter.
authorMartin Mares <mj@ucw.cz>
Mon, 10 Sep 2007 18:05:10 +0000 (20:05 +0200)
committerMartin Mares <mj@ucw.cz>
Mon, 10 Sep 2007 18:05:10 +0000 (20:05 +0200)
They are now immediately submitted to the worker thread instead of keeping
them in a list of delayed chunks for the whole course of the algorithm.
This avoids memory usage explosions caused by queueing lots of smallish
chunks. We prefer to wait for the small chunks to complete before starting
the next large chunk.

lib/sorter/array.c
lib/sorter/common.h

index d724f84128ed5a1bd14bd32e6bc68d0e40e52054..efa314a8702c1fd01339c9467cc473b5c70d96f3 100644 (file)
@@ -228,6 +228,18 @@ rs_finish(struct worker_thread *thr UNUSED, struct work *ww)
     DBG("Thread %d: Finishing done", thr->id);
 }
 
+static void
+rs_wait_small(struct asort_context *ctx)
+{
+  struct rs_work *w;
+
+  while (w = (struct rs_work *) work_wait(ctx->rs_work_queue))
+    {
+      DBG("Reaping small chunk of %d items", w->num_elts);
+      ep_free(ctx->eltpool, w);
+    }
+}
+
 static void
 rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output)
 {
@@ -237,6 +249,9 @@ rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns
   uns blksize = num_elts / sorter_threads;
   DBG(">>> n=%d h=%d s=%d blk=%d sw=%d", num_elts, hash_bits, shift, blksize, swapped_output);
 
+  // If there are any small chunks in progress, wait for them to finish
+  rs_wait_small(ctx);
+
   // Start parallel counting
   void *iptr = array;
   for (uns i=0; i<sorter_threads; i++)
@@ -321,8 +336,8 @@ rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns
            }
          else
            {
-             clist_add_tail(&ctx->rs_bits, &w->w.n);
              DBG("Scheduling block %d+%d", pos, n);
+             work_submit(ctx->rs_work_queue, &w->w);
            }
        }
       else
@@ -348,8 +363,7 @@ threaded_radixsort(struct asort_context *ctx)
   for (uns i=0; i<sorter_threads; i++)
     ctx->rs_works[i] = big_alloc(sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits));
 
-  // Prepare work structures for all remaining small bits which will be sorted later.
-  clist_init(&ctx->rs_bits);
+  // Prepare a pool for all remaining small bits which will be sorted on background.
   ctx->eltpool = ep_new(sizeof(struct rs_work), 1000);
 
   // Do the big splitting
@@ -359,12 +373,9 @@ threaded_radixsort(struct asort_context *ctx)
     big_free(ctx->rs_works[i], sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits));
 
   // Finish the small blocks
-  struct rs_work *w, *tmp;
-  CLIST_WALK_DELSAFE(w, ctx->rs_bits, tmp)
-    work_submit(&q, &w->w);
-  while (work_wait(&q))
-    ;
+  rs_wait_small(ctx);
 
+  ASSERT(!ctx->eltpool->num_allocated);
   ep_delete(ctx->eltpool);
   work_queue_cleanup(&q);
   asort_stop_threads();
index cd8053e11ce7b8d06737b24c8b3a342e54e096e1..50cbf34b789bf0b72475d50a61b20274de45d7fe 100644 (file)
@@ -135,7 +135,6 @@ struct asort_context {
   // Used internally by array.c
   struct rs_work **rs_works;
   struct work_queue *rs_work_queue;
-  clist rs_bits;
   struct eltpool *eltpool;
 };