X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fbucket.c;h=f81f03e8d3f5444d0af9d3176edf932f0fe1b5a3;hb=e828732528e0ed88973dd18f2dee97a42c0b4e59;hp=a9422ca549e1af205988011ba8e9c8948559549c;hpb=43b1b86e0fb28a3b383df336acfadbb1baaa87aa;p=libucw.git diff --git a/lib/bucket.c b/lib/bucket.c index a9422ca5..f81f03e8 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -2,6 +2,7 @@ * Sherlock Library -- Object Buckets * * (c) 2001--2004 Martin Mares + * (c) 2004 Robert Spalek * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -59,22 +60,39 @@ obuck_broken(char *msg, sh_off_t pos) } /* - * Unfortunately we cannot use flock() here since it happily permits - * locking a shared fd (e.g., after fork()) multiple times. The fcntl - * locks are very ugly and they don't support 64-bit offsets, but we - * can work around the problem by always locking the first header - * in the file. + * We need several types of locks: + * + * Read lock reading parts of bucket file + * Write lock any write operations + * Append lock appending to the end of the file + * Scan lock reading parts which we are certain they exist + * + * Multiple read and scan locks can co-exist together. + * Scan locks can co-exist with an append lock. + * There can be at most one write/append lock at a time. + * + * These lock types map to a pair of normal read-write locks which + * we represent as fcntl() locks on the first and second byte of the + * bucket file. [We cannot use flock() since it happily permits + * locking a shared fd (e.g., after fork()) multiple times at it also + * doesn't offer multiple locks on a single file.] + * + * byte0 byte1 + * Read + * Write + * Append - + * Scan - */ static inline void -obuck_do_lock(int type) +obuck_do_lock(int type, int start, int len) { struct flock fl; fl.l_type = type; fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = sizeof(struct obuck_header); + fl.l_start = start; + fl.l_len = len; if (fcntl(obuck_fd, F_SETLKW, &fl) < 0) die("fcntl lock: %m"); } @@ -82,19 +100,31 @@ obuck_do_lock(int type) inline void obuck_lock_read(void) { - obuck_do_lock(F_RDLCK); + obuck_do_lock(F_RDLCK, 0, 2); } inline void obuck_lock_write(void) { - obuck_do_lock(F_WRLCK); + obuck_do_lock(F_WRLCK, 0, 2); +} + +static inline void +obuck_lock_append(void) +{ + obuck_do_lock(F_WRLCK, 0, 1); +} + +static inline void +obuck_lock_read_to_scan(void) +{ + obuck_do_lock(F_UNLCK, 0, 1); } inline void obuck_unlock(void) { - obuck_do_lock(F_UNLCK); + obuck_do_lock(F_UNLCK, 0, 2); } /*** FastIO emulation ***/ @@ -124,15 +154,16 @@ obuck_fb_refill(struct fastbuf *f) uns remains, bufsize, size, datasize; remains = FB_BUCKET(f)->bucket_size - (uns)f->pos; - bufsize = f->bufend - f->buffer; if (!remains) return 0; + f->buffer = FB_BUCKET(f)->buffer; /* Could have been trimmed by bdirect_read_commit_modified() */ + bufsize = f->bufend - f->buffer; sh_off_t start = FB_BUCKET(f)->start_pos; sh_off_t pos = start + sizeof(struct obuck_header) + f->pos; if (remains <= bufsize) { datasize = remains; - size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos; + size = start + obuck_bucket_size(FB_BUCKET(f)->bucket_size) - pos; } else size = datasize = bufsize; @@ -253,8 +284,7 @@ obuck_find_next(struct obuck_header *hdrp, int full) for(;;) { if (obuck_hdr.magic) - bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length + - 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); + bucket_find_pos += obuck_bucket_size(obuck_hdr.length); obuck_lock_read(); c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos); obuck_unlock(); @@ -289,6 +319,7 @@ obuck_fetch(void) b->seek = NULL; b->close = obuck_fb_close; b->config = NULL; + b->can_overwrite_buffer = 2; FB_BUCKET(b)->start_pos = bucket_find_pos; FB_BUCKET(b)->bucket_size = obuck_hdr.length; obuck_fb_count++; @@ -298,9 +329,8 @@ obuck_fetch(void) oid_t obuck_predict_last_oid(void) { - /* BEWARE: This is not fork-safe. */ sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END); - return size >> OBUCK_SHIFT; + return (oid_t)(size >> OBUCK_SHIFT); } struct fastbuf * @@ -308,7 +338,7 @@ obuck_create(u32 type) { ASSERT(!obuck_write_fb); - obuck_lock_write(); + obuck_lock_append(); sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END); if (start & (OBUCK_ALIGN - 1)) obuck_broken("Misaligned file", start); @@ -329,6 +359,7 @@ obuck_create(u32 type) b->seek = NULL; b->close = NULL; b->config = NULL; + b->can_overwrite_buffer = 0; FB_BUCKET(b)->start_pos = start; FB_BUCKET(b)->bucket_size = 0; bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr)); @@ -371,19 +402,20 @@ obuck_delete(oid_t oid) static struct fastbuf *obuck_rpf; static uns slurp_remains; -static sh_off_t slurp_start, slurp_current; +static sh_off_t slurp_start, slurp_current, slurp_end; static int obuck_slurp_refill(struct fastbuf *f) { - uns l; - if (!slurp_remains) return 0; - l = bdirect_read_prepare(obuck_rpf, &f->buffer); + uns l = bdirect_read_prepare(obuck_rpf, &f->buffer); if (!l) obuck_broken("Incomplete object", slurp_start); l = MIN(l, slurp_remains); + /* XXX: This probably should be bdirect_read_commit_modified() in some cases, + * but it doesn't hurt since we aren't going to seek. + */ bdirect_read_commit(obuck_rpf, f->buffer + l); slurp_remains -= l; f->bptr = f->buffer; @@ -403,6 +435,8 @@ obuck_slurp_pool(struct obuck_header *hdrp) { obuck_lock_read(); obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen); + slurp_end = bfilesize(obuck_rpf); + obuck_lock_read_to_scan(); } else { @@ -411,7 +445,10 @@ obuck_slurp_pool(struct obuck_header *hdrp) obuck_broken("Missing trailer", slurp_start); } slurp_start = btell(obuck_rpf); - l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + if (slurp_start < slurp_end) + l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + else + l = 0; if (!l) { bclose(obuck_rpf); @@ -423,8 +460,7 @@ obuck_slurp_pool(struct obuck_header *hdrp) obuck_broken("Short header read", slurp_start); if (hdrp->magic != OBUCK_MAGIC) obuck_broken("Missing magic number", slurp_start); - slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length + - 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); + slurp_current = slurp_start + obuck_bucket_size(hdrp->length); } while (hdrp->oid == OBUCK_OID_DELETED); if (obuck_get_pos(hdrp->oid) != slurp_start) @@ -434,6 +470,7 @@ obuck_slurp_pool(struct obuck_header *hdrp) limiter.name = "Bucket"; limiter.pos = 0; limiter.refill = obuck_slurp_refill; + limiter.can_overwrite_buffer = obuck_rpf->can_overwrite_buffer; return &limiter; } @@ -474,7 +511,7 @@ shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, /* This needn't be optimized for speed. */ bhdr = (struct obuck_header *) (norm_buf + boff); ASSERT(bhdr->magic == OBUCK_MAGIC); - l = ALIGN(sizeof(struct obuck_header) + bhdr->length + 4, OBUCK_ALIGN); + l = obuck_bucket_size(bhdr->length); old_oid = bhdr->oid; bhdr->oid = bpos >> OBUCK_SHIFT; shake_write(bhdr, l, bpos); @@ -579,7 +616,7 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) msg = "header mismatch"; goto broken; } - l = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN); + l = obuck_bucket_size(rhdr->length); if (l > buflen) { if (rhdr->oid != OBUCK_OID_DELETED) @@ -609,20 +646,20 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)); if (status) { + int lnew = l; if (status > 1) { /* Changed! Reconstruct the trailer. */ - int l2 = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN); - ASSERT(l2 <= l); - PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER); - l = l2; + lnew = obuck_bucket_size(rhdr->length); + ASSERT(lnew <= l); + PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER); changed = 1; } whdr = (struct obuck_header *)(buf+woff); if (rhdr != whdr) - memmove(whdr, rhdr, l); + memmove(whdr, rhdr, lnew); whdr->oid = w_bucket_start >> OBUCK_SHIFT; - woff += l; + woff += lnew; } else changed = 1;