]> mj.ucw.cz Git - libucw.git/blob - ucw/fb-direct.c
Logging: Improved log-syslog.
[libucw.git] / ucw / fb-direct.c
1 /*
2  *      UCW Library -- Fast Buffered I/O on O_DIRECT Files
3  *
4  *      (c) 2006--2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *      This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12  *      the asynchronous I/O module. It's designed for use on large files
13  *      which don't fit in the disk cache.
14  *
15  *      CAVEATS:
16  *
17  *        - All operations with a single fbdirect handle must be done
18  *          within a single thread, unless you provide a custom I/O queue
19  *          and take care of locking.
20  *
21  *      FIXME: what if the OS doesn't support O_DIRECT?
22  *      FIXME: unaligned seeks and partial writes?
23  *      FIXME: append to unaligned file
24  */
25
26 #undef LOCAL_DEBUG
27
28 #include "ucw/lib.h"
29 #include "ucw/fastbuf.h"
30 #include "ucw/lfs.h"
31 #include "ucw/asio.h"
32 #include "ucw/conf.h"
33 #include "ucw/threads.h"
34
35 #include <string.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38 #include <stdio.h>
39
40 #define FBDIR_ALIGN 512
41
42 enum fbdir_mode {                               // Current operating mode
43     M_NULL,
44     M_READ,
45     M_WRITE
46 };
47
48 struct fb_direct {
49   struct fastbuf fb;
50   int fd;                                       // File descriptor
51   int is_temp_file;
52   struct asio_queue *io_queue;                  // I/O queue to use
53   struct asio_queue *user_queue;                // If io_queue was supplied by the user
54   struct asio_request *pending_read;
55   struct asio_request *done_read;
56   struct asio_request *active_buffer;
57   enum fbdir_mode mode;
58   byte name[0];
59 };
60 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
61
62 #ifndef TEST
63 uns fbdir_cheat;
64
65 static struct cf_section fbdir_cf = {
66   CF_ITEMS {
67     CF_UNS("Cheat", &fbdir_cheat),
68     CF_END
69   }
70 };
71
72 static void CONSTRUCTOR
73 fbdir_global_init(void)
74 {
75   cf_declare_section("FBDirect", &fbdir_cf, 0);
76 }
77 #endif
78
79 static void
80 fbdir_read_sync(struct fb_direct *F)
81 {
82   while (F->pending_read)
83     {
84       struct asio_request *r = asio_wait(F->io_queue);
85       ASSERT(r);
86       struct fb_direct *G = r->user_data;
87       ASSERT(G);
88       ASSERT(G->pending_read == r && !G->done_read);
89       G->pending_read = NULL;
90       G->done_read = r;
91     }
92 }
93
94 static void
95 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
96 {
97   if (F->mode == mode)
98     return;
99   DBG("FB-DIRECT: Switching mode to %d", mode);
100   switch (F->mode)
101     {
102     case M_NULL:
103       break;
104     case M_READ:
105       fbdir_read_sync(F);                       // Wait for read-ahead requests to finish
106       if (F->done_read)                         // Return read-ahead requests if any
107         {
108           asio_put(F->done_read);
109           F->done_read = NULL;
110         }
111       break;
112     case M_WRITE:
113       asio_sync(F->io_queue);                   // Wait for pending writebacks
114       break;
115     }
116   if (F->active_buffer)
117     {
118       asio_put(F->active_buffer);
119       F->active_buffer = NULL;
120     }
121   F->mode = mode;
122 }
123
124 static void
125 fbdir_submit_read(struct fb_direct *F)
126 {
127   struct asio_request *r = asio_get(F->io_queue);
128   r->fd = F->fd;
129   r->op = ASIO_READ;
130   r->len = F->io_queue->buffer_size;
131   r->user_data = F;
132   asio_submit(r);
133   F->pending_read = r;
134 }
135
136 static int
137 fbdir_refill(struct fastbuf *f)
138 {
139   struct fb_direct *F = FB_DIRECT(f);
140
141   DBG("FB-DIRECT: Refill");
142
143   if (!F->done_read)
144     {
145       if (!F->pending_read)
146         {
147           fbdir_change_mode(F, M_READ);
148           fbdir_submit_read(F);
149         }
150       fbdir_read_sync(F);
151       ASSERT(F->done_read);
152     }
153
154   struct asio_request *r = F->done_read;
155   F->done_read = NULL;
156   if (F->active_buffer)
157     asio_put(F->active_buffer);
158   F->active_buffer = r;
159   if (!r->status)
160     return 0;
161   if (r->status < 0)
162     die("Error reading %s: %s", f->name, strerror(r->returned_errno));
163   f->bptr = f->buffer = r->buffer;
164   f->bstop = f->bufend = f->buffer + r->status;
165   f->pos += r->status;
166
167   fbdir_submit_read(F);                         // Read-ahead the next block
168
169   return r->status;
170 }
171
172 static void
173 fbdir_spout(struct fastbuf *f)
174 {
175   struct fb_direct *F = FB_DIRECT(f);
176   struct asio_request *r;
177
178   DBG("FB-DIRECT: Spout");
179
180   fbdir_change_mode(F, M_WRITE);
181   r = F->active_buffer;
182   if (r && f->bptr > f->bstop)
183     {
184       r->op = ASIO_WRITE_BACK;
185       r->fd = F->fd;
186       r->len = f->bptr - f->bstop;
187       ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
188       f->pos += r->len;
189       if (!fbdir_cheat && r->len % FBDIR_ALIGN)                 // Have to simulate incomplete writes
190         {
191           r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
192           asio_submit(r);
193           asio_sync(F->io_queue);
194           DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
195           if (ucw_ftruncate(F->fd, f->pos) < 0)
196             die("Error truncating %s: %m", f->name);
197         }
198       else
199         asio_submit(r);
200       r = NULL;
201     }
202   if (!r)
203     r = asio_get(F->io_queue);
204   f->bstop = f->bptr = f->buffer = r->buffer;
205   f->bufend = f->buffer + F->io_queue->buffer_size;
206   F->active_buffer = r;
207 }
208
209 static int
210 fbdir_seek(struct fastbuf *f, ucw_off_t pos, int whence)
211 {
212   DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
213
214   if (whence == SEEK_SET && pos == f->pos)
215     return 1;
216
217   fbdir_change_mode(FB_DIRECT(f), M_NULL);                      // Wait for all async requests to finish
218   ucw_off_t l = ucw_seek(FB_DIRECT(f)->fd, pos, whence);
219   if (l < 0)
220     return 0;
221   f->pos = l;
222   return 1;
223 }
224
225 static struct asio_queue *
226 fbdir_get_io_queue(uns buffer_size, uns write_back)
227 {
228   struct ucwlib_context *ctx = ucwlib_thread_context();
229   struct asio_queue *q = ctx->io_queue;
230   if (!q)
231     {
232       q = xmalloc_zero(sizeof(struct asio_queue));
233       q->buffer_size = buffer_size;
234       q->max_writebacks = write_back;
235       asio_init_queue(q);
236       ctx->io_queue = q;
237     }
238   q->use_count++;
239   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
240   return q;
241 }
242
243 static void
244 fbdir_put_io_queue(void)
245 {
246   struct ucwlib_context *ctx = ucwlib_thread_context();
247   struct asio_queue *q = ctx->io_queue;
248   ASSERT(q);
249   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
250   if (!--q->use_count)
251     {
252       asio_cleanup_queue(q);
253       xfree(q);
254       ctx->io_queue = NULL;
255     }
256 }
257
258 static void
259 fbdir_close(struct fastbuf *f)
260 {
261   struct fb_direct *F = FB_DIRECT(f);
262
263   DBG("FB-DIRECT: Close");
264
265   fbdir_change_mode(F, M_NULL);
266   if (!F->user_queue)
267     fbdir_put_io_queue();
268
269   bclose_file_helper(f, F->fd, F->is_temp_file);
270   xfree(f);
271 }
272
273 static int
274 fbdir_config(struct fastbuf *f, uns item, int value)
275 {
276   int orig;
277
278   switch (item)
279     {
280     case BCONFIG_IS_TEMP_FILE:
281       orig = FB_DIRECT(f)->is_temp_file;
282       FB_DIRECT(f)->is_temp_file = value;
283       return orig;
284     default:
285       return -1;
286     }
287 }
288
289 struct fastbuf *
290 fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
291 {
292   int namelen = strlen(name) + 1;
293   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
294   struct fastbuf *f = &F->fb;
295
296   DBG("FB-DIRECT: Open");
297   bzero(F, sizeof(*F));
298   f->name = F->name;
299   memcpy(f->name, name, namelen);
300   F->fd = fd;
301   if (q)
302     F->io_queue = F->user_queue = q;
303   else
304     F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
305   f->refill = fbdir_refill;
306   f->spout = fbdir_spout;
307   f->seek = fbdir_seek;
308   f->close = fbdir_close;
309   f->config = fbdir_config;
310   f->can_overwrite_buffer = 2;
311   return f;
312 }
313
314 #ifdef TEST
315
316 #include "ucw/getopt.h"
317
318 int main(int argc, char **argv)
319 {
320   struct fb_params par = { .type = FB_DIRECT };
321   struct fastbuf *f, *t;
322
323   log_init(NULL);
324   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
325     die("Hey, whaddya want?");
326   f = (optind < argc) ? bopen_file(argv[optind++], O_RDONLY, &par) : bopen_fd(0, &par);
327   t = (optind < argc) ? bopen_file(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, &par) : bopen_fd(1, &par);
328
329   bbcopy(f, t, ~0U);
330   ASSERT(btell(f) == btell(t));
331
332 #if 0           // This triggers unaligned write
333   bflush(t);
334   bputc(t, '\n');
335 #endif
336
337   brewind(t);
338   bgetc(t);
339   ASSERT(btell(t) == 1);
340
341   bclose(f);
342   bclose(t);
343   return 0;
344 }
345
346 #endif