]> mj.ucw.cz Git - libucw.git/blobdiff - lib/workqueue.c
Use Threads.DefaultStackSize instead of various local stack sizes.
[libucw.git] / lib / workqueue.c
index a5ea7b8dca1eea58075163a99e713b197c6b5c45..3d87a74e191672c8e9a118e89a39e34ed948a648 100644 (file)
@@ -8,6 +8,7 @@
  */
 
 #include "lib/lib.h"
+#include "lib/threads.h"
 #include "lib/workqueue.h"
 
 static void *
@@ -48,7 +49,7 @@ worker_pool_init(struct worker_pool *p)
 
   pthread_attr_t attr;
   if (pthread_attr_init(&attr) < 0 ||
-      pthread_attr_setstacksize(&attr, p->stack_size) < 0)
+      pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
     ASSERT(0);
 
   for (uns i=0; i < p->num_threads; i++)
@@ -117,10 +118,9 @@ raw_queue_put(struct raw_queue *q, struct work *w)
   sem_post(q->queue_sem);
 }
 
-struct work *
-raw_queue_get(struct raw_queue *q)
+static inline struct work *
+raw_queue_do_get(struct raw_queue *q)
 {
-  sem_wait(q->queue_sem);
   pthread_mutex_lock(&q->queue_mutex);
   struct work *w = clist_head(&q->queue);
   ASSERT(w);
@@ -129,6 +129,22 @@ raw_queue_get(struct raw_queue *q)
   return w;
 }
 
+struct work *
+raw_queue_get(struct raw_queue *q)
+{
+  sem_wait(q->queue_sem);
+  return raw_queue_do_get(q);
+}
+
+struct work *
+raw_queue_try_get(struct raw_queue *q)
+{
+  if (!sem_trywait(q->queue_sem))
+    return raw_queue_do_get(q);
+  else
+    return NULL;
+}
+
 void
 work_queue_init(struct worker_pool *p, struct work_queue *q)
 {
@@ -153,12 +169,14 @@ work_submit(struct work_queue *q, struct work *w)
   q->nr_running++;
 }
 
-struct work *
-work_wait(struct work_queue *q)
+static struct work *
+work_do_wait(struct work_queue *q, int try)
 {
   while (q->nr_running)
     {
-      struct work *w = raw_queue_get(&q->finished);
+      struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
+      if (!w)
+       return NULL;
       q->nr_running--;
       if (w->returned)
        w->returned(q, w);
@@ -168,6 +186,18 @@ work_wait(struct work_queue *q)
   return NULL;
 }
 
+struct work *
+work_wait(struct work_queue *q)
+{
+  return work_do_wait(q, 0);
+}
+
+struct work *
+work_try_wait(struct work_queue *q)
+{
+  return work_do_wait(q, 1);
+}
+
 #ifdef TEST
 
 #include <unistd.h>