]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
Improved and cleaned up the bucket library. The original "single operation
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001--2003 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/bucket.h"
12 #include "lib/fastbuf.h"
13 #include "lib/lfs.h"
14 #include "lib/conf.h"
15
16 #include <string.h>
17 #include <stdlib.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/file.h>
21
22 static int obuck_fd;
23 static struct obuck_header obuck_hdr, obuck_create_hdr;
24 static sh_off_t bucket_find_pos;
25 static struct fastbuf *obuck_write_fb;
26
27 /*** Configuration ***/
28
29 byte *obuck_name = "not/configured";
30 static uns obuck_io_buflen = 65536;
31 static int obuck_shake_buflen = 1048576;
32 static uns obuck_slurp_buflen = 65536;
33
34 static struct cfitem obuck_config[] = {
35   { "Buckets",          CT_SECTION,     NULL },
36   { "BucketFile",       CT_STRING,      &obuck_name },
37   { "BufSize",          CT_INT,         &obuck_io_buflen },
38   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
39   { "SlurpBufSize",     CT_INT,         &obuck_slurp_buflen },
40   { NULL,               CT_STOP,        NULL }
41 };
42
43 static void CONSTRUCTOR obuck_init_config(void)
44 {
45   cf_register(obuck_config);
46 }
47
48 /*** Internal operations ***/
49
50 static void
51 obuck_broken(char *msg, sh_off_t pos)
52 {
53   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
54 }
55
56 /*
57  *  Unfortunately we cannot use flock() here since it happily permits
58  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
59  *  locks are very ugly and they don't support 64-bit offsets, but we
60  *  can work around the problem by always locking the first header
61  *  in the file.
62  */
63
64 static inline void
65 obuck_do_lock(int type)
66 {
67   struct flock fl;
68
69   fl.l_type = type;
70   fl.l_whence = SEEK_SET;
71   fl.l_start = 0;
72   fl.l_len = sizeof(struct obuck_header);
73   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
74     die("fcntl lock: %m");
75 }
76
77 inline void
78 obuck_lock_read(void)
79 {
80   obuck_do_lock(F_RDLCK);
81 }
82
83 inline void
84 obuck_lock_write(void)
85 {
86   obuck_do_lock(F_WRLCK);
87 }
88
89 inline void
90 obuck_unlock(void)
91 {
92   obuck_do_lock(F_UNLCK);
93 }
94
95 /*** FastIO emulation ***/
96
97 struct fb_bucket {
98   struct fastbuf fb;
99   sh_off_t start_pos;
100   uns bucket_size;
101   byte buffer[0];
102 };
103 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
104
105 static int obuck_fb_count;
106
107 static void
108 obuck_fb_close(struct fastbuf *f)
109 {
110   obuck_fb_count--;
111   xfree(f);
112 }
113
114 /* We need to use pread/pwrite since we work on fd's shared between processes */
115
116 static int
117 obuck_fb_refill(struct fastbuf *f)
118 {
119   uns remains, bufsize, size, datasize;
120
121   remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
122   bufsize = f->bufend - f->buffer;
123   if (!remains)
124     return 0;
125   sh_off_t start = FB_BUCKET(f)->start_pos;
126   sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
127   if (remains <= bufsize)
128     {
129       datasize = remains;
130       size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
131     }
132   else
133     size = datasize = bufsize;
134   int l = sh_pread(obuck_fd, f->buffer, size, pos);
135   if (l < 0)
136     die("Error reading bucket: %m");
137   if ((unsigned) l != size)
138     obuck_broken("Short read", FB_BUCKET(f)->start_pos);
139   f->bptr = f->buffer;
140   f->bstop = f->buffer + datasize;
141   f->pos += datasize;
142   if (datasize < size)
143     {
144       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
145         obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
146     }
147   return datasize;
148 }
149
150 static void
151 obuck_fb_spout(struct fastbuf *f)
152 {
153   int l = f->bptr - f->buffer;
154   char *c = f->buffer;
155
156   while (l)
157     {
158       int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
159       if (z <= 0)
160         die("Error writing bucket: %m");
161       f->pos += z;
162       l -= z;
163       c += z;
164     }
165   f->bptr = f->buffer;
166 }
167
168 /*** Exported functions ***/
169
170 void
171 obuck_init(int writeable)
172 {
173   sh_off_t size;
174
175   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
176   if (obuck_fd < 0)
177     die("Unable to open bucket file %s: %m", obuck_name);
178   obuck_lock_read();
179   size = sh_seek(obuck_fd, 0, SEEK_END);
180   if (size)
181     {
182       /* If the bucket pool is not empty, check consistency of its end */
183       u32 check;
184       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
185           check != OBUCK_TRAILER)
186         obuck_broken("Missing trailer of last object", size - 4);
187     }
188   obuck_unlock();
189 }
190
191 void
192 obuck_cleanup(void)
193 {
194   close(obuck_fd);
195   if (obuck_fb_count)
196     log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
197   if (obuck_write_fb)
198     log(L_ERROR, "Bug: Forgot to close bucket write stream");
199 }
200
201 void
202 obuck_sync(void)
203 {
204   if (obuck_write_fb)
205     bflush(obuck_write_fb);
206   fsync(obuck_fd);
207 }
208
209 static void
210 obuck_get(oid_t oid)
211 {
212   bucket_find_pos = obuck_get_pos(oid);
213   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
214     obuck_broken("Short header read", bucket_find_pos);
215   if (obuck_hdr.magic != OBUCK_MAGIC)
216     obuck_broken("Missing magic number", bucket_find_pos);
217   if (obuck_hdr.oid == OBUCK_OID_DELETED)
218     obuck_broken("Access to deleted bucket", bucket_find_pos);
219   if (obuck_hdr.oid != oid)
220     obuck_broken("Invalid backlink", bucket_find_pos);
221 }
222
223 void
224 obuck_find_by_oid(struct obuck_header *hdrp)
225 {
226   oid_t oid = hdrp->oid;
227
228   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
229   obuck_lock_read();
230   obuck_get(oid);
231   obuck_unlock();
232   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
233 }
234
235 int
236 obuck_find_first(struct obuck_header *hdrp, int full)
237 {
238   bucket_find_pos = 0;
239   obuck_hdr.magic = 0;
240   return obuck_find_next(hdrp, full);
241 }
242
243 int
244 obuck_find_next(struct obuck_header *hdrp, int full)
245 {
246   int c;
247
248   for(;;)
249     {
250       if (obuck_hdr.magic)
251         bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
252                            4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
253       obuck_lock_read();
254       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
255       obuck_unlock();
256       if (!c)
257         return 0;
258       if (c != sizeof(obuck_hdr))
259         obuck_broken("Short header read", bucket_find_pos);
260       if (obuck_hdr.magic != OBUCK_MAGIC)
261         obuck_broken("Missing magic number", bucket_find_pos);
262       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
263         {
264           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
265           return 1;
266         }
267     }
268 }
269
270 struct fastbuf *
271 obuck_fetch(void)
272 {
273   struct fastbuf *b;
274   uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
275   uns real_buflen = official_buflen + OBUCK_ALIGN;
276
277   b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
278   b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
279   b->bufend = b->buffer + official_buflen;
280   b->name = "bucket-read";
281   b->pos = 0;
282   b->refill = obuck_fb_refill;
283   b->spout = NULL;
284   b->seek = NULL;
285   b->close = obuck_fb_close;
286   b->config = NULL;
287   FB_BUCKET(b)->start_pos = bucket_find_pos;
288   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
289   obuck_fb_count++;
290   return b;
291 }
292
293 oid_t
294 obuck_predict_last_oid(void)
295 {
296   /* BEWARE: This is not fork-safe. */
297   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
298   return size >> OBUCK_SHIFT;
299 }
300
301 struct fastbuf *
302 obuck_create(u32 type)
303 {
304   ASSERT(!obuck_write_fb);
305
306   obuck_lock_write();
307   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
308   if (start & (OBUCK_ALIGN - 1))
309     obuck_broken("Misaligned file", start);
310   obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
311   obuck_create_hdr.oid = start >> OBUCK_SHIFT;
312   obuck_create_hdr.length = 0;
313   obuck_create_hdr.type = type;
314
315   struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
316   obuck_write_fb = b;
317   b->buffer = FB_BUCKET(b)->buffer;
318   b->bptr = b->bstop = b->buffer;
319   b->bufend = b->buffer + obuck_io_buflen;
320   b->pos = -(int)sizeof(obuck_create_hdr);
321   b->name = "bucket-write";
322   b->refill = NULL;
323   b->spout = obuck_fb_spout;
324   b->seek = NULL;
325   b->close = NULL;
326   b->config = NULL;
327   FB_BUCKET(b)->start_pos = start;
328   FB_BUCKET(b)->bucket_size = 0;
329   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
330
331   return b;
332 }
333
334 void
335 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
336 {
337   ASSERT(b == obuck_write_fb);
338   obuck_write_fb = NULL;
339
340   obuck_create_hdr.magic = OBUCK_MAGIC;
341   obuck_create_hdr.length = btell(b);
342   int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
343   while (pad--)
344     bputc(b, 0);
345   bputl(b, OBUCK_TRAILER);
346   bflush(b);
347   ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
348   if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
349     die("Bucket header update failed: %m");
350   obuck_unlock();
351   memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
352   xfree(b);
353 }
354
355 void
356 obuck_delete(oid_t oid)
357 {
358   obuck_lock_write();
359   obuck_get(oid);
360   obuck_hdr.oid = OBUCK_OID_DELETED;
361   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
362   obuck_unlock();
363 }
364
365 /*** Fast reading of the whole pool ***/
366
367 static struct fastbuf *obuck_rpf;
368 static uns slurp_remains;
369 static sh_off_t slurp_start, slurp_current;
370
371 static int
372 obuck_slurp_refill(struct fastbuf *f)
373 {
374   uns l;
375
376   if (!slurp_remains)
377     return 0;
378   l = bdirect_read_prepare(obuck_rpf, &f->buffer);
379   if (!l)
380     obuck_broken("Incomplete object", slurp_start);
381   l = MIN(l, slurp_remains);
382   bdirect_read_commit(obuck_rpf, f->buffer + l);
383   slurp_remains -= l;
384   f->bptr = f->buffer;
385   f->bufend = f->bstop = f->buffer + l;
386   return 1;
387 }
388
389 struct fastbuf *
390 obuck_slurp_pool(struct obuck_header *hdrp)
391 {
392   static struct fastbuf limiter;
393   uns l;
394
395   do
396     {
397       if (!obuck_rpf)
398         {
399           obuck_lock_read();
400           obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
401         }
402       else
403         {
404           bsetpos(obuck_rpf, slurp_current - 4);
405           if (bgetl(obuck_rpf) != OBUCK_TRAILER)
406             obuck_broken("Missing trailer", slurp_start);
407         }
408       slurp_start = btell(obuck_rpf);
409       l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
410       if (!l)
411         {
412           bclose(obuck_rpf);
413           obuck_rpf = NULL;
414           obuck_unlock();
415           return NULL;
416         }
417       if (l != sizeof(struct obuck_header))
418         obuck_broken("Short header read", slurp_start);
419       if (hdrp->magic != OBUCK_MAGIC)
420         obuck_broken("Missing magic number", slurp_start);
421       slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
422                        4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
423     }
424   while (hdrp->oid == OBUCK_OID_DELETED);
425   if (obuck_get_pos(hdrp->oid) != slurp_start)
426     obuck_broken("Invalid backlink", slurp_start);
427   slurp_remains = hdrp->length;
428   limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
429   limiter.name = "Bucket";
430   limiter.pos = 0;
431   limiter.refill = obuck_slurp_refill;
432   return &limiter;
433 }
434
435 /*** Shakedown ***/
436
437 void
438 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
439 {
440   byte *rbuf, *wbuf, *msg;
441   sh_off_t rstart, wstart, r_bucket_start, w_bucket_start;
442   int roff, woff, rsize, l;
443   struct obuck_header *rhdr, *whdr;
444
445   rbuf = xmalloc(obuck_shake_buflen);
446   wbuf = xmalloc(obuck_shake_buflen);
447   rstart = wstart = 0;
448   roff = woff = rsize = 0;
449
450   /* We need to be the only accessor, all the object ID's are becoming invalid */
451   obuck_lock_write();
452
453   for(;;)
454     {
455       r_bucket_start = rstart + roff;
456       w_bucket_start = wstart + woff;
457       if (rsize - roff < OBUCK_ALIGN)
458         goto reread;
459       rhdr = (struct obuck_header *)(rbuf + roff);
460       if (rhdr->magic != OBUCK_MAGIC ||
461           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
462         {
463           msg = "header mismatch";
464           goto broken;
465         }
466       l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
467       if (l > obuck_shake_buflen)
468         {
469           if (rhdr->oid != OBUCK_OID_DELETED)
470             {
471               msg = "bucket longer than ShakeBufSize";
472               goto broken;
473             }
474           rstart = r_bucket_start + l;
475           roff = 0;
476           rsize = 0;
477           goto reread;
478         }
479       if (rsize - roff < l)
480         goto reread;
481       if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
482         {
483           msg = "missing trailer";
484           goto broken;
485         }
486       if (rhdr->oid != OBUCK_OID_DELETED)
487         {
488           if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
489             {
490               if (r_bucket_start == w_bucket_start)
491                 {
492                   /* No copying needed now nor ever in the past, hence woff==0 */
493                   wstart += l;
494                 }
495               else
496                 {
497                   if (obuck_shake_buflen - woff < l)
498                     {
499                       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
500                         die("obuck_shakedown write failed: %m");
501                       wstart += woff;
502                       woff = 0;
503                     }
504                   whdr = (struct obuck_header *)(wbuf+woff);
505                   memcpy(whdr, rhdr, l);
506                   whdr->oid = w_bucket_start >> OBUCK_SHIFT;
507                   woff += l;
508                 }
509             }
510         }
511       else
512         kibitz(rhdr, OBUCK_OID_DELETED, NULL);
513       roff += l;
514       continue;
515
516     reread:
517       if (roff)
518         {
519           memmove(rbuf, rbuf+roff, rsize-roff);
520           rsize -= roff;
521           rstart += roff;
522           roff = 0;
523         }
524       l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
525       if (l < 0)
526         die("obuck_shakedown read error: %m");
527       if (!l)
528         {
529           if (!rsize)
530             break;
531           msg = "unexpected EOF";
532           goto broken;
533         }
534       rsize += l;
535     }
536   if (woff)
537     {
538       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
539         die("obuck_shakedown write failed: %m");
540       wstart += woff;
541     }
542   sh_ftruncate(obuck_fd, wstart);
543
544   obuck_unlock();
545   xfree(rbuf);
546   xfree(wbuf);
547   return;
548
549  broken:
550   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
551   if (woff)
552     {
553       sh_pwrite(obuck_fd, wbuf, woff, wstart);
554       wstart += woff;
555     }
556   while (wstart + OBUCK_ALIGN <= r_bucket_start)
557     {
558       u32 check = OBUCK_TRAILER;
559       obuck_hdr.magic = OBUCK_MAGIC;
560       obuck_hdr.oid = OBUCK_OID_DELETED;
561       if (r_bucket_start - wstart < 0x40000000)
562         obuck_hdr.length = r_bucket_start - wstart - sizeof(obuck_hdr) - 4;
563       else
564         obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
565       sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
566       wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4;
567       sh_pwrite(obuck_fd, &check, 4, wstart-4);
568     }
569   die("Fatal error during object pool shakedown");
570 }
571
572 /*** Testing ***/
573
574 #ifdef TEST
575
576 #define COUNT 5000
577 #define MAXLEN 10000
578 #define KILLPERC 13
579 #define LEN(i) ((259309*(i))%MAXLEN)
580
581 int main(int argc, char **argv)
582 {
583   int ids[COUNT];
584   unsigned int i, j, cnt;
585   struct obuck_header h;
586   struct fastbuf *b;
587
588   log_init(NULL);
589   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
590       optind < argc)
591   {
592     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
593     exit(1);
594   }
595
596   unlink(obuck_name);
597   obuck_init(1);
598   for(j=0; j<COUNT; j++)
599     {
600       b = obuck_create(BUCKET_TYPE_PLAIN);
601       for(i=0; i<LEN(j); i++)
602         bputc(b, (i+j) % 256);
603       obuck_create_end(b, &h);
604       printf("Writing %08x %d\n", h.oid, h.length);
605       ids[j] = h.oid;
606     }
607   for(j=0; j<COUNT; j++)
608     if (j % 100 < KILLPERC)
609       {
610         printf("Deleting %08x\n", ids[j]);
611         obuck_delete(ids[j]);
612       }
613   cnt = 0;
614   for(j=0; j<COUNT; j++)
615     if (j % 100 >= KILLPERC)
616       {
617         cnt++;
618         h.oid = ids[j];
619         obuck_find_by_oid(&h);
620         b = obuck_fetch();
621         printf("Reading %08x %d\n", h.oid, h.length);
622         if (h.length != LEN(j))
623           die("Invalid length");
624         for(i=0; i<h.length; i++)
625           if ((unsigned) bgetc(b) != (i+j) % 256)
626             die("Contents mismatch");
627         if (bgetc(b) != EOF)
628           die("EOF mismatch");
629         bclose(b);
630       }
631   if (obuck_find_first(&h, 0))
632     do
633       {
634         printf("<<< %08x\t%d\n", h.oid, h.length);
635         cnt--;
636       }
637     while (obuck_find_next(&h, 0));
638   if (cnt)
639     die("Walk mismatch");
640   obuck_cleanup();
641   return 0;
642 }
643
644 #endif