]> mj.ucw.cz Git - libucw.git/blobdiff - lib/workqueue.c
Added the local copy of the regex library back.
[libucw.git] / lib / workqueue.c
index 3d87a74e191672c8e9a118e89a39e34ed948a648..8833e46f9d7a5ebcc29f5c4cc32e8e64d8a44b09 100644 (file)
@@ -10,6 +10,7 @@
 #include "lib/lib.h"
 #include "lib/threads.h"
 #include "lib/workqueue.h"
+#include "lib/heap.h"
 
 static void *
 worker_thread_init(void *arg)
@@ -97,23 +98,42 @@ void
 raw_queue_init(struct raw_queue *q)
 {
   pthread_mutex_init(&q->queue_mutex, NULL);
-  clist_init(&q->queue);
+  clist_init(&q->pri0_queue);
   q->queue_sem = sem_alloc();
+  q->pri_heap = NULL;
+  q->heap_cnt = q->heap_max = 0;
 }
 
 void
 raw_queue_cleanup(struct raw_queue *q)
 {
-  ASSERT(clist_empty(&q->queue));
+  ASSERT(clist_empty(&q->pri0_queue));
+  ASSERT(!q->heap_cnt);
+  xfree(q->pri_heap);
   sem_free(q->queue_sem);
   pthread_mutex_destroy(&q->queue_mutex);
 }
 
+#define PRI_LESS(x,y) ((x)->priority > (y)->priority)
+
 void
 raw_queue_put(struct raw_queue *q, struct work *w)
 {
   pthread_mutex_lock(&q->queue_mutex);
-  clist_add_tail(&q->queue, &w->n);
+  if (!w->priority)
+    clist_add_tail(&q->pri0_queue, &w->n);
+  else
+    {
+      if (unlikely(q->heap_cnt >= q->heap_max))
+       {
+         struct work **old_heap = q->pri_heap;
+         q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
+         q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
+       }
+      struct work **heap = q->pri_heap;
+      heap[++q->heap_cnt] = w;
+      HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+    }
   pthread_mutex_unlock(&q->queue_mutex);
   sem_post(q->queue_sem);
 }
@@ -122,9 +142,19 @@ static inline struct work *
 raw_queue_do_get(struct raw_queue *q)
 {
   pthread_mutex_lock(&q->queue_mutex);
-  struct work *w = clist_head(&q->queue);
-  ASSERT(w);
-  clist_remove(&w->n);
+  struct work *w;
+  if (!q->heap_cnt)
+    {
+      w = clist_head(&q->pri0_queue);
+      ASSERT(w);
+      clist_remove(&w->n);
+    }
+  else
+    {
+      struct work **heap = q->pri_heap;
+      w = heap[1];
+      HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+    }
   pthread_mutex_unlock(&q->queue_mutex);
   return w;
 }
@@ -172,18 +202,13 @@ work_submit(struct work_queue *q, struct work *w)
 static struct work *
 work_do_wait(struct work_queue *q, int try)
 {
-  while (q->nr_running)
-    {
-      struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
-      if (!w)
-       return NULL;
-      q->nr_running--;
-      if (w->returned)
-       w->returned(q, w);
-      else
-       return w;
-    }
-  return NULL;
+  if (!q->nr_running)
+    return NULL;
+  struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
+  if (!w)
+    return NULL;
+  q->nr_running--;
+  return w;
 }
 
 struct work *
@@ -219,7 +244,7 @@ struct w {
 
 static void go(struct worker_thread *t, struct work *w)
 {
-  log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
+  log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
   usleep(1);
 }
 
@@ -239,8 +264,10 @@ int main(void)
     {
       struct w *w = xmalloc_zero(sizeof(*w));
       w->w.go = go;
+      w->w.priority = (i < 250 ? i : 0);
       w->id = i;
       work_submit(&q, &w->w);
+      log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
     }
 
   struct w *w;