]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/array.c
838ad98c61eff7c4335122e76e35d0eb661dc767
[libucw.git] / lib / sorter / array.c
1 /*
2  *      UCW Library -- Optimized Array Sorter
3  *
4  *      (c) 2003--2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/sorter/common.h"
14
15 #include <string.h>
16 #include <alloca.h>
17
18 #define ASORT_MIN_RADIX 5000            // FIXME: var?
19 #define ASORT_MIN_SHIFT 2
20
21 static void
22 asort_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output)
23 {
24   // swap_output == 0 if result should be returned in `array', otherwise in `buffer'
25   uns buckets = (1 << ctx->radix_bits);
26   uns shift = (hash_bits > ctx->radix_bits) ? (hash_bits - ctx->radix_bits) : 0;
27   uns cnt[buckets];
28
29 #if 0
30   static int reported[64];
31   if (!reported[hash_bits]++)
32 #endif
33   DBG(">>> n=%d h=%d s=%d sw=%d", num_elts, hash_bits, shift, swapped_output);
34
35   bzero(cnt, sizeof(cnt));
36   ctx->radix_count(array, num_elts, cnt, shift);
37
38   uns pos = 0;
39   for (uns i=0; i<buckets; i++)
40     {
41       uns j = cnt[i];
42       cnt[i] = pos;
43       pos += j;
44     }
45   ASSERT(pos == num_elts);
46
47   ctx->radix_split(array, buffer, num_elts, cnt, shift);
48   pos = 0;
49   for (uns i=0; i<buckets; i++)
50     {
51       uns n = cnt[i] - pos;
52       if (n < ASORT_MIN_RADIX || shift < ASORT_MIN_SHIFT)
53         {
54           ctx->quicksort(buffer, n);
55           if (!swapped_output)
56             memcpy(array, buffer, n * ctx->elt_size);
57         }
58       else
59         asort_radix(ctx, buffer, array, n, shift, !swapped_output);
60       array += n * ctx->elt_size;
61       buffer += n * ctx->elt_size;
62       pos = cnt[i];
63     }
64 }
65
66 #ifdef CONFIG_UCW_THREADS
67
68 #include "lib/threads.h"
69 #include "lib/workqueue.h"
70 #include "lib/eltpool.h"
71
72 static uns asort_threads_use_count;
73 static uns asort_threads_ready;
74 static struct worker_pool asort_thread_pool;
75
76 void
77 asort_start_threads(uns run)
78 {
79   ucwlib_lock();
80   asort_threads_use_count++;
81   if (run && !asort_threads_ready)
82     {
83       SORT_XTRACE(2, "Initializing thread pool (%d threads)", sorter_threads);
84       asort_thread_pool.num_threads = sorter_threads;
85       worker_pool_init(&asort_thread_pool);
86       asort_threads_ready = 1;
87     }
88   ucwlib_unlock();
89 }
90
91 void
92 asort_stop_threads(void)
93 {
94   ucwlib_lock();
95   if (!--asort_threads_use_count && asort_threads_ready)
96     {
97       SORT_XTRACE(2, "Shutting down thread pool");
98       worker_pool_cleanup(&asort_thread_pool);
99       asort_threads_ready = 0;
100     }
101   ucwlib_unlock();
102 }
103
104 struct qs_work {
105   struct work w;
106   struct asort_context *ctx;
107   void *array;
108   uns num_elts;
109   int left, right;
110 #define LR_UNDEF -100
111 };
112
113 static void
114 qs_handle_work(struct worker_thread *thr UNUSED, struct work *ww)
115 {
116   struct qs_work *w = (struct qs_work *) ww;
117
118   DBG("Thread %d: got %d elts", thr->id, w->num_elts);
119   if (w->num_elts * w->ctx->elt_size < sorter_thread_threshold)
120     {
121       w->ctx->quicksort(w->array, w->num_elts);
122       w->left = w->right = LR_UNDEF;
123     }
124   else
125     w->ctx->quicksplit(w->array, w->num_elts, &w->left, &w->right);
126   DBG("Thread %d: returning l=%d r=%d", thr->id, w->left, w->right);
127 }
128
129 static struct qs_work *
130 qs_alloc_work(struct asort_context *ctx)
131 {
132   struct qs_work *w = ep_alloc(ctx->eltpool);
133   w->w.priority = 0;
134   w->w.go = qs_handle_work;
135   w->ctx = ctx;
136   return w;
137 }
138
139 static void
140 threaded_quicksort(struct asort_context *ctx)
141 {
142   struct work_queue q;
143   struct qs_work *v, *w;
144
145   asort_start_threads(1);
146   work_queue_init(&asort_thread_pool, &q);
147   ctx->eltpool = ep_new(sizeof(struct qs_work), 1000);
148
149   w = qs_alloc_work(ctx);
150   w->array = ctx->array;
151   w->num_elts = ctx->num_elts;
152   work_submit(&q, &w->w);
153
154   while (v = (struct qs_work *) work_wait(&q))
155     {
156       if (v->left != LR_UNDEF)
157         {
158           if (v->right > 0)
159             {
160               w = qs_alloc_work(ctx);
161               w->array = v->array;
162               w->num_elts = v->right + 1;
163               w->w.priority = v->w.priority + 1;
164               work_submit(&q, &w->w);
165             }
166           if (v->left < (int)v->num_elts - 1)
167             {
168               w = qs_alloc_work(ctx);
169               w->array = v->array + v->left * ctx->elt_size;
170               w->num_elts = v->num_elts - v->left;
171               w->w.priority = v->w.priority + 1;
172               work_submit(&q, &w->w);
173             }
174         }
175       ep_free(ctx->eltpool, v);
176     }
177
178   ep_delete(ctx->eltpool);
179   work_queue_cleanup(&q);
180   asort_stop_threads();
181 }
182
183 struct rs_work {
184   struct work w;
185   struct asort_context *ctx;
186   void *in, *out;
187   uns num_elts;
188   uns shift;
189   uns swap_output;
190   uns cnt[0];
191 };
192
193 static void
194 rs_count(struct worker_thread *thr UNUSED, struct work *ww)
195 {
196   struct rs_work *w = (struct rs_work *) ww;
197
198   DBG("Thread %d: Counting %d items, shift=%d", thr->id, w->num_elts, w->shift);
199   w->ctx->radix_count(w->in, w->num_elts, w->cnt, w->shift);
200   DBG("Thread %d: Counting done", thr->id);
201 }
202
203 static void
204 rs_split(struct worker_thread *thr UNUSED, struct work *ww)
205 {
206   struct rs_work *w = (struct rs_work *) ww;
207
208   DBG("Thread %d: Splitting %d items, shift=%d", thr->id, w->num_elts, w->shift);
209   w->ctx->radix_split(w->in, w->out, w->num_elts, w->cnt, w->shift);
210   DBG("Thread %d: Splitting done", thr->id);
211 }
212
213 static void
214 rs_finish(struct worker_thread *thr UNUSED, struct work *ww)
215 {
216   struct rs_work *w = (struct rs_work *) ww;
217
218   DBG("Thread %d: Finishing %d items, shift=%d", thr->id, w->num_elts, w->shift);
219   if (w->shift < ASORT_MIN_SHIFT || w->num_elts < ASORT_MIN_RADIX)
220     {
221       w->ctx->quicksort(w->in, w->num_elts);
222       if (w->swap_output)
223         memcpy(w->out, w->in, w->num_elts * w->ctx->elt_size);
224     }
225   else
226     asort_radix(w->ctx, w->in, w->out, w->num_elts, w->shift, w->swap_output);
227   DBG("Thread %d: Finishing done", thr->id);
228 }
229
230 static void
231 rs_radix(struct asort_context *ctx, void *array, void *buffer, uns num_elts, uns hash_bits, uns swapped_output)
232 {
233   uns buckets = (1 << ctx->radix_bits);
234   uns shift = (hash_bits > ctx->radix_bits) ? (hash_bits - ctx->radix_bits) : 0;
235   uns cnt[buckets];
236   uns blksize = num_elts / sorter_threads;
237   DBG(">>> n=%d h=%d s=%d blk=%d sw=%d", num_elts, hash_bits, shift, blksize, swapped_output);
238
239   // Start parallel counting
240   void *iptr = array;
241   for (uns i=0; i<sorter_threads; i++)
242     {
243       struct rs_work *w = ctx->rs_works[i];
244       w->w.priority = 0;
245       w->w.go = rs_count;
246       w->ctx = ctx;
247       w->in = iptr;
248       w->out = buffer;
249       w->num_elts = blksize;
250       if (i == sorter_threads-1)
251         w->num_elts += num_elts % sorter_threads;
252       w->shift = shift;
253       iptr += w->num_elts * ctx->elt_size;
254       bzero(w->cnt, sizeof(uns) * buckets);
255       work_submit(ctx->rs_work_queue, &w->w);
256     }
257
258   // Get bucket sizes from the counts
259   bzero(cnt, sizeof(cnt));
260   for (uns i=0; i<sorter_threads; i++)
261     {
262       struct rs_work *w = (struct rs_work *) work_wait(ctx->rs_work_queue);
263       ASSERT(w);
264       for (uns j=0; j<buckets; j++)
265         cnt[j] += w->cnt[j];
266     }
267
268   // Calculate bucket starts
269   uns pos = 0;
270   for (uns i=0; i<buckets; i++)
271     {
272       uns j = cnt[i];
273       cnt[i] = pos;
274       pos += j;
275     }
276   ASSERT(pos == num_elts);
277
278   // Start parallel splitting
279   for (uns i=0; i<sorter_threads; i++)
280     {
281       struct rs_work *w = ctx->rs_works[i];
282       w->w.go = rs_split;
283       for (uns j=0; j<buckets; j++)
284         {
285           uns k = w->cnt[j];
286           w->cnt[j] = cnt[j];
287           cnt[j] += k;
288         }
289       work_submit(ctx->rs_work_queue, &w->w);
290     }
291   ASSERT(cnt[buckets-1] == num_elts);
292
293   // Wait for splits to finish
294   while (work_wait(ctx->rs_work_queue))
295     ;
296
297   // Recurse on buckets
298   pos = 0;
299   for (uns i=0; i<buckets; i++)
300     {
301       uns n = cnt[i] - pos;
302       if (n * ctx->elt_size < sorter_thread_threshold)
303         {
304           struct rs_work *w = ep_alloc(ctx->eltpool);
305           w->w.priority = 0;
306           w->w.go = rs_finish;
307           w->ctx = ctx;
308           w->in = buffer;
309           w->out = array;
310           w->num_elts = n;
311           w->shift = shift;
312           w->swap_output = !swapped_output;
313           clist_add_tail(&ctx->rs_bits, &w->w.n);
314           DBG("Scheduling block %d+%d", pos, n);
315         }
316       else
317         rs_radix(ctx, buffer, array, n, shift, !swapped_output);
318       pos = cnt[i];
319       array += n * ctx->elt_size;
320       buffer += n * ctx->elt_size;
321     }
322 }
323
324 static void
325 threaded_radixsort(struct asort_context *ctx)
326 {
327   struct work_queue q;
328
329   asort_start_threads(1);
330   work_queue_init(&asort_thread_pool, &q);
331
332   // Prepare work structures for counting and splitting.
333   // We use big_alloc(), because we want to avoid cacheline aliasing between threads.
334   ctx->rs_work_queue = &q;
335   ctx->rs_works = alloca(sizeof(struct rs_work *) * sorter_threads);
336   for (uns i=0; i<sorter_threads; i++)
337     ctx->rs_works[i] = big_alloc(sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits));
338
339   // Prepare work structures for all remaining small bits which will be sorted later.
340   clist_init(&ctx->rs_bits);
341   ctx->eltpool = ep_new(sizeof(struct rs_work), 1000);
342
343   // Do the big splitting
344   // FIXME: Set the swap bit carefully.
345   rs_radix(ctx, ctx->array, ctx->buffer, ctx->num_elts, ctx->hash_bits, 0);
346   for (uns i=0; i<sorter_threads; i++)
347     big_free(ctx->rs_works[i], sizeof(struct rs_work) + sizeof(uns) * (1 << ctx->radix_bits));
348
349   // Finish the small blocks
350   struct rs_work *w, *tmp;
351   CLIST_WALK_DELSAFE(w, ctx->rs_bits, tmp)
352     work_submit(&q, &w->w);
353   while (work_wait(&q))
354     ;
355
356   ep_delete(ctx->eltpool);
357   work_queue_cleanup(&q);
358   asort_stop_threads();
359 }
360
361 #else
362
363 void asort_start_threads(uns run UNUSED) { }
364 void asort_stop_threads(void) { }
365
366 #endif
367
368 void
369 asort_run(struct asort_context *ctx)
370 {
371   SORT_XTRACE(10, "Array-sorting %d items per %d bytes, hash_bits=%d", ctx->num_elts, ctx->elt_size, ctx->hash_bits);
372   uns allow_threads UNUSED = (sorter_threads > 1 &&
373                               ctx->num_elts * ctx->elt_size >= sorter_thread_threshold &&
374                               !(sorter_debug & SORT_DEBUG_ASORT_NO_THREADS));
375
376   if (ctx->num_elts < ASORT_MIN_RADIX ||
377       ctx->hash_bits <= ASORT_MIN_SHIFT ||
378       !ctx->radix_split ||
379       (sorter_debug & SORT_DEBUG_ASORT_NO_RADIX))
380     {
381 #ifdef CONFIG_UCW_THREADS
382       if (allow_threads)
383         {
384           SORT_XTRACE(12, "Decided to use parallel quicksort");
385           threaded_quicksort(ctx);
386           return;
387         }
388 #endif
389       SORT_XTRACE(12, "Decided to use sequential quicksort");
390       ctx->quicksort(ctx->array, ctx->num_elts);
391     }
392   else
393     {
394 #ifdef CONFIG_UCW_THREADS
395       if (allow_threads)
396         {
397           SORT_XTRACE(12, "Decided to use parallel radix-sort");
398           threaded_radixsort(ctx);
399           return;
400         }
401 #endif
402       SORT_XTRACE(12, "Decided to use sequential radix-sort");
403       // FIXME: select dest buffer
404       asort_radix(ctx, ctx->array, ctx->buffer, ctx->num_elts, ctx->hash_bits, 0);
405     }
406
407   SORT_XTRACE(11, "Array-sort finished");
408 }