]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/array.c
Cleaned up tracing levels. Array sorter now has its own control knob.
[libucw.git] / lib / sorter / array.c
1 /*
2  *      UCW Library -- Optimized Array Sorter
3  *
4  *      (c) 2003--2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/sorter/common.h"
14
15 #include <string.h>
16 #include <alloca.h>
17
18 #define ASORT_MIN_SHIFT 2
19
20 #define ASORT_TRACE(x...) ASORT_XTRACE(1, x)
21 #define ASORT_XTRACE(level, x...) do { if (sorter_trace_array >= level) msg(L_DEBUG, x); } while(0)
22
23 static void
24 asort_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output)
25 {
26   // swap_output == 0 if result should be returned in `array', otherwise in `buffer'
27   uns buckets = (1 << ctx->radix_bits);
28   uns shift = (hash_bits > ctx->radix_bits) ? (hash_bits - ctx->radix_bits) : 0;
29   uns cnt[buckets];
30
31 #if 0
32   static int reported[64];
33   if (!reported[hash_bits]++)
34 #endif
35   DBG(">>> n=%d h=%d s=%d sw=%d", num_elts, hash_bits, shift, swapped_output);
36
37   bzero(cnt, sizeof(cnt));
38   ctx->radix_count(array, num_elts, cnt, shift);
39
40   uns pos = 0;
41   for (uns i=0; i<buckets; i++)
42     {
43       uns j = cnt[i];
44       cnt[i] = pos;
45       pos += j;
46     }
47   ASSERT(pos == num_elts);
48
49   ctx->radix_split(array, buffer, num_elts, cnt, shift);
50   pos = 0;
51   for (uns i=0; i<buckets; i++)
52     {
53       uns n = cnt[i] - pos;
54       if (n * ctx->elt_size < sorter_radix_threshold || shift < ASORT_MIN_SHIFT)
55         {
56           ctx->quicksort(buffer, n);
57           if (!swapped_output)
58             memcpy(array, buffer, n * ctx->elt_size);
59         }
60       else
61         asort_radix(ctx, buffer, array, n, shift, !swapped_output);
62       array += n * ctx->elt_size;
63       buffer += n * ctx->elt_size;
64       pos = cnt[i];
65     }
66 }
67
68 #ifdef CONFIG_UCW_THREADS
69
70 #include "lib/threads.h"
71 #include "lib/workqueue.h"
72 #include "lib/eltpool.h"
73
74 static uns asort_threads_use_count;
75 static uns asort_threads_ready;
76 static struct worker_pool asort_thread_pool;
77
78 void
79 asort_start_threads(uns run)
80 {
81   ucwlib_lock();
82   asort_threads_use_count++;
83   if (run && !asort_threads_ready)
84     {
85       ASORT_TRACE("Initializing thread pool (%d threads)", sorter_threads);
86       asort_thread_pool.num_threads = sorter_threads;
87       worker_pool_init(&asort_thread_pool);
88       asort_threads_ready = 1;
89     }
90   ucwlib_unlock();
91 }
92
93 void
94 asort_stop_threads(void)
95 {
96   ucwlib_lock();
97   if (!--asort_threads_use_count && asort_threads_ready)
98     {
99       ASORT_TRACE("Shutting down thread pool");
100       worker_pool_cleanup(&asort_thread_pool);
101       asort_threads_ready = 0;
102     }
103   ucwlib_unlock();
104 }
105
106 struct qs_work {
107   struct work w;
108   struct asort_context *ctx;
109   void *array;
110   uns num_elts;
111   int left, right;
112 #define LR_UNDEF -100
113 };
114
115 static void
116 qs_handle_work(struct worker_thread *thr UNUSED, struct work *ww)
117 {
118   struct qs_work *w = (struct qs_work *) ww;
119
120   DBG("Thread %d: got %d elts", thr->id, w->num_elts);
121   if (w->num_elts * w->ctx->elt_size < sorter_thread_threshold)
122     {
123       w->ctx->quicksort(w->array, w->num_elts);
124       w->left = w->right = LR_UNDEF;
125     }
126   else
127     w->ctx->quicksplit(w->array, w->num_elts, &w->left, &w->right);
128   DBG("Thread %d: returning l=%d r=%d", thr->id, w->left, w->right);
129 }
130
131 static struct qs_work *
132 qs_alloc_work(struct asort_context *ctx)
133 {
134   struct qs_work *w = ep_alloc(ctx->eltpool);
135   w->w.priority = 0;
136   w->w.go = qs_handle_work;
137   w->ctx = ctx;
138   return w;
139 }
140
141 static void
142 threaded_quicksort(struct asort_context *ctx)
143 {
144   struct work_queue q;
145   struct qs_work *v, *w;
146
147   asort_start_threads(1);
148   work_queue_init(&asort_thread_pool, &q);
149   ctx->eltpool = ep_new(sizeof(struct qs_work), 1000);
150
151   w = qs_alloc_work(ctx);
152   w->array = ctx->array;
153   w->num_elts = ctx->num_elts;
154   work_submit(&q, &w->w);
155
156   while (v = (struct qs_work *) work_wait(&q))
157     {
158       if (v->left != LR_UNDEF)
159         {
160           if (v->right > 0)
161             {
162               w = qs_alloc_work(ctx);
163               w->array = v->array;
164               w->num_elts = v->right + 1;
165               w->w.priority = v->w.priority + 1;
166               work_submit(&q, &w->w);
167             }
168           if (v->left < (int)v->num_elts - 1)
169             {
170               w = qs_alloc_work(ctx);
171               w->array = v->array + v->left * ctx->elt_size;
172               w->num_elts = v->num_elts - v->left;
173               w->w.priority = v->w.priority + 1;
174               work_submit(&q, &w->w);
175             }
176         }
177       ep_free(ctx->eltpool, v);
178     }
179
180   ep_delete(ctx->eltpool);
181   work_queue_cleanup(&q);
182   asort_stop_threads();
183 }
184
185 struct rs_work {
186   struct work w;
187   struct asort_context *ctx;
188   void *array, *buffer;         // Like asort_radix().
189   uns num_elts;
190   uns shift;
191   uns swap_output;
192   uns cnt[0];
193 };
194
195 static void
196 rs_count(struct worker_thread *thr UNUSED, struct work *ww)
197 {
198   struct rs_work *w = (struct rs_work *) ww;
199
200   DBG("Thread %d: Counting %d items, shift=%d", thr->id, w->num_elts, w->shift);
201   w->ctx->radix_count(w->array, w->num_elts, w->cnt, w->shift);
202   DBG("Thread %d: Counting done", thr->id);
203 }
204
205 static void
206 rs_split(struct worker_thread *thr UNUSED, struct work *ww)
207 {
208   struct rs_work *w = (struct rs_work *) ww;
209
210   DBG("Thread %d: Splitting %d items, shift=%d", thr->id, w->num_elts, w->shift);
211   w->ctx->radix_split(w->array, w->buffer, w->num_elts, w->cnt, w->shift);
212   DBG("Thread %d: Splitting done", thr->id);
213 }
214
215 static void
216 rs_finish(struct worker_thread *thr UNUSED, struct work *ww)
217 {
218   struct rs_work *w = (struct rs_work *) ww;
219
220   if (thr)
221     DBG("Thread %d: Finishing %d items, shift=%d", thr->id, w->num_elts, w->shift);
222   if (w->shift < ASORT_MIN_SHIFT || w->num_elts * w->ctx->elt_size < sorter_radix_threshold)
223     {
224       w->ctx->quicksort(w->array, w->num_elts);
225       if (w->swap_output)
226         memcpy(w->buffer, w->array, w->num_elts * w->ctx->elt_size);
227     }
228   else
229     asort_radix(w->ctx, w->array, w->buffer, w->num_elts, w->shift, w->swap_output);
230   if (thr)
231     DBG("Thread %d: Finishing done", thr->id);
232 }
233
234 static void
235 rs_wait_small(struct asort_context *ctx)
236 {
237   struct rs_work *w;
238
239   while (w = (struct rs_work *) work_wait(ctx->rs_work_queue))
240     {
241       DBG("Reaping small chunk of %d items", w->num_elts);
242       ep_free(ctx->eltpool, w);
243     }
244 }
245
246 static void
247 rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output)
248 {
249   uns buckets = (1 << ctx->radix_bits);
250   uns shift = (hash_bits > ctx->radix_bits) ? (hash_bits - ctx->radix_bits) : 0;
251   uns cnt[buckets];
252   uns blksize = num_elts / sorter_threads;
253   DBG(">>> n=%d h=%d s=%d blk=%d sw=%d", num_elts, hash_bits, shift, blksize, swapped_output);
254
255   // If there are any small chunks in progress, wait for them to finish
256   rs_wait_small(ctx);
257
258   // Start parallel counting
259   void *iptr = array;
260   for (uns i=0; i<sorter_threads; i++)
261     {
262       struct rs_work *w = ctx->rs_works[i];
263       w->w.priority = 0;
264       w->w.go = rs_count;
265       w->ctx = ctx;
266       w->array = iptr;
267       w->buffer = buffer;
268       w->num_elts = blksize;
269       if (i == sorter_threads-1)
270         w->num_elts += num_elts % sorter_threads;
271       w->shift = shift;
272       iptr += w->num_elts * ctx->elt_size;
273       bzero(w->cnt, sizeof(uns) * buckets);
274       work_submit(ctx->rs_work_queue, &w->w);
275     }
276
277   // Get bucket sizes from the counts
278   bzero(cnt, sizeof(cnt));
279   for (uns i=0; i<sorter_threads; i++)
280     {
281       struct rs_work *w = (struct rs_work *) work_wait(ctx->rs_work_queue);
282       ASSERT(w);
283       for (uns j=0; j<buckets; j++)
284         cnt[j] += w->cnt[j];
285     }
286
287   // Calculate bucket starts
288   uns pos = 0;
289   for (uns i=0; i<buckets; i++)
290     {
291       uns j = cnt[i];
292       cnt[i] = pos;
293       pos += j;
294     }
295   ASSERT(pos == num_elts);
296
297   // Start parallel splitting
298   for (uns i=0; i<sorter_threads; i++)
299     {
300       struct rs_work *w = ctx->rs_works[i];
301       w->w.go = rs_split;
302       for (uns j=0; j<buckets; j++)
303         {
304           uns k = w->cnt[j];
305           w->cnt[j] = cnt[j];
306           cnt[j] += k;
307         }
308       work_submit(ctx->rs_work_queue, &w->w);
309     }
310   ASSERT(cnt[buckets-1] == num_elts);
311
312   // Wait for splits to finish
313   while (work_wait(ctx->rs_work_queue))
314     ;
315
316   // Recurse on buckets
317   pos = 0;
318   for (uns i=0; i<buckets; i++)
319     {
320       uns n = cnt[i] - pos;
321       if (!n)
322         continue;
323       if (n * ctx->elt_size < sorter_thread_threshold)
324         {
325           struct rs_work *w = ep_alloc(ctx->eltpool);
326           w->w.priority = 0;
327           w->w.go = rs_finish;
328           w->ctx = ctx;
329           w->array = buffer;
330           w->buffer = array;
331           w->num_elts = n;
332           w->shift = shift;
333           w->swap_output = !swapped_output;
334           if (n * ctx->elt_size < sorter_thread_chunk)
335             {
336               DBG("Sorting block %d+%d inline", pos, n);
337               rs_finish(NULL, &w->w);
338               ep_free(ctx->eltpool, w);
339             }
340           else
341             {
342               DBG("Scheduling block %d+%d", pos, n);
343               work_submit(ctx->rs_work_queue, &w->w);
344             }
345         }
346       else
347         rs_radix(ctx, buffer, array, n, shift, !swapped_output);
348       pos = cnt[i];
349       array += n * ctx->elt_size;
350       buffer += n * ctx->elt_size;
351     }
352 }
353
354 static void
355 threaded_radixsort(struct asort_context *ctx, uns swap)
356 {
357   struct work_queue q;
358
359   asort_start_threads(1);
360   work_queue_init(&asort_thread_pool, &q);
361
362   // Prepare work structures for counting and splitting.
363   // We use big_alloc(), because we want to avoid cacheline aliasing between threads.
364   ctx->rs_work_queue = &q;
365   ctx->rs_works = alloca(sizeof(struct rs_work *) * sorter_threads);
366   for (uns i=0; i<sorter_threads; i++)
367     ctx->rs_works[i] = big_alloc(sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits));
368
369   // Prepare a pool for all remaining small bits which will be sorted on background.
370   ctx->eltpool = ep_new(sizeof(struct rs_work), 1000);
371
372   // Do the big splitting
373   rs_radix(ctx, ctx->array, ctx->buffer, ctx->num_elts, ctx->hash_bits, swap);
374   for (uns i=0; i<sorter_threads; i++)
375     big_free(ctx->rs_works[i], sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits));
376
377   // Finish the small blocks
378   rs_wait_small(ctx);
379
380   ASSERT(!ctx->eltpool->num_allocated);
381   ep_delete(ctx->eltpool);
382   work_queue_cleanup(&q);
383   asort_stop_threads();
384 }
385
386 #else
387
388 void asort_start_threads(uns run UNUSED) { }
389 void asort_stop_threads(void) { }
390
391 #endif
392
393 static uns
394 predict_swap(struct asort_context *ctx)
395 {
396   uns bits = ctx->radix_bits;
397   uns elts = ctx->num_elts;
398   uns swap = 0;
399
400   while (elts * ctx->elt_size >= sorter_radix_threshold && bits >= ASORT_MIN_SHIFT)
401     {
402       DBG("Predicting pass: %d elts, %d bits", elts, bits);
403       swap = !swap;
404       elts >>= ctx->radix_bits;
405       bits = MAX(bits, ctx->radix_bits) - ctx->radix_bits;
406     }
407   return swap;
408 }
409
410 void
411 asort_run(struct asort_context *ctx)
412 {
413   ASORT_TRACE("Array-sorting %d items per %d bytes, hash_bits=%d", ctx->num_elts, ctx->elt_size, ctx->hash_bits);
414   uns allow_threads UNUSED = (sorter_threads > 1 &&
415                               ctx->num_elts * ctx->elt_size >= sorter_thread_threshold &&
416                               !(sorter_debug & SORT_DEBUG_ASORT_NO_THREADS));
417
418   if (ctx->num_elts * ctx->elt_size < sorter_radix_threshold ||
419       ctx->hash_bits <= ASORT_MIN_SHIFT ||
420       !ctx->radix_split ||
421       (sorter_debug & SORT_DEBUG_ASORT_NO_RADIX))
422     {
423 #ifdef CONFIG_UCW_THREADS
424       if (allow_threads)
425         {
426           ASORT_XTRACE(2, "Decided to use parallel quicksort");
427           threaded_quicksort(ctx);
428           return;
429         }
430 #endif
431       ASORT_XTRACE(2, "Decided to use sequential quicksort");
432       ctx->quicksort(ctx->array, ctx->num_elts);
433     }
434   else
435     {
436       uns swap = predict_swap(ctx);
437 #ifdef CONFIG_UCW_THREADS
438       if (allow_threads)
439         {
440           ASORT_XTRACE(2, "Decided to use parallel radix-sort (swap=%d)", swap);
441           threaded_radixsort(ctx, swap);
442           return;
443         }
444 #endif
445       ASORT_XTRACE(2, "Decided to use sequential radix-sort (swap=%d)", swap);
446       asort_radix(ctx, ctx->array, ctx->buffer, ctx->num_elts, ctx->hash_bits, swap);
447       if (swap)
448         ctx->array = ctx->buffer;
449     }
450
451   ASORT_XTRACE(2, "Array-sort finished");
452 }