]> mj.ucw.cz Git - libucw.git/commitdiff
Implemented priority queues.
authorMartin Mares <mj@ucw.cz>
Fri, 15 Dec 2006 13:27:21 +0000 (14:27 +0100)
committerMartin Mares <mj@ucw.cz>
Fri, 15 Dec 2006 13:27:21 +0000 (14:27 +0100)
lib/workqueue.c
lib/workqueue.h

index 3d87a74e191672c8e9a118e89a39e34ed948a648..1e2bcb1c2cbfd4fcbefcfa09bc7add95b6b47485 100644 (file)
@@ -10,6 +10,7 @@
 #include "lib/lib.h"
 #include "lib/threads.h"
 #include "lib/workqueue.h"
+#include "lib/heap.h"
 
 static void *
 worker_thread_init(void *arg)
@@ -97,23 +98,42 @@ void
 raw_queue_init(struct raw_queue *q)
 {
   pthread_mutex_init(&q->queue_mutex, NULL);
-  clist_init(&q->queue);
+  clist_init(&q->pri0_queue);
   q->queue_sem = sem_alloc();
+  q->pri_heap = NULL;
+  q->heap_cnt = q->heap_max = 0;
 }
 
 void
 raw_queue_cleanup(struct raw_queue *q)
 {
-  ASSERT(clist_empty(&q->queue));
+  ASSERT(clist_empty(&q->pri0_queue));
+  ASSERT(!q->heap_cnt);
+  xfree(q->pri_heap);
   sem_free(q->queue_sem);
   pthread_mutex_destroy(&q->queue_mutex);
 }
 
+#define PRI_LESS(x,y) ((x)->priority > (y)->priority)
+
 void
 raw_queue_put(struct raw_queue *q, struct work *w)
 {
   pthread_mutex_lock(&q->queue_mutex);
-  clist_add_tail(&q->queue, &w->n);
+  if (!w->priority)
+    clist_add_tail(&q->pri0_queue, &w->n);
+  else
+    {
+      if (unlikely(q->heap_cnt >= q->heap_max))
+       {
+         struct work **old_heap = q->pri_heap;
+         q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
+         q->pri_heap = xrealloc(old_heap, q->heap_max * sizeof(struct work *));
+       }
+      struct work **heap = q->pri_heap;
+      heap[++q->heap_cnt] = w;
+      HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+    }
   pthread_mutex_unlock(&q->queue_mutex);
   sem_post(q->queue_sem);
 }
@@ -122,9 +142,19 @@ static inline struct work *
 raw_queue_do_get(struct raw_queue *q)
 {
   pthread_mutex_lock(&q->queue_mutex);
-  struct work *w = clist_head(&q->queue);
-  ASSERT(w);
-  clist_remove(&w->n);
+  struct work *w;
+  if (!q->heap_cnt)
+    {
+      w = clist_head(&q->pri0_queue);
+      ASSERT(w);
+      clist_remove(&w->n);
+    }
+  else
+    {
+      struct work **heap = q->pri_heap;
+      w = heap[1];
+      HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+    }
   pthread_mutex_unlock(&q->queue_mutex);
   return w;
 }
@@ -219,7 +249,7 @@ struct w {
 
 static void go(struct worker_thread *t, struct work *w)
 {
-  log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
+  log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
   usleep(1);
 }
 
@@ -239,8 +269,10 @@ int main(void)
     {
       struct w *w = xmalloc_zero(sizeof(*w));
       w->w.go = go;
+      w->w.priority = (i < 250 ? i : 0);
       w->id = i;
       work_submit(&q, &w->w);
+      log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
     }
 
   struct w *w;
index c16904abc2c694b0d53cb56c408363befc5bbd23..f23198c1d3b2efa386bcd98345a2f4fe1ebdb9c6 100644 (file)
  *  thread pool, it remembers running requests and gathers replies. A single work queue
  *  should not be used by multiple threads simultaneously.
  *
+ *  Requests can have priorities. Requests with the highest priority are served first.
+ *  Requests of priority 0 are guaranteed to be served on first-come-first-served
+ *  basis, requests of higher priorities are unordered.
+ *
  *  When a thread pool is initialized, new_thread() is called for every thread first,
  *  allocating struct worker_thread (and user-defined thread context following it) for
  *  each thread. Then the threads are fired and each of them executes the init_thread()
@@ -40,7 +44,9 @@ struct worker_thread {                                // One of threads serving requests
 
 struct raw_queue {                             // Generic queue with locking
   pthread_mutex_t queue_mutex;
-  clist queue;
+  clist pri0_queue;                            // Ordinary queue for requests with priority=0
+  struct work **pri_heap;                      // A heap for request with priority>0
+  uns heap_cnt, heap_max;
   sem_t *queue_sem;                            // Number of requests queued
 };
 
@@ -64,6 +70,7 @@ struct work_queue {
 
 struct work {                                  // A single request
   cnode n;
+  uns priority;
   struct work_queue *reply_to;                 // Where to queue the request when it's finished
   void (*go)(struct worker_thread *t, struct work *w);         // Called inside the worker thread
   void (*returned)(struct work_queue *q, struct work *w);      // Called when returned back, NULL if work_wait should return