]> mj.ucw.cz Git - libucw.git/blob - lib/fb-direct.c
Cleanup of file fastbufs.
[libucw.git] / lib / fb-direct.c
1 /*
2  *      UCW Library -- Fast Buffered I/O on O_DIRECT Files
3  *
4  *      (c) 2006--2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *      This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12  *      the asynchronous I/O module. It's designed for use on large files
13  *      which don't fit in the disk cache.
14  *
15  *      CAVEATS:
16  *
17  *        - All operations with a single fbdirect handle must be done
18  *          within a single thread, unless you provide a custom I/O queue
19  *          and take care of locking.
20  *
21  *      FIXME: what if the OS doesn't support O_DIRECT?
22  *      FIXME: unaligned seeks and partial writes?
23  */
24
25 #undef LOCAL_DEBUG
26
27 #include "lib/lib.h"
28 #include "lib/fastbuf.h"
29 #include "lib/lfs.h"
30 #include "lib/asio.h"
31 #include "lib/conf.h"
32 #include "lib/threads.h"
33
34 #include <string.h>
35 #include <fcntl.h>
36 #include <unistd.h>
37 #include <stdio.h>
38
39 uns fbdir_cheat;
40
41 static struct cf_section fbdir_cf = {
42   CF_ITEMS {
43     CF_UNS("Cheat", &fbdir_cheat),
44     CF_END
45   }
46 };
47
48 #define FBDIR_ALIGN 512
49
50 enum fbdir_mode {                               // Current operating mode
51     M_NULL,
52     M_READ,
53     M_WRITE
54 };
55
56 struct fb_direct {
57   struct fastbuf fb;
58   int fd;                                       // File descriptor
59   int is_temp_file;
60   struct asio_queue *io_queue;                  // I/O queue to use
61   struct asio_queue *user_queue;                // If io_queue was supplied by the user
62   struct asio_request *pending_read;
63   struct asio_request *done_read;
64   struct asio_request *active_buffer;
65   enum fbdir_mode mode;
66   byte name[0];
67 };
68 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
69
70 static void CONSTRUCTOR
71 fbdir_global_init(void)
72 {
73   cf_declare_section("FBDirect", &fbdir_cf, 0);
74 }
75
76 static void
77 fbdir_read_sync(struct fb_direct *F)
78 {
79   while (F->pending_read)
80     {
81       struct asio_request *r = asio_wait(F->io_queue);
82       ASSERT(r);
83       struct fb_direct *G = r->user_data;
84       ASSERT(G);
85       ASSERT(G->pending_read == r && !G->done_read);
86       G->pending_read = NULL;
87       G->done_read = r;
88     }
89 }
90
91 static void
92 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
93 {
94   if (F->mode == mode)
95     return;
96   DBG("FB-DIRECT: Switching mode to %d", mode);
97   switch (F->mode)
98     {
99     case M_NULL:
100       break;
101     case M_READ:
102       fbdir_read_sync(F);                       // Wait for read-ahead requests to finish
103       if (F->done_read)                         // Return read-ahead requests if any
104         {
105           asio_put(F->done_read);
106           F->done_read = NULL;
107         }
108       break;
109     case M_WRITE:
110       asio_sync(F->io_queue);                   // Wait for pending writebacks
111       break;
112     }
113   if (F->active_buffer)
114     {
115       asio_put(F->active_buffer);
116       F->active_buffer = NULL;
117     }
118   F->mode = mode;
119 }
120
121 static void
122 fbdir_submit_read(struct fb_direct *F)
123 {
124   struct asio_request *r = asio_get(F->io_queue);
125   r->fd = F->fd;
126   r->op = ASIO_READ;
127   r->len = F->io_queue->buffer_size;
128   r->user_data = F;
129   asio_submit(r);
130   F->pending_read = r;
131 }
132
133 static int
134 fbdir_refill(struct fastbuf *f)
135 {
136   struct fb_direct *F = FB_DIRECT(f);
137
138   DBG("FB-DIRECT: Refill");
139
140   if (!F->done_read)
141     {
142       if (!F->pending_read)
143         {
144           fbdir_change_mode(F, M_READ);
145           fbdir_submit_read(F);
146         }
147       fbdir_read_sync(F);
148       ASSERT(F->done_read);
149     }
150
151   struct asio_request *r = F->done_read;
152   F->done_read = NULL;
153   if (F->active_buffer)
154     asio_put(F->active_buffer);
155   F->active_buffer = r;
156   if (!r->status)
157     return 0;
158   if (r->status < 0)
159     die("Error reading %s: %s", f->name, strerror(r->returned_errno));
160   f->bptr = f->buffer = r->buffer;
161   f->bstop = f->bufend = f->buffer + r->status;
162   f->pos += r->status;
163
164   fbdir_submit_read(F);                         // Read-ahead the next block
165
166   return r->status;
167 }
168
169 static void
170 fbdir_spout(struct fastbuf *f)
171 {
172   struct fb_direct *F = FB_DIRECT(f);
173   struct asio_request *r;
174
175   DBG("FB-DIRECT: Spout");
176
177   fbdir_change_mode(F, M_WRITE);
178   r = F->active_buffer;
179   if (r && f->bptr > f->bstop)
180     {
181       r->op = ASIO_WRITE_BACK;
182       r->fd = F->fd;
183       r->len = f->bptr - f->bstop;
184       ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
185       f->pos += r->len;
186       if (!fbdir_cheat && r->len % FBDIR_ALIGN)                 // Have to simulate incomplete writes
187         {
188           r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
189           asio_submit(r);
190           asio_sync(F->io_queue);
191           DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
192           if (sh_ftruncate(F->fd, f->pos) < 0)
193             die("Error truncating %s: %m", f->name);
194         }
195       else
196         asio_submit(r);
197       r = NULL;
198     }
199   if (!r)
200     r = asio_get(F->io_queue);
201   f->bstop = f->bptr = f->buffer = r->buffer;
202   f->bufend = f->buffer + F->io_queue->buffer_size;
203   F->active_buffer = r;
204 }
205
206 static int
207 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
208 {
209   DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
210
211   if (whence == SEEK_SET && pos == f->pos)
212     return 1;
213
214   fbdir_change_mode(FB_DIRECT(f), M_NULL);                      // Wait for all async requests to finish
215   sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
216   if (l < 0)
217     return 0;
218   f->pos = l;
219   return 1;
220 }
221
222 static struct asio_queue *
223 fbdir_get_io_queue(uns buffer_size, uns write_back)
224 {
225   struct ucwlib_context *ctx = ucwlib_thread_context();
226   struct asio_queue *q = ctx->io_queue;
227   if (!q)
228     {
229       q = xmalloc_zero(sizeof(struct asio_queue));
230       q->buffer_size = buffer_size;
231       q->max_writebacks = write_back;
232       asio_init_queue(q);
233       ctx->io_queue = q;
234     }
235   q->use_count++;
236   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
237   return q;
238 }
239
240 static void
241 fbdir_put_io_queue(void)
242 {
243   struct ucwlib_context *ctx = ucwlib_thread_context();
244   struct asio_queue *q = ctx->io_queue;
245   ASSERT(q);
246   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
247   if (!--q->use_count)
248     {
249       asio_cleanup_queue(q);
250       xfree(q);
251       ctx->io_queue = NULL;
252     }
253 }
254
255 static void
256 fbdir_close(struct fastbuf *f)
257 {
258   struct fb_direct *F = FB_DIRECT(f);
259
260   DBG("FB-DIRECT: Close");
261
262   fbdir_change_mode(F, M_NULL);
263   if (!F->user_queue)
264     fbdir_put_io_queue();
265
266   bclose_file_helper(f, F->fd, F->is_temp_file);
267   xfree(f);
268 }
269
270 static int
271 fbdir_config(struct fastbuf *f, uns item, int value)
272 {
273   int orig;
274
275   switch (item)
276     {
277     case BCONFIG_IS_TEMP_FILE:
278       orig = FB_DIRECT(f)->is_temp_file;
279       FB_DIRECT(f)->is_temp_file = value;
280       return orig;
281     default:
282       return -1;
283     }
284 }
285
286 struct fastbuf *
287 fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
288 {
289   int namelen = strlen(name) + 1;
290   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
291   struct fastbuf *f = &F->fb;
292
293   DBG("FB-DIRECT: Open");
294   bzero(F, sizeof(*F));
295   f->name = F->name;
296   memcpy(f->name, name, namelen);
297   F->fd = fd;
298   if (q)
299     F->io_queue = F->user_queue = q;
300   else
301     F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
302   f->refill = fbdir_refill;
303   f->spout = fbdir_spout;
304   f->seek = fbdir_seek;
305   f->close = fbdir_close;
306   f->config = fbdir_config;
307   f->can_overwrite_buffer = 2;
308   return f;
309 }
310
311 #ifdef TEST
312
313 #include "lib/getopt.h"
314
315 int main(int argc, char **argv)
316 {
317   struct fb_params par = { .type = FB_DIRECT };
318   struct fastbuf *f, *t;
319
320   log_init(NULL);
321   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
322     die("Hey, whaddya want?");
323   f = (optind < argc) ? bopen_file(argv[optind++], O_RDONLY, &par) : bopen_fd(0, &par);
324   t = (optind < argc) ? bopen_file(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, &par) : bopen_fd(1, &par);
325
326   bbcopy(f, t, ~0U);
327   ASSERT(btell(f) == btell(t));
328
329 #if 0           // This triggers unaligned write
330   bflush(t);
331   bputc(t, '\n');
332 #endif
333
334   brewind(t);
335   bgetc(t);
336   ASSERT(btell(t) == 1);
337
338   bclose(f);
339   bclose(t);
340   return 0;
341 }
342
343 #endif