+/*** Fast reading of the whole pool ***/
+
+static struct fastbuf *obuck_rpf;
+
+static int
+obuck_slurp_refill(struct fastbuf *f)
+{
+ uns l;
+
+ if (!obuck_remains)
+ return 0;
+ l = bdirect_read_prepare(obuck_rpf, &f->buffer);
+ if (!l)
+ obuck_broken("Incomplete object");
+ l = MIN(l, obuck_remains);
+ bdirect_read_commit(obuck_rpf, f->buffer + l);
+ obuck_remains -= l;
+ f->bptr = f->buffer;
+ f->bufend = f->bstop = f->buffer + l;
+ return 1;
+}
+
+struct fastbuf *
+obuck_slurp_pool(struct obuck_header *hdrp)
+{
+ static struct fastbuf limiter;
+ uns l;
+
+ do
+ {
+ if (!obuck_rpf)
+ {
+ obuck_lock_read();
+ obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
+ }
+ else
+ {
+ bsetpos(obuck_rpf, bucket_current - 4);
+ if (bgetl(obuck_rpf) != OBUCK_TRAILER)
+ obuck_broken("Missing trailer");
+ }
+ bucket_start = btell(obuck_rpf);
+ l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+ if (!l)
+ {
+ bclose(obuck_rpf);
+ obuck_rpf = NULL;
+ obuck_unlock();
+ return NULL;
+ }
+ if (l != sizeof(struct obuck_header))
+ obuck_broken("Short header read");
+ if (hdrp->magic != OBUCK_MAGIC)
+ obuck_broken("Missing magic number");
+ bucket_current = (bucket_start + sizeof(obuck_hdr) + hdrp->length +
+ 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
+ }
+ while (hdrp->oid == OBUCK_OID_DELETED);
+ if (obuck_get_pos(hdrp->oid) != bucket_start)
+ obuck_broken("Invalid backlink");
+ obuck_remains = hdrp->length;
+ limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
+ limiter.name = "Bucket";
+ limiter.pos = 0;
+ limiter.refill = obuck_slurp_refill;
+ return &limiter;
+}
+
+/*** Shakedown ***/
+
+void
+obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
+{
+ byte *rbuf, *wbuf, *msg;
+ sh_off_t rstart, wstart, w_bucket_start;
+ int roff, woff, rsize, l;
+ struct obuck_header *rhdr, *whdr;
+
+ rbuf = xmalloc(obuck_shake_buflen);
+ wbuf = xmalloc(obuck_shake_buflen);
+ rstart = wstart = 0;
+ roff = woff = rsize = 0;
+
+ /* We need to be the only accessor, all the object ID's are becoming invalid */
+ obuck_lock_write();
+
+ for(;;)
+ {
+ bucket_start = rstart + roff;
+ w_bucket_start = wstart + woff;
+ if (rsize - roff < OBUCK_ALIGN)
+ goto reread;
+ rhdr = (struct obuck_header *)(rbuf + roff);
+ if (rhdr->magic != OBUCK_MAGIC ||
+ rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(bucket_start >> OBUCK_SHIFT))
+ {
+ msg = "header mismatch";
+ goto broken;
+ }
+ l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
+ if (l > obuck_shake_buflen)
+ {
+ if (rhdr->oid != OBUCK_OID_DELETED)
+ {
+ msg = "bucket longer than ShakeBufSize";
+ goto broken;
+ }
+ rstart = bucket_start + l;
+ roff = 0;
+ rsize = 0;
+ goto reread;
+ }
+ if (rsize - roff < l)
+ goto reread;
+ if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
+ {
+ msg = "missing trailer";
+ goto broken;
+ }
+ if (rhdr->oid != OBUCK_OID_DELETED)
+ {
+ if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
+ {
+ if (bucket_start == w_bucket_start)
+ {
+ /* No copying needed now nor ever in the past, hence woff==0 */
+ wstart += l;
+ }
+ else
+ {
+ if (obuck_shake_buflen - woff < l)
+ {
+ if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
+ die("obuck_shakedown write failed: %m");
+ wstart += woff;
+ woff = 0;
+ }
+ whdr = (struct obuck_header *)(wbuf+woff);
+ memcpy(whdr, rhdr, l);
+ whdr->oid = w_bucket_start >> OBUCK_SHIFT;
+ woff += l;
+ }
+ }
+ }
+ else
+ kibitz(rhdr, OBUCK_OID_DELETED, NULL);
+ roff += l;
+ continue;
+
+ reread:
+ if (roff)
+ {
+ memmove(rbuf, rbuf+roff, rsize-roff);
+ rsize -= roff;
+ rstart += roff;
+ roff = 0;
+ }
+ l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
+ if (l < 0)
+ die("obuck_shakedown read error: %m");
+ if (!l)
+ {
+ if (!rsize)
+ break;
+ msg = "unexpected EOF";
+ goto broken;
+ }
+ rsize += l;
+ }
+ if (woff)
+ {
+ if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
+ die("obuck_shakedown write failed: %m");
+ wstart += woff;
+ }
+ sh_ftruncate(obuck_fd, wstart);
+
+ obuck_unlock();
+ xfree(rbuf);
+ xfree(wbuf);
+ return;
+
+ broken:
+ log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) bucket_start, (uns)(bucket_start >> OBUCK_SHIFT));
+ if (woff)
+ {
+ sh_pwrite(obuck_fd, wbuf, woff, wstart);
+ wstart += woff;
+ }
+ while (wstart + OBUCK_ALIGN <= bucket_start)
+ {
+ u32 check = OBUCK_TRAILER;
+ obuck_hdr.magic = OBUCK_MAGIC;
+ obuck_hdr.oid = OBUCK_OID_DELETED;
+ if (bucket_start - wstart < 0x40000000)
+ obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4;
+ else
+ obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
+ sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
+ wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4;
+ sh_pwrite(obuck_fd, &check, 4, wstart-4);
+ }
+ die("Fatal error during object pool shakedown");
+}
+