*/
#include "lib/lib.h"
+#include "lib/threads.h"
#include "lib/workqueue.h"
+#include "lib/heap.h"
static void *
worker_thread_init(void *arg)
pthread_attr_t attr;
if (pthread_attr_init(&attr) < 0 ||
- pthread_attr_setstacksize(&attr, p->stack_size) < 0)
+ pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
ASSERT(0);
for (uns i=0; i < p->num_threads; i++)
raw_queue_init(struct raw_queue *q)
{
pthread_mutex_init(&q->queue_mutex, NULL);
- clist_init(&q->queue);
+ clist_init(&q->pri0_queue);
q->queue_sem = sem_alloc();
+ q->pri_heap = NULL;
+ q->heap_cnt = q->heap_max = 0;
}
void
raw_queue_cleanup(struct raw_queue *q)
{
- ASSERT(clist_empty(&q->queue));
+ ASSERT(clist_empty(&q->pri0_queue));
+ ASSERT(!q->heap_cnt);
+ xfree(q->pri_heap);
sem_free(q->queue_sem);
pthread_mutex_destroy(&q->queue_mutex);
}
+#define PRI_LESS(x,y) ((x)->priority > (y)->priority)
+
void
raw_queue_put(struct raw_queue *q, struct work *w)
{
pthread_mutex_lock(&q->queue_mutex);
- clist_add_tail(&q->queue, &w->n);
+ if (!w->priority)
+ clist_add_tail(&q->pri0_queue, &w->n);
+ else
+ {
+ if (unlikely(q->heap_cnt >= q->heap_max))
+ {
+ struct work **old_heap = q->pri_heap;
+ q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
+ q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
+ }
+ struct work **heap = q->pri_heap;
+ heap[++q->heap_cnt] = w;
+ HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+ }
pthread_mutex_unlock(&q->queue_mutex);
sem_post(q->queue_sem);
}
-struct work *
-raw_queue_get(struct raw_queue *q)
+static inline struct work *
+raw_queue_do_get(struct raw_queue *q)
{
- sem_wait(q->queue_sem);
pthread_mutex_lock(&q->queue_mutex);
- struct work *w = clist_head(&q->queue);
- ASSERT(w);
- clist_remove(&w->n);
+ struct work *w;
+ if (!q->heap_cnt)
+ {
+ w = clist_head(&q->pri0_queue);
+ ASSERT(w);
+ clist_remove(&w->n);
+ }
+ else
+ {
+ struct work **heap = q->pri_heap;
+ w = heap[1];
+ HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+ }
pthread_mutex_unlock(&q->queue_mutex);
return w;
}
+struct work *
+raw_queue_get(struct raw_queue *q)
+{
+ sem_wait(q->queue_sem);
+ return raw_queue_do_get(q);
+}
+
+struct work *
+raw_queue_try_get(struct raw_queue *q)
+{
+ if (!sem_trywait(q->queue_sem))
+ return raw_queue_do_get(q);
+ else
+ return NULL;
+}
+
void
work_queue_init(struct worker_pool *p, struct work_queue *q)
{
q->nr_running++;
}
+static struct work *
+work_do_wait(struct work_queue *q, int try)
+{
+ if (!q->nr_running)
+ return NULL;
+ struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
+ if (!w)
+ return NULL;
+ q->nr_running--;
+ return w;
+}
+
struct work *
work_wait(struct work_queue *q)
{
- while (q->nr_running)
- {
- struct work *w = raw_queue_get(&q->finished);
- q->nr_running--;
- if (w->returned)
- w->returned(q, w);
- else
- return w;
- }
- return NULL;
+ return work_do_wait(q, 0);
+}
+
+struct work *
+work_try_wait(struct work_queue *q)
+{
+ return work_do_wait(q, 1);
}
#ifdef TEST
static void go(struct worker_thread *t, struct work *w)
{
- log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
+ log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
usleep(1);
}
{
struct w *w = xmalloc_zero(sizeof(*w));
w->w.go = go;
+ w->w.priority = (i < 250 ? i : 0);
w->id = i;
work_submit(&q, &w->w);
+ log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
}
struct w *w;