2 * Sherlock Library -- Object Buckets
4 * (c) 2001--2003 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 #include "lib/bucket.h"
12 #include "lib/fastbuf.h"
23 static struct obuck_header obuck_hdr, obuck_create_hdr;
24 static sh_off_t bucket_find_pos;
25 static struct fastbuf *obuck_write_fb;
27 /*** Configuration ***/
29 byte *obuck_name = "not/configured";
30 static uns obuck_io_buflen = 65536;
31 static int obuck_shake_buflen = 1048576;
32 static uns obuck_slurp_buflen = 65536;
34 static struct cfitem obuck_config[] = {
35 { "Buckets", CT_SECTION, NULL },
36 { "BucketFile", CT_STRING, &obuck_name },
37 { "BufSize", CT_INT, &obuck_io_buflen },
38 { "ShakeBufSize", CT_INT, &obuck_shake_buflen },
39 { "SlurpBufSize", CT_INT, &obuck_slurp_buflen },
40 { NULL, CT_STOP, NULL }
43 static void CONSTRUCTOR obuck_init_config(void)
45 cf_register(obuck_config);
48 /*** Internal operations ***/
51 obuck_broken(char *msg, sh_off_t pos)
53 die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
57 * Unfortunately we cannot use flock() here since it happily permits
58 * locking a shared fd (e.g., after fork()) multiple times. The fcntl
59 * locks are very ugly and they don't support 64-bit offsets, but we
60 * can work around the problem by always locking the first header
65 obuck_do_lock(int type)
70 fl.l_whence = SEEK_SET;
72 fl.l_len = sizeof(struct obuck_header);
73 if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
74 die("fcntl lock: %m");
80 obuck_do_lock(F_RDLCK);
84 obuck_lock_write(void)
86 obuck_do_lock(F_WRLCK);
92 obuck_do_lock(F_UNLCK);
95 /*** FastIO emulation ***/
103 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
105 static int obuck_fb_count;
108 obuck_fb_close(struct fastbuf *f)
114 /* We need to use pread/pwrite since we work on fd's shared between processes */
117 obuck_fb_refill(struct fastbuf *f)
119 uns remains, bufsize, size, datasize;
121 remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
122 bufsize = f->bufend - f->buffer;
125 sh_off_t start = FB_BUCKET(f)->start_pos;
126 sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
127 if (remains <= bufsize)
130 size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
133 size = datasize = bufsize;
134 int l = sh_pread(obuck_fd, f->buffer, size, pos);
136 die("Error reading bucket: %m");
137 if ((unsigned) l != size)
138 obuck_broken("Short read", FB_BUCKET(f)->start_pos);
140 f->bstop = f->buffer + datasize;
144 if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
145 obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
151 obuck_fb_spout(struct fastbuf *f)
153 int l = f->bptr - f->buffer;
158 int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
160 die("Error writing bucket: %m");
168 /*** Exported functions ***/
171 obuck_init(int writeable)
175 obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
177 die("Unable to open bucket file %s: %m", obuck_name);
179 size = sh_seek(obuck_fd, 0, SEEK_END);
182 /* If the bucket pool is not empty, check consistency of its end */
184 if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
185 check != OBUCK_TRAILER)
186 obuck_broken("Missing trailer of last object", size - 4);
196 log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
198 log(L_ERROR, "Bug: Forgot to close bucket write stream");
205 bflush(obuck_write_fb);
212 bucket_find_pos = obuck_get_pos(oid);
213 if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
214 obuck_broken("Short header read", bucket_find_pos);
215 if (obuck_hdr.magic != OBUCK_MAGIC)
216 obuck_broken("Missing magic number", bucket_find_pos);
217 if (obuck_hdr.oid == OBUCK_OID_DELETED)
218 obuck_broken("Access to deleted bucket", bucket_find_pos);
219 if (obuck_hdr.oid != oid)
220 obuck_broken("Invalid backlink", bucket_find_pos);
224 obuck_find_by_oid(struct obuck_header *hdrp)
226 oid_t oid = hdrp->oid;
228 ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
232 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
236 obuck_find_first(struct obuck_header *hdrp, int full)
240 return obuck_find_next(hdrp, full);
244 obuck_find_next(struct obuck_header *hdrp, int full)
251 bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
252 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
254 c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
258 if (c != sizeof(obuck_hdr))
259 obuck_broken("Short header read", bucket_find_pos);
260 if (obuck_hdr.magic != OBUCK_MAGIC)
261 obuck_broken("Missing magic number", bucket_find_pos);
262 if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
264 memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
274 uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
275 uns real_buflen = official_buflen + OBUCK_ALIGN;
277 b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
278 b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
279 b->bufend = b->buffer + official_buflen;
280 b->name = "bucket-read";
282 b->refill = obuck_fb_refill;
285 b->close = obuck_fb_close;
287 FB_BUCKET(b)->start_pos = bucket_find_pos;
288 FB_BUCKET(b)->bucket_size = obuck_hdr.length;
294 obuck_predict_last_oid(void)
296 /* BEWARE: This is not fork-safe. */
297 sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
298 return size >> OBUCK_SHIFT;
302 obuck_create(u32 type)
304 ASSERT(!obuck_write_fb);
307 sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
308 if (start & (OBUCK_ALIGN - 1))
309 obuck_broken("Misaligned file", start);
310 obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
311 obuck_create_hdr.oid = start >> OBUCK_SHIFT;
312 obuck_create_hdr.length = 0;
313 obuck_create_hdr.type = type;
315 struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
317 b->buffer = FB_BUCKET(b)->buffer;
318 b->bptr = b->bstop = b->buffer;
319 b->bufend = b->buffer + obuck_io_buflen;
320 b->pos = -(int)sizeof(obuck_create_hdr);
321 b->name = "bucket-write";
323 b->spout = obuck_fb_spout;
327 FB_BUCKET(b)->start_pos = start;
328 FB_BUCKET(b)->bucket_size = 0;
329 bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
335 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
337 ASSERT(b == obuck_write_fb);
338 obuck_write_fb = NULL;
340 obuck_create_hdr.magic = OBUCK_MAGIC;
341 obuck_create_hdr.length = btell(b);
342 int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
345 bputl(b, OBUCK_TRAILER);
347 ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
348 if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
349 die("Bucket header update failed: %m");
351 memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
356 obuck_delete(oid_t oid)
360 obuck_hdr.oid = OBUCK_OID_DELETED;
361 sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
365 /*** Fast reading of the whole pool ***/
367 static struct fastbuf *obuck_rpf;
368 static uns slurp_remains;
369 static sh_off_t slurp_start, slurp_current;
372 obuck_slurp_refill(struct fastbuf *f)
378 l = bdirect_read_prepare(obuck_rpf, &f->buffer);
380 obuck_broken("Incomplete object", slurp_start);
381 l = MIN(l, slurp_remains);
382 bdirect_read_commit(obuck_rpf, f->buffer + l);
385 f->bufend = f->bstop = f->buffer + l;
390 obuck_slurp_pool(struct obuck_header *hdrp)
392 static struct fastbuf limiter;
400 obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
404 bsetpos(obuck_rpf, slurp_current - 4);
405 if (bgetl(obuck_rpf) != OBUCK_TRAILER)
406 obuck_broken("Missing trailer", slurp_start);
408 slurp_start = btell(obuck_rpf);
409 l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
417 if (l != sizeof(struct obuck_header))
418 obuck_broken("Short header read", slurp_start);
419 if (hdrp->magic != OBUCK_MAGIC)
420 obuck_broken("Missing magic number", slurp_start);
421 slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
422 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
424 while (hdrp->oid == OBUCK_OID_DELETED);
425 if (obuck_get_pos(hdrp->oid) != slurp_start)
426 obuck_broken("Invalid backlink", slurp_start);
427 slurp_remains = hdrp->length;
428 limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
429 limiter.name = "Bucket";
431 limiter.refill = obuck_slurp_refill;
438 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
440 byte *rbuf, *wbuf, *msg;
441 sh_off_t rstart, wstart, r_bucket_start, w_bucket_start;
442 int roff, woff, rsize, l;
443 struct obuck_header *rhdr, *whdr;
445 rbuf = xmalloc(obuck_shake_buflen);
446 wbuf = xmalloc(obuck_shake_buflen);
448 roff = woff = rsize = 0;
450 /* We need to be the only accessor, all the object ID's are becoming invalid */
455 r_bucket_start = rstart + roff;
456 w_bucket_start = wstart + woff;
457 if (rsize - roff < OBUCK_ALIGN)
459 rhdr = (struct obuck_header *)(rbuf + roff);
460 if (rhdr->magic != OBUCK_MAGIC ||
461 rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
463 msg = "header mismatch";
466 l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
467 if (l > obuck_shake_buflen)
469 if (rhdr->oid != OBUCK_OID_DELETED)
471 msg = "bucket longer than ShakeBufSize";
474 rstart = r_bucket_start + l;
479 if (rsize - roff < l)
481 if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
483 msg = "missing trailer";
486 if (rhdr->oid != OBUCK_OID_DELETED)
488 if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
490 if (r_bucket_start == w_bucket_start)
492 /* No copying needed now nor ever in the past, hence woff==0 */
497 if (obuck_shake_buflen - woff < l)
499 if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
500 die("obuck_shakedown write failed: %m");
504 whdr = (struct obuck_header *)(wbuf+woff);
505 memcpy(whdr, rhdr, l);
506 whdr->oid = w_bucket_start >> OBUCK_SHIFT;
512 kibitz(rhdr, OBUCK_OID_DELETED, NULL);
519 memmove(rbuf, rbuf+roff, rsize-roff);
524 l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
526 die("obuck_shakedown read error: %m");
531 msg = "unexpected EOF";
538 if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
539 die("obuck_shakedown write failed: %m");
542 sh_ftruncate(obuck_fd, wstart);
550 log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
553 sh_pwrite(obuck_fd, wbuf, woff, wstart);
556 while (wstart + OBUCK_ALIGN <= r_bucket_start)
558 u32 check = OBUCK_TRAILER;
559 obuck_hdr.magic = OBUCK_MAGIC;
560 obuck_hdr.oid = OBUCK_OID_DELETED;
561 if (r_bucket_start - wstart < 0x40000000)
562 obuck_hdr.length = r_bucket_start - wstart - sizeof(obuck_hdr) - 4;
564 obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
565 sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
566 wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4;
567 sh_pwrite(obuck_fd, &check, 4, wstart-4);
569 die("Fatal error during object pool shakedown");
579 #define LEN(i) ((259309*(i))%MAXLEN)
581 int main(int argc, char **argv)
584 unsigned int i, j, cnt;
585 struct obuck_header h;
589 if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
592 fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
598 for(j=0; j<COUNT; j++)
600 b = obuck_create(BUCKET_TYPE_PLAIN);
601 for(i=0; i<LEN(j); i++)
602 bputc(b, (i+j) % 256);
603 obuck_create_end(b, &h);
604 printf("Writing %08x %d\n", h.oid, h.length);
607 for(j=0; j<COUNT; j++)
608 if (j % 100 < KILLPERC)
610 printf("Deleting %08x\n", ids[j]);
611 obuck_delete(ids[j]);
614 for(j=0; j<COUNT; j++)
615 if (j % 100 >= KILLPERC)
619 obuck_find_by_oid(&h);
621 printf("Reading %08x %d\n", h.oid, h.length);
622 if (h.length != LEN(j))
623 die("Invalid length");
624 for(i=0; i<h.length; i++)
625 if ((unsigned) bgetc(b) != (i+j) % 256)
626 die("Contents mismatch");
631 if (obuck_find_first(&h, 0))
634 printf("<<< %08x\t%d\n", h.oid, h.length);
637 while (obuck_find_next(&h, 0));
639 die("Walk mismatch");