#include "lib/lib.h"
#include "lib/asio.h"
+#include "lib/threads.h"
#include <string.h>
#include <unistd.h>
static struct worker_pool asio_wpool;
static void
-asio_init(void)
+asio_init_unlocked(void)
{
if (asio_num_users++)
return;
DBG("ASIO: INIT");
asio_wpool.num_threads = 1;
- asio_wpool.stack_size = 65536;
worker_pool_init(&asio_wpool);
}
static void
-asio_cleanup(void)
+asio_cleanup_unlocked(void)
{
if (--asio_num_users)
return;
void
asio_init_queue(struct asio_queue *q)
{
- asio_init();
+ ucwlib_lock();
+ asio_init_unlocked();
+ ucwlib_unlock();
DBG("ASIO: New queue %p", q);
ASSERT(q->buffer_size);
q->allocated_requests = 0;
- q->allocated_writebacks = 0;
q->running_requests = 0;
+ q->running_writebacks = 0;
+ q->use_count = 0;
clist_init(&q->idle_list);
clist_init(&q->done_list);
work_queue_init(&asio_wpool, &q->queue);
{
DBG("ASIO: Removing queue %p", q);
ASSERT(!q->running_requests);
+ ASSERT(!q->running_writebacks);
ASSERT(!q->allocated_requests);
- ASSERT(!q->allocated_writebacks);
ASSERT(clist_empty(&q->done_list));
struct asio_request *r;
}
work_queue_cleanup(&q->queue);
- asio_cleanup();
+
+ ucwlib_lock();
+ asio_cleanup_unlocked();
+ ucwlib_unlock();
}
struct asio_request *
DBG("ASIO: Got %p", r);
}
r->op = ASIO_FREE;
+ r->fd = -1;
+ r->len = 0;
+ r->status = -1;
+ r->returned_errno = -1;
+ r->submitted = 0;
return r;
}
struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
if (!r)
return 0;
+ r->submitted = 0;
q->running_requests--;
if (r->op == ASIO_WRITE_BACK)
{
die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
if (r->status != (int)r->len)
die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
- q->allocated_writebacks--;
+ q->running_writebacks--;
asio_put(r);
}
else
return 1;
}
-struct asio_request *
-asio_get_writeback(struct asio_queue *q)
-{
- while (q->allocated_writebacks >= q->max_writebacks)
- {
- DBG("ASIO: Waiting for free writeback request");
- if (!asio_raw_wait(q))
- ASSERT(0);
- }
- q->allocated_writebacks++;
- struct asio_request *r = asio_get(q);
- r->op = ASIO_WRITE_BACK;
- return r;
-}
-
-void
-asio_turn_to_writeback(struct asio_request *r)
-{
- struct asio_queue *q = r->queue;
- ASSERT(r->op != ASIO_WRITE_BACK);
- while (q->allocated_writebacks >= q->max_writebacks)
- {
- DBG("ASIO: Waiting for free writeback request");
- if (!asio_raw_wait(q))
- ASSERT(0);
- }
- q->allocated_writebacks++;
- r->op = ASIO_WRITE_BACK;
-}
-
static void
asio_handler(struct worker_thread *t UNUSED, struct work *w)
{
struct asio_queue *q = r->queue;
DBG("ASIO: Submitting %p on queue %p", r, q);
ASSERT(r->op != ASIO_FREE);
+ ASSERT(!r->submitted);
+ if (r->op == ASIO_WRITE_BACK)
+ {
+ while (q->running_writebacks >= q->max_writebacks)
+ {
+ DBG("ASIO: Waiting for free writebacks");
+ if (!asio_raw_wait(q))
+ ASSERT(0);
+ }
+ q->running_writebacks++;
+ }
q->running_requests++;
+ r->submitted = 1;
r->work.go = asio_handler;
- r->work.returned = NULL;
+ r->work.priority = 0;
work_submit(&q->queue, &r->work);
}
{
struct asio_queue *q = r->queue;
DBG("ASIO: Put %p", r);
+ ASSERT(!r->submitted);
ASSERT(q->allocated_requests);
clist_add_tail(&q->idle_list, &r->work.n);
q->allocated_requests--;
asio_put(r);
break;
}
- asio_turn_to_writeback(r);
+ r->op = ASIO_WRITE_BACK;
r->fd = 1;
r->len = r->status;
asio_submit(r);
for (uns i=0; i<10; i++)
{
- r = asio_get_writeback(&q);
+ r = asio_get(&q);
+ r->op = ASIO_WRITE_BACK;
r->fd = 1;
r->len = 1;
r->buffer[0] = 'A' + i;