X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;ds=sidebyside;f=lib%2Fworkqueue.c;h=8833e46f9d7a5ebcc29f5c4cc32e8e64d8a44b09;hb=88714d18176f047eb4d298bb3f22520217671513;hp=3d87a74e191672c8e9a118e89a39e34ed948a648;hpb=e6b2a1b6b54dd9bf49f47431769225028dc8aa8d;p=libucw.git diff --git a/lib/workqueue.c b/lib/workqueue.c index 3d87a74e..8833e46f 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -10,6 +10,7 @@ #include "lib/lib.h" #include "lib/threads.h" #include "lib/workqueue.h" +#include "lib/heap.h" static void * worker_thread_init(void *arg) @@ -97,23 +98,42 @@ void raw_queue_init(struct raw_queue *q) { pthread_mutex_init(&q->queue_mutex, NULL); - clist_init(&q->queue); + clist_init(&q->pri0_queue); q->queue_sem = sem_alloc(); + q->pri_heap = NULL; + q->heap_cnt = q->heap_max = 0; } void raw_queue_cleanup(struct raw_queue *q) { - ASSERT(clist_empty(&q->queue)); + ASSERT(clist_empty(&q->pri0_queue)); + ASSERT(!q->heap_cnt); + xfree(q->pri_heap); sem_free(q->queue_sem); pthread_mutex_destroy(&q->queue_mutex); } +#define PRI_LESS(x,y) ((x)->priority > (y)->priority) + void raw_queue_put(struct raw_queue *q, struct work *w) { pthread_mutex_lock(&q->queue_mutex); - clist_add_tail(&q->queue, &w->n); + if (!w->priority) + clist_add_tail(&q->pri0_queue, &w->n); + else + { + if (unlikely(q->heap_cnt >= q->heap_max)) + { + struct work **old_heap = q->pri_heap; + q->heap_max = (q->heap_max ? 2*q->heap_max : 16); + q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *)); + } + struct work **heap = q->pri_heap; + heap[++q->heap_cnt] = w; + HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP); + } pthread_mutex_unlock(&q->queue_mutex); sem_post(q->queue_sem); } @@ -122,9 +142,19 @@ static inline struct work * raw_queue_do_get(struct raw_queue *q) { pthread_mutex_lock(&q->queue_mutex); - struct work *w = clist_head(&q->queue); - ASSERT(w); - clist_remove(&w->n); + struct work *w; + if (!q->heap_cnt) + { + w = clist_head(&q->pri0_queue); + ASSERT(w); + clist_remove(&w->n); + } + else + { + struct work **heap = q->pri_heap; + w = heap[1]; + HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP); + } pthread_mutex_unlock(&q->queue_mutex); return w; } @@ -172,18 +202,13 @@ work_submit(struct work_queue *q, struct work *w) static struct work * work_do_wait(struct work_queue *q, int try) { - while (q->nr_running) - { - struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished); - if (!w) - return NULL; - q->nr_running--; - if (w->returned) - w->returned(q, w); - else - return w; - } - return NULL; + if (!q->nr_running) + return NULL; + struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished); + if (!w) + return NULL; + q->nr_running--; + return w; } struct work * @@ -219,7 +244,7 @@ struct w { static void go(struct worker_thread *t, struct work *w) { - log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id); + log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority); usleep(1); } @@ -239,8 +264,10 @@ int main(void) { struct w *w = xmalloc_zero(sizeof(*w)); w->w.go = go; + w->w.priority = (i < 250 ? i : 0); w->id = i; work_submit(&q, &w->w); + log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority); } struct w *w;