2 * Sherlock Library -- Object Buckets
4 * (c) 2001--2004 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
13 #include "lib/bucket.h"
14 #include "lib/fastbuf.h"
26 static struct obuck_header obuck_hdr, obuck_create_hdr;
27 static sh_off_t bucket_find_pos;
28 static struct fastbuf *obuck_write_fb;
30 /*** Configuration ***/
32 byte *obuck_name = "not/configured";
33 static uns obuck_io_buflen = 65536;
34 static int obuck_shake_buflen = 1048576;
35 static uns obuck_shake_security;
36 static uns obuck_slurp_buflen = 65536;
38 static struct cfitem obuck_config[] = {
39 { "Buckets", CT_SECTION, NULL },
40 { "BucketFile", CT_STRING, &obuck_name },
41 { "BufSize", CT_INT, &obuck_io_buflen },
42 { "ShakeBufSize", CT_INT, &obuck_shake_buflen },
43 { "ShakeSecurity", CT_INT, &obuck_shake_security },
44 { "SlurpBufSize", CT_INT, &obuck_slurp_buflen },
45 { NULL, CT_STOP, NULL }
48 static void CONSTRUCTOR obuck_init_config(void)
50 cf_register(obuck_config);
53 /*** Internal operations ***/
56 obuck_broken(char *msg, sh_off_t pos)
58 die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
62 * Unfortunately we cannot use flock() here since it happily permits
63 * locking a shared fd (e.g., after fork()) multiple times. The fcntl
64 * locks are very ugly and they don't support 64-bit offsets, but we
65 * can work around the problem by always locking the first header
70 obuck_do_lock(int type)
75 fl.l_whence = SEEK_SET;
77 fl.l_len = sizeof(struct obuck_header);
78 if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
79 die("fcntl lock: %m");
85 obuck_do_lock(F_RDLCK);
89 obuck_lock_write(void)
91 obuck_do_lock(F_WRLCK);
97 obuck_do_lock(F_UNLCK);
100 /*** FastIO emulation ***/
108 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
110 static int obuck_fb_count;
113 obuck_fb_close(struct fastbuf *f)
119 /* We need to use pread/pwrite since we work on fd's shared between processes */
122 obuck_fb_refill(struct fastbuf *f)
124 uns remains, bufsize, size, datasize;
126 remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
127 bufsize = f->bufend - f->buffer;
130 sh_off_t start = FB_BUCKET(f)->start_pos;
131 sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
132 if (remains <= bufsize)
135 size = start + obuck_bucket_size(FB_BUCKET(f)->bucket_size) - pos;
138 size = datasize = bufsize;
139 int l = sh_pread(obuck_fd, f->buffer, size, pos);
141 die("Error reading bucket: %m");
142 if ((unsigned) l != size)
143 obuck_broken("Short read", FB_BUCKET(f)->start_pos);
145 f->bstop = f->buffer + datasize;
149 if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
150 obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
156 obuck_fb_spout(struct fastbuf *f)
158 int l = f->bptr - f->buffer;
163 int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
165 die("Error writing bucket: %m");
173 /*** Exported functions ***/
176 obuck_init(int writeable)
180 obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
182 die("Unable to open bucket file %s: %m", obuck_name);
184 size = sh_seek(obuck_fd, 0, SEEK_END);
187 /* If the bucket pool is not empty, check consistency of its end */
189 if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
190 check != OBUCK_TRAILER)
191 obuck_broken("Missing trailer of last object", size - 4);
201 log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
203 log(L_ERROR, "Bug: Forgot to close bucket write stream");
210 bflush(obuck_write_fb);
217 bucket_find_pos = obuck_get_pos(oid);
218 if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
219 obuck_broken("Short header read", bucket_find_pos);
220 if (obuck_hdr.magic != OBUCK_MAGIC)
221 obuck_broken("Missing magic number", bucket_find_pos);
222 if (obuck_hdr.oid == OBUCK_OID_DELETED)
223 obuck_broken("Access to deleted bucket", bucket_find_pos);
224 if (obuck_hdr.oid != oid)
225 obuck_broken("Invalid backlink", bucket_find_pos);
229 obuck_find_by_oid(struct obuck_header *hdrp)
231 oid_t oid = hdrp->oid;
233 ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
237 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
241 obuck_find_first(struct obuck_header *hdrp, int full)
245 return obuck_find_next(hdrp, full);
249 obuck_find_next(struct obuck_header *hdrp, int full)
256 bucket_find_pos += obuck_bucket_size(obuck_hdr.length);
258 c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
262 if (c != sizeof(obuck_hdr))
263 obuck_broken("Short header read", bucket_find_pos);
264 if (obuck_hdr.magic != OBUCK_MAGIC)
265 obuck_broken("Missing magic number", bucket_find_pos);
266 if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
268 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
278 uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
279 uns real_buflen = official_buflen + OBUCK_ALIGN;
281 b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
282 b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
283 b->bufend = b->buffer + official_buflen;
284 b->name = "bucket-read";
286 b->refill = obuck_fb_refill;
289 b->close = obuck_fb_close;
291 FB_BUCKET(b)->start_pos = bucket_find_pos;
292 FB_BUCKET(b)->bucket_size = obuck_hdr.length;
298 obuck_predict_last_oid(void)
300 sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
301 return (oid_t)(size >> OBUCK_SHIFT);
305 obuck_create(u32 type)
307 ASSERT(!obuck_write_fb);
310 sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
311 if (start & (OBUCK_ALIGN - 1))
312 obuck_broken("Misaligned file", start);
313 obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
314 obuck_create_hdr.oid = start >> OBUCK_SHIFT;
315 obuck_create_hdr.length = 0;
316 obuck_create_hdr.type = type;
318 struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
320 b->buffer = FB_BUCKET(b)->buffer;
321 b->bptr = b->bstop = b->buffer;
322 b->bufend = b->buffer + obuck_io_buflen;
323 b->pos = -(int)sizeof(obuck_create_hdr);
324 b->name = "bucket-write";
326 b->spout = obuck_fb_spout;
330 FB_BUCKET(b)->start_pos = start;
331 FB_BUCKET(b)->bucket_size = 0;
332 bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
338 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
340 ASSERT(b == obuck_write_fb);
341 obuck_write_fb = NULL;
343 obuck_create_hdr.magic = OBUCK_MAGIC;
344 obuck_create_hdr.length = btell(b);
345 int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
348 bputl(b, OBUCK_TRAILER);
350 ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
351 if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
352 die("Bucket header update failed: %m");
354 memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
359 obuck_delete(oid_t oid)
363 obuck_hdr.oid = OBUCK_OID_DELETED;
364 sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
368 /*** Fast reading of the whole pool ***/
370 static struct fastbuf *obuck_rpf;
371 static uns slurp_remains;
372 static sh_off_t slurp_start, slurp_current;
375 obuck_slurp_refill(struct fastbuf *f)
381 l = bdirect_read_prepare(obuck_rpf, &f->buffer);
383 obuck_broken("Incomplete object", slurp_start);
384 l = MIN(l, slurp_remains);
385 bdirect_read_commit(obuck_rpf, f->buffer + l);
388 f->bufend = f->bstop = f->buffer + l;
393 obuck_slurp_pool(struct obuck_header *hdrp)
395 static struct fastbuf limiter;
403 obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
407 bsetpos(obuck_rpf, slurp_current - 4);
408 if (bgetl(obuck_rpf) != OBUCK_TRAILER)
409 obuck_broken("Missing trailer", slurp_start);
411 slurp_start = btell(obuck_rpf);
412 l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
420 if (l != sizeof(struct obuck_header))
421 obuck_broken("Short header read", slurp_start);
422 if (hdrp->magic != OBUCK_MAGIC)
423 obuck_broken("Missing magic number", slurp_start);
424 slurp_current = slurp_start + obuck_bucket_size(hdrp->length);
426 while (hdrp->oid == OBUCK_OID_DELETED);
427 if (obuck_get_pos(hdrp->oid) != slurp_start)
428 obuck_broken("Invalid backlink", slurp_start);
429 slurp_remains = hdrp->length;
430 limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
431 limiter.name = "Bucket";
433 limiter.refill = obuck_slurp_refill;
440 shake_write(void *addr, int len, sh_off_t pos)
442 int l = sh_pwrite(obuck_fd, addr, len, pos);
446 die("obuck_shakedown write error: %m");
448 die("obuck_shakedown write error: disk full");
455 if (obuck_shake_security > 1)
460 shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size)
462 struct obuck_header *bhdr;
467 /* First of all, the "normal" part -- everything that will be written in this pass */
468 DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size);
469 while (boff < norm_size)
471 /* This needn't be optimized for speed. */
472 bhdr = (struct obuck_header *) (norm_buf + boff);
473 ASSERT(bhdr->magic == OBUCK_MAGIC);
474 l = obuck_bucket_size(bhdr->length);
476 bhdr->oid = bpos >> OBUCK_SHIFT;
477 shake_write(bhdr, l, bpos);
483 /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */
486 DBG("Backing up fragment of size %x and %x more", frag_size, more_size);
488 /* First the part we already have in the buffer */
489 bhdr = (struct obuck_header *) fragment;
490 ASSERT(bhdr->magic == OBUCK_MAGIC);
492 bhdr->oid = bpos >> OBUCK_SHIFT;
493 shake_write(bhdr, frag_size, bpos);
497 /* And then the rest, using a small 64K buffer */
498 byte *auxbuf = alloca(65536);
500 while (l < more_size)
502 int j = MIN(more_size-l, 65536);
503 if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j)
504 die("obuck_shakedown read error: %m");
505 shake_write(auxbuf, j, bpos);
513 shake_erase(sh_off_t start, sh_off_t end)
516 die("shake_erase called with negative length, that's a bug");
517 ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1)));
520 u32 check = OBUCK_TRAILER;
521 obuck_hdr.magic = OBUCK_MAGIC;
522 obuck_hdr.oid = OBUCK_OID_DELETED;
523 uns len = MIN(0x40000000, end-start);
524 obuck_hdr.length = len - sizeof(obuck_hdr) - 4;
525 DBG("Erasing %08x bytes at %Lx", len, (long long) start);
526 shake_write(&obuck_hdr, sizeof(obuck_hdr), start);
528 shake_write(&check, 4, start-4);
533 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
535 byte *buf; /* Shakedown buffer and its size */
536 int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN);
537 byte *msg; /* Error message we will print */
538 sh_off_t rstart, wstart; /* Original and new position of buffer start */
539 sh_off_t r_bucket_start, w_bucket_start; /* Original and new position of the current bucket */
540 int roff, woff; /* Orig/new position of the current bucket relative to buffer start */
541 int rsize; /* Number of original bytes in the buffer */
542 int l; /* Raw size of the current bucket */
543 int changed = 0; /* "Something has been altered" flag */
544 int wrote_anything = 0; /* We already did a write to the bucket file */
545 struct obuck_header *rhdr, *whdr; /* Original and new address of header of the current bucket */
546 sh_off_t r_file_size; /* Original size of the bucket file */
547 int more; /* How much does the last bucket overlap the buffer */
549 buf = xmalloc(buflen);
551 roff = woff = rsize = 0;
553 /* We need to be the only accessor, all the object ID's are becoming invalid */
555 r_file_size = sh_seek(obuck_fd, 0, SEEK_END);
556 ASSERT(!(r_file_size & (OBUCK_ALIGN - 1)));
557 if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen)
558 die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work.");
560 DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size);
564 r_bucket_start = rstart + roff;
565 w_bucket_start = wstart + woff;
566 rhdr = (struct obuck_header *)(buf + roff);
567 whdr = (struct obuck_header *)(buf + woff);
573 if (rhdr->magic != OBUCK_MAGIC ||
574 rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
576 msg = "header mismatch";
579 l = obuck_bucket_size(rhdr->length);
582 if (rhdr->oid != OBUCK_OID_DELETED)
584 msg = "bucket longer than ShakeBufSize";
587 /* Empty buckets are allowed to be large, but we need to handle them extra */
588 DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l);
593 if (rsize - roff < l)
595 more = l - (rsize - roff);
598 if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER)
600 msg = "missing trailer";
604 if (rhdr->oid != OBUCK_OID_DELETED)
606 int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
612 /* Changed! Reconstruct the trailer. */
613 lnew = obuck_bucket_size(rhdr->length);
615 PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER);
618 whdr = (struct obuck_header *)(buf+woff);
620 memmove(whdr, rhdr, lnew);
621 whdr->oid = w_bucket_start >> OBUCK_SHIFT;
629 kibitz(rhdr, OBUCK_OID_DELETED, NULL);
638 /* Write the new contents of the bucket file */
641 if (obuck_shake_security)
643 /* But first write a backup at the end of the file to ensure nothing can be lost. */
644 shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more);
651 DBG("Write %Lx %x", wstart, woff);
652 shake_write(buf, woff, wstart);
657 ASSERT(wstart == rstart);
659 /* In any case, update the write position */
663 /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */
667 memmove(buf, buf+roff, rsize-roff);
673 /* And refill the buffer */
674 r_bucket_start = rstart+rsize; /* Also needed for error messages */
675 l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start);
676 DBG("Read %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize);
678 die("obuck_shakedown read error: %m");
683 msg = "unexpected EOF";
686 if (l & (OBUCK_ALIGN-1))
688 msg = "garbage at the end of file";
695 DBG("Finished at position %Lx", (long long) wstart);
696 sh_ftruncate(obuck_fd, wstart);
704 log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris",
705 msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
707 * We can attempt to clean up the bucket file by erasing everything between the last
708 * byte written and the next byte to be read. If the secure mode is switched on, we can
709 * guarantee that no data are lost, only some might be duplicated.
711 shake_erase(wstart, rstart);
712 die("Fatal error during object pool shakedown");
722 #define LEN(i) ((259309*(i))%MAXLEN)
724 static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck)
729 int main(int argc, char **argv)
732 unsigned int i, j, cnt;
733 struct obuck_header h;
737 if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
740 fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
746 for(j=0; j<COUNT; j++)
748 b = obuck_create(BUCKET_TYPE_PLAIN);
749 for(i=0; i<LEN(j); i++)
750 bputc(b, (i+j) % 256);
751 obuck_create_end(b, &h);
752 printf("Writing %08x %d\n", h.oid, h.length);
755 for(j=0; j<COUNT; j++)
756 if (j % 100 < KILLPERC)
758 printf("Deleting %08x\n", ids[j]);
759 obuck_delete(ids[j]);
762 for(j=0; j<COUNT; j++)
763 if (j % 100 >= KILLPERC)
767 obuck_find_by_oid(&h);
769 printf("Reading %08x %d\n", h.oid, h.length);
770 if (h.length != LEN(j))
771 die("Invalid length");
772 for(i=0; i<h.length; i++)
773 if ((unsigned) bgetc(b) != (i+j) % 256)
774 die("Contents mismatch");
779 obuck_shakedown(test_kibitz);
780 if (obuck_find_first(&h, 0))
783 printf("<<< %08x\t%d\n", h.oid, h.length);
786 while (obuck_find_next(&h, 0));
788 die("Walk mismatch");