From fe41a1f4c5b1313cbbf900a00c194aafcce7575b Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Fri, 15 Dec 2006 14:24:08 +0100 Subject: [PATCH] Implemented priority queues. --- lib/workqueue.c | 46 +++++++++++++++++++++++++++++++++++++++------- lib/workqueue.h | 9 ++++++++- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/lib/workqueue.c b/lib/workqueue.c index d26ff186..325cedf7 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -10,6 +10,7 @@ #include "lib/lib.h" #include "lib/threads.h" #include "lib/workqueue.h" +#include "lib/heap.h" static void * worker_thread_init(void *arg) @@ -97,23 +98,42 @@ void raw_queue_init(struct raw_queue *q) { pthread_mutex_init(&q->queue_mutex, NULL); - clist_init(&q->queue); + clist_init(&q->pri0_queue); q->queue_sem = sem_alloc(); + q->pri_heap = NULL; + q->heap_cnt = q->heap_max = 0; } void raw_queue_cleanup(struct raw_queue *q) { - ASSERT(clist_empty(&q->queue)); + ASSERT(clist_empty(&q->pri0_queue)); + ASSERT(!q->heap_cnt); + xfree(q->pri_heap); sem_free(q->queue_sem); pthread_mutex_destroy(&q->queue_mutex); } +#define PRI_LESS(x,y) ((x)->priority > (y)->priority) + void raw_queue_put(struct raw_queue *q, struct work *w) { pthread_mutex_lock(&q->queue_mutex); - clist_add_tail(&q->queue, &w->n); + if (!w->priority) + clist_add_tail(&q->pri0_queue, &w->n); + else + { + if (unlikely(q->heap_cnt >= q->heap_max)) + { + struct work **old_heap = q->pri_heap; + q->heap_max = (q->heap_max ? 2*q->heap_max : 16); + q->pri_heap = xrealloc(old_heap, q->heap_max * sizeof(struct work *)); + } + struct work **heap = q->pri_heap; + heap[++q->heap_cnt] = w; + HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP); + } pthread_mutex_unlock(&q->queue_mutex); sem_post(q->queue_sem); } @@ -122,9 +142,19 @@ static inline struct work * raw_queue_do_get(struct raw_queue *q) { pthread_mutex_lock(&q->queue_mutex); - struct work *w = clist_head(&q->queue); - ASSERT(w); - clist_remove(&w->n); + struct work *w; + if (!q->heap_cnt) + { + w = clist_head(&q->pri0_queue); + ASSERT(w); + clist_remove(&w->n); + } + else + { + struct work **heap = q->pri_heap; + w = heap[1]; + HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP); + } pthread_mutex_unlock(&q->queue_mutex); return w; } @@ -214,7 +244,7 @@ struct w { static void go(struct worker_thread *t, struct work *w) { - log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id); + log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority); usleep(1); } @@ -234,8 +264,10 @@ int main(void) { struct w *w = xmalloc_zero(sizeof(*w)); w->w.go = go; + w->w.priority = (i < 250 ? i : 0); w->id = i; work_submit(&q, &w->w); + log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority); } struct w *w; diff --git a/lib/workqueue.h b/lib/workqueue.h index e8c762b2..b16a9947 100644 --- a/lib/workqueue.h +++ b/lib/workqueue.h @@ -18,6 +18,10 @@ * thread pool, it remembers running requests and gathers replies. A single work queue * should not be used by multiple threads simultaneously. * + * Requests can have priorities. Requests with the highest priority are served first. + * Requests of priority 0 are guaranteed to be served on first-come-first-served + * basis, requests of higher priorities are unordered. + * * When a thread pool is initialized, new_thread() is called for every thread first, * allocating struct worker_thread (and user-defined thread context following it) for * each thread. Then the threads are fired and each of them executes the init_thread() @@ -40,7 +44,9 @@ struct worker_thread { // One of threads serving requests struct raw_queue { // Generic queue with locking pthread_mutex_t queue_mutex; - clist queue; + clist pri0_queue; // Ordinary queue for requests with priority=0 + struct work **pri_heap; // A heap for request with priority>0 + uns heap_cnt, heap_max; sem_t *queue_sem; // Number of requests queued }; @@ -64,6 +70,7 @@ struct work_queue { struct work { // A single request cnode n; + uns priority; struct work_queue *reply_to; // Where to queue the request when it's finished void (*go)(struct worker_thread *t, struct work *w); // Called inside the worker thread }; -- 2.39.2