]> mj.ucw.cz Git - libucw.git/blob - lib/fb-direct.c
Added a direct I/O fastbuf backend.
[libucw.git] / lib / fb-direct.c
1 /*
2  *      UCW Library -- Fast Buffered I/O on O_DIRECT Files
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *      This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12  *      the asynchronous I/O module. It's designed for use on large files
13  *      which don't fit in the disk cache.
14  *
15  *      CAVEATS:
16  *
17  *        - All operations with a single fbdirect handle must be done
18  *          within a single thread, unless you provide a custom I/O queue
19  *          and take care of locking.
20  *
21  *      FIXME: what if the OS doesn't support O_DIRECT?
22  *      FIXME: doc: don't mix threads
23  *      FIXME: unaligned seeks and partial writes?
24  *      FIXME: merge with other file-oriented fastbufs
25  */
26
27 #undef LOCAL_DEBUG
28
29 #include "lib/lib.h"
30 #include "lib/fastbuf.h"
31 #include "lib/lfs.h"
32 #include "lib/asio.h"
33 #include "lib/conf.h"
34 #include "lib/getopt.h"
35
36 #include <string.h>
37 #include <fcntl.h>
38 #include <unistd.h>
39 #include <pthread.h>
40
41 static uns fbdir_cheat;
42 static uns fbdir_buffer_size = 65536;
43 static uns fbdir_read_ahead = 1;
44 static uns fbdir_write_back = 1;
45
46 static struct cf_section fbdir_cf = {
47   CF_ITEMS {
48     CF_UNS("Cheat", &fbdir_cheat),
49     CF_UNS("BufferSize", &fbdir_buffer_size),
50     CF_UNS("ReadAhead", &fbdir_read_ahead),
51     CF_UNS("WriteBack", &fbdir_write_back),
52     CF_END
53   }
54 };
55
56 #define FBDIR_ALIGN 512
57
58 static pthread_key_t fbdir_queue_key;
59
60 enum fbdir_mode {                               // Current operating mode
61     M_NULL,
62     M_READ,
63     M_WRITE
64 };
65
66 struct fb_direct {
67   struct fastbuf fb;
68   int fd;                                       // File descriptor
69   int is_temp_file;                             // 0=normal file, 1=temporary file, delete on close, -1=shared FD
70   struct asio_queue *io_queue;                  // I/O queue to use
71   struct asio_queue *user_queue;                // If io_queue was supplied by the user
72   struct asio_request *pending_read;
73   struct asio_request *done_read;
74   struct asio_request *active_buffer;
75   enum fbdir_mode mode;
76   byte name[0];
77 };
78 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
79
80 static void CONSTRUCTOR
81 fbdir_global_init(void)
82 {
83   cf_declare_section("FBDirect", &fbdir_cf, 0);
84   if (pthread_key_create(&fbdir_queue_key, NULL) < 0)
85     die("Cannot create fbdir_queue_key: %m");
86 }
87
88 static void
89 fbdir_read_sync(struct fb_direct *F)
90 {
91   while (F->pending_read)
92     {
93       struct asio_request *r = asio_wait(F->io_queue);
94       ASSERT(r);
95       struct fb_direct *G = r->user_data;
96       ASSERT(G);
97       ASSERT(G->pending_read == r && !G->done_read);
98       G->pending_read = NULL;
99       G->done_read = r;
100     }
101 }
102
103 static void
104 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
105 {
106   if (F->mode == mode)
107     return;
108   DBG("FB-DIRECT: Switching mode to %d", mode);
109   switch (F->mode)
110     {
111     case M_NULL:
112       break;
113     case M_READ:
114       fbdir_read_sync(F);                       // Wait for read-ahead requests to finish
115       if (F->done_read)                         // Return read-ahead requests if any
116         {
117           asio_put(F->done_read);
118           F->done_read = NULL;
119         }
120       break;
121     case M_WRITE:
122       asio_sync(F->io_queue);                   // Wait for pending writebacks
123       break;
124     }
125   if (F->active_buffer)
126     {
127       asio_put(F->active_buffer);
128       F->active_buffer = NULL;
129     }
130   F->mode = mode;
131 }
132
133 static void
134 fbdir_submit_read(struct fb_direct *F)
135 {
136   struct asio_request *r = asio_get(F->io_queue);
137   r->fd = F->fd;
138   r->op = ASIO_READ;
139   r->len = F->io_queue->buffer_size;
140   r->user_data = F;
141   asio_submit(r);
142   F->pending_read = r;
143 }
144
145 static int
146 fbdir_refill(struct fastbuf *f)
147 {
148   struct fb_direct *F = FB_DIRECT(f);
149
150   DBG("FB-DIRECT: Refill");
151
152   if (!F->done_read)
153     {
154       if (!F->pending_read)
155         {
156           fbdir_change_mode(F, M_READ);
157           fbdir_submit_read(F);
158         }
159       fbdir_read_sync(F);
160       ASSERT(F->done_read);
161     }
162
163   struct asio_request *r = F->done_read;
164   F->done_read = NULL;
165   if (F->active_buffer)
166     asio_put(F->active_buffer);
167   F->active_buffer = r;
168   if (!r->status)
169     return 0;
170   if (r->status < 0)
171     die("Error reading %s: %s", f->name, strerror(r->returned_errno));
172   f->bptr = f->buffer = r->buffer;
173   f->bstop = f->bufend = f->buffer + r->status;
174   f->pos += r->status;
175
176   fbdir_submit_read(F);                         // Read-ahead the next block
177
178   return r->status;
179 }
180
181 static void
182 fbdir_spout(struct fastbuf *f)
183 {
184   struct fb_direct *F = FB_DIRECT(f);
185   struct asio_request *r;
186
187   DBG("FB-DIRECT: Spout");
188
189   fbdir_change_mode(F, M_WRITE);
190   r = F->active_buffer;
191   if (r && f->bptr > f->bstop)
192     {
193       r->op = ASIO_WRITE_BACK;
194       r->fd = F->fd;
195       r->len = f->bptr - f->bstop;
196       ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
197       f->pos += r->len;
198       if (!fbdir_cheat && r->len % FBDIR_ALIGN)                 // Have to simulate incomplete writes
199         {
200           r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
201           asio_submit(r);
202           asio_sync(F->io_queue);
203           DBG("FB-DIRECT: Truncating at %Ld", (long long)f->pos);
204           if (sh_ftruncate(F->fd, f->pos) < 0)
205             die("Error truncating %s: %m", f->name);
206         }
207       else
208         asio_submit(r);
209       r = NULL;
210     }
211   if (!r)
212     r = asio_get(F->io_queue);
213   f->bstop = f->bptr = f->buffer = r->buffer;
214   f->bufend = f->buffer + F->io_queue->buffer_size;
215   F->active_buffer = r;
216 }
217
218 static void
219 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
220 {
221   DBG("FB-DIRECT: Seek %Ld %d", (long long)pos, whence);
222
223   if (whence == SEEK_SET && pos == f->pos)
224     return;
225
226   fbdir_change_mode(FB_DIRECT(f), M_NULL);                      // Wait for all async requests to finish
227   sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
228   if (l < 0)
229     die("lseek on %s: %m", f->name);
230   f->pos = l;
231 }
232
233 static struct asio_queue *
234 fbdir_get_io_queue(void)
235 {
236   struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
237   if (!q)
238     {
239       q = xmalloc_zero(sizeof(struct asio_queue));
240       q->buffer_size = fbdir_buffer_size;
241       q->max_writebacks = fbdir_write_back;
242       asio_init_queue(q);
243       pthread_setspecific(fbdir_queue_key, q);
244     }
245   q->use_count++;
246   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
247   return q;
248 }
249
250 static void
251 fbdir_put_io_queue(void)
252 {
253   struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
254   ASSERT(q);
255   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
256   if (!--q->use_count)
257     {
258       asio_cleanup_queue(q);
259       xfree(q);
260       pthread_setspecific(fbdir_queue_key, NULL);
261     }
262 }
263
264 static void
265 fbdir_close(struct fastbuf *f)
266 {
267   struct fb_direct *F = FB_DIRECT(f);
268
269   DBG("FB-DIRECT: Close");
270
271   fbdir_change_mode(F, M_NULL);
272   if (!F->user_queue)
273     fbdir_put_io_queue();
274
275   switch (F->is_temp_file)
276     {
277     case 1:
278       if (unlink(f->name) < 0)
279         log(L_ERROR, "unlink(%s): %m", f->name);
280     case 0:
281       close(F->fd);
282     }
283
284   xfree(f);
285 }
286
287 static int
288 fbdir_config(struct fastbuf *f, uns item, int value)
289 {
290   switch (item)
291     {
292     case BCONFIG_IS_TEMP_FILE:
293       FB_DIRECT(f)->is_temp_file = value;
294       return 0;
295     default:
296       return -1;
297     }
298 }
299
300 static struct fastbuf *
301 fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
302 {
303   int namelen = strlen(name) + 1;
304   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
305   struct fastbuf *f = &F->fb;
306
307   DBG("FB-DIRECT: Open");
308   bzero(F, sizeof(*F));
309   f->name = F->name;
310   memcpy(f->name, name, namelen);
311   F->fd = fd;
312   if (q)
313     F->io_queue = F->user_queue = q;
314   else
315     F->io_queue = fbdir_get_io_queue();
316   f->refill = fbdir_refill;
317   f->spout = fbdir_spout;
318   f->seek = fbdir_seek;
319   f->close = fbdir_close;
320   f->config = fbdir_config;
321   f->can_overwrite_buffer = 2;
322   return f;
323 }
324
325 struct fastbuf *
326 fbdirect_open_try(byte *name, uns mode, struct asio_queue *q)
327 {
328   if (!fbdir_cheat)
329     mode |= O_DIRECT;
330   int fd = sh_open(name, mode, 0666);
331   if (fd < 0)
332     return NULL;
333   struct fastbuf *b = fbdir_open_internal(name, fd, q);
334   if (mode & O_APPEND)
335     fbdir_seek(b, 0, SEEK_END);
336   return b;
337 }
338
339 struct fastbuf *
340 fbdirect_open(byte *name, uns mode, struct asio_queue *q)
341 {
342   struct fastbuf *b = fbdirect_open_try(name, mode, q);
343   if (!b)
344     die("Unable to %s file %s: %m",
345         (mode & O_CREAT) ? "create" : "open", name);
346   return b;
347 }
348
349 struct fastbuf *
350 fbdirect_open_fd(int fd, struct asio_queue *q)
351 {
352   byte x[32];
353
354   sprintf(x, "fd%d", fd);
355   if (!fbdir_cheat && fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_DIRECT) < 0)
356     log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
357   return fbdir_open_internal(x, fd, q);
358 }
359
360 #ifdef TEST
361
362 int main(int argc, char **argv)
363 {
364   struct fastbuf *f, *t;
365
366   log_init(NULL);
367   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
368     die("Hey, whaddya want?");
369   f = (optind < argc) ? fbdirect_open(argv[optind++], O_RDONLY, NULL) : fbdirect_open_fd(0, NULL);
370   t = (optind < argc) ? fbdirect_open(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, NULL) : fbdirect_open_fd(1, NULL);
371
372   bbcopy(f, t, ~0U);
373   ASSERT(btell(f) == btell(t));
374
375 #if 0           // This triggers unaligned write
376   bflush(t);
377   bputc(t, '\n');
378 #endif
379
380   brewind(t);
381   bgetc(t);
382   ASSERT(btell(t) == 1);
383
384   bclose(f);
385   bclose(t);
386   return 0;
387 }
388
389 #endif