]> mj.ucw.cz Git - libucw.git/blobdiff - ucw/workqueue.c
Merge branch 'master' into dev-sizet
[libucw.git] / ucw / workqueue.c
index 314177b8757ca45445894742a24a4b4355d0a490..b01f258c1caf1948b1b2ca6adf5e59ddf79adeae 100644 (file)
@@ -7,10 +7,10 @@
  *     of the GNU Lesser General Public License.
  */
 
-#include "ucw/lib.h"
-#include "ucw/threads.h"
-#include "ucw/workqueue.h"
-#include "ucw/heap.h"
+#include <ucw/lib.h>
+#include <ucw/threads.h>
+#include <ucw/workqueue.h>
+#include <ucw/heap.h>
 
 static void *
 worker_thread_init(void *arg)
@@ -50,10 +50,10 @@ worker_pool_init(struct worker_pool *p)
 
   pthread_attr_t attr;
   if (pthread_attr_init(&attr) < 0 ||
-      pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
+      pthread_attr_setstacksize(&attr, p->stack_size ? : ucwlib_thread_stack_size) < 0)
     ASSERT(0);
 
-  for (uns i=0; i < p->num_threads; i++)
+  for (uint i=0; i < p->num_threads; i++)
     {
       struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
       t->pool = p;
@@ -71,7 +71,7 @@ worker_pool_init(struct worker_pool *p)
 void
 worker_pool_cleanup(struct worker_pool *p)
 {
-  for (uns i=0; i < p->num_threads; i++)
+  for (uint i=0; i < p->num_threads; i++)
     {
       struct work w = {
        .go = worker_thread_signal_finish
@@ -131,8 +131,7 @@ raw_queue_put(struct raw_queue *q, struct work *w)
          q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
        }
       struct work **heap = q->pri_heap;
-      heap[++q->heap_cnt] = w;
-      HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+      HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP, w);
     }
   pthread_mutex_unlock(&q->queue_mutex);
   sem_post(q->queue_sem);
@@ -153,7 +152,7 @@ raw_queue_do_get(struct raw_queue *q)
     {
       struct work **heap = q->pri_heap;
       w = heap[1];
-      HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
+      HEAP_DELETE_MIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
     }
   pthread_mutex_unlock(&q->queue_mutex);
   return w;
@@ -229,22 +228,22 @@ work_try_wait(struct work_queue *q)
 
 static void wt_init(struct worker_thread *t)
 {
-  log(L_INFO, "INIT %d", t->id);
+  msg(L_INFO, "INIT %d", t->id);
 }
 
 static void wt_cleanup(struct worker_thread *t)
 {
-  log(L_INFO, "CLEANUP %d", t->id);
+  msg(L_INFO, "CLEANUP %d", t->id);
 }
 
 struct w {
   struct work w;
-  uns id;
+  uint id;
 };
 
 static void go(struct worker_thread *t, struct work *w)
 {
-  log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
+  msg(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
   usleep(1);
 }
 
@@ -260,19 +259,19 @@ int main(void)
 
   struct work_queue q;
   work_queue_init(&pool, &q);
-  for (uns i=0; i<500; i++)
+  for (uint i=0; i<500; i++)
     {
       struct w *w = xmalloc_zero(sizeof(*w));
       w->w.go = go;
       w->w.priority = (i < 250 ? i : 0);
       w->id = i;
       work_submit(&q, &w->w);
-      log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
+      msg(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
     }
 
   struct w *w;
   while (w = (struct w *) work_wait(&q))
-    log(L_INFO, "Finished request %d", w->id);
+    msg(L_INFO, "Finished request %d", w->id);
 
   work_queue_cleanup(&q);
   worker_pool_cleanup(&pool);