2 * UCW Library -- Fast Buffered I/O on O_DIRECT Files
4 * (c) 2006--2007 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 * This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12 * the asynchronous I/O module. It's designed for use on large files
13 * which don't fit in the disk cache.
17 * - All operations with a single fbdirect handle must be done
18 * within a single thread, unless you provide a custom I/O queue
19 * and take care of locking.
21 * FIXME: what if the OS doesn't support O_DIRECT?
22 * FIXME: unaligned seeks and partial writes?
23 * FIXME: append to unaligned file
29 #include "ucw/fastbuf.h"
33 #include "ucw/threads.h"
40 #define FBDIR_ALIGN 512
42 enum fbdir_mode { // Current operating mode
50 int fd; // File descriptor
52 struct asio_queue *io_queue; // I/O queue to use
53 struct asio_queue *user_queue; // If io_queue was supplied by the user
54 struct asio_request *pending_read;
55 struct asio_request *done_read;
56 struct asio_request *active_buffer;
60 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
65 static struct cf_section fbdir_cf = {
67 CF_UNS("Cheat", &fbdir_cheat),
72 static void CONSTRUCTOR
73 fbdir_global_init(void)
75 cf_declare_section("FBDirect", &fbdir_cf, 0);
80 fbdir_read_sync(struct fb_direct *F)
82 while (F->pending_read)
84 struct asio_request *r = asio_wait(F->io_queue);
86 struct fb_direct *G = r->user_data;
88 ASSERT(G->pending_read == r && !G->done_read);
89 G->pending_read = NULL;
95 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
99 DBG("FB-DIRECT: Switching mode to %d", mode);
105 fbdir_read_sync(F); // Wait for read-ahead requests to finish
106 if (F->done_read) // Return read-ahead requests if any
108 asio_put(F->done_read);
113 asio_sync(F->io_queue); // Wait for pending writebacks
116 if (F->active_buffer)
118 asio_put(F->active_buffer);
119 F->active_buffer = NULL;
125 fbdir_submit_read(struct fb_direct *F)
127 struct asio_request *r = asio_get(F->io_queue);
130 r->len = F->io_queue->buffer_size;
137 fbdir_refill(struct fastbuf *f)
139 struct fb_direct *F = FB_DIRECT(f);
141 DBG("FB-DIRECT: Refill");
145 if (!F->pending_read)
147 fbdir_change_mode(F, M_READ);
148 fbdir_submit_read(F);
151 ASSERT(F->done_read);
154 struct asio_request *r = F->done_read;
156 if (F->active_buffer)
157 asio_put(F->active_buffer);
158 F->active_buffer = r;
162 bthrow(f, "read", "Error reading %s: %s", f->name, strerror(r->returned_errno));
163 f->bptr = f->buffer = r->buffer;
164 f->bstop = f->bufend = f->buffer + r->status;
167 fbdir_submit_read(F); // Read-ahead the next block
173 fbdir_spout(struct fastbuf *f)
175 struct fb_direct *F = FB_DIRECT(f);
176 struct asio_request *r;
178 DBG("FB-DIRECT: Spout");
180 fbdir_change_mode(F, M_WRITE);
181 r = F->active_buffer;
182 if (r && f->bptr > f->bstop)
184 r->op = ASIO_WRITE_BACK;
186 r->len = f->bptr - f->bstop;
187 ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
189 if (!fbdir_cheat && r->len % FBDIR_ALIGN) // Have to simulate incomplete writes
191 r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
193 asio_sync(F->io_queue);
194 DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
195 if (ucw_ftruncate(F->fd, f->pos) < 0)
196 bthrow(f, "write", "Error truncating %s: %m", f->name);
203 r = asio_get(F->io_queue);
204 f->bstop = f->bptr = f->buffer = r->buffer;
205 f->bufend = f->buffer + F->io_queue->buffer_size;
206 F->active_buffer = r;
210 fbdir_seek(struct fastbuf *f, ucw_off_t pos, int whence)
212 DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
214 if (whence == SEEK_SET && pos == f->pos)
217 fbdir_change_mode(FB_DIRECT(f), M_NULL); // Wait for all async requests to finish
218 ucw_off_t l = ucw_seek(FB_DIRECT(f)->fd, pos, whence);
225 static struct asio_queue *
226 fbdir_get_io_queue(uns buffer_size, uns write_back)
228 struct ucwlib_context *ctx = ucwlib_thread_context();
229 struct asio_queue *q = ctx->io_queue;
232 q = xmalloc_zero(sizeof(struct asio_queue));
233 q->buffer_size = buffer_size;
234 q->max_writebacks = write_back;
239 DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
244 fbdir_put_io_queue(void)
246 struct ucwlib_context *ctx = ucwlib_thread_context();
247 struct asio_queue *q = ctx->io_queue;
249 DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
252 asio_cleanup_queue(q);
254 ctx->io_queue = NULL;
259 fbdir_close(struct fastbuf *f)
261 struct fb_direct *F = FB_DIRECT(f);
263 DBG("FB-DIRECT: Close");
265 fbdir_change_mode(F, M_NULL);
267 fbdir_put_io_queue();
269 bclose_file_helper(f, F->fd, F->is_temp_file);
274 fbdir_config(struct fastbuf *f, uns item, int value)
280 case BCONFIG_IS_TEMP_FILE:
281 orig = FB_DIRECT(f)->is_temp_file;
282 FB_DIRECT(f)->is_temp_file = value;
290 fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
292 int namelen = strlen(name) + 1;
293 struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
294 struct fastbuf *f = &F->fb;
296 DBG("FB-DIRECT: Open");
297 bzero(F, sizeof(*F));
299 memcpy(f->name, name, namelen);
302 F->io_queue = F->user_queue = q;
304 F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
305 f->refill = fbdir_refill;
306 f->spout = fbdir_spout;
307 f->seek = fbdir_seek;
308 f->close = fbdir_close;
309 f->config = fbdir_config;
310 f->can_overwrite_buffer = 2;
316 #include "ucw/getopt.h"
318 int main(int argc, char **argv)
320 struct fb_params par = { .type = FB_DIRECT };
321 struct fastbuf *f, *t;
324 if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
325 die("Hey, whaddya want?");
326 f = (optind < argc) ? bopen_file(argv[optind++], O_RDONLY, &par) : bopen_fd(0, &par);
327 t = (optind < argc) ? bopen_file(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, &par) : bopen_fd(1, &par);
330 ASSERT(btell(f) == btell(t));
332 #if 0 // This triggers unaligned write
339 ASSERT(btell(t) == 1);