From 43b1b86e0fb28a3b383df336acfadbb1baaa87aa Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Sun, 11 Jan 2004 19:03:21 +0000 Subject: [PATCH] Rewritten shake down of bucket file. o Replaced read and write buffers by a single shared buffer. This should be somewhat faster (with the same size of memory invested to buffers). o If ShakeSecurity is set to 2, shaking down should be reliable under all circumstances, including server reboots and broken bucket files. Buckettool -F still needs to be run after a failed shakedown and oid's need to be synchronized with the outside world, but no buckets will be lost (only some of them may be duplicated). o The callback function (`the kibitz') is now allowed not only to decide which buckets will be kept, but also to alter contents of the buckets provided that it won't enlarge the bucket. I tried to be very careful and tested the new routine thoroughly, but since it's a pretty critical place, I would be very happy if somebody checks it independently. --- lib/bucket.c | 304 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 228 insertions(+), 76 deletions(-) diff --git a/lib/bucket.c b/lib/bucket.c index a513c7cf..a9422ca5 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -1,12 +1,14 @@ /* * Sherlock Library -- Object Buckets * - * (c) 2001--2003 Martin Mares + * (c) 2001--2004 Martin Mares * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. */ +#undef LOCAL_DEBUG + #include "lib/lib.h" #include "lib/bucket.h" #include "lib/fastbuf.h" @@ -18,6 +20,7 @@ #include #include #include +#include static int obuck_fd; static struct obuck_header obuck_hdr, obuck_create_hdr; @@ -29,6 +32,7 @@ static struct fastbuf *obuck_write_fb; byte *obuck_name = "not/configured"; static uns obuck_io_buflen = 65536; static int obuck_shake_buflen = 1048576; +static uns obuck_shake_security; static uns obuck_slurp_buflen = 65536; static struct cfitem obuck_config[] = { @@ -36,6 +40,7 @@ static struct cfitem obuck_config[] = { { "BucketFile", CT_STRING, &obuck_name }, { "BufSize", CT_INT, &obuck_io_buflen }, { "ShakeBufSize", CT_INT, &obuck_shake_buflen }, + { "ShakeSecurity", CT_INT, &obuck_shake_security }, { "SlurpBufSize", CT_INT, &obuck_slurp_buflen }, { NULL, CT_STOP, NULL } }; @@ -434,138 +439,279 @@ obuck_slurp_pool(struct obuck_header *hdrp) /*** Shakedown ***/ +static inline void +shake_write(void *addr, int len, sh_off_t pos) +{ + int l = sh_pwrite(obuck_fd, addr, len, pos); + if (l != len) + { + if (l < 0) + die("obuck_shakedown write error: %m"); + else + die("obuck_shakedown write error: disk full"); + } +} + +static inline void +shake_sync(void) +{ + if (obuck_shake_security > 1) + fdatasync(obuck_fd); +} + +static void +shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size) +{ + struct obuck_header *bhdr; + int boff = 0; + int l; + oid_t old_oid; + + /* First of all, the "normal" part -- everything that will be written in this pass */ + DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size); + while (boff < norm_size) + { + /* This needn't be optimized for speed. */ + bhdr = (struct obuck_header *) (norm_buf + boff); + ASSERT(bhdr->magic == OBUCK_MAGIC); + l = ALIGN(sizeof(struct obuck_header) + bhdr->length + 4, OBUCK_ALIGN); + old_oid = bhdr->oid; + bhdr->oid = bpos >> OBUCK_SHIFT; + shake_write(bhdr, l, bpos); + bhdr->oid = old_oid; + boff += l; + bpos += l; + } + + /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */ + if (more_size) + { + DBG("Backing up fragment of size %x and %x more", frag_size, more_size); + + /* First the part we already have in the buffer */ + bhdr = (struct obuck_header *) fragment; + ASSERT(bhdr->magic == OBUCK_MAGIC); + old_oid = bhdr->oid; + bhdr->oid = bpos >> OBUCK_SHIFT; + shake_write(bhdr, frag_size, bpos); + bhdr->oid = old_oid; + bpos += frag_size; + + /* And then the rest, using a small 64K buffer */ + byte *auxbuf = alloca(65536); + l = 0; + while (l < more_size) + { + int j = MIN(more_size-l, 65536); + if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j) + die("obuck_shakedown read error: %m"); + shake_write(auxbuf, j, bpos); + bpos += j; + l += j; + } + } +} + +static void +shake_erase(sh_off_t start, sh_off_t end) +{ + if (start > end) + die("shake_erase called with negative length, that's a bug"); + ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1))); + while (start < end) + { + u32 check = OBUCK_TRAILER; + obuck_hdr.magic = OBUCK_MAGIC; + obuck_hdr.oid = OBUCK_OID_DELETED; + uns len = MIN(0x40000000, end-start); + obuck_hdr.length = len - sizeof(obuck_hdr) - 4; + DBG("Erasing %08x bytes at %Lx", len, (long long) start); + shake_write(&obuck_hdr, sizeof(obuck_hdr), start); + start += len; + shake_write(&check, 4, start-4); + } +} + void obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) { - byte *rbuf, *wbuf, *msg; - sh_off_t rstart, wstart, r_bucket_start, w_bucket_start; - int roff, woff, rsize, l; - struct obuck_header *rhdr, *whdr; - - rbuf = xmalloc(obuck_shake_buflen); - wbuf = xmalloc(obuck_shake_buflen); + byte *buf; /* Shakedown buffer and its size */ + int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN); + byte *msg; /* Error message we will print */ + sh_off_t rstart, wstart; /* Original and new position of buffer start */ + sh_off_t r_bucket_start, w_bucket_start; /* Original and new position of the current bucket */ + int roff, woff; /* Orig/new position of the current bucket relative to buffer start */ + int rsize; /* Number of original bytes in the buffer */ + int l; /* Raw size of the current bucket */ + int changed = 0; /* "Something has been altered" flag */ + int wrote_anything = 0; /* We already did a write to the bucket file */ + struct obuck_header *rhdr, *whdr; /* Original and new address of header of the current bucket */ + sh_off_t r_file_size; /* Original size of the bucket file */ + int more; /* How much does the last bucket overlap the buffer */ + + buf = xmalloc(buflen); rstart = wstart = 0; roff = woff = rsize = 0; /* We need to be the only accessor, all the object ID's are becoming invalid */ obuck_lock_write(); + r_file_size = sh_seek(obuck_fd, 0, SEEK_END); + ASSERT(!(r_file_size & (OBUCK_ALIGN - 1))); + if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen) + die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work."); + + DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size); for(;;) { r_bucket_start = rstart + roff; w_bucket_start = wstart + woff; - if (rsize - roff < OBUCK_ALIGN) - goto reread; - rhdr = (struct obuck_header *)(rbuf + roff); + rhdr = (struct obuck_header *)(buf + roff); + whdr = (struct obuck_header *)(buf + woff); + if (roff == rsize) + { + more = 0; + goto next; + } if (rhdr->magic != OBUCK_MAGIC || rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT)) { msg = "header mismatch"; goto broken; } - l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1); - if (l > obuck_shake_buflen) + l = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN); + if (l > buflen) { if (rhdr->oid != OBUCK_OID_DELETED) { msg = "bucket longer than ShakeBufSize"; goto broken; } - rstart = r_bucket_start + l; - roff = 0; - rsize = 0; - goto reread; + /* Empty buckets are allowed to be large, but we need to handle them extra */ + DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l); + rsize = roff + l; } - if (rsize - roff < l) - goto reread; - if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER) + else { - msg = "missing trailer"; - goto broken; + if (rsize - roff < l) + { + more = l - (rsize - roff); + goto next; + } + if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER) + { + msg = "missing trailer"; + goto broken; + } } if (rhdr->oid != OBUCK_OID_DELETED) { - if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1))) + int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)); + if (status) { - if (r_bucket_start == w_bucket_start) + if (status > 1) { - /* No copying needed now nor ever in the past, hence woff==0 */ - wstart += l; - } - else - { - if (obuck_shake_buflen - woff < l) - { - if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff) - die("obuck_shakedown write failed: %m"); - wstart += woff; - woff = 0; - } - whdr = (struct obuck_header *)(wbuf+woff); - memcpy(whdr, rhdr, l); - whdr->oid = w_bucket_start >> OBUCK_SHIFT; - woff += l; + /* Changed! Reconstruct the trailer. */ + int l2 = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN); + ASSERT(l2 <= l); + PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER); + l = l2; + changed = 1; } + whdr = (struct obuck_header *)(buf+woff); + if (rhdr != whdr) + memmove(whdr, rhdr, l); + whdr->oid = w_bucket_start >> OBUCK_SHIFT; + woff += l; } + else + changed = 1; } else - kibitz(rhdr, OBUCK_OID_DELETED, NULL); + { + kibitz(rhdr, OBUCK_OID_DELETED, NULL); + changed = 1; + } roff += l; continue; - reread: - if (roff) + next: + if (changed) { - memmove(rbuf, rbuf+roff, rsize-roff); - rsize -= roff; - rstart += roff; - roff = 0; + /* Write the new contents of the bucket file */ + if (!wrote_anything) + { + if (obuck_shake_security) + { + /* But first write a backup at the end of the file to ensure nothing can be lost. */ + shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more); + shake_sync(); + } + wrote_anything = 1; + } + if (woff) + { + DBG("Write %Lx %x", wstart, woff); + shake_write(buf, woff, wstart); + shake_sync(); + } + } + else + ASSERT(wstart == rstart); + + /* In any case, update the write position */ + wstart += woff; + woff = 0; + + /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */ + rstart += roff; + if (more) + { + memmove(buf, buf+roff, rsize-roff); + rsize = rsize-roff; } - l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize); + else + rsize = 0; + + /* And refill the buffer */ + r_bucket_start = rstart+rsize; /* Also needed for error messages */ + l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start); + DBG("Read %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize); if (l < 0) die("obuck_shakedown read error: %m"); if (!l) { - if (!rsize) + if (!more) break; msg = "unexpected EOF"; goto broken; } + if (l & (OBUCK_ALIGN-1)) + { + msg = "garbage at the end of file"; + goto broken; + } rsize += l; + roff = 0; } - if (woff) - { - if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff) - die("obuck_shakedown write failed: %m"); - wstart += woff; - } + + DBG("Finished at position %Lx", (long long) wstart); sh_ftruncate(obuck_fd, wstart); + shake_sync(); obuck_unlock(); - xfree(rbuf); - xfree(wbuf); + xfree(buf); return; broken: - log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT)); - if (woff) - { - sh_pwrite(obuck_fd, wbuf, woff, wstart); - wstart += woff; - } - while (wstart + OBUCK_ALIGN <= r_bucket_start) - { - u32 check = OBUCK_TRAILER; - obuck_hdr.magic = OBUCK_MAGIC; - obuck_hdr.oid = OBUCK_OID_DELETED; - if (r_bucket_start - wstart < 0x40000000) - obuck_hdr.length = r_bucket_start - wstart - sizeof(obuck_hdr) - 4; - else - obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4; - sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart); - wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4; - sh_pwrite(obuck_fd, &check, 4, wstart-4); - } + log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", + msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT)); + /* + * We can attempt to clean up the bucket file by erasing everything between the last + * byte written and the next byte to be read. If the secure mode is switched on, we can + * guarantee that no data are lost, only some might be duplicated. + */ + shake_erase(wstart, rstart); die("Fatal error during object pool shakedown"); } @@ -578,6 +724,11 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) #define KILLPERC 13 #define LEN(i) ((259309*(i))%MAXLEN) +static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck) +{ + return 1; +} + int main(int argc, char **argv) { int ids[COUNT]; @@ -628,6 +779,7 @@ int main(int argc, char **argv) die("EOF mismatch"); bclose(b); } + obuck_shakedown(test_kibitz); if (obuck_find_first(&h, 0)) do { -- 2.39.2