/*
* UCW Library -- Fast Buffered I/O on O_DIRECT Files
*
- * (c) 2006 Martin Mares <mj@ucw.cz>
+ * (c) 2006--2007 Martin Mares <mj@ucw.cz>
*
* This software may be freely distributed and used according to the terms
* of the GNU Lesser General Public License.
* and take care of locking.
*
* FIXME: what if the OS doesn't support O_DIRECT?
- * FIXME: doc: don't mix threads
* FIXME: unaligned seeks and partial writes?
- * FIXME: merge with other file-oriented fastbufs
+ * FIXME: append to unaligned file
*/
#undef LOCAL_DEBUG
#include "lib/lfs.h"
#include "lib/asio.h"
#include "lib/conf.h"
-#include "lib/getopt.h"
+#include "lib/threads.h"
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
-#include <pthread.h>
+#include <stdio.h>
-static uns fbdir_cheat;
-static uns fbdir_buffer_size = 65536;
-static uns fbdir_read_ahead = 1;
-static uns fbdir_write_back = 1;
+uns fbdir_cheat;
static struct cf_section fbdir_cf = {
CF_ITEMS {
CF_UNS("Cheat", &fbdir_cheat),
- CF_UNS("BufferSize", &fbdir_buffer_size),
- CF_UNS("ReadAhead", &fbdir_read_ahead),
- CF_UNS("WriteBack", &fbdir_write_back),
CF_END
}
};
#define FBDIR_ALIGN 512
-static pthread_key_t fbdir_queue_key;
-
enum fbdir_mode { // Current operating mode
M_NULL,
M_READ,
struct fb_direct {
struct fastbuf fb;
int fd; // File descriptor
- int is_temp_file; // 0=normal file, 1=temporary file, delete on close, -1=shared FD
+ int is_temp_file;
struct asio_queue *io_queue; // I/O queue to use
struct asio_queue *user_queue; // If io_queue was supplied by the user
struct asio_request *pending_read;
fbdir_global_init(void)
{
cf_declare_section("FBDirect", &fbdir_cf, 0);
- if (pthread_key_create(&fbdir_queue_key, NULL) < 0)
- die("Cannot create fbdir_queue_key: %m");
}
static void
r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
asio_submit(r);
asio_sync(F->io_queue);
- DBG("FB-DIRECT: Truncating at %Ld", (long long)f->pos);
+ DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
if (sh_ftruncate(F->fd, f->pos) < 0)
die("Error truncating %s: %m", f->name);
}
F->active_buffer = r;
}
-static void
+static int
fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
{
- DBG("FB-DIRECT: Seek %Ld %d", (long long)pos, whence);
+ DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
if (whence == SEEK_SET && pos == f->pos)
- return;
+ return 1;
fbdir_change_mode(FB_DIRECT(f), M_NULL); // Wait for all async requests to finish
sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
if (l < 0)
- die("lseek on %s: %m", f->name);
+ return 0;
f->pos = l;
+ return 1;
}
static struct asio_queue *
-fbdir_get_io_queue(void)
+fbdir_get_io_queue(uns buffer_size, uns write_back)
{
- struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
+ struct ucwlib_context *ctx = ucwlib_thread_context();
+ struct asio_queue *q = ctx->io_queue;
if (!q)
{
q = xmalloc_zero(sizeof(struct asio_queue));
- q->buffer_size = fbdir_buffer_size;
- q->max_writebacks = fbdir_write_back;
+ q->buffer_size = buffer_size;
+ q->max_writebacks = write_back;
asio_init_queue(q);
- pthread_setspecific(fbdir_queue_key, q);
+ ctx->io_queue = q;
}
q->use_count++;
DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
static void
fbdir_put_io_queue(void)
{
- struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
+ struct ucwlib_context *ctx = ucwlib_thread_context();
+ struct asio_queue *q = ctx->io_queue;
ASSERT(q);
DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
if (!--q->use_count)
{
asio_cleanup_queue(q);
xfree(q);
- pthread_setspecific(fbdir_queue_key, NULL);
+ ctx->io_queue = NULL;
}
}
if (!F->user_queue)
fbdir_put_io_queue();
- switch (F->is_temp_file)
- {
- case 1:
- if (unlink(f->name) < 0)
- log(L_ERROR, "unlink(%s): %m", f->name);
- case 0:
- close(F->fd);
- }
-
+ bclose_file_helper(f, F->fd, F->is_temp_file);
xfree(f);
}
static int
fbdir_config(struct fastbuf *f, uns item, int value)
{
+ int orig;
+
switch (item)
{
case BCONFIG_IS_TEMP_FILE:
+ orig = FB_DIRECT(f)->is_temp_file;
FB_DIRECT(f)->is_temp_file = value;
- return 0;
+ return orig;
default:
return -1;
}
}
-static struct fastbuf *
-fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
+struct fastbuf *
+fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
{
int namelen = strlen(name) + 1;
struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
if (q)
F->io_queue = F->user_queue = q;
else
- F->io_queue = fbdir_get_io_queue();
+ F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
f->refill = fbdir_refill;
f->spout = fbdir_spout;
f->seek = fbdir_seek;
return f;
}
-struct fastbuf *
-fbdirect_open_try(byte *name, uns mode, struct asio_queue *q)
-{
- if (!fbdir_cheat)
- mode |= O_DIRECT;
- int fd = sh_open(name, mode, 0666);
- if (fd < 0)
- return NULL;
- struct fastbuf *b = fbdir_open_internal(name, fd, q);
- if (mode & O_APPEND)
- fbdir_seek(b, 0, SEEK_END);
- return b;
-}
-
-struct fastbuf *
-fbdirect_open(byte *name, uns mode, struct asio_queue *q)
-{
- struct fastbuf *b = fbdirect_open_try(name, mode, q);
- if (!b)
- die("Unable to %s file %s: %m",
- (mode & O_CREAT) ? "create" : "open", name);
- return b;
-}
-
-struct fastbuf *
-fbdirect_open_fd(int fd, struct asio_queue *q)
-{
- byte x[32];
-
- sprintf(x, "fd%d", fd);
- if (!fbdir_cheat && fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_DIRECT) < 0)
- log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
- return fbdir_open_internal(x, fd, q);
-}
-
#ifdef TEST
+#include "lib/getopt.h"
+
int main(int argc, char **argv)
{
+ struct fb_params par = { .type = FB_DIRECT };
struct fastbuf *f, *t;
log_init(NULL);
if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
die("Hey, whaddya want?");
- f = (optind < argc) ? fbdirect_open(argv[optind++], O_RDONLY, NULL) : fbdirect_open_fd(0, NULL);
- t = (optind < argc) ? fbdirect_open(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, NULL) : fbdirect_open_fd(1, NULL);
+ f = (optind < argc) ? bopen_file(argv[optind++], O_RDONLY, &par) : bopen_fd(0, &par);
+ t = (optind < argc) ? bopen_file(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, &par) : bopen_fd(1, &par);
bbcopy(f, t, ~0U);
ASSERT(btell(f) == btell(t));