2 * Sherlock Library -- Object Buckets
4 * (c) 2001--2004 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
13 #include "lib/bucket.h"
14 #include "lib/fastbuf.h"
26 static struct obuck_header obuck_hdr, obuck_create_hdr;
27 static sh_off_t bucket_find_pos;
28 static struct fastbuf *obuck_write_fb;
30 /*** Configuration ***/
32 byte *obuck_name = "not/configured";
33 static uns obuck_io_buflen = 65536;
34 static int obuck_shake_buflen = 1048576;
35 static uns obuck_shake_security;
36 static uns obuck_slurp_buflen = 65536;
38 static struct cfitem obuck_config[] = {
39 { "Buckets", CT_SECTION, NULL },
40 { "BucketFile", CT_STRING, &obuck_name },
41 { "BufSize", CT_INT, &obuck_io_buflen },
42 { "ShakeBufSize", CT_INT, &obuck_shake_buflen },
43 { "ShakeSecurity", CT_INT, &obuck_shake_security },
44 { "SlurpBufSize", CT_INT, &obuck_slurp_buflen },
45 { NULL, CT_STOP, NULL }
48 static void CONSTRUCTOR obuck_init_config(void)
50 cf_register(obuck_config);
53 /*** Internal operations ***/
56 obuck_broken(char *msg, sh_off_t pos)
58 die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
62 * Unfortunately we cannot use flock() here since it happily permits
63 * locking a shared fd (e.g., after fork()) multiple times. The fcntl
64 * locks are very ugly and they don't support 64-bit offsets, but we
65 * can work around the problem by always locking the first header
70 obuck_do_lock(int type)
75 fl.l_whence = SEEK_SET;
77 fl.l_len = sizeof(struct obuck_header);
78 if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
79 die("fcntl lock: %m");
85 obuck_do_lock(F_RDLCK);
89 obuck_lock_write(void)
91 obuck_do_lock(F_WRLCK);
97 obuck_do_lock(F_UNLCK);
100 /*** FastIO emulation ***/
108 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
110 static int obuck_fb_count;
113 obuck_fb_close(struct fastbuf *f)
119 /* We need to use pread/pwrite since we work on fd's shared between processes */
122 obuck_fb_refill(struct fastbuf *f)
124 uns remains, bufsize, size, datasize;
126 remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
127 bufsize = f->bufend - f->buffer;
130 sh_off_t start = FB_BUCKET(f)->start_pos;
131 sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
132 if (remains <= bufsize)
135 size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
138 size = datasize = bufsize;
139 int l = sh_pread(obuck_fd, f->buffer, size, pos);
141 die("Error reading bucket: %m");
142 if ((unsigned) l != size)
143 obuck_broken("Short read", FB_BUCKET(f)->start_pos);
145 f->bstop = f->buffer + datasize;
149 if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
150 obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
156 obuck_fb_spout(struct fastbuf *f)
158 int l = f->bptr - f->buffer;
163 int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
165 die("Error writing bucket: %m");
173 /*** Exported functions ***/
176 obuck_init(int writeable)
180 obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
182 die("Unable to open bucket file %s: %m", obuck_name);
184 size = sh_seek(obuck_fd, 0, SEEK_END);
187 /* If the bucket pool is not empty, check consistency of its end */
189 if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
190 check != OBUCK_TRAILER)
191 obuck_broken("Missing trailer of last object", size - 4);
201 log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
203 log(L_ERROR, "Bug: Forgot to close bucket write stream");
210 bflush(obuck_write_fb);
217 bucket_find_pos = obuck_get_pos(oid);
218 if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
219 obuck_broken("Short header read", bucket_find_pos);
220 if (obuck_hdr.magic != OBUCK_MAGIC)
221 obuck_broken("Missing magic number", bucket_find_pos);
222 if (obuck_hdr.oid == OBUCK_OID_DELETED)
223 obuck_broken("Access to deleted bucket", bucket_find_pos);
224 if (obuck_hdr.oid != oid)
225 obuck_broken("Invalid backlink", bucket_find_pos);
229 obuck_find_by_oid(struct obuck_header *hdrp)
231 oid_t oid = hdrp->oid;
233 ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
237 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
241 obuck_find_first(struct obuck_header *hdrp, int full)
245 return obuck_find_next(hdrp, full);
249 obuck_find_next(struct obuck_header *hdrp, int full)
256 bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
257 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
259 c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
263 if (c != sizeof(obuck_hdr))
264 obuck_broken("Short header read", bucket_find_pos);
265 if (obuck_hdr.magic != OBUCK_MAGIC)
266 obuck_broken("Missing magic number", bucket_find_pos);
267 if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
269 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
279 uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
280 uns real_buflen = official_buflen + OBUCK_ALIGN;
282 b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
283 b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
284 b->bufend = b->buffer + official_buflen;
285 b->name = "bucket-read";
287 b->refill = obuck_fb_refill;
290 b->close = obuck_fb_close;
292 FB_BUCKET(b)->start_pos = bucket_find_pos;
293 FB_BUCKET(b)->bucket_size = obuck_hdr.length;
299 obuck_predict_last_oid(void)
302 sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
303 oid_t ss = size >> OBUCK_SHIFT;
309 obuck_create(u32 type)
311 ASSERT(!obuck_write_fb);
314 sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
315 if (start & (OBUCK_ALIGN - 1))
316 obuck_broken("Misaligned file", start);
317 obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
318 obuck_create_hdr.oid = start >> OBUCK_SHIFT;
319 obuck_create_hdr.length = 0;
320 obuck_create_hdr.type = type;
322 struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
324 b->buffer = FB_BUCKET(b)->buffer;
325 b->bptr = b->bstop = b->buffer;
326 b->bufend = b->buffer + obuck_io_buflen;
327 b->pos = -(int)sizeof(obuck_create_hdr);
328 b->name = "bucket-write";
330 b->spout = obuck_fb_spout;
334 FB_BUCKET(b)->start_pos = start;
335 FB_BUCKET(b)->bucket_size = 0;
336 bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
342 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
344 ASSERT(b == obuck_write_fb);
345 obuck_write_fb = NULL;
347 obuck_create_hdr.magic = OBUCK_MAGIC;
348 obuck_create_hdr.length = btell(b);
349 int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
352 bputl(b, OBUCK_TRAILER);
354 ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
355 if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
356 die("Bucket header update failed: %m");
358 memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
363 obuck_delete(oid_t oid)
367 obuck_hdr.oid = OBUCK_OID_DELETED;
368 sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
372 /*** Fast reading of the whole pool ***/
374 static struct fastbuf *obuck_rpf;
375 static uns slurp_remains;
376 static sh_off_t slurp_start, slurp_current;
379 obuck_slurp_refill(struct fastbuf *f)
385 l = bdirect_read_prepare(obuck_rpf, &f->buffer);
387 obuck_broken("Incomplete object", slurp_start);
388 l = MIN(l, slurp_remains);
389 bdirect_read_commit(obuck_rpf, f->buffer + l);
392 f->bufend = f->bstop = f->buffer + l;
397 obuck_slurp_pool(struct obuck_header *hdrp)
399 static struct fastbuf limiter;
407 obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
411 bsetpos(obuck_rpf, slurp_current - 4);
412 if (bgetl(obuck_rpf) != OBUCK_TRAILER)
413 obuck_broken("Missing trailer", slurp_start);
415 slurp_start = btell(obuck_rpf);
416 l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
424 if (l != sizeof(struct obuck_header))
425 obuck_broken("Short header read", slurp_start);
426 if (hdrp->magic != OBUCK_MAGIC)
427 obuck_broken("Missing magic number", slurp_start);
428 slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
429 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
431 while (hdrp->oid == OBUCK_OID_DELETED);
432 if (obuck_get_pos(hdrp->oid) != slurp_start)
433 obuck_broken("Invalid backlink", slurp_start);
434 slurp_remains = hdrp->length;
435 limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
436 limiter.name = "Bucket";
438 limiter.refill = obuck_slurp_refill;
445 shake_write(void *addr, int len, sh_off_t pos)
447 int l = sh_pwrite(obuck_fd, addr, len, pos);
451 die("obuck_shakedown write error: %m");
453 die("obuck_shakedown write error: disk full");
460 if (obuck_shake_security > 1)
465 shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size)
467 struct obuck_header *bhdr;
472 /* First of all, the "normal" part -- everything that will be written in this pass */
473 DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size);
474 while (boff < norm_size)
476 /* This needn't be optimized for speed. */
477 bhdr = (struct obuck_header *) (norm_buf + boff);
478 ASSERT(bhdr->magic == OBUCK_MAGIC);
479 l = ALIGN(sizeof(struct obuck_header) + bhdr->length + 4, OBUCK_ALIGN);
481 bhdr->oid = bpos >> OBUCK_SHIFT;
482 shake_write(bhdr, l, bpos);
488 /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */
491 DBG("Backing up fragment of size %x and %x more", frag_size, more_size);
493 /* First the part we already have in the buffer */
494 bhdr = (struct obuck_header *) fragment;
495 ASSERT(bhdr->magic == OBUCK_MAGIC);
497 bhdr->oid = bpos >> OBUCK_SHIFT;
498 shake_write(bhdr, frag_size, bpos);
502 /* And then the rest, using a small 64K buffer */
503 byte *auxbuf = alloca(65536);
505 while (l < more_size)
507 int j = MIN(more_size-l, 65536);
508 if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j)
509 die("obuck_shakedown read error: %m");
510 shake_write(auxbuf, j, bpos);
518 shake_erase(sh_off_t start, sh_off_t end)
521 die("shake_erase called with negative length, that's a bug");
522 ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1)));
525 u32 check = OBUCK_TRAILER;
526 obuck_hdr.magic = OBUCK_MAGIC;
527 obuck_hdr.oid = OBUCK_OID_DELETED;
528 uns len = MIN(0x40000000, end-start);
529 obuck_hdr.length = len - sizeof(obuck_hdr) - 4;
530 DBG("Erasing %08x bytes at %Lx", len, (long long) start);
531 shake_write(&obuck_hdr, sizeof(obuck_hdr), start);
533 shake_write(&check, 4, start-4);
538 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
540 byte *buf; /* Shakedown buffer and its size */
541 int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN);
542 byte *msg; /* Error message we will print */
543 sh_off_t rstart, wstart; /* Original and new position of buffer start */
544 sh_off_t r_bucket_start, w_bucket_start; /* Original and new position of the current bucket */
545 int roff, woff; /* Orig/new position of the current bucket relative to buffer start */
546 int rsize; /* Number of original bytes in the buffer */
547 int l; /* Raw size of the current bucket */
548 int changed = 0; /* "Something has been altered" flag */
549 int wrote_anything = 0; /* We already did a write to the bucket file */
550 struct obuck_header *rhdr, *whdr; /* Original and new address of header of the current bucket */
551 sh_off_t r_file_size; /* Original size of the bucket file */
552 int more; /* How much does the last bucket overlap the buffer */
554 buf = xmalloc(buflen);
556 roff = woff = rsize = 0;
558 /* We need to be the only accessor, all the object ID's are becoming invalid */
560 r_file_size = sh_seek(obuck_fd, 0, SEEK_END);
561 ASSERT(!(r_file_size & (OBUCK_ALIGN - 1)));
562 if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen)
563 die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work.");
565 DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size);
569 r_bucket_start = rstart + roff;
570 w_bucket_start = wstart + woff;
571 rhdr = (struct obuck_header *)(buf + roff);
572 whdr = (struct obuck_header *)(buf + woff);
578 if (rhdr->magic != OBUCK_MAGIC ||
579 rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
581 msg = "header mismatch";
584 l = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
587 if (rhdr->oid != OBUCK_OID_DELETED)
589 msg = "bucket longer than ShakeBufSize";
592 /* Empty buckets are allowed to be large, but we need to handle them extra */
593 DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l);
598 if (rsize - roff < l)
600 more = l - (rsize - roff);
603 if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER)
605 msg = "missing trailer";
609 if (rhdr->oid != OBUCK_OID_DELETED)
611 int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
616 /* Changed! Reconstruct the trailer. */
617 int l2 = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
619 PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER);
623 whdr = (struct obuck_header *)(buf+woff);
625 memmove(whdr, rhdr, l);
626 whdr->oid = w_bucket_start >> OBUCK_SHIFT;
634 kibitz(rhdr, OBUCK_OID_DELETED, NULL);
643 /* Write the new contents of the bucket file */
646 if (obuck_shake_security)
648 /* But first write a backup at the end of the file to ensure nothing can be lost. */
649 shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more);
656 DBG("Write %Lx %x", wstart, woff);
657 shake_write(buf, woff, wstart);
662 ASSERT(wstart == rstart);
664 /* In any case, update the write position */
668 /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */
672 memmove(buf, buf+roff, rsize-roff);
678 /* And refill the buffer */
679 r_bucket_start = rstart+rsize; /* Also needed for error messages */
680 l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start);
681 DBG("Read %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize);
683 die("obuck_shakedown read error: %m");
688 msg = "unexpected EOF";
691 if (l & (OBUCK_ALIGN-1))
693 msg = "garbage at the end of file";
700 DBG("Finished at position %Lx", (long long) wstart);
701 sh_ftruncate(obuck_fd, wstart);
709 log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris",
710 msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
712 * We can attempt to clean up the bucket file by erasing everything between the last
713 * byte written and the next byte to be read. If the secure mode is switched on, we can
714 * guarantee that no data are lost, only some might be duplicated.
716 shake_erase(wstart, rstart);
717 die("Fatal error during object pool shakedown");
727 #define LEN(i) ((259309*(i))%MAXLEN)
729 static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck)
734 int main(int argc, char **argv)
737 unsigned int i, j, cnt;
738 struct obuck_header h;
742 if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
745 fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
751 for(j=0; j<COUNT; j++)
753 b = obuck_create(BUCKET_TYPE_PLAIN);
754 for(i=0; i<LEN(j); i++)
755 bputc(b, (i+j) % 256);
756 obuck_create_end(b, &h);
757 printf("Writing %08x %d\n", h.oid, h.length);
760 for(j=0; j<COUNT; j++)
761 if (j % 100 < KILLPERC)
763 printf("Deleting %08x\n", ids[j]);
764 obuck_delete(ids[j]);
767 for(j=0; j<COUNT; j++)
768 if (j % 100 >= KILLPERC)
772 obuck_find_by_oid(&h);
774 printf("Reading %08x %d\n", h.oid, h.length);
775 if (h.length != LEN(j))
776 die("Invalid length");
777 for(i=0; i<h.length; i++)
778 if ((unsigned) bgetc(b) != (i+j) % 256)
779 die("Contents mismatch");
784 obuck_shakedown(test_kibitz);
785 if (obuck_find_first(&h, 0))
788 printf("<<< %08x\t%d\n", h.oid, h.length);
791 while (obuck_find_next(&h, 0));
793 die("Walk mismatch");