]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
Increase line buffer sizes to 4096 bytes. Current gatherd really can
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001 Martin Mares <mj@ucw.cz>
5  */
6
7 #include "lib/lib.h"
8 #include "lib/bucket.h"
9 #include "lib/fastbuf.h"
10 #include "lib/lfs.h"
11 #include "lib/conf.h"
12
13 #include <string.h>
14 #include <stdlib.h>
15 #include <fcntl.h>
16 #include <unistd.h>
17 #include <sys/file.h>
18
19 static int obuck_fd;
20 static unsigned int obuck_remains, obuck_check_pad;
21 static struct fastbuf *obuck_fb;
22 static struct obuck_header obuck_hdr;
23 static sh_off_t bucket_start;
24
25 /*** Configuration ***/
26
27 byte *obuck_name = "not/configured";
28 static int obuck_io_buflen = 65536;
29 static int obuck_shake_buflen = 1048576;
30
31 static struct cfitem obuck_config[] = {
32   { "Buckets",          CT_SECTION,     NULL },
33   { "BucketFile",       CT_STRING,      &obuck_name },
34   { "BufSize",          CT_INT,         &obuck_io_buflen },
35   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
36   { NULL,               CT_STOP,        NULL }
37 };
38
39 static void CONSTRUCTOR obuck_init_config(void)
40 {
41   cf_register(obuck_config);
42 }
43
44 /*** Internal operations ***/
45
46 static void
47 obuck_broken(char *msg)
48 {
49   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
50 }
51
52 /*
53  *  Unfortunately we cannot use flock() here since it happily permits
54  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
55  *  locks are very ugly and they don't support 64-bit offsets, but we
56  *  can work around the problem by always locking the first header
57  *  in the file.
58  */
59
60 static inline void
61 obuck_do_lock(int type)
62 {
63   struct flock fl;
64
65   fl.l_type = type;
66   fl.l_whence = SEEK_SET;
67   fl.l_start = 0;
68   fl.l_len = sizeof(struct obuck_header);
69   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
70     die("fcntl lock: %m");
71 }
72
73 static inline void
74 obuck_lock_read(void)
75 {
76   obuck_do_lock(F_RDLCK);
77 }
78
79 static inline void
80 obuck_lock_write(void)
81 {
82   obuck_do_lock(F_WRLCK);
83 }
84
85 static inline void
86 obuck_unlock(void)
87 {
88   obuck_do_lock(F_UNLCK);
89 }
90
91 /*** FastIO emulation ***/
92
93 /* We need to use pread/pwrite since we work on fd's shared between processes */
94
95 static int
96 obuck_fb_refill(struct fastbuf *f)
97 {
98   unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains;
99   unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
100   int l;
101
102   if (!limit)
103     return 0;
104   l = sh_pread(f->fd, f->buffer, size, f->fdpos);
105   if (l < 0)
106     die("Error reading bucket: %m");
107   if ((unsigned) l != size)
108     obuck_broken("Short read");
109   f->bptr = f->buffer;
110   f->bstop = f->buffer + limit;
111   f->pos = f->fdpos;
112   f->fdpos += limit;
113   obuck_remains -= limit;
114   if (!obuck_remains)   /* Should check the trailer */
115     {
116       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
117         obuck_broken("Missing trailer");
118     }
119   return limit;
120 }
121
122 static void
123 obuck_fb_spout(struct fastbuf *f)
124 {
125   int l = f->bptr - f->buffer;
126   char *c = f->buffer;
127
128   while (l)
129     {
130       int z = sh_pwrite(f->fd, c, l, f->fdpos);
131       if (z <= 0)
132         die("Error writing bucket: %m");
133       f->fdpos += z;
134       l -= z;
135       c += z;
136     }
137   f->bptr = f->buffer;
138   f->pos = f->fdpos;
139 }
140
141 static void
142 obuck_fb_close(struct fastbuf *f)
143 {
144   close(f->fd);
145 }
146
147 /*** Exported functions ***/
148
149 void
150 obuck_init(int writeable)
151 {
152   struct fastbuf *b;
153   sh_off_t size;
154
155   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
156   if (obuck_fd < 0)
157     die("Unable to open bucket file %s: %m", obuck_name);
158   obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
159   b->buflen = obuck_io_buflen;
160   b->buffer = (char *)(b+1);
161   b->bptr = b->bstop = b->buffer;
162   b->bufend = b->buffer + obuck_io_buflen;
163   b->name = "bucket";
164   b->fd = obuck_fd;
165   b->refill = obuck_fb_refill;
166   b->spout = obuck_fb_spout;
167   b->close = obuck_fb_close;
168   obuck_lock_read();
169   size = sh_seek(obuck_fd, 0, SEEK_END);
170   if (size)
171     {
172       /* If the bucket pool is not empty, check consistency of its end */
173       u32 check;
174       bucket_start = size - 4;  /* for error reporting */
175       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
176           check != OBUCK_TRAILER)
177         obuck_broken("Missing trailer of last object");
178     }
179   obuck_unlock();
180 }
181
182 void
183 obuck_cleanup(void)
184 {
185   bclose(obuck_fb);
186 }
187
188 void
189 obuck_sync(void)
190 {
191   bflush(obuck_fb);
192   fsync(obuck_fd);
193 }
194
195 static void
196 obuck_get(oid_t oid)
197 {
198   struct fastbuf *b = obuck_fb;
199
200   bucket_start = obuck_get_pos(oid);
201   bflush(b);
202   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
203     obuck_broken("Short header read");
204   b->fdpos = bucket_start + sizeof(obuck_hdr);
205   if (obuck_hdr.magic != OBUCK_MAGIC)
206     obuck_broken("Missing magic number");
207   if (obuck_hdr.oid == OBUCK_OID_DELETED)
208     obuck_broken("Access to deleted bucket");
209   if (obuck_hdr.oid != oid)
210     obuck_broken("Invalid backlink");
211 }
212
213 void
214 obuck_find_by_oid(struct obuck_header *hdrp)
215 {
216   oid_t oid = hdrp->oid;
217
218   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
219   obuck_lock_read();
220   obuck_get(oid);
221   obuck_unlock();
222   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
223 }
224
225 int
226 obuck_find_first(struct obuck_header *hdrp, int full)
227 {
228   bucket_start = 0;
229   obuck_hdr.magic = 0;
230   return obuck_find_next(hdrp, full);
231 }
232
233 int
234 obuck_find_next(struct obuck_header *hdrp, int full)
235 {
236   int c;
237   struct fastbuf *b = obuck_fb;
238
239   for(;;)
240     {
241       if (obuck_hdr.magic)
242         bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
243                         4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
244       bflush(b);
245       obuck_lock_read();
246       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
247       obuck_unlock();
248       if (!c)
249         return 0;
250       if (c != sizeof(obuck_hdr))
251         obuck_broken("Short header read");
252       b->fdpos = bucket_start + sizeof(obuck_hdr);
253       if (obuck_hdr.magic != OBUCK_MAGIC)
254         obuck_broken("Missing magic number");
255       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
256         {
257           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
258           return 1;
259         }
260     }
261 }
262
263 struct fastbuf *
264 obuck_fetch(void)
265 {
266   obuck_remains = obuck_hdr.length;
267   obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
268   return obuck_fb;
269 }
270
271 void
272 obuck_fetch_end(struct fastbuf *b UNUSED)
273 {
274 }
275
276 struct fastbuf *
277 obuck_create(void)
278 {
279   obuck_lock_write();
280   bflush(obuck_fb);
281   bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
282   if (bucket_start & (OBUCK_ALIGN - 1))
283     obuck_broken("Misaligned file");
284   obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
285   obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
286   obuck_hdr.length = obuck_hdr.orig_length = 0;
287   obuck_fb->fdpos = obuck_fb->pos = bucket_start;
288   bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
289   return obuck_fb;
290 }
291
292 void
293 obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
294 {
295   int pad;
296   obuck_hdr.magic = OBUCK_MAGIC;
297   obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr);
298   pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
299   while (pad--)
300     bputc(obuck_fb, 0);
301   bputl(obuck_fb, OBUCK_TRAILER);
302   bflush(obuck_fb);
303   ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1)));
304   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
305   obuck_unlock();
306   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
307 }
308
309 void
310 obuck_delete(oid_t oid)
311 {
312   obuck_lock_write();
313   obuck_get(oid);
314   obuck_hdr.oid = OBUCK_OID_DELETED;
315   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
316   obuck_unlock();
317 }
318
319 /*** Shakedown ***/
320
321 void
322 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
323 {
324   byte *rbuf, *wbuf, *msg;
325   sh_off_t rstart, wstart, w_bucket_start;
326   int roff, woff, rsize, l;
327   struct obuck_header *rhdr, *whdr;
328
329   rbuf = xmalloc(obuck_shake_buflen);
330   wbuf = xmalloc(obuck_shake_buflen);
331   rstart = wstart = 0;
332   roff = woff = rsize = 0;
333
334   /* We need to be the only accessor, all the object ID's are becoming invalid */
335   obuck_lock_write();
336
337   for(;;)
338     {
339       bucket_start = rstart + roff;
340       w_bucket_start = wstart + woff;
341       if (rsize - roff < OBUCK_ALIGN)
342         goto reread;
343       rhdr = (struct obuck_header *)(rbuf + roff);
344       if (rhdr->magic != OBUCK_MAGIC ||
345           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (bucket_start >> OBUCK_SHIFT))
346         {
347           msg = "header mismatch";
348           goto broken;
349         }
350       l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
351       if (rsize - roff < l)
352         goto reread;
353       if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
354         {
355           msg = "missing trailer";
356           goto broken;
357         }
358       if (rhdr->oid != OBUCK_OID_DELETED)
359         {
360           if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
361             {
362               if (bucket_start == w_bucket_start)
363                 {
364                   /* No copying needed now nor ever in the past, hence woff==0 */
365                   wstart += l;
366                 }
367               else
368                 {
369                   if (obuck_shake_buflen - woff < l)
370                     {
371                       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
372                         die("obuck_shakedown write failed: %m");
373                       wstart += woff;
374                       woff = 0;
375                     }
376                   whdr = (struct obuck_header *)(wbuf+woff);
377                   memcpy(whdr, rhdr, l);
378                   whdr->oid = w_bucket_start >> OBUCK_SHIFT;
379                   woff += l;
380                 }
381             }
382         }
383       else
384         kibitz(rhdr, OBUCK_OID_DELETED, NULL);
385       roff += l;
386       continue;
387
388     reread:
389       if (roff)
390         {
391           memmove(rbuf, rbuf+roff, rsize-roff);
392           rsize -= roff;
393           rstart += roff;
394           roff = 0;
395         }
396       l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
397       if (l < 0)
398         die("obuck_shakedown read error: %m");
399       if (!l)
400         {
401           if (!rsize)
402             break;
403           msg = "unexpected EOF";
404           goto broken;
405         }
406       rsize += l;
407     }
408   if (woff)
409     {
410       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
411         die("obuck_shakedown write failed: %m");
412       wstart += woff;
413     }
414   sh_ftruncate(obuck_fd, wstart);
415
416   obuck_unlock();
417   xfree(rbuf);
418   xfree(wbuf);
419   return;
420
421  broken:
422   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld), gathering debris", msg, (long long) bucket_start);
423   if (woff)
424     {
425       sh_pwrite(obuck_fd, wbuf, woff, wstart);
426       wstart += woff;
427     }
428   while (wstart + OBUCK_ALIGN <= bucket_start)
429     {
430       u32 check = OBUCK_TRAILER;
431       obuck_hdr.magic = OBUCK_MAGIC;
432       obuck_hdr.oid = OBUCK_OID_DELETED;
433       if (bucket_start - wstart < 0x40000000)
434         obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4;
435       else
436         obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
437       obuck_hdr.orig_length = obuck_hdr.length;
438       sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
439       wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4;
440       sh_pwrite(obuck_fd, &check, 4, wstart-4);
441     }
442   die("Fatal error during object pool shakedown");
443 }
444
445 /*** Testing ***/
446
447 #ifdef TEST
448
449 #define COUNT 5000
450 #define MAXLEN 10000
451 #define KILLPERC 13
452 #define LEN(i) ((259309*(i))%MAXLEN)
453
454 int main(int argc, char **argv)
455 {
456   int ids[COUNT];
457   unsigned int i, j, cnt;
458   struct obuck_header h;
459   struct fastbuf *b;
460
461   log_init(NULL);
462   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
463       optind < argc)
464   {
465     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
466     exit(1);
467   }
468
469   unlink(obuck_name);
470   obuck_init(1);
471   for(j=0; j<COUNT; j++)
472     {
473       b = obuck_create();
474       for(i=0; i<LEN(j); i++)
475         bputc(b, (i+j) % 256);
476       obuck_create_end(b, &h);
477       printf("Writing %08x %d -> %d\n", h.oid, h.orig_length, h.length);
478       ids[j] = h.oid;
479     }
480   for(j=0; j<COUNT; j++)
481     if (j % 100 < KILLPERC)
482       {
483         printf("Deleting %08x\n", ids[j]);
484         obuck_delete(ids[j]);
485       }
486   cnt = 0;
487   for(j=0; j<COUNT; j++)
488     if (j % 100 >= KILLPERC)
489       {
490         cnt++;
491         h.oid = ids[j];
492         obuck_find_by_oid(&h);
493         b = obuck_fetch();
494         printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length);
495         if (h.orig_length != LEN(j))
496           die("Invalid length");
497         for(i=0; i<h.orig_length; i++)
498           if ((unsigned) bgetc(b) != (i+j) % 256)
499             die("Contents mismatch");
500         if (bgetc(b) != EOF)
501           die("EOF mismatch");
502         obuck_fetch_end(b);
503       }
504   if (obuck_find_first(&h, 0))
505     do
506       {
507         printf("<<< %08x\t%d\n", h.oid, h.orig_length);
508         cnt--;
509       }
510     while (obuck_find_next(&h, 0));
511   if (cnt)
512     die("Walk mismatch");
513   obuck_cleanup();
514   return 0;
515 }
516
517 #endif