]> mj.ucw.cz Git - libucw.git/blob - ucw/fb-direct.c
The big move. Step #1: Move whole lib/ to ucw/.
[libucw.git] / ucw / fb-direct.c
1 /*
2  *      UCW Library -- Fast Buffered I/O on O_DIRECT Files
3  *
4  *      (c) 2006--2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *      This is a fastbuf backend for fast streaming I/O using O_DIRECT and
12  *      the asynchronous I/O module. It's designed for use on large files
13  *      which don't fit in the disk cache.
14  *
15  *      CAVEATS:
16  *
17  *        - All operations with a single fbdirect handle must be done
18  *          within a single thread, unless you provide a custom I/O queue
19  *          and take care of locking.
20  *
21  *      FIXME: what if the OS doesn't support O_DIRECT?
22  *      FIXME: unaligned seeks and partial writes?
23  *      FIXME: append to unaligned file
24  */
25
26 #undef LOCAL_DEBUG
27
28 #include "ucw/lib.h"
29 #include "ucw/fastbuf.h"
30 #include "ucw/lfs.h"
31 #include "ucw/asio.h"
32 #include "ucw/conf.h"
33 #include "ucw/threads.h"
34
35 #include <string.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38 #include <stdio.h>
39
40 uns fbdir_cheat;
41
42 static struct cf_section fbdir_cf = {
43   CF_ITEMS {
44     CF_UNS("Cheat", &fbdir_cheat),
45     CF_END
46   }
47 };
48
49 #define FBDIR_ALIGN 512
50
51 enum fbdir_mode {                               // Current operating mode
52     M_NULL,
53     M_READ,
54     M_WRITE
55 };
56
57 struct fb_direct {
58   struct fastbuf fb;
59   int fd;                                       // File descriptor
60   int is_temp_file;
61   struct asio_queue *io_queue;                  // I/O queue to use
62   struct asio_queue *user_queue;                // If io_queue was supplied by the user
63   struct asio_request *pending_read;
64   struct asio_request *done_read;
65   struct asio_request *active_buffer;
66   enum fbdir_mode mode;
67   byte name[0];
68 };
69 #define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
70
71 static void CONSTRUCTOR
72 fbdir_global_init(void)
73 {
74   cf_declare_section("FBDirect", &fbdir_cf, 0);
75 }
76
77 static void
78 fbdir_read_sync(struct fb_direct *F)
79 {
80   while (F->pending_read)
81     {
82       struct asio_request *r = asio_wait(F->io_queue);
83       ASSERT(r);
84       struct fb_direct *G = r->user_data;
85       ASSERT(G);
86       ASSERT(G->pending_read == r && !G->done_read);
87       G->pending_read = NULL;
88       G->done_read = r;
89     }
90 }
91
92 static void
93 fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
94 {
95   if (F->mode == mode)
96     return;
97   DBG("FB-DIRECT: Switching mode to %d", mode);
98   switch (F->mode)
99     {
100     case M_NULL:
101       break;
102     case M_READ:
103       fbdir_read_sync(F);                       // Wait for read-ahead requests to finish
104       if (F->done_read)                         // Return read-ahead requests if any
105         {
106           asio_put(F->done_read);
107           F->done_read = NULL;
108         }
109       break;
110     case M_WRITE:
111       asio_sync(F->io_queue);                   // Wait for pending writebacks
112       break;
113     }
114   if (F->active_buffer)
115     {
116       asio_put(F->active_buffer);
117       F->active_buffer = NULL;
118     }
119   F->mode = mode;
120 }
121
122 static void
123 fbdir_submit_read(struct fb_direct *F)
124 {
125   struct asio_request *r = asio_get(F->io_queue);
126   r->fd = F->fd;
127   r->op = ASIO_READ;
128   r->len = F->io_queue->buffer_size;
129   r->user_data = F;
130   asio_submit(r);
131   F->pending_read = r;
132 }
133
134 static int
135 fbdir_refill(struct fastbuf *f)
136 {
137   struct fb_direct *F = FB_DIRECT(f);
138
139   DBG("FB-DIRECT: Refill");
140
141   if (!F->done_read)
142     {
143       if (!F->pending_read)
144         {
145           fbdir_change_mode(F, M_READ);
146           fbdir_submit_read(F);
147         }
148       fbdir_read_sync(F);
149       ASSERT(F->done_read);
150     }
151
152   struct asio_request *r = F->done_read;
153   F->done_read = NULL;
154   if (F->active_buffer)
155     asio_put(F->active_buffer);
156   F->active_buffer = r;
157   if (!r->status)
158     return 0;
159   if (r->status < 0)
160     die("Error reading %s: %s", f->name, strerror(r->returned_errno));
161   f->bptr = f->buffer = r->buffer;
162   f->bstop = f->bufend = f->buffer + r->status;
163   f->pos += r->status;
164
165   fbdir_submit_read(F);                         // Read-ahead the next block
166
167   return r->status;
168 }
169
170 static void
171 fbdir_spout(struct fastbuf *f)
172 {
173   struct fb_direct *F = FB_DIRECT(f);
174   struct asio_request *r;
175
176   DBG("FB-DIRECT: Spout");
177
178   fbdir_change_mode(F, M_WRITE);
179   r = F->active_buffer;
180   if (r && f->bptr > f->bstop)
181     {
182       r->op = ASIO_WRITE_BACK;
183       r->fd = F->fd;
184       r->len = f->bptr - f->bstop;
185       ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
186       f->pos += r->len;
187       if (!fbdir_cheat && r->len % FBDIR_ALIGN)                 // Have to simulate incomplete writes
188         {
189           r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
190           asio_submit(r);
191           asio_sync(F->io_queue);
192           DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
193           if (sh_ftruncate(F->fd, f->pos) < 0)
194             die("Error truncating %s: %m", f->name);
195         }
196       else
197         asio_submit(r);
198       r = NULL;
199     }
200   if (!r)
201     r = asio_get(F->io_queue);
202   f->bstop = f->bptr = f->buffer = r->buffer;
203   f->bufend = f->buffer + F->io_queue->buffer_size;
204   F->active_buffer = r;
205 }
206
207 static int
208 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
209 {
210   DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
211
212   if (whence == SEEK_SET && pos == f->pos)
213     return 1;
214
215   fbdir_change_mode(FB_DIRECT(f), M_NULL);                      // Wait for all async requests to finish
216   sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
217   if (l < 0)
218     return 0;
219   f->pos = l;
220   return 1;
221 }
222
223 static struct asio_queue *
224 fbdir_get_io_queue(uns buffer_size, uns write_back)
225 {
226   struct ucwlib_context *ctx = ucwlib_thread_context();
227   struct asio_queue *q = ctx->io_queue;
228   if (!q)
229     {
230       q = xmalloc_zero(sizeof(struct asio_queue));
231       q->buffer_size = buffer_size;
232       q->max_writebacks = write_back;
233       asio_init_queue(q);
234       ctx->io_queue = q;
235     }
236   q->use_count++;
237   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
238   return q;
239 }
240
241 static void
242 fbdir_put_io_queue(void)
243 {
244   struct ucwlib_context *ctx = ucwlib_thread_context();
245   struct asio_queue *q = ctx->io_queue;
246   ASSERT(q);
247   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
248   if (!--q->use_count)
249     {
250       asio_cleanup_queue(q);
251       xfree(q);
252       ctx->io_queue = NULL;
253     }
254 }
255
256 static void
257 fbdir_close(struct fastbuf *f)
258 {
259   struct fb_direct *F = FB_DIRECT(f);
260
261   DBG("FB-DIRECT: Close");
262
263   fbdir_change_mode(F, M_NULL);
264   if (!F->user_queue)
265     fbdir_put_io_queue();
266
267   bclose_file_helper(f, F->fd, F->is_temp_file);
268   xfree(f);
269 }
270
271 static int
272 fbdir_config(struct fastbuf *f, uns item, int value)
273 {
274   int orig;
275
276   switch (item)
277     {
278     case BCONFIG_IS_TEMP_FILE:
279       orig = FB_DIRECT(f)->is_temp_file;
280       FB_DIRECT(f)->is_temp_file = value;
281       return orig;
282     default:
283       return -1;
284     }
285 }
286
287 struct fastbuf *
288 fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
289 {
290   int namelen = strlen(name) + 1;
291   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
292   struct fastbuf *f = &F->fb;
293
294   DBG("FB-DIRECT: Open");
295   bzero(F, sizeof(*F));
296   f->name = F->name;
297   memcpy(f->name, name, namelen);
298   F->fd = fd;
299   if (q)
300     F->io_queue = F->user_queue = q;
301   else
302     F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
303   f->refill = fbdir_refill;
304   f->spout = fbdir_spout;
305   f->seek = fbdir_seek;
306   f->close = fbdir_close;
307   f->config = fbdir_config;
308   f->can_overwrite_buffer = 2;
309   return f;
310 }
311
312 #ifdef TEST
313
314 #include "ucw/getopt.h"
315
316 int main(int argc, char **argv)
317 {
318   struct fb_params par = { .type = FB_DIRECT };
319   struct fastbuf *f, *t;
320
321   log_init(NULL);
322   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
323     die("Hey, whaddya want?");
324   f = (optind < argc) ? bopen_file(argv[optind++], O_RDONLY, &par) : bopen_fd(0, &par);
325   t = (optind < argc) ? bopen_file(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, &par) : bopen_fd(1, &par);
326
327   bbcopy(f, t, ~0U);
328   ASSERT(btell(f) == btell(t));
329
330 #if 0           // This triggers unaligned write
331   bflush(t);
332   bputc(t, '\n');
333 #endif
334
335   brewind(t);
336   bgetc(t);
337   ASSERT(btell(t) == 1);
338
339   bclose(f);
340   bclose(t);
341   return 0;
342 }
343
344 #endif