]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
Introduced relative counterparts of url_canon_split() and url_auto_canonicalize().
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001--2004 Martin Mares <mj@ucw.cz>
5  *      (c) 2004 Robert Spalek <robert@ucw.cz>
6  *
7  *      This software may be freely distributed and used according to the terms
8  *      of the GNU Lesser General Public License.
9  */
10
11 #undef LOCAL_DEBUG
12
13 #include "lib/lib.h"
14 #include "lib/bucket.h"
15 #include "lib/fastbuf.h"
16 #include "lib/lfs.h"
17 #include "lib/conf.h"
18
19 #include <string.h>
20 #include <stdlib.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <sys/file.h>
24 #include <alloca.h>
25
26 static int obuck_fd;
27 static struct obuck_header obuck_hdr, obuck_create_hdr;
28 static sh_off_t bucket_find_pos;
29 static struct fastbuf *obuck_write_fb;
30
31 /*** Configuration ***/
32
33 byte *obuck_name = "not/configured";
34 static uns obuck_io_buflen = 65536;
35 static int obuck_shake_buflen = 1048576;
36 static uns obuck_shake_security;
37 static uns obuck_slurp_buflen = 65536;
38
39 static struct cfitem obuck_config[] = {
40   { "Buckets",          CT_SECTION,     NULL },
41   { "BucketFile",       CT_STRING,      &obuck_name },
42   { "BufSize",          CT_INT,         &obuck_io_buflen },
43   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
44   { "ShakeSecurity",    CT_INT,         &obuck_shake_security },
45   { "SlurpBufSize",     CT_INT,         &obuck_slurp_buflen },
46   { NULL,               CT_STOP,        NULL }
47 };
48
49 static void CONSTRUCTOR obuck_init_config(void)
50 {
51   cf_register(obuck_config);
52 }
53
54 /*** Internal operations ***/
55
56 static void
57 obuck_broken(char *msg, sh_off_t pos)
58 {
59   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
60 }
61
62 /*
63  *  We need several types of locks:
64  *
65  *      Read lock       reading parts of bucket file
66  *      Write lock      any write operations
67  *      Append lock     appending to the end of the file
68  *      Scan lock       reading parts which we are certain they exist
69  *
70  *  Multiple read and scan locks can co-exist together.
71  *  Scan locks can co-exist with an append lock.
72  *  There can be at most one write/append lock at a time.
73  *
74  *  These lock types map to a pair of normal read-write locks which
75  *  we represent as fcntl() locks on the first and second byte of the
76  *  bucket file. [We cannot use flock() since it happily permits
77  *  locking a shared fd (e.g., after fork()) multiple times at it also
78  *  doesn't offer multiple locks on a single file.]
79  *
80  *                      byte0           byte1
81  *      Read            <read>          <read>
82  *      Write           <write>         <write>
83  *      Append          <write>         -
84  *      Scan            -               <read>
85  */
86
87 static inline void
88 obuck_do_lock(int type, int start, int len)
89 {
90   struct flock fl;
91
92   fl.l_type = type;
93   fl.l_whence = SEEK_SET;
94   fl.l_start = start;
95   fl.l_len = len;
96   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
97     die("fcntl lock: %m");
98 }
99
100 inline void
101 obuck_lock_read(void)
102 {
103   obuck_do_lock(F_RDLCK, 0, 2);
104 }
105
106 inline void
107 obuck_lock_write(void)
108 {
109   obuck_do_lock(F_WRLCK, 0, 2);
110 }
111
112 static inline void
113 obuck_lock_append(void)
114 {
115   obuck_do_lock(F_WRLCK, 0, 1);
116 }
117
118 static inline void
119 obuck_lock_read_to_scan(void)
120 {
121   obuck_do_lock(F_UNLCK, 0, 1);
122 }
123
124 inline void
125 obuck_unlock(void)
126 {
127   obuck_do_lock(F_UNLCK, 0, 2);
128 }
129
130 /*** FastIO emulation ***/
131
132 struct fb_bucket {
133   struct fastbuf fb;
134   sh_off_t start_pos;
135   uns bucket_size;
136   byte buffer[0];
137 };
138 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
139
140 static int obuck_fb_count;
141
142 static void
143 obuck_fb_close(struct fastbuf *f)
144 {
145   obuck_fb_count--;
146   xfree(f);
147 }
148
149 /* We need to use pread/pwrite since we work on fd's shared between processes */
150
151 static int
152 obuck_fb_refill(struct fastbuf *f)
153 {
154   uns remains, bufsize, size, datasize;
155
156   remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
157   if (!remains)
158     return 0;
159   f->buffer = FB_BUCKET(f)->buffer;     /* Could have been trimmed by bdirect_read_commit_modified() */
160   bufsize = f->bufend - f->buffer;
161   sh_off_t start = FB_BUCKET(f)->start_pos;
162   sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
163   if (remains <= bufsize)
164     {
165       datasize = remains;
166       size = start + obuck_bucket_size(FB_BUCKET(f)->bucket_size) - pos;
167     }
168   else
169     size = datasize = bufsize;
170   int l = sh_pread(obuck_fd, f->buffer, size, pos);
171   if (l < 0)
172     die("Error reading bucket: %m");
173   if ((unsigned) l != size)
174     obuck_broken("Short read", FB_BUCKET(f)->start_pos);
175   f->bptr = f->buffer;
176   f->bstop = f->buffer + datasize;
177   f->pos += datasize;
178   if (datasize < size)
179     {
180       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
181         obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
182     }
183   return datasize;
184 }
185
186 static void
187 obuck_fb_seek(struct fastbuf *f, sh_off_t pos, int whence)
188 {
189   ASSERT(whence == SEEK_SET || whence == SEEK_END);
190   if (whence == SEEK_END)
191     pos += FB_BUCKET(f)->bucket_size;
192   ASSERT(pos >= 0 && pos <= FB_BUCKET(f)->bucket_size);
193   f->pos = pos;
194 }
195
196 static void
197 obuck_fb_spout(struct fastbuf *f)
198 {
199   int l = f->bptr - f->buffer;
200   char *c = f->buffer;
201
202   while (l)
203     {
204       int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
205       if (z <= 0)
206         die("Error writing bucket: %m");
207       f->pos += z;
208       l -= z;
209       c += z;
210     }
211   f->bptr = f->buffer;
212 }
213
214 /*** Exported functions ***/
215
216 void
217 obuck_init(int writeable)
218 {
219   sh_off_t size;
220
221   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
222   if (obuck_fd < 0)
223     die("Unable to open bucket file %s: %m", obuck_name);
224   obuck_lock_read();
225   size = sh_seek(obuck_fd, 0, SEEK_END);
226   if (size)
227     {
228       /* If the bucket pool is not empty, check consistency of its end */
229       u32 check;
230       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
231           check != OBUCK_TRAILER)
232         obuck_broken("Missing trailer of last object", size - 4);
233     }
234   obuck_unlock();
235 }
236
237 void
238 obuck_cleanup(void)
239 {
240   close(obuck_fd);
241   if (obuck_fb_count)
242     log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
243   if (obuck_write_fb)
244     log(L_ERROR, "Bug: Forgot to close bucket write stream");
245 }
246
247 void
248 obuck_sync(void)
249 {
250   if (obuck_write_fb)
251     bflush(obuck_write_fb);
252   fsync(obuck_fd);
253 }
254
255 static void
256 obuck_get(oid_t oid)
257 {
258   bucket_find_pos = obuck_get_pos(oid);
259   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
260     obuck_broken("Short header read", bucket_find_pos);
261   if (obuck_hdr.magic != OBUCK_MAGIC)
262     obuck_broken("Missing magic number", bucket_find_pos);
263   if (obuck_hdr.oid == OBUCK_OID_DELETED)
264     obuck_broken("Access to deleted bucket", bucket_find_pos);
265   if (obuck_hdr.oid != oid)
266     obuck_broken("Invalid backlink", bucket_find_pos);
267 }
268
269 void
270 obuck_find_by_oid(struct obuck_header *hdrp)
271 {
272   oid_t oid = hdrp->oid;
273
274   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
275   obuck_lock_read();
276   obuck_get(oid);
277   obuck_unlock();
278   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
279 }
280
281 int
282 obuck_find_first(struct obuck_header *hdrp, int full)
283 {
284   bucket_find_pos = 0;
285   obuck_hdr.magic = 0;
286   return obuck_find_next(hdrp, full);
287 }
288
289 int
290 obuck_find_next(struct obuck_header *hdrp, int full)
291 {
292   int c;
293
294   for(;;)
295     {
296       if (obuck_hdr.magic)
297         bucket_find_pos += obuck_bucket_size(obuck_hdr.length);
298       obuck_lock_read();
299       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
300       obuck_unlock();
301       if (!c)
302         return 0;
303       if (c != sizeof(obuck_hdr))
304         obuck_broken("Short header read", bucket_find_pos);
305       if (obuck_hdr.magic != OBUCK_MAGIC)
306         obuck_broken("Missing magic number", bucket_find_pos);
307       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
308         {
309           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
310           return 1;
311         }
312     }
313 }
314
315 struct fastbuf *
316 obuck_fetch(void)
317 {
318   struct fastbuf *b;
319   uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
320   uns real_buflen = official_buflen + OBUCK_ALIGN;
321
322   b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
323   b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
324   b->bufend = b->buffer + official_buflen;
325   b->name = "bucket-read";
326   b->pos = 0;
327   b->refill = obuck_fb_refill;
328   b->spout = NULL;
329   b->seek = obuck_fb_seek;
330   b->close = obuck_fb_close;
331   b->config = NULL;
332   b->can_overwrite_buffer = 2;
333   FB_BUCKET(b)->start_pos = bucket_find_pos;
334   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
335   obuck_fb_count++;
336   return b;
337 }
338
339 oid_t
340 obuck_predict_last_oid(void)
341 {
342   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
343   return (oid_t)(size >> OBUCK_SHIFT);
344 }
345
346 struct fastbuf *
347 obuck_create(u32 type)
348 {
349   ASSERT(!obuck_write_fb);
350
351   obuck_lock_append();
352   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
353   if (start & (OBUCK_ALIGN - 1))
354     obuck_broken("Misaligned file", start);
355   obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
356   obuck_create_hdr.oid = start >> OBUCK_SHIFT;
357   obuck_create_hdr.length = 0;
358   obuck_create_hdr.type = type;
359
360   struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
361   obuck_write_fb = b;
362   b->buffer = FB_BUCKET(b)->buffer;
363   b->bptr = b->bstop = b->buffer;
364   b->bufend = b->buffer + obuck_io_buflen;
365   b->pos = -(int)sizeof(obuck_create_hdr);
366   b->name = "bucket-write";
367   b->refill = NULL;
368   b->spout = obuck_fb_spout;
369   b->seek = NULL;
370   b->close = NULL;
371   b->config = NULL;
372   b->can_overwrite_buffer = 0;
373   FB_BUCKET(b)->start_pos = start;
374   FB_BUCKET(b)->bucket_size = 0;
375   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
376
377   return b;
378 }
379
380 void
381 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
382 {
383   ASSERT(b == obuck_write_fb);
384   obuck_write_fb = NULL;
385
386   obuck_create_hdr.magic = OBUCK_MAGIC;
387   obuck_create_hdr.length = btell(b);
388   int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
389   while (pad--)
390     bputc(b, 0);
391   bputl(b, OBUCK_TRAILER);
392   bflush(b);
393   ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
394   if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
395     die("Bucket header update failed: %m");
396   obuck_unlock();
397   memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
398   xfree(b);
399 }
400
401 void
402 obuck_delete(oid_t oid)
403 {
404   obuck_lock_write();
405   obuck_get(oid);
406   obuck_hdr.oid = OBUCK_OID_DELETED;
407   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
408   obuck_unlock();
409 }
410
411 /*** Fast reading of the whole pool ***/
412
413 static struct fastbuf *obuck_rpf;
414 static uns slurp_remains;
415 static sh_off_t slurp_start, slurp_current, slurp_end;
416
417 static int
418 obuck_slurp_refill(struct fastbuf *f)
419 {
420   if (!slurp_remains)
421     return 0;
422   uns l = bdirect_read_prepare(obuck_rpf, &f->buffer);
423   if (!l)
424     obuck_broken("Incomplete object", slurp_start);
425   l = MIN(l, slurp_remains);
426   /* XXX: This probably should be bdirect_read_commit_modified() in some cases,
427    *      but it doesn't hurt since we aren't going to seek.
428    */
429   bdirect_read_commit(obuck_rpf, f->buffer + l);
430   slurp_remains -= l;
431   f->bptr = f->buffer;
432   f->bufend = f->bstop = f->buffer + l;
433   f->pos += l;
434   return 1;
435 }
436
437 void
438 obuck_slurp_end(void)
439 {
440   if (obuck_rpf)
441     {
442       bclose(obuck_rpf);
443       obuck_rpf = NULL;
444       obuck_unlock();
445     }
446 }
447
448 struct fastbuf *
449 obuck_slurp_pool(struct obuck_header *hdrp, oid_t next_oid)
450 {
451   static struct fastbuf limiter;
452   uns l;
453
454   do
455     {
456       if (!obuck_rpf)
457         {
458           obuck_lock_read();
459           obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
460           slurp_end = bfilesize(obuck_rpf);
461           obuck_lock_read_to_scan();
462         }
463       else
464         {
465           bsetpos(obuck_rpf, slurp_current - 4);
466           if (bgetl(obuck_rpf) != OBUCK_TRAILER)
467             obuck_broken("Missing trailer", slurp_start);
468         }
469       if (next_oid == OBUCK_OID_ANY)
470         slurp_start = btell(obuck_rpf);
471       else
472         {
473           slurp_start = obuck_get_pos(next_oid);
474           bsetpos(obuck_rpf, slurp_start);
475         }
476       if (slurp_start < slurp_end)
477         l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
478       else
479         {
480           obuck_slurp_end();
481           return NULL;
482         }
483       if (l != sizeof(struct obuck_header))
484         obuck_broken("Short header read", slurp_start);
485       if (hdrp->magic != OBUCK_MAGIC)
486         obuck_broken("Missing magic number", slurp_start);
487       slurp_current = slurp_start + obuck_bucket_size(hdrp->length);
488     }
489   while (hdrp->oid == OBUCK_OID_DELETED);
490   if (obuck_get_pos(hdrp->oid) != slurp_start)
491     obuck_broken("Invalid backlink", slurp_start);
492   slurp_remains = hdrp->length;
493   limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
494   limiter.name = "Bucket";
495   limiter.pos = 0;
496   limiter.refill = obuck_slurp_refill;
497   limiter.can_overwrite_buffer = obuck_rpf->can_overwrite_buffer;
498   return &limiter;
499 }
500
501 /*** Shakedown ***/
502
503 static inline void
504 shake_write(void *addr, int len, sh_off_t pos)
505 {
506   int l = sh_pwrite(obuck_fd, addr, len, pos);
507   if (l != len)
508     {
509       if (l < 0)
510         die("obuck_shakedown write error: %m");
511       else
512         die("obuck_shakedown write error: disk full");
513     }
514 }
515
516 static inline void
517 shake_sync(void)
518 {
519   if (obuck_shake_security > 1)
520     fdatasync(obuck_fd);
521 }
522
523 static void
524 shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size)
525 {
526   struct obuck_header *bhdr;
527   int boff = 0;
528   int l;
529   oid_t old_oid;
530
531   /* First of all, the "normal" part -- everything that will be written in this pass */
532   DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size);
533   while (boff < norm_size)
534     {
535       /* This needn't be optimized for speed. */
536       bhdr = (struct obuck_header *) (norm_buf + boff);
537       ASSERT(bhdr->magic == OBUCK_MAGIC);
538       l = obuck_bucket_size(bhdr->length);
539       old_oid = bhdr->oid;
540       bhdr->oid = bpos >> OBUCK_SHIFT;
541       shake_write(bhdr, l, bpos);
542       bhdr->oid = old_oid;
543       boff += l;
544       bpos += l;
545     }
546
547   /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */
548   if (more_size)
549     {
550       DBG("Backing up fragment of size %x and %x more", frag_size, more_size);
551
552       /* First the part we already have in the buffer */
553       bhdr = (struct obuck_header *) fragment;
554       ASSERT(bhdr->magic == OBUCK_MAGIC);
555       old_oid = bhdr->oid;
556       bhdr->oid = bpos >> OBUCK_SHIFT;
557       shake_write(bhdr, frag_size, bpos);
558       bhdr->oid = old_oid;
559       bpos += frag_size;
560
561       /* And then the rest, using a small 64K buffer */
562       byte *auxbuf = alloca(65536);
563       l = 0;
564       while (l < more_size)
565         {
566           int j = MIN(more_size-l, 65536);
567           if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j)
568             die("obuck_shakedown read error: %m");
569           shake_write(auxbuf, j, bpos);
570           bpos += j;
571           l += j;
572         }
573     }
574 }
575
576 static void
577 shake_erase(sh_off_t start, sh_off_t end)
578 {
579   if (start > end)
580     die("shake_erase called with negative length, that's a bug");
581   ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1)));
582   while (start < end)
583     {
584       u32 check = OBUCK_TRAILER;
585       obuck_hdr.magic = OBUCK_MAGIC;
586       obuck_hdr.oid = OBUCK_OID_DELETED;
587       uns len = MIN(0x40000000, end-start);
588       obuck_hdr.length = len - sizeof(obuck_hdr) - 4;
589       DBG("Erasing %08x bytes at %Lx", len, (long long) start);
590       shake_write(&obuck_hdr, sizeof(obuck_hdr), start);
591       start += len;
592       shake_write(&check, 4, start-4);
593     }
594 }
595
596 void
597 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
598 {
599   byte *buf;                                            /* Shakedown buffer and its size */
600   int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN);
601   byte *msg;                                            /* Error message we will print */
602   sh_off_t rstart, wstart;                              /* Original and new position of buffer start */
603   sh_off_t r_bucket_start, w_bucket_start;              /* Original and new position of the current bucket */
604   int roff, woff;                                       /* Orig/new position of the current bucket relative to buffer start */
605   int rsize;                                            /* Number of original bytes in the buffer */
606   int l;                                                /* Raw size of the current bucket */
607   int changed = 0;                                      /* "Something has been altered" flag */
608   int wrote_anything = 0;                               /* We already did a write to the bucket file */
609   struct obuck_header *rhdr, *whdr;                     /* Original and new address of header of the current bucket */
610   sh_off_t r_file_size;                                 /* Original size of the bucket file */
611   int more;                                             /* How much does the last bucket overlap the buffer */
612
613   buf = xmalloc(buflen);
614   rstart = wstart = 0;
615   roff = woff = rsize = 0;
616
617   /* We need to be the only accessor, all the object ID's are becoming invalid */
618   obuck_lock_write();
619   r_file_size = sh_seek(obuck_fd, 0, SEEK_END);
620   ASSERT(!(r_file_size & (OBUCK_ALIGN - 1)));
621   if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen)
622     die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work.");
623
624   DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size);
625
626   for(;;)
627     {
628       r_bucket_start = rstart + roff;
629       w_bucket_start = wstart + woff;
630       rhdr = (struct obuck_header *)(buf + roff);
631       whdr = (struct obuck_header *)(buf + woff);
632       if (roff == rsize)
633         {
634           more = 0;
635           goto next;
636         }
637       if (rhdr->magic != OBUCK_MAGIC ||
638           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
639         {
640           msg = "header mismatch";
641           goto broken;
642         }
643       l = obuck_bucket_size(rhdr->length);
644       if (l > buflen)
645         {
646           if (rhdr->oid != OBUCK_OID_DELETED)
647             {
648               msg = "bucket longer than ShakeBufSize";
649               goto broken;
650             }
651           /* Empty buckets are allowed to be large, but we need to handle them extra */
652           DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l);
653           rsize = roff + l;
654         }
655       else
656         {
657           if (rsize - roff < l)
658             {
659               more = l - (rsize - roff);
660               goto next;
661             }
662           if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER)
663             {
664               msg = "missing trailer";
665               goto broken;
666             }
667         }
668       if (rhdr->oid != OBUCK_OID_DELETED)
669         {
670           int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
671           if (status)
672             {
673               int lnew = l;
674               if (status > 1)
675                 {
676                   /* Changed! Reconstruct the trailer. */
677                   lnew = obuck_bucket_size(rhdr->length);
678                   ASSERT(lnew <= l);
679                   PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER);
680                   changed = 1;
681                 }
682               whdr = (struct obuck_header *)(buf+woff);
683               if (rhdr != whdr)
684                 memmove(whdr, rhdr, lnew);
685               whdr->oid = w_bucket_start >> OBUCK_SHIFT;
686               woff += lnew;
687             }
688           else
689             changed = 1;
690         }
691       else
692         {
693           kibitz(rhdr, OBUCK_OID_DELETED, NULL);
694           changed = 1;
695         }
696       roff += l;
697       continue;
698
699     next:
700       if (changed)
701         {
702           /* Write the new contents of the bucket file */
703           if (!wrote_anything)
704             {
705               if (obuck_shake_security)
706                 {
707                   /* But first write a backup at the end of the file to ensure nothing can be lost. */
708                   shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more);
709                   shake_sync();
710                 }
711               wrote_anything = 1;
712             }
713           if (woff)
714             {
715               DBG("Write %Lx %x", wstart, woff);
716               shake_write(buf, woff, wstart);
717               shake_sync();
718             }
719         }
720       else
721         ASSERT(wstart == rstart);
722
723       /* In any case, update the write position */
724       wstart += woff;
725       woff = 0;
726
727       /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */
728       rstart += roff;
729       if (more)
730         {
731           memmove(buf, buf+roff, rsize-roff);
732           rsize = rsize-roff;
733         }
734       else
735         rsize = 0;
736
737       /* And refill the buffer */
738       r_bucket_start = rstart+rsize;    /* Also needed for error messages */
739       l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start);
740       DBG("Read  %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize);
741       if (l < 0)
742         die("obuck_shakedown read error: %m");
743       if (!l)
744         {
745           if (!more)
746             break;
747           msg = "unexpected EOF";
748           goto broken;
749         }
750       if (l & (OBUCK_ALIGN-1))
751         {
752           msg = "garbage at the end of file";
753           goto broken;
754         }
755       rsize += l;
756       roff = 0;
757     }
758
759   DBG("Finished at position %Lx", (long long) wstart);
760   sh_ftruncate(obuck_fd, wstart);
761   shake_sync();
762
763   obuck_unlock();
764   xfree(buf);
765   return;
766
767  broken:
768   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris",
769       msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
770   /*
771    * We can attempt to clean up the bucket file by erasing everything between the last
772    * byte written and the next byte to be read. If the secure mode is switched on, we can
773    * guarantee that no data are lost, only some might be duplicated.
774    */
775   shake_erase(wstart, rstart);
776   die("Fatal error during object pool shakedown");
777 }
778
779 /*** Testing ***/
780
781 #ifdef TEST
782
783 #define COUNT 5000
784 #define MAXLEN 10000
785 #define KILLPERC 13
786 #define LEN(i) ((259309*(i))%MAXLEN)
787
788 static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck)
789 {
790   return 1;
791 }
792
793 int main(int argc, char **argv)
794 {
795   int ids[COUNT];
796   unsigned int i, j, cnt;
797   struct obuck_header h;
798   struct fastbuf *b;
799
800   log_init(NULL);
801   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
802       optind < argc)
803   {
804     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
805     exit(1);
806   }
807
808   unlink(obuck_name);
809   obuck_init(1);
810   for(j=0; j<COUNT; j++)
811     {
812       b = obuck_create(BUCKET_TYPE_PLAIN);
813       for(i=0; i<LEN(j); i++)
814         bputc(b, (i+j) % 256);
815       obuck_create_end(b, &h);
816       printf("Writing %08x %d\n", h.oid, h.length);
817       ids[j] = h.oid;
818     }
819   for(j=0; j<COUNT; j++)
820     if (j % 100 < KILLPERC)
821       {
822         printf("Deleting %08x\n", ids[j]);
823         obuck_delete(ids[j]);
824       }
825   cnt = 0;
826   for(j=0; j<COUNT; j++)
827     if (j % 100 >= KILLPERC)
828       {
829         cnt++;
830         h.oid = ids[j];
831         obuck_find_by_oid(&h);
832         b = obuck_fetch();
833         printf("Reading %08x %d\n", h.oid, h.length);
834         if (h.length != LEN(j))
835           die("Invalid length");
836         for(i=0; i<h.length; i++)
837           if ((unsigned) bgetc(b) != (i+j) % 256)
838             die("Contents mismatch");
839         if (bgetc(b) != EOF)
840           die("EOF mismatch");
841         bclose(b);
842       }
843   obuck_shakedown(test_kibitz);
844   if (obuck_find_first(&h, 0))
845     do
846       {
847         printf("<<< %08x\t%d\n", h.oid, h.length);
848         cnt--;
849       }
850     while (obuck_find_next(&h, 0));
851   if (cnt)
852     die("Walk mismatch");
853   obuck_cleanup();
854   return 0;
855 }
856
857 #endif