#include "lib/lib.h"
#include "lib/threads.h"
#include "lib/workqueue.h"
+#include "lib/heap.h"
static void *
worker_thread_init(void *arg)
raw_queue_init(struct raw_queue *q)
{
pthread_mutex_init(&q->queue_mutex, NULL);
- clist_init(&q->queue);
+ clist_init(&q->pri0_queue);
q->queue_sem = sem_alloc();
+ q->pri_heap = NULL;
+ q->heap_cnt = q->heap_max = 0;
}
void
raw_queue_cleanup(struct raw_queue *q)
{
- ASSERT(clist_empty(&q->queue));
+ ASSERT(clist_empty(&q->pri0_queue));
+ ASSERT(!q->heap_cnt);
+ xfree(q->pri_heap);
sem_free(q->queue_sem);
pthread_mutex_destroy(&q->queue_mutex);
}
+#define PRI_LESS(x,y) ((x)->priority > (y)->priority)
+
void
raw_queue_put(struct raw_queue *q, struct work *w)
{
pthread_mutex_lock(&q->queue_mutex);
- clist_add_tail(&q->queue, &w->n);
+ if (!w->priority)
+ clist_add_tail(&q->pri0_queue, &w->n);
+ else
+ {
+ if (unlikely(q->heap_cnt >= q->heap_max))
+ {
+ struct work **old_heap = q->pri_heap;
+ q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
+ q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
+ }
+ struct work **heap = q->pri_heap;
+ heap[++q->heap_cnt] = w;
+ HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+ }
pthread_mutex_unlock(&q->queue_mutex);
sem_post(q->queue_sem);
}
raw_queue_do_get(struct raw_queue *q)
{
pthread_mutex_lock(&q->queue_mutex);
- struct work *w = clist_head(&q->queue);
- ASSERT(w);
- clist_remove(&w->n);
+ struct work *w;
+ if (!q->heap_cnt)
+ {
+ w = clist_head(&q->pri0_queue);
+ ASSERT(w);
+ clist_remove(&w->n);
+ }
+ else
+ {
+ struct work **heap = q->pri_heap;
+ w = heap[1];
+ HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+ }
pthread_mutex_unlock(&q->queue_mutex);
return w;
}
static struct work *
work_do_wait(struct work_queue *q, int try)
{
- while (q->nr_running)
- {
- struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
- if (!w)
- return NULL;
- q->nr_running--;
- if (w->returned)
- w->returned(q, w);
- else
- return w;
- }
- return NULL;
+ if (!q->nr_running)
+ return NULL;
+ struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
+ if (!w)
+ return NULL;
+ q->nr_running--;
+ return w;
}
struct work *
static void go(struct worker_thread *t, struct work *w)
{
- log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
+ log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
usleep(1);
}
{
struct w *w = xmalloc_zero(sizeof(*w));
w->w.go = go;
+ w->w.priority = (i < 250 ? i : 0);
w->id = i;
work_submit(&q, &w->w);
+ log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
}
struct w *w;