X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fworkqueue.c;h=8833e46f9d7a5ebcc29f5c4cc32e8e64d8a44b09;hb=88714d18176f047eb4d298bb3f22520217671513;hp=a5ea7b8dca1eea58075163a99e713b197c6b5c45;hpb=859769b129993a67598d61bb36d72d268082276b;p=libucw.git diff --git a/lib/workqueue.c b/lib/workqueue.c index a5ea7b8d..8833e46f 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -8,7 +8,9 @@ */ #include "lib/lib.h" +#include "lib/threads.h" #include "lib/workqueue.h" +#include "lib/heap.h" static void * worker_thread_init(void *arg) @@ -48,7 +50,7 @@ worker_pool_init(struct worker_pool *p) pthread_attr_t attr; if (pthread_attr_init(&attr) < 0 || - pthread_attr_setstacksize(&attr, p->stack_size) < 0) + pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0) ASSERT(0); for (uns i=0; i < p->num_threads; i++) @@ -96,39 +98,83 @@ void raw_queue_init(struct raw_queue *q) { pthread_mutex_init(&q->queue_mutex, NULL); - clist_init(&q->queue); + clist_init(&q->pri0_queue); q->queue_sem = sem_alloc(); + q->pri_heap = NULL; + q->heap_cnt = q->heap_max = 0; } void raw_queue_cleanup(struct raw_queue *q) { - ASSERT(clist_empty(&q->queue)); + ASSERT(clist_empty(&q->pri0_queue)); + ASSERT(!q->heap_cnt); + xfree(q->pri_heap); sem_free(q->queue_sem); pthread_mutex_destroy(&q->queue_mutex); } +#define PRI_LESS(x,y) ((x)->priority > (y)->priority) + void raw_queue_put(struct raw_queue *q, struct work *w) { pthread_mutex_lock(&q->queue_mutex); - clist_add_tail(&q->queue, &w->n); + if (!w->priority) + clist_add_tail(&q->pri0_queue, &w->n); + else + { + if (unlikely(q->heap_cnt >= q->heap_max)) + { + struct work **old_heap = q->pri_heap; + q->heap_max = (q->heap_max ? 2*q->heap_max : 16); + q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *)); + } + struct work **heap = q->pri_heap; + heap[++q->heap_cnt] = w; + HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP); + } pthread_mutex_unlock(&q->queue_mutex); sem_post(q->queue_sem); } -struct work * -raw_queue_get(struct raw_queue *q) +static inline struct work * +raw_queue_do_get(struct raw_queue *q) { - sem_wait(q->queue_sem); pthread_mutex_lock(&q->queue_mutex); - struct work *w = clist_head(&q->queue); - ASSERT(w); - clist_remove(&w->n); + struct work *w; + if (!q->heap_cnt) + { + w = clist_head(&q->pri0_queue); + ASSERT(w); + clist_remove(&w->n); + } + else + { + struct work **heap = q->pri_heap; + w = heap[1]; + HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP); + } pthread_mutex_unlock(&q->queue_mutex); return w; } +struct work * +raw_queue_get(struct raw_queue *q) +{ + sem_wait(q->queue_sem); + return raw_queue_do_get(q); +} + +struct work * +raw_queue_try_get(struct raw_queue *q) +{ + if (!sem_trywait(q->queue_sem)) + return raw_queue_do_get(q); + else + return NULL; +} + void work_queue_init(struct worker_pool *p, struct work_queue *q) { @@ -153,19 +199,28 @@ work_submit(struct work_queue *q, struct work *w) q->nr_running++; } +static struct work * +work_do_wait(struct work_queue *q, int try) +{ + if (!q->nr_running) + return NULL; + struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished); + if (!w) + return NULL; + q->nr_running--; + return w; +} + struct work * work_wait(struct work_queue *q) { - while (q->nr_running) - { - struct work *w = raw_queue_get(&q->finished); - q->nr_running--; - if (w->returned) - w->returned(q, w); - else - return w; - } - return NULL; + return work_do_wait(q, 0); +} + +struct work * +work_try_wait(struct work_queue *q) +{ + return work_do_wait(q, 1); } #ifdef TEST @@ -189,7 +244,7 @@ struct w { static void go(struct worker_thread *t, struct work *w) { - log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id); + log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority); usleep(1); } @@ -209,8 +264,10 @@ int main(void) { struct w *w = xmalloc_zero(sizeof(*w)); w->w.go = go; + w->w.priority = (i < 250 ? i : 0); w->id = i; work_submit(&q, &w->w); + log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority); } struct w *w;