sem_post(q->queue_sem);
}
-struct work *
-raw_queue_get(struct raw_queue *q)
+static inline struct work *
+raw_queue_do_get(struct raw_queue *q)
{
- sem_wait(q->queue_sem);
pthread_mutex_lock(&q->queue_mutex);
struct work *w = clist_head(&q->queue);
ASSERT(w);
return w;
}
+struct work *
+raw_queue_get(struct raw_queue *q)
+{
+ sem_wait(q->queue_sem);
+ return raw_queue_do_get(q);
+}
+
+struct work *
+raw_queue_try_get(struct raw_queue *q)
+{
+ if (!sem_trywait(q->queue_sem))
+ return raw_queue_do_get(q);
+ else
+ return NULL;
+}
+
void
work_queue_init(struct worker_pool *p, struct work_queue *q)
{
q->nr_running++;
}
-struct work *
-work_wait(struct work_queue *q)
+static struct work *
+work_do_wait(struct work_queue *q, int try)
{
while (q->nr_running)
{
- struct work *w = raw_queue_get(&q->finished);
+ struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
+ if (!w)
+ return NULL;
q->nr_running--;
if (w->returned)
w->returned(q, w);
return NULL;
}
+struct work *
+work_wait(struct work_queue *q)
+{
+ return work_do_wait(q, 0);
+}
+
+struct work *
+work_try_wait(struct work_queue *q)
+{
+ return work_do_wait(q, 1);
+}
+
#ifdef TEST
#include <unistd.h>
void raw_queue_cleanup(struct raw_queue *q);
void raw_queue_put(struct raw_queue *q, struct work *w);
struct work *raw_queue_get(struct raw_queue *q);
+struct work *raw_queue_try_get(struct raw_queue *q);
void work_queue_init(struct worker_pool *p, struct work_queue *q);
void work_queue_cleanup(struct work_queue *q);
void work_submit(struct work_queue *q, struct work *w);
struct work *work_wait(struct work_queue *q);
+struct work *work_try_wait(struct work_queue *q);
#endif /* !_UCW_WORKQUEUE_H */