2 * UCW Library -- Fast Buffered I/O on O_DIRECT Files
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 * This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12 * the asynchronous I/O module. It's designed for use on large files
13 * which don't fit in the disk cache.
17 * - All operations with a single fbdirect handle must be done
18 * within a single thread, unless you provide a custom I/O queue
19 * and take care of locking.
21 * FIXME: what if the OS doesn't support O_DIRECT?
22 * FIXME: doc: don't mix threads
23 * FIXME: unaligned seeks and partial writes?
24 * FIXME: merge with other file-oriented fastbufs
30 #include "lib/fastbuf.h"
40 static uns fbdir_cheat;
41 static uns fbdir_buffer_size = 65536;
42 static uns fbdir_read_ahead = 1;
43 static uns fbdir_write_back = 1;
45 static struct cf_section fbdir_cf = {
47 CF_UNS("Cheat", &fbdir_cheat),
48 CF_UNS("BufferSize", &fbdir_buffer_size),
49 CF_UNS("ReadAhead", &fbdir_read_ahead),
50 CF_UNS("WriteBack", &fbdir_write_back),
55 #define FBDIR_ALIGN 512
57 static pthread_key_t fbdir_queue_key;
59 enum fbdir_mode { // Current operating mode
67 int fd; // File descriptor
68 int is_temp_file; // 0=normal file, 1=temporary file, delete on close, -1=shared FD
69 struct asio_queue *io_queue; // I/O queue to use
70 struct asio_queue *user_queue; // If io_queue was supplied by the user
71 struct asio_request *pending_read;
72 struct asio_request *done_read;
73 struct asio_request *active_buffer;
77 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
79 static void CONSTRUCTOR
80 fbdir_global_init(void)
82 cf_declare_section("FBDirect", &fbdir_cf, 0);
83 if (pthread_key_create(&fbdir_queue_key, NULL) < 0)
84 die("Cannot create fbdir_queue_key: %m");
88 fbdir_read_sync(struct fb_direct *F)
90 while (F->pending_read)
92 struct asio_request *r = asio_wait(F->io_queue);
94 struct fb_direct *G = r->user_data;
96 ASSERT(G->pending_read == r && !G->done_read);
97 G->pending_read = NULL;
103 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
107 DBG("FB-DIRECT: Switching mode to %d", mode);
113 fbdir_read_sync(F); // Wait for read-ahead requests to finish
114 if (F->done_read) // Return read-ahead requests if any
116 asio_put(F->done_read);
121 asio_sync(F->io_queue); // Wait for pending writebacks
124 if (F->active_buffer)
126 asio_put(F->active_buffer);
127 F->active_buffer = NULL;
133 fbdir_submit_read(struct fb_direct *F)
135 struct asio_request *r = asio_get(F->io_queue);
138 r->len = F->io_queue->buffer_size;
145 fbdir_refill(struct fastbuf *f)
147 struct fb_direct *F = FB_DIRECT(f);
149 DBG("FB-DIRECT: Refill");
153 if (!F->pending_read)
155 fbdir_change_mode(F, M_READ);
156 fbdir_submit_read(F);
159 ASSERT(F->done_read);
162 struct asio_request *r = F->done_read;
164 if (F->active_buffer)
165 asio_put(F->active_buffer);
166 F->active_buffer = r;
170 die("Error reading %s: %s", f->name, strerror(r->returned_errno));
171 f->bptr = f->buffer = r->buffer;
172 f->bstop = f->bufend = f->buffer + r->status;
175 fbdir_submit_read(F); // Read-ahead the next block
181 fbdir_spout(struct fastbuf *f)
183 struct fb_direct *F = FB_DIRECT(f);
184 struct asio_request *r;
186 DBG("FB-DIRECT: Spout");
188 fbdir_change_mode(F, M_WRITE);
189 r = F->active_buffer;
190 if (r && f->bptr > f->bstop)
192 r->op = ASIO_WRITE_BACK;
194 r->len = f->bptr - f->bstop;
195 ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
197 if (!fbdir_cheat && r->len % FBDIR_ALIGN) // Have to simulate incomplete writes
199 r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
201 asio_sync(F->io_queue);
202 DBG("FB-DIRECT: Truncating at %Ld", (long long)f->pos);
203 if (sh_ftruncate(F->fd, f->pos) < 0)
204 die("Error truncating %s: %m", f->name);
211 r = asio_get(F->io_queue);
212 f->bstop = f->bptr = f->buffer = r->buffer;
213 f->bufend = f->buffer + F->io_queue->buffer_size;
214 F->active_buffer = r;
218 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
220 DBG("FB-DIRECT: Seek %Ld %d", (long long)pos, whence);
222 if (whence == SEEK_SET && pos == f->pos)
225 fbdir_change_mode(FB_DIRECT(f), M_NULL); // Wait for all async requests to finish
226 sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
228 die("lseek on %s: %m", f->name);
232 static struct asio_queue *
233 fbdir_get_io_queue(void)
235 struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
238 q = xmalloc_zero(sizeof(struct asio_queue));
239 q->buffer_size = fbdir_buffer_size;
240 q->max_writebacks = fbdir_write_back;
242 pthread_setspecific(fbdir_queue_key, q);
245 DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
250 fbdir_put_io_queue(void)
252 struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
254 DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
257 asio_cleanup_queue(q);
259 pthread_setspecific(fbdir_queue_key, NULL);
264 fbdir_close(struct fastbuf *f)
266 struct fb_direct *F = FB_DIRECT(f);
268 DBG("FB-DIRECT: Close");
270 fbdir_change_mode(F, M_NULL);
272 fbdir_put_io_queue();
274 switch (F->is_temp_file)
277 if (unlink(f->name) < 0)
278 log(L_ERROR, "unlink(%s): %m", f->name);
287 fbdir_config(struct fastbuf *f, uns item, int value)
291 case BCONFIG_IS_TEMP_FILE:
292 FB_DIRECT(f)->is_temp_file = value;
299 static struct fastbuf *
300 fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
302 int namelen = strlen(name) + 1;
303 struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
304 struct fastbuf *f = &F->fb;
306 DBG("FB-DIRECT: Open");
307 bzero(F, sizeof(*F));
309 memcpy(f->name, name, namelen);
312 F->io_queue = F->user_queue = q;
314 F->io_queue = fbdir_get_io_queue();
315 f->refill = fbdir_refill;
316 f->spout = fbdir_spout;
317 f->seek = fbdir_seek;
318 f->close = fbdir_close;
319 f->config = fbdir_config;
320 f->can_overwrite_buffer = 2;
325 fbdir_open_try(byte *name, uns mode, struct asio_queue *q)
329 int fd = sh_open(name, mode, 0666);
332 struct fastbuf *b = fbdir_open_internal(name, fd, q);
334 fbdir_seek(b, 0, SEEK_END);
339 fbdir_open(byte *name, uns mode, struct asio_queue *q)
341 struct fastbuf *b = fbdir_open_try(name, mode, q);
343 die("Unable to %s file %s: %m",
344 (mode & O_CREAT) ? "create" : "open", name);
349 fbdir_open_fd(int fd, struct asio_queue *q)
353 sprintf(x, "fd%d", fd);
354 if (!fbdir_cheat && fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_DIRECT) < 0)
355 log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
356 return fbdir_open_internal(x, fd, q);
360 fbdir_open_tmp(struct asio_queue *q)
362 byte buf[TEMP_FILE_NAME_LEN];
366 f = fbdir_open(buf, O_RDWR | O_CREAT | O_TRUNC, q);
367 bconfig(f, BCONFIG_IS_TEMP_FILE, 1);
373 #include "lib/getopt.h"
375 int main(int argc, char **argv)
377 struct fastbuf *f, *t;
380 if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
381 die("Hey, whaddya want?");
382 f = (optind < argc) ? fbdir_open(argv[optind++], O_RDONLY, NULL) : fbdir_open_fd(0, NULL);
383 t = (optind < argc) ? fbdir_open(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, NULL) : fbdir_open_fd(1, NULL);
386 ASSERT(btell(f) == btell(t));
388 #if 0 // This triggers unaligned write
395 ASSERT(btell(t) == 1);