]> mj.ucw.cz Git - libucw.git/blob - lib/fb-direct.c
c3b74e1592edf698ce593e4cca6745afddee6c78
[libucw.git] / lib / fb-direct.c
1 /*
2  *      UCW Library -- Fast Buffered I/O on O_DIRECT Files
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *      This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12  *      the asynchronous I/O module. It's designed for use on large files
13  *      which don't fit in the disk cache.
14  *
15  *      CAVEATS:
16  *
17  *        - All operations with a single fbdirect handle must be done
18  *          within a single thread, unless you provide a custom I/O queue
19  *          and take care of locking.
20  *
21  *      FIXME: what if the OS doesn't support O_DIRECT?
22  *      FIXME: unaligned seeks and partial writes?
23  *      FIXME: merge with other file-oriented fastbufs
24  */
25
26 #undef LOCAL_DEBUG
27
28 #include "lib/lib.h"
29 #include "lib/fastbuf.h"
30 #include "lib/lfs.h"
31 #include "lib/asio.h"
32 #include "lib/conf.h"
33 #include "lib/threads.h"
34
35 #include <string.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38
39 static uns fbdir_cheat;
40 static uns fbdir_buffer_size = 65536;
41 static uns fbdir_read_ahead = 1;
42 static uns fbdir_write_back = 1;
43
44 static struct cf_section fbdir_cf = {
45   CF_ITEMS {
46     CF_UNS("Cheat", &fbdir_cheat),
47     CF_UNS("BufferSize", &fbdir_buffer_size),
48     CF_UNS("ReadAhead", &fbdir_read_ahead),
49     CF_UNS("WriteBack", &fbdir_write_back),
50     CF_END
51   }
52 };
53
54 #define FBDIR_ALIGN 512
55
56 enum fbdir_mode {                               // Current operating mode
57     M_NULL,
58     M_READ,
59     M_WRITE
60 };
61
62 struct fb_direct {
63   struct fastbuf fb;
64   int fd;                                       // File descriptor
65   int is_temp_file;                             // 0=normal file, 1=temporary file, delete on close, -1=shared FD
66   struct asio_queue *io_queue;                  // I/O queue to use
67   struct asio_queue *user_queue;                // If io_queue was supplied by the user
68   struct asio_request *pending_read;
69   struct asio_request *done_read;
70   struct asio_request *active_buffer;
71   enum fbdir_mode mode;
72   byte name[0];
73 };
74 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
75
76 static void CONSTRUCTOR
77 fbdir_global_init(void)
78 {
79   cf_declare_section("FBDirect", &fbdir_cf, 0);
80 }
81
82 static void
83 fbdir_read_sync(struct fb_direct *F)
84 {
85   while (F->pending_read)
86     {
87       struct asio_request *r = asio_wait(F->io_queue);
88       ASSERT(r);
89       struct fb_direct *G = r->user_data;
90       ASSERT(G);
91       ASSERT(G->pending_read == r && !G->done_read);
92       G->pending_read = NULL;
93       G->done_read = r;
94     }
95 }
96
97 static void
98 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
99 {
100   if (F->mode == mode)
101     return;
102   DBG("FB-DIRECT: Switching mode to %d", mode);
103   switch (F->mode)
104     {
105     case M_NULL:
106       break;
107     case M_READ:
108       fbdir_read_sync(F);                       // Wait for read-ahead requests to finish
109       if (F->done_read)                         // Return read-ahead requests if any
110         {
111           asio_put(F->done_read);
112           F->done_read = NULL;
113         }
114       break;
115     case M_WRITE:
116       asio_sync(F->io_queue);                   // Wait for pending writebacks
117       break;
118     }
119   if (F->active_buffer)
120     {
121       asio_put(F->active_buffer);
122       F->active_buffer = NULL;
123     }
124   F->mode = mode;
125 }
126
127 static void
128 fbdir_submit_read(struct fb_direct *F)
129 {
130   struct asio_request *r = asio_get(F->io_queue);
131   r->fd = F->fd;
132   r->op = ASIO_READ;
133   r->len = F->io_queue->buffer_size;
134   r->user_data = F;
135   asio_submit(r);
136   F->pending_read = r;
137 }
138
139 static int
140 fbdir_refill(struct fastbuf *f)
141 {
142   struct fb_direct *F = FB_DIRECT(f);
143
144   DBG("FB-DIRECT: Refill");
145
146   if (!F->done_read)
147     {
148       if (!F->pending_read)
149         {
150           fbdir_change_mode(F, M_READ);
151           fbdir_submit_read(F);
152         }
153       fbdir_read_sync(F);
154       ASSERT(F->done_read);
155     }
156
157   struct asio_request *r = F->done_read;
158   F->done_read = NULL;
159   if (F->active_buffer)
160     asio_put(F->active_buffer);
161   F->active_buffer = r;
162   if (!r->status)
163     return 0;
164   if (r->status < 0)
165     die("Error reading %s: %s", f->name, strerror(r->returned_errno));
166   f->bptr = f->buffer = r->buffer;
167   f->bstop = f->bufend = f->buffer + r->status;
168   f->pos += r->status;
169
170   fbdir_submit_read(F);                         // Read-ahead the next block
171
172   return r->status;
173 }
174
175 static void
176 fbdir_spout(struct fastbuf *f)
177 {
178   struct fb_direct *F = FB_DIRECT(f);
179   struct asio_request *r;
180
181   DBG("FB-DIRECT: Spout");
182
183   fbdir_change_mode(F, M_WRITE);
184   r = F->active_buffer;
185   if (r && f->bptr > f->bstop)
186     {
187       r->op = ASIO_WRITE_BACK;
188       r->fd = F->fd;
189       r->len = f->bptr - f->bstop;
190       ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
191       f->pos += r->len;
192       if (!fbdir_cheat && r->len % FBDIR_ALIGN)                 // Have to simulate incomplete writes
193         {
194           r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
195           asio_submit(r);
196           asio_sync(F->io_queue);
197           DBG("FB-DIRECT: Truncating at %Ld", (long long)f->pos);
198           if (sh_ftruncate(F->fd, f->pos) < 0)
199             die("Error truncating %s: %m", f->name);
200         }
201       else
202         asio_submit(r);
203       r = NULL;
204     }
205   if (!r)
206     r = asio_get(F->io_queue);
207   f->bstop = f->bptr = f->buffer = r->buffer;
208   f->bufend = f->buffer + F->io_queue->buffer_size;
209   F->active_buffer = r;
210 }
211
212 static void
213 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
214 {
215   DBG("FB-DIRECT: Seek %Ld %d", (long long)pos, whence);
216
217   if (whence == SEEK_SET && pos == f->pos)
218     return;
219
220   fbdir_change_mode(FB_DIRECT(f), M_NULL);                      // Wait for all async requests to finish
221   sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
222   if (l < 0)
223     die("lseek on %s: %m", f->name);
224   f->pos = l;
225 }
226
227 static struct asio_queue *
228 fbdir_get_io_queue(void)
229 {
230   struct ucwlib_context *ctx = ucwlib_thread_context();
231   struct asio_queue *q = ctx->io_queue;
232   if (!q)
233     {
234       q = xmalloc_zero(sizeof(struct asio_queue));
235       q->buffer_size = fbdir_buffer_size;
236       q->max_writebacks = fbdir_write_back;
237       asio_init_queue(q);
238       ctx->io_queue = q;
239     }
240   q->use_count++;
241   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
242   return q;
243 }
244
245 static void
246 fbdir_put_io_queue(void)
247 {
248   struct ucwlib_context *ctx = ucwlib_thread_context();
249   struct asio_queue *q = ctx->io_queue;
250   ASSERT(q);
251   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
252   if (!--q->use_count)
253     {
254       asio_cleanup_queue(q);
255       xfree(q);
256       ctx->io_queue = NULL;
257     }
258 }
259
260 static void
261 fbdir_close(struct fastbuf *f)
262 {
263   struct fb_direct *F = FB_DIRECT(f);
264
265   DBG("FB-DIRECT: Close");
266
267   fbdir_change_mode(F, M_NULL);
268   if (!F->user_queue)
269     fbdir_put_io_queue();
270
271   switch (F->is_temp_file)
272     {
273     case 1:
274       if (unlink(f->name) < 0)
275         log(L_ERROR, "unlink(%s): %m", f->name);
276     case 0:
277       close(F->fd);
278     }
279
280   xfree(f);
281 }
282
283 static int
284 fbdir_config(struct fastbuf *f, uns item, int value)
285 {
286   switch (item)
287     {
288     case BCONFIG_IS_TEMP_FILE:
289       FB_DIRECT(f)->is_temp_file = value;
290       return 0;
291     default:
292       return -1;
293     }
294 }
295
296 static struct fastbuf *
297 fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
298 {
299   int namelen = strlen(name) + 1;
300   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
301   struct fastbuf *f = &F->fb;
302
303   DBG("FB-DIRECT: Open");
304   bzero(F, sizeof(*F));
305   f->name = F->name;
306   memcpy(f->name, name, namelen);
307   F->fd = fd;
308   if (q)
309     F->io_queue = F->user_queue = q;
310   else
311     F->io_queue = fbdir_get_io_queue();
312   f->refill = fbdir_refill;
313   f->spout = fbdir_spout;
314   f->seek = fbdir_seek;
315   f->close = fbdir_close;
316   f->config = fbdir_config;
317   f->can_overwrite_buffer = 2;
318   return f;
319 }
320
321 struct fastbuf *
322 fbdir_open_try(byte *name, uns mode, struct asio_queue *q)
323 {
324   if (!fbdir_cheat)
325     mode |= O_DIRECT;
326   int fd = sh_open(name, mode, 0666);
327   if (fd < 0)
328     return NULL;
329   struct fastbuf *b = fbdir_open_internal(name, fd, q);
330   if (mode & O_APPEND)
331     fbdir_seek(b, 0, SEEK_END);
332   return b;
333 }
334
335 struct fastbuf *
336 fbdir_open(byte *name, uns mode, struct asio_queue *q)
337 {
338   struct fastbuf *b = fbdir_open_try(name, mode, q);
339   if (!b)
340     die("Unable to %s file %s: %m",
341         (mode & O_CREAT) ? "create" : "open", name);
342   return b;
343 }
344
345 struct fastbuf *
346 fbdir_open_fd(int fd, struct asio_queue *q)
347 {
348   byte x[32];
349
350   sprintf(x, "fd%d", fd);
351   if (!fbdir_cheat && fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_DIRECT) < 0)
352     log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
353   return fbdir_open_internal(x, fd, q);
354 }
355
356 struct fastbuf *
357 fbdir_open_tmp(struct asio_queue *q)
358 {
359   byte buf[TEMP_FILE_NAME_LEN];
360   struct fastbuf *f;
361
362   temp_file_name(buf);
363   f = fbdir_open(buf, O_RDWR | O_CREAT | O_TRUNC, q);
364   bconfig(f, BCONFIG_IS_TEMP_FILE, 1);
365   return f;
366 }
367
368 #ifdef TEST
369
370 #include "lib/getopt.h"
371
372 int main(int argc, char **argv)
373 {
374   struct fastbuf *f, *t;
375
376   log_init(NULL);
377   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
378     die("Hey, whaddya want?");
379   f = (optind < argc) ? fbdir_open(argv[optind++], O_RDONLY, NULL) : fbdir_open_fd(0, NULL);
380   t = (optind < argc) ? fbdir_open(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, NULL) : fbdir_open_fd(1, NULL);
381
382   bbcopy(f, t, ~0U);
383   ASSERT(btell(f) == btell(t));
384
385 #if 0           // This triggers unaligned write
386   bflush(t);
387   bputc(t, '\n');
388 #endif
389
390   brewind(t);
391   bgetc(t);
392   ASSERT(btell(t) == 1);
393
394   bclose(f);
395   bclose(t);
396   return 0;
397 }
398
399 #endif