2 * Sherlock Library -- Object Buckets
4 * (c) 2001--2004 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
13 #include "lib/bucket.h"
14 #include "lib/fastbuf.h"
26 static struct obuck_header obuck_hdr, obuck_create_hdr;
27 static sh_off_t bucket_find_pos;
28 static struct fastbuf *obuck_write_fb;
30 /*** Configuration ***/
32 byte *obuck_name = "not/configured";
33 static uns obuck_io_buflen = 65536;
34 static int obuck_shake_buflen = 1048576;
35 static uns obuck_shake_security;
36 static uns obuck_slurp_buflen = 65536;
38 static struct cfitem obuck_config[] = {
39 { "Buckets", CT_SECTION, NULL },
40 { "BucketFile", CT_STRING, &obuck_name },
41 { "BufSize", CT_INT, &obuck_io_buflen },
42 { "ShakeBufSize", CT_INT, &obuck_shake_buflen },
43 { "ShakeSecurity", CT_INT, &obuck_shake_security },
44 { "SlurpBufSize", CT_INT, &obuck_slurp_buflen },
45 { NULL, CT_STOP, NULL }
48 static void CONSTRUCTOR obuck_init_config(void)
50 cf_register(obuck_config);
53 /*** Internal operations ***/
56 obuck_broken(char *msg, sh_off_t pos)
58 die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
62 * Unfortunately we cannot use flock() here since it happily permits
63 * locking a shared fd (e.g., after fork()) multiple times. The fcntl
64 * locks are very ugly and they don't support 64-bit offsets, but we
65 * can work around the problem by always locking the first header
70 obuck_do_lock(int type)
75 fl.l_whence = SEEK_SET;
77 fl.l_len = sizeof(struct obuck_header);
78 if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
79 die("fcntl lock: %m");
85 obuck_do_lock(F_RDLCK);
89 obuck_lock_write(void)
91 obuck_do_lock(F_WRLCK);
97 obuck_do_lock(F_UNLCK);
100 /*** FastIO emulation ***/
108 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
110 static int obuck_fb_count;
113 obuck_fb_close(struct fastbuf *f)
119 /* We need to use pread/pwrite since we work on fd's shared between processes */
122 obuck_fb_refill(struct fastbuf *f)
124 uns remains, bufsize, size, datasize;
126 remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
127 bufsize = f->bufend - f->buffer;
130 sh_off_t start = FB_BUCKET(f)->start_pos;
131 sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
132 if (remains <= bufsize)
135 size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
138 size = datasize = bufsize;
139 int l = sh_pread(obuck_fd, f->buffer, size, pos);
141 die("Error reading bucket: %m");
142 if ((unsigned) l != size)
143 obuck_broken("Short read", FB_BUCKET(f)->start_pos);
145 f->bstop = f->buffer + datasize;
149 if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
150 obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
156 obuck_fb_spout(struct fastbuf *f)
158 int l = f->bptr - f->buffer;
163 int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
165 die("Error writing bucket: %m");
173 /*** Exported functions ***/
176 obuck_init(int writeable)
180 obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
182 die("Unable to open bucket file %s: %m", obuck_name);
184 size = sh_seek(obuck_fd, 0, SEEK_END);
187 /* If the bucket pool is not empty, check consistency of its end */
189 if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
190 check != OBUCK_TRAILER)
191 obuck_broken("Missing trailer of last object", size - 4);
201 log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
203 log(L_ERROR, "Bug: Forgot to close bucket write stream");
210 bflush(obuck_write_fb);
217 bucket_find_pos = obuck_get_pos(oid);
218 if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
219 obuck_broken("Short header read", bucket_find_pos);
220 if (obuck_hdr.magic != OBUCK_MAGIC)
221 obuck_broken("Missing magic number", bucket_find_pos);
222 if (obuck_hdr.oid == OBUCK_OID_DELETED)
223 obuck_broken("Access to deleted bucket", bucket_find_pos);
224 if (obuck_hdr.oid != oid)
225 obuck_broken("Invalid backlink", bucket_find_pos);
229 obuck_find_by_oid(struct obuck_header *hdrp)
231 oid_t oid = hdrp->oid;
233 ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
237 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
241 obuck_find_first(struct obuck_header *hdrp, int full)
245 return obuck_find_next(hdrp, full);
249 obuck_find_next(struct obuck_header *hdrp, int full)
256 bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
257 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
259 c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
263 if (c != sizeof(obuck_hdr))
264 obuck_broken("Short header read", bucket_find_pos);
265 if (obuck_hdr.magic != OBUCK_MAGIC)
266 obuck_broken("Missing magic number", bucket_find_pos);
267 if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
269 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
279 uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
280 uns real_buflen = official_buflen + OBUCK_ALIGN;
282 b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
283 b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
284 b->bufend = b->buffer + official_buflen;
285 b->name = "bucket-read";
287 b->refill = obuck_fb_refill;
290 b->close = obuck_fb_close;
292 FB_BUCKET(b)->start_pos = bucket_find_pos;
293 FB_BUCKET(b)->bucket_size = obuck_hdr.length;
299 obuck_predict_last_oid(void)
301 /* BEWARE: This is not fork-safe. */
302 sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
303 return size >> OBUCK_SHIFT;
307 obuck_create(u32 type)
309 ASSERT(!obuck_write_fb);
312 sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
313 if (start & (OBUCK_ALIGN - 1))
314 obuck_broken("Misaligned file", start);
315 obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
316 obuck_create_hdr.oid = start >> OBUCK_SHIFT;
317 obuck_create_hdr.length = 0;
318 obuck_create_hdr.type = type;
320 struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
322 b->buffer = FB_BUCKET(b)->buffer;
323 b->bptr = b->bstop = b->buffer;
324 b->bufend = b->buffer + obuck_io_buflen;
325 b->pos = -(int)sizeof(obuck_create_hdr);
326 b->name = "bucket-write";
328 b->spout = obuck_fb_spout;
332 FB_BUCKET(b)->start_pos = start;
333 FB_BUCKET(b)->bucket_size = 0;
334 bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
340 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
342 ASSERT(b == obuck_write_fb);
343 obuck_write_fb = NULL;
345 obuck_create_hdr.magic = OBUCK_MAGIC;
346 obuck_create_hdr.length = btell(b);
347 int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
350 bputl(b, OBUCK_TRAILER);
352 ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
353 if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
354 die("Bucket header update failed: %m");
356 memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
361 obuck_delete(oid_t oid)
365 obuck_hdr.oid = OBUCK_OID_DELETED;
366 sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
370 /*** Fast reading of the whole pool ***/
372 static struct fastbuf *obuck_rpf;
373 static uns slurp_remains;
374 static sh_off_t slurp_start, slurp_current;
377 obuck_slurp_refill(struct fastbuf *f)
383 l = bdirect_read_prepare(obuck_rpf, &f->buffer);
385 obuck_broken("Incomplete object", slurp_start);
386 l = MIN(l, slurp_remains);
387 bdirect_read_commit(obuck_rpf, f->buffer + l);
390 f->bufend = f->bstop = f->buffer + l;
395 obuck_slurp_pool(struct obuck_header *hdrp)
397 static struct fastbuf limiter;
405 obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
409 bsetpos(obuck_rpf, slurp_current - 4);
410 if (bgetl(obuck_rpf) != OBUCK_TRAILER)
411 obuck_broken("Missing trailer", slurp_start);
413 slurp_start = btell(obuck_rpf);
414 l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
422 if (l != sizeof(struct obuck_header))
423 obuck_broken("Short header read", slurp_start);
424 if (hdrp->magic != OBUCK_MAGIC)
425 obuck_broken("Missing magic number", slurp_start);
426 slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
427 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
429 while (hdrp->oid == OBUCK_OID_DELETED);
430 if (obuck_get_pos(hdrp->oid) != slurp_start)
431 obuck_broken("Invalid backlink", slurp_start);
432 slurp_remains = hdrp->length;
433 limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
434 limiter.name = "Bucket";
436 limiter.refill = obuck_slurp_refill;
443 shake_write(void *addr, int len, sh_off_t pos)
445 int l = sh_pwrite(obuck_fd, addr, len, pos);
449 die("obuck_shakedown write error: %m");
451 die("obuck_shakedown write error: disk full");
458 if (obuck_shake_security > 1)
463 shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size)
465 struct obuck_header *bhdr;
470 /* First of all, the "normal" part -- everything that will be written in this pass */
471 DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size);
472 while (boff < norm_size)
474 /* This needn't be optimized for speed. */
475 bhdr = (struct obuck_header *) (norm_buf + boff);
476 ASSERT(bhdr->magic == OBUCK_MAGIC);
477 l = ALIGN(sizeof(struct obuck_header) + bhdr->length + 4, OBUCK_ALIGN);
479 bhdr->oid = bpos >> OBUCK_SHIFT;
480 shake_write(bhdr, l, bpos);
486 /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */
489 DBG("Backing up fragment of size %x and %x more", frag_size, more_size);
491 /* First the part we already have in the buffer */
492 bhdr = (struct obuck_header *) fragment;
493 ASSERT(bhdr->magic == OBUCK_MAGIC);
495 bhdr->oid = bpos >> OBUCK_SHIFT;
496 shake_write(bhdr, frag_size, bpos);
500 /* And then the rest, using a small 64K buffer */
501 byte *auxbuf = alloca(65536);
503 while (l < more_size)
505 int j = MIN(more_size-l, 65536);
506 if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j)
507 die("obuck_shakedown read error: %m");
508 shake_write(auxbuf, j, bpos);
516 shake_erase(sh_off_t start, sh_off_t end)
519 die("shake_erase called with negative length, that's a bug");
520 ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1)));
523 u32 check = OBUCK_TRAILER;
524 obuck_hdr.magic = OBUCK_MAGIC;
525 obuck_hdr.oid = OBUCK_OID_DELETED;
526 uns len = MIN(0x40000000, end-start);
527 obuck_hdr.length = len - sizeof(obuck_hdr) - 4;
528 DBG("Erasing %08x bytes at %Lx", len, (long long) start);
529 shake_write(&obuck_hdr, sizeof(obuck_hdr), start);
531 shake_write(&check, 4, start-4);
536 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
538 byte *buf; /* Shakedown buffer and its size */
539 int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN);
540 byte *msg; /* Error message we will print */
541 sh_off_t rstart, wstart; /* Original and new position of buffer start */
542 sh_off_t r_bucket_start, w_bucket_start; /* Original and new position of the current bucket */
543 int roff, woff; /* Orig/new position of the current bucket relative to buffer start */
544 int rsize; /* Number of original bytes in the buffer */
545 int l; /* Raw size of the current bucket */
546 int changed = 0; /* "Something has been altered" flag */
547 int wrote_anything = 0; /* We already did a write to the bucket file */
548 struct obuck_header *rhdr, *whdr; /* Original and new address of header of the current bucket */
549 sh_off_t r_file_size; /* Original size of the bucket file */
550 int more; /* How much does the last bucket overlap the buffer */
552 buf = xmalloc(buflen);
554 roff = woff = rsize = 0;
556 /* We need to be the only accessor, all the object ID's are becoming invalid */
558 r_file_size = sh_seek(obuck_fd, 0, SEEK_END);
559 ASSERT(!(r_file_size & (OBUCK_ALIGN - 1)));
560 if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen)
561 die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work.");
563 DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size);
567 r_bucket_start = rstart + roff;
568 w_bucket_start = wstart + woff;
569 rhdr = (struct obuck_header *)(buf + roff);
570 whdr = (struct obuck_header *)(buf + woff);
576 if (rhdr->magic != OBUCK_MAGIC ||
577 rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
579 msg = "header mismatch";
582 l = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
585 if (rhdr->oid != OBUCK_OID_DELETED)
587 msg = "bucket longer than ShakeBufSize";
590 /* Empty buckets are allowed to be large, but we need to handle them extra */
591 DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l);
596 if (rsize - roff < l)
598 more = l - (rsize - roff);
601 if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER)
603 msg = "missing trailer";
607 if (rhdr->oid != OBUCK_OID_DELETED)
609 int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
614 /* Changed! Reconstruct the trailer. */
615 int l2 = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
617 PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER);
621 whdr = (struct obuck_header *)(buf+woff);
623 memmove(whdr, rhdr, l);
624 whdr->oid = w_bucket_start >> OBUCK_SHIFT;
632 kibitz(rhdr, OBUCK_OID_DELETED, NULL);
641 /* Write the new contents of the bucket file */
644 if (obuck_shake_security)
646 /* But first write a backup at the end of the file to ensure nothing can be lost. */
647 shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more);
654 DBG("Write %Lx %x", wstart, woff);
655 shake_write(buf, woff, wstart);
660 ASSERT(wstart == rstart);
662 /* In any case, update the write position */
666 /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */
670 memmove(buf, buf+roff, rsize-roff);
676 /* And refill the buffer */
677 r_bucket_start = rstart+rsize; /* Also needed for error messages */
678 l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start);
679 DBG("Read %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize);
681 die("obuck_shakedown read error: %m");
686 msg = "unexpected EOF";
689 if (l & (OBUCK_ALIGN-1))
691 msg = "garbage at the end of file";
698 DBG("Finished at position %Lx", (long long) wstart);
699 sh_ftruncate(obuck_fd, wstart);
707 log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris",
708 msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
710 * We can attempt to clean up the bucket file by erasing everything between the last
711 * byte written and the next byte to be read. If the secure mode is switched on, we can
712 * guarantee that no data are lost, only some might be duplicated.
714 shake_erase(wstart, rstart);
715 die("Fatal error during object pool shakedown");
725 #define LEN(i) ((259309*(i))%MAXLEN)
727 static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck)
732 int main(int argc, char **argv)
735 unsigned int i, j, cnt;
736 struct obuck_header h;
740 if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
743 fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
749 for(j=0; j<COUNT; j++)
751 b = obuck_create(BUCKET_TYPE_PLAIN);
752 for(i=0; i<LEN(j); i++)
753 bputc(b, (i+j) % 256);
754 obuck_create_end(b, &h);
755 printf("Writing %08x %d\n", h.oid, h.length);
758 for(j=0; j<COUNT; j++)
759 if (j % 100 < KILLPERC)
761 printf("Deleting %08x\n", ids[j]);
762 obuck_delete(ids[j]);
765 for(j=0; j<COUNT; j++)
766 if (j % 100 >= KILLPERC)
770 obuck_find_by_oid(&h);
772 printf("Reading %08x %d\n", h.oid, h.length);
773 if (h.length != LEN(j))
774 die("Invalid length");
775 for(i=0; i<h.length; i++)
776 if ((unsigned) bgetc(b) != (i+j) % 256)
777 die("Contents mismatch");
782 obuck_shakedown(test_kibitz);
783 if (obuck_find_first(&h, 0))
786 printf("<<< %08x\t%d\n", h.oid, h.length);
789 while (obuck_find_next(&h, 0));
791 die("Walk mismatch");