#include "lib/lib.h"
#include "lib/threads.h"
#include "lib/workqueue.h"
+#include "lib/heap.h"
static void *
worker_thread_init(void *arg)
raw_queue_init(struct raw_queue *q)
{
pthread_mutex_init(&q->queue_mutex, NULL);
- clist_init(&q->queue);
+ clist_init(&q->pri0_queue);
q->queue_sem = sem_alloc();
+ q->pri_heap = NULL;
+ q->heap_cnt = q->heap_max = 0;
}
void
raw_queue_cleanup(struct raw_queue *q)
{
- ASSERT(clist_empty(&q->queue));
+ ASSERT(clist_empty(&q->pri0_queue));
+ ASSERT(!q->heap_cnt);
+ xfree(q->pri_heap);
sem_free(q->queue_sem);
pthread_mutex_destroy(&q->queue_mutex);
}
+#define PRI_LESS(x,y) ((x)->priority > (y)->priority)
+
void
raw_queue_put(struct raw_queue *q, struct work *w)
{
pthread_mutex_lock(&q->queue_mutex);
- clist_add_tail(&q->queue, &w->n);
+ if (!w->priority)
+ clist_add_tail(&q->pri0_queue, &w->n);
+ else
+ {
+ if (unlikely(q->heap_cnt >= q->heap_max))
+ {
+ struct work **old_heap = q->pri_heap;
+ q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
+ q->pri_heap = xrealloc(old_heap, q->heap_max * sizeof(struct work *));
+ }
+ struct work **heap = q->pri_heap;
+ heap[++q->heap_cnt] = w;
+ HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+ }
pthread_mutex_unlock(&q->queue_mutex);
sem_post(q->queue_sem);
}
raw_queue_do_get(struct raw_queue *q)
{
pthread_mutex_lock(&q->queue_mutex);
- struct work *w = clist_head(&q->queue);
- ASSERT(w);
- clist_remove(&w->n);
+ struct work *w;
+ if (!q->heap_cnt)
+ {
+ w = clist_head(&q->pri0_queue);
+ ASSERT(w);
+ clist_remove(&w->n);
+ }
+ else
+ {
+ struct work **heap = q->pri_heap;
+ w = heap[1];
+ HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+ }
pthread_mutex_unlock(&q->queue_mutex);
return w;
}
static void go(struct worker_thread *t, struct work *w)
{
- log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
+ log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
usleep(1);
}
{
struct w *w = xmalloc_zero(sizeof(*w));
w->w.go = go;
+ w->w.priority = (i < 250 ? i : 0);
w->id = i;
work_submit(&q, &w->w);
+ log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
}
struct w *w;
* thread pool, it remembers running requests and gathers replies. A single work queue
* should not be used by multiple threads simultaneously.
*
+ * Requests can have priorities. Requests with the highest priority are served first.
+ * Requests of priority 0 are guaranteed to be served on first-come-first-served
+ * basis, requests of higher priorities are unordered.
+ *
* When a thread pool is initialized, new_thread() is called for every thread first,
* allocating struct worker_thread (and user-defined thread context following it) for
* each thread. Then the threads are fired and each of them executes the init_thread()
struct raw_queue { // Generic queue with locking
pthread_mutex_t queue_mutex;
- clist queue;
+ clist pri0_queue; // Ordinary queue for requests with priority=0
+ struct work **pri_heap; // A heap for request with priority>0
+ uns heap_cnt, heap_max;
sem_t *queue_sem; // Number of requests queued
};
struct work { // A single request
cnode n;
+ uns priority;
struct work_queue *reply_to; // Where to queue the request when it's finished
void (*go)(struct worker_thread *t, struct work *w); // Called inside the worker thread
void (*returned)(struct work_queue *q, struct work *w); // Called when returned back, NULL if work_wait should return