From 0f2719c181f3a8a9ba88ef76901f2115dee88c48 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Sat, 25 Nov 2006 23:35:55 +0100 Subject: [PATCH] Implemented an asynchronous I/O library module. It's built at the top of the work queue module. --- lib/Makefile | 9 +- lib/asio.c | 294 ++++++++++++++++++++++++++++++++++++++ lib/{asyncio.h => asio.h} | 32 ++--- lib/asio.t | 4 + 4 files changed, 320 insertions(+), 19 deletions(-) create mode 100644 lib/asio.c rename lib/{asyncio.h => asio.h} (57%) create mode 100644 lib/asio.t diff --git a/lib/Makefile b/lib/Makefile index 53343a17..d26b9c25 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -30,7 +30,7 @@ LIBUCW_MODS= \ qache \ string \ bbuf \ - workqueue + workqueue asio LIBUCW_INCLUDES= \ lib.h config.h math.h \ @@ -53,7 +53,8 @@ LIBUCW_INCLUDES= \ lizard.h \ md5.h \ base64.h base224.h \ - qache.h + qache.h \ + workqueue.h asio.h ifdef CONFIG_OWN_REGEX include $(s)/lib/regex/Makefile @@ -83,8 +84,9 @@ $(o)/lib/kmp-test: $(o)/lib/kmp-test.o $(LIBUCW) $(LIBCHARSET) $(o)/lib/ipaccess-test: $(o)/lib/ipaccess-test.o $(LIBUCW) $(o)/lib/workqueue-t: LIBS+=-lpthread # FIXME +$(o)/lib/asio-t: LIBS+=-lpthread # FIXME -TESTS+=$(addprefix $(o)/lib/,regex.test unicode-utf8.test hash-test.test mempool.test stkstring.test slists.test kmp-test.test bbuf.test) +TESTS+=$(addprefix $(o)/lib/,regex.test unicode-utf8.test hash-test.test mempool.test stkstring.test slists.test kmp-test.test bbuf.test asio.test) $(o)/lib/regex.test: $(o)/lib/regex-t $(o)/lib/unicode-utf8.test: $(o)/lib/unicode-utf8-t $(o)/lib/hash-test.test: $(o)/lib/hash-test @@ -94,6 +96,7 @@ $(o)/lib/bitops.test: $(o)/lib/bit-ffs-t $(o)/lib/bit-fls-t $(o)/lib/slists.test: $(o)/lib/slists-t $(o)/lib/kmp-test.test: $(o)/lib/kmp-test $(o)/lib/bbuf.test: $(o)/lib/bbuf-t +$(o)/lib/asio.test: $(o)/lib/asio-t INCLUDES+=$(o)/lib/.include-stamp $(o)/lib/.include-stamp: $(addprefix $(s)/lib/,$(LIBUCW_INCLUDES)) diff --git a/lib/asio.c b/lib/asio.c new file mode 100644 index 00000000..f1996aae --- /dev/null +++ b/lib/asio.c @@ -0,0 +1,294 @@ +/* + * UCW Library -- Asynchronous I/O + * + * (c) 2006 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +#undef LOCAL_DEBUG + +#include "lib/lib.h" +#include "lib/asio.h" + +#include +#include +#include + +static uns asio_num_users; +static struct worker_pool asio_wpool; + +static void +asio_init(void) +{ + if (asio_num_users++) + return; + + DBG("ASIO: INIT"); + asio_wpool.num_threads = 1; + asio_wpool.stack_size = 65536; + worker_pool_init(&asio_wpool); +} + +static void +asio_cleanup(void) +{ + if (--asio_num_users) + return; + + DBG("ASIO: CLEANUP"); + worker_pool_cleanup(&asio_wpool); +} + +void +asio_init_queue(struct asio_queue *q) +{ + asio_init(); + + DBG("ASIO: New queue %p", q); + ASSERT(q->buffer_size); + q->allocated_requests = 0; + q->allocated_writebacks = 0; + q->running_requests = 0; + clist_init(&q->idle_list); + clist_init(&q->done_list); + work_queue_init(&asio_wpool, &q->queue); +} + +void +asio_cleanup_queue(struct asio_queue *q) +{ + DBG("ASIO: Removing queue %p", q); + ASSERT(!q->running_requests); + ASSERT(!q->allocated_requests); + ASSERT(!q->allocated_writebacks); + ASSERT(clist_empty(&q->done_list)); + + struct asio_request *r; + while (r = clist_head(&q->idle_list)) + { + clist_remove(&r->work.n); + big_free(r->buffer, q->buffer_size); + xfree(r); + } + + work_queue_cleanup(&q->queue); + asio_cleanup(); +} + +struct asio_request * +asio_get(struct asio_queue *q) +{ + q->allocated_requests++; + struct asio_request *r = clist_head(&q->idle_list); + if (!r) + { + r = xmalloc_zero(sizeof(*r)); + r->queue = q; + r->buffer = big_alloc(q->buffer_size); + DBG("ASIO: Got %p (new)", r); + } + else + { + clist_remove(&r->work.n); + DBG("ASIO: Got %p", r); + } + r->op = ASIO_FREE; + return r; +} + +static int +asio_raw_wait(struct asio_queue *q) +{ + struct asio_request *r = (struct asio_request *) work_wait(&q->queue); + if (!r) + return 0; + q->running_requests--; + if (r->op == ASIO_WRITE_BACK) + { + DBG("ASIO: Finished writeback %p", r); + if (r->status < 0) + die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno)); + if (r->status != (int)r->len) + die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len); + q->allocated_writebacks--; + asio_put(r); + } + else + clist_add_tail(&q->done_list, &r->work.n); + return 1; +} + +struct asio_request * +asio_get_writeback(struct asio_queue *q) +{ + while (q->allocated_writebacks >= q->max_writebacks) + { + DBG("ASIO: Waiting for free writeback request"); + if (!asio_raw_wait(q)) + ASSERT(0); + } + q->allocated_writebacks++; + struct asio_request *r = asio_get(q); + r->op = ASIO_WRITE_BACK; + return r; +} + +void +asio_turn_to_writeback(struct asio_request *r) +{ + struct asio_queue *q = r->queue; + ASSERT(r->op != ASIO_WRITE_BACK); + while (q->allocated_writebacks >= q->max_writebacks) + { + DBG("ASIO: Waiting for free writeback request"); + if (!asio_raw_wait(q)) + ASSERT(0); + } + q->allocated_writebacks++; + r->op = ASIO_WRITE_BACK; +} + +static void +asio_handler(struct worker_thread *t UNUSED, struct work *w) +{ + struct asio_request *r = (struct asio_request *) w; + + DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r, + (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len); + errno = 0; + switch (r->op) + { + case ASIO_READ: + r->status = read(r->fd, r->buffer, r->len); + break; + case ASIO_WRITE: + case ASIO_WRITE_BACK: + r->status = write(r->fd, r->buffer, r->len); + break; + default: + die("ASIO: Got unknown request type %d", r->op); + } + r->returned_errno = errno; + DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno); +} + +void +asio_submit(struct asio_request *r) +{ + struct asio_queue *q = r->queue; + DBG("ASIO: Submitting %p on queue %p", r, q); + ASSERT(r->op != ASIO_FREE); + q->running_requests++; + r->work.go = asio_handler; + r->work.returned = NULL; + work_submit(&q->queue, &r->work); +} + +struct asio_request * +asio_wait(struct asio_queue *q) +{ + struct asio_request *r; + while (!(r = clist_head(&q->done_list))) + { + DBG("ASIO: Waiting on queue %p", q); + if (!asio_raw_wait(q)) + return NULL; + } + clist_remove(&r->work.n); + DBG("ASIO: Done %p", r); + return r; +} + +void +asio_put(struct asio_request *r) +{ + struct asio_queue *q = r->queue; + DBG("ASIO: Put %p", r); + ASSERT(q->allocated_requests); + clist_add_tail(&q->idle_list, &r->work.n); + q->allocated_requests--; +} + +void +asio_sync(struct asio_queue *q) +{ + DBG("ASIO: Syncing queue %p", q); + while (q->running_requests) + if (!asio_raw_wait(q)) + ASSERT(0); +} + +#ifdef TEST + +int main(void) +{ + struct asio_queue q; + struct asio_request *r; + + q.buffer_size = 4096; + q.max_writebacks = 2; + asio_init_queue(&q); + +#if 0 + + for (;;) + { + r = asio_get(&q); + r->op = ASIO_READ; + r->fd = 0; + r->len = q.buffer_size; + asio_submit(r); + r = asio_wait(&q); + ASSERT(r); + if (r->status <= 0) + { + asio_put(r); + break; + } + asio_turn_to_writeback(r); + r->fd = 1; + r->len = r->status; + asio_submit(r); + } + asio_sync(&q); + +#else + + r = asio_get(&q); + r->op = ASIO_READ; + r->fd = 0; + r->len = 1; + asio_submit(r); + r = asio_wait(&q); + ASSERT(r); + asio_put(r); + + for (uns i=0; i<10; i++) + { + r = asio_get_writeback(&q); + r->fd = 1; + r->len = 1; + r->buffer[0] = 'A' + i; + asio_submit(r); + } + asio_sync(&q); + + r = asio_get(&q); + r->op = ASIO_WRITE; + r->fd = 1; + r->len = 1; + r->buffer[0] = '\n'; + asio_submit(r); + r = asio_wait(&q); + ASSERT(r); + asio_put(r); + +#endif + + asio_cleanup_queue(&q); + return 0; +} + +#endif diff --git a/lib/asyncio.h b/lib/asio.h similarity index 57% rename from lib/asyncio.h rename to lib/asio.h index d230f684..31c1f5ac 100644 --- a/lib/asyncio.h +++ b/lib/asio.h @@ -7,51 +7,51 @@ * of the GNU Lesser General Public License. */ -#ifndef _UCW_ASYNCIO_H -#define _UCW_ASYNCIO_H +#ifndef _UCW_ASIO_H +#define _UCW_ASIO_H -#include "lib/semaphore.h" +#include "lib/workqueue.h" #include "lib/clists.h" /* FIXME: Comment, especially on request ordering */ struct asio_queue { - uns max_requests; // Maximum number of requests allowed on this queue [user-settable] uns buffer_size; // How large buffers do we use [user-settable] + uns max_writebacks; // Maximum number of writeback requests active [user-settable] uns allocated_requests; + uns allocated_writebacks; uns running_requests; clist idle_list; // Recycled requests waiting for get - clist done_list; // Requests returned from the worker threads - sem_t *done_sem; // ... and how many of them - clist wait_list; // Requests available for wait() + clist done_list; // Finished requests + struct work_queue queue; }; enum asio_op { ASIO_FREE, ASIO_READ, ASIO_WRITE, - ASIO_BACKGROUND_WRITE, // Write with no success notification + ASIO_WRITE_BACK, // Write with no success notification }; struct asio_request { - cnode n; + struct work work; // asio_requests are internally just work nodes struct asio_queue *queue; byte *buffer; int fd; enum asio_op op; uns len; int status; + int returned_errno; }; -void asio_init(void); -void asio_cleanup(void); - void asio_init_queue(struct asio_queue *q); // Initialize a new queue -struct asio_request *asio_get(struct asio_queue *q); // Get an empty request +void asio_cleanup_queue(struct asio_queue *q); +struct asio_request *asio_get(struct asio_queue *q); // Get an empty request (not for writeback) +struct asio_request *asio_get_writeback(struct asio_queue *q); // Get an empty writeback request +void asio_turn_to_writeback(struct asio_request *r); // Convert a request allocated as non-writeback to writeback void asio_submit(struct asio_request *r); // Submit the request struct asio_request *asio_wait(struct asio_queue *q); // Wait for the first finished request, NULL if no more void asio_put(struct asio_request *r); // Return a finished request for recycling -struct asio_request *asio_get_bg(struct asio_queue *q); // Get and if there are no free requests, wait for background writes to finish -void asio_sync(struct asio_queue *q); // Wait for all requests to finish +void asio_sync(struct asio_queue *q); // Wait until all requests are finished -#endif /* !_UCW_ASYNCIO_H */ +#endif /* !_UCW_ASIO_H */ diff --git a/lib/asio.t b/lib/asio.t new file mode 100644 index 00000000..4382107f --- /dev/null +++ b/lib/asio.t @@ -0,0 +1,4 @@ +# Tests for asynchronous I/O + +Run: echo y | obj/lib/asio-t +Out: ABCDEFGHIJ -- 2.39.2