]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
implemented bconfig in the internal fastbuf
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001--2004 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/bucket.h"
14 #include "lib/fastbuf.h"
15 #include "lib/lfs.h"
16 #include "lib/conf.h"
17
18 #include <string.h>
19 #include <stdlib.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22 #include <sys/file.h>
23 #include <alloca.h>
24
25 static int obuck_fd;
26 static struct obuck_header obuck_hdr, obuck_create_hdr;
27 static sh_off_t bucket_find_pos;
28 static struct fastbuf *obuck_write_fb;
29
30 /*** Configuration ***/
31
32 byte *obuck_name = "not/configured";
33 static uns obuck_io_buflen = 65536;
34 static int obuck_shake_buflen = 1048576;
35 static uns obuck_shake_security;
36 static uns obuck_slurp_buflen = 65536;
37
38 static struct cfitem obuck_config[] = {
39   { "Buckets",          CT_SECTION,     NULL },
40   { "BucketFile",       CT_STRING,      &obuck_name },
41   { "BufSize",          CT_INT,         &obuck_io_buflen },
42   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
43   { "ShakeSecurity",    CT_INT,         &obuck_shake_security },
44   { "SlurpBufSize",     CT_INT,         &obuck_slurp_buflen },
45   { NULL,               CT_STOP,        NULL }
46 };
47
48 static void CONSTRUCTOR obuck_init_config(void)
49 {
50   cf_register(obuck_config);
51 }
52
53 /*** Internal operations ***/
54
55 static void
56 obuck_broken(char *msg, sh_off_t pos)
57 {
58   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
59 }
60
61 /*
62  *  We need several types of locks:
63  *
64  *      Read lock       reading parts of bucket file
65  *      Write lock      any write operations
66  *      Append lock     appending to the end of the file
67  *      Scan lock       reading parts which we are certain they exist
68  *
69  *  Multiple read and scan locks can co-exist together.
70  *  Scan locks can co-exist with an append lock.
71  *  There can be at most one write/append lock at a time.
72  *
73  *  These lock types map to a pair of normal read-write locks which
74  *  we represent as fcntl() locks on the first and second byte of the
75  *  bucket file. [We cannot use flock() since it happily permits
76  *  locking a shared fd (e.g., after fork()) multiple times at it also
77  *  doesn't offer multiple locks on a single file.]
78  *
79  *                      byte0           byte1
80  *      Read            <read>          <read>
81  *      Write           <write>         <write>
82  *      Append          <write>         -
83  *      Scan            -               <read>
84  */
85
86 static inline void
87 obuck_do_lock(int type, int start, int len)
88 {
89   struct flock fl;
90
91   fl.l_type = type;
92   fl.l_whence = SEEK_SET;
93   fl.l_start = start;
94   fl.l_len = len;
95   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
96     die("fcntl lock: %m");
97 }
98
99 inline void
100 obuck_lock_read(void)
101 {
102   obuck_do_lock(F_RDLCK, 0, 2);
103 }
104
105 inline void
106 obuck_lock_write(void)
107 {
108   obuck_do_lock(F_WRLCK, 0, 2);
109 }
110
111 static inline void
112 obuck_lock_append(void)
113 {
114   obuck_do_lock(F_WRLCK, 0, 1);
115 }
116
117 static inline void
118 obuck_lock_read_to_scan(void)
119 {
120   obuck_do_lock(F_UNLCK, 0, 1);
121 }
122
123 inline void
124 obuck_unlock(void)
125 {
126   obuck_do_lock(F_UNLCK, 0, 2);
127 }
128
129 /*** FastIO emulation ***/
130
131 struct fb_bucket {
132   struct fastbuf fb;
133   sh_off_t start_pos;
134   uns bucket_size;
135   byte buffer[0];
136 };
137 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
138
139 static int obuck_fb_count;
140
141 static void
142 obuck_fb_close(struct fastbuf *f)
143 {
144   obuck_fb_count--;
145   xfree(f);
146 }
147
148 /* We need to use pread/pwrite since we work on fd's shared between processes */
149
150 static int
151 obuck_fb_refill(struct fastbuf *f)
152 {
153   uns remains, bufsize, size, datasize;
154
155   remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
156   bufsize = f->bufend - f->buffer;
157   if (!remains)
158     return 0;
159   sh_off_t start = FB_BUCKET(f)->start_pos;
160   sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
161   if (remains <= bufsize)
162     {
163       datasize = remains;
164       size = start + obuck_bucket_size(FB_BUCKET(f)->bucket_size) - pos;
165     }
166   else
167     size = datasize = bufsize;
168   int l = sh_pread(obuck_fd, f->buffer, size, pos);
169   if (l < 0)
170     die("Error reading bucket: %m");
171   if ((unsigned) l != size)
172     obuck_broken("Short read", FB_BUCKET(f)->start_pos);
173   f->bptr = f->buffer;
174   f->bstop = f->buffer + datasize;
175   f->pos += datasize;
176   if (datasize < size)
177     {
178       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
179         obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
180     }
181   return datasize;
182 }
183
184 static void
185 obuck_fb_spout(struct fastbuf *f)
186 {
187   int l = f->bptr - f->buffer;
188   char *c = f->buffer;
189
190   while (l)
191     {
192       int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
193       if (z <= 0)
194         die("Error writing bucket: %m");
195       f->pos += z;
196       l -= z;
197       c += z;
198     }
199   f->bptr = f->buffer;
200 }
201
202 /*** Exported functions ***/
203
204 void
205 obuck_init(int writeable)
206 {
207   sh_off_t size;
208
209   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
210   if (obuck_fd < 0)
211     die("Unable to open bucket file %s: %m", obuck_name);
212   obuck_lock_read();
213   size = sh_seek(obuck_fd, 0, SEEK_END);
214   if (size)
215     {
216       /* If the bucket pool is not empty, check consistency of its end */
217       u32 check;
218       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
219           check != OBUCK_TRAILER)
220         obuck_broken("Missing trailer of last object", size - 4);
221     }
222   obuck_unlock();
223 }
224
225 void
226 obuck_cleanup(void)
227 {
228   close(obuck_fd);
229   if (obuck_fb_count)
230     log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
231   if (obuck_write_fb)
232     log(L_ERROR, "Bug: Forgot to close bucket write stream");
233 }
234
235 void
236 obuck_sync(void)
237 {
238   if (obuck_write_fb)
239     bflush(obuck_write_fb);
240   fsync(obuck_fd);
241 }
242
243 static void
244 obuck_get(oid_t oid)
245 {
246   bucket_find_pos = obuck_get_pos(oid);
247   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
248     obuck_broken("Short header read", bucket_find_pos);
249   if (obuck_hdr.magic != OBUCK_MAGIC)
250     obuck_broken("Missing magic number", bucket_find_pos);
251   if (obuck_hdr.oid == OBUCK_OID_DELETED)
252     obuck_broken("Access to deleted bucket", bucket_find_pos);
253   if (obuck_hdr.oid != oid)
254     obuck_broken("Invalid backlink", bucket_find_pos);
255 }
256
257 void
258 obuck_find_by_oid(struct obuck_header *hdrp)
259 {
260   oid_t oid = hdrp->oid;
261
262   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
263   obuck_lock_read();
264   obuck_get(oid);
265   obuck_unlock();
266   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
267 }
268
269 int
270 obuck_find_first(struct obuck_header *hdrp, int full)
271 {
272   bucket_find_pos = 0;
273   obuck_hdr.magic = 0;
274   return obuck_find_next(hdrp, full);
275 }
276
277 int
278 obuck_find_next(struct obuck_header *hdrp, int full)
279 {
280   int c;
281
282   for(;;)
283     {
284       if (obuck_hdr.magic)
285         bucket_find_pos += obuck_bucket_size(obuck_hdr.length);
286       obuck_lock_read();
287       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
288       obuck_unlock();
289       if (!c)
290         return 0;
291       if (c != sizeof(obuck_hdr))
292         obuck_broken("Short header read", bucket_find_pos);
293       if (obuck_hdr.magic != OBUCK_MAGIC)
294         obuck_broken("Missing magic number", bucket_find_pos);
295       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
296         {
297           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
298           return 1;
299         }
300     }
301 }
302
303 static int
304 obuck_bconfig(struct fastbuf *f UNUSED, uns item, int value UNUSED)
305 {
306   switch (item)
307     {
308     case BCONFIG_CAN_OVERWRITE:
309       return 2;
310     default:
311       return -1;
312     }
313 }
314
315 struct fastbuf *
316 obuck_fetch(void)
317 {
318   struct fastbuf *b;
319   uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
320   uns real_buflen = official_buflen + OBUCK_ALIGN;
321
322   b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
323   b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
324   b->bufend = b->buffer + official_buflen;
325   b->name = "bucket-read";
326   b->pos = 0;
327   b->refill = obuck_fb_refill;
328   b->spout = NULL;
329   b->seek = NULL;
330   b->close = obuck_fb_close;
331   b->config = obuck_bconfig;
332   FB_BUCKET(b)->start_pos = bucket_find_pos;
333   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
334   obuck_fb_count++;
335   return b;
336 }
337
338 oid_t
339 obuck_predict_last_oid(void)
340 {
341   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
342   return (oid_t)(size >> OBUCK_SHIFT);
343 }
344
345 struct fastbuf *
346 obuck_create(u32 type)
347 {
348   ASSERT(!obuck_write_fb);
349
350   obuck_lock_append();
351   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
352   if (start & (OBUCK_ALIGN - 1))
353     obuck_broken("Misaligned file", start);
354   obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
355   obuck_create_hdr.oid = start >> OBUCK_SHIFT;
356   obuck_create_hdr.length = 0;
357   obuck_create_hdr.type = type;
358
359   struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
360   obuck_write_fb = b;
361   b->buffer = FB_BUCKET(b)->buffer;
362   b->bptr = b->bstop = b->buffer;
363   b->bufend = b->buffer + obuck_io_buflen;
364   b->pos = -(int)sizeof(obuck_create_hdr);
365   b->name = "bucket-write";
366   b->refill = NULL;
367   b->spout = obuck_fb_spout;
368   b->seek = NULL;
369   b->close = NULL;
370   b->config = NULL;
371   FB_BUCKET(b)->start_pos = start;
372   FB_BUCKET(b)->bucket_size = 0;
373   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
374
375   return b;
376 }
377
378 void
379 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
380 {
381   ASSERT(b == obuck_write_fb);
382   obuck_write_fb = NULL;
383
384   obuck_create_hdr.magic = OBUCK_MAGIC;
385   obuck_create_hdr.length = btell(b);
386   int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
387   while (pad--)
388     bputc(b, 0);
389   bputl(b, OBUCK_TRAILER);
390   bflush(b);
391   ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
392   if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
393     die("Bucket header update failed: %m");
394   obuck_unlock();
395   memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
396   xfree(b);
397 }
398
399 void
400 obuck_delete(oid_t oid)
401 {
402   obuck_lock_write();
403   obuck_get(oid);
404   obuck_hdr.oid = OBUCK_OID_DELETED;
405   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
406   obuck_unlock();
407 }
408
409 /*** Fast reading of the whole pool ***/
410
411 static struct fastbuf *obuck_rpf;
412 static uns slurp_remains;
413 static sh_off_t slurp_start, slurp_current, slurp_end;
414
415 static int
416 obuck_slurp_refill(struct fastbuf *f)
417 {
418   uns l;
419
420   if (!slurp_remains)
421     return 0;
422   l = bdirect_read_prepare(obuck_rpf, &f->buffer);
423   if (!l)
424     obuck_broken("Incomplete object", slurp_start);
425   l = MIN(l, slurp_remains);
426   bdirect_read_commit(obuck_rpf, f->buffer + l);
427   slurp_remains -= l;
428   f->bptr = f->buffer;
429   f->bufend = f->bstop = f->buffer + l;
430   return 1;
431 }
432
433 struct fastbuf *
434 obuck_slurp_pool(struct obuck_header *hdrp)
435 {
436   static struct fastbuf limiter;
437   uns l;
438
439   do
440     {
441       if (!obuck_rpf)
442         {
443           obuck_lock_read();
444           obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
445           bseek(obuck_rpf, 0, SEEK_END);
446           slurp_end = btell(obuck_rpf);
447           bsetpos(obuck_rpf, 0);
448           obuck_lock_read_to_scan();
449         }
450       else
451         {
452           bsetpos(obuck_rpf, slurp_current - 4);
453           if (bgetl(obuck_rpf) != OBUCK_TRAILER)
454             obuck_broken("Missing trailer", slurp_start);
455         }
456       slurp_start = btell(obuck_rpf);
457       if (slurp_start < slurp_end)
458         l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
459       else
460         l = 0;
461       if (!l)
462         {
463           bclose(obuck_rpf);
464           obuck_rpf = NULL;
465           obuck_unlock();
466           return NULL;
467         }
468       if (l != sizeof(struct obuck_header))
469         obuck_broken("Short header read", slurp_start);
470       if (hdrp->magic != OBUCK_MAGIC)
471         obuck_broken("Missing magic number", slurp_start);
472       slurp_current = slurp_start + obuck_bucket_size(hdrp->length);
473     }
474   while (hdrp->oid == OBUCK_OID_DELETED);
475   if (obuck_get_pos(hdrp->oid) != slurp_start)
476     obuck_broken("Invalid backlink", slurp_start);
477   slurp_remains = hdrp->length;
478   limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
479   limiter.name = "Bucket";
480   limiter.pos = 0;
481   limiter.refill = obuck_slurp_refill;
482   return &limiter;
483 }
484
485 /*** Shakedown ***/
486
487 static inline void
488 shake_write(void *addr, int len, sh_off_t pos)
489 {
490   int l = sh_pwrite(obuck_fd, addr, len, pos);
491   if (l != len)
492     {
493       if (l < 0)
494         die("obuck_shakedown write error: %m");
495       else
496         die("obuck_shakedown write error: disk full");
497     }
498 }
499
500 static inline void
501 shake_sync(void)
502 {
503   if (obuck_shake_security > 1)
504     fdatasync(obuck_fd);
505 }
506
507 static void
508 shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size)
509 {
510   struct obuck_header *bhdr;
511   int boff = 0;
512   int l;
513   oid_t old_oid;
514
515   /* First of all, the "normal" part -- everything that will be written in this pass */
516   DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size);
517   while (boff < norm_size)
518     {
519       /* This needn't be optimized for speed. */
520       bhdr = (struct obuck_header *) (norm_buf + boff);
521       ASSERT(bhdr->magic == OBUCK_MAGIC);
522       l = obuck_bucket_size(bhdr->length);
523       old_oid = bhdr->oid;
524       bhdr->oid = bpos >> OBUCK_SHIFT;
525       shake_write(bhdr, l, bpos);
526       bhdr->oid = old_oid;
527       boff += l;
528       bpos += l;
529     }
530
531   /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */
532   if (more_size)
533     {
534       DBG("Backing up fragment of size %x and %x more", frag_size, more_size);
535
536       /* First the part we already have in the buffer */
537       bhdr = (struct obuck_header *) fragment;
538       ASSERT(bhdr->magic == OBUCK_MAGIC);
539       old_oid = bhdr->oid;
540       bhdr->oid = bpos >> OBUCK_SHIFT;
541       shake_write(bhdr, frag_size, bpos);
542       bhdr->oid = old_oid;
543       bpos += frag_size;
544
545       /* And then the rest, using a small 64K buffer */
546       byte *auxbuf = alloca(65536);
547       l = 0;
548       while (l < more_size)
549         {
550           int j = MIN(more_size-l, 65536);
551           if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j)
552             die("obuck_shakedown read error: %m");
553           shake_write(auxbuf, j, bpos);
554           bpos += j;
555           l += j;
556         }
557     }
558 }
559
560 static void
561 shake_erase(sh_off_t start, sh_off_t end)
562 {
563   if (start > end)
564     die("shake_erase called with negative length, that's a bug");
565   ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1)));
566   while (start < end)
567     {
568       u32 check = OBUCK_TRAILER;
569       obuck_hdr.magic = OBUCK_MAGIC;
570       obuck_hdr.oid = OBUCK_OID_DELETED;
571       uns len = MIN(0x40000000, end-start);
572       obuck_hdr.length = len - sizeof(obuck_hdr) - 4;
573       DBG("Erasing %08x bytes at %Lx", len, (long long) start);
574       shake_write(&obuck_hdr, sizeof(obuck_hdr), start);
575       start += len;
576       shake_write(&check, 4, start-4);
577     }
578 }
579
580 void
581 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
582 {
583   byte *buf;                                            /* Shakedown buffer and its size */
584   int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN);
585   byte *msg;                                            /* Error message we will print */
586   sh_off_t rstart, wstart;                              /* Original and new position of buffer start */
587   sh_off_t r_bucket_start, w_bucket_start;              /* Original and new position of the current bucket */
588   int roff, woff;                                       /* Orig/new position of the current bucket relative to buffer start */
589   int rsize;                                            /* Number of original bytes in the buffer */
590   int l;                                                /* Raw size of the current bucket */
591   int changed = 0;                                      /* "Something has been altered" flag */
592   int wrote_anything = 0;                               /* We already did a write to the bucket file */
593   struct obuck_header *rhdr, *whdr;                     /* Original and new address of header of the current bucket */
594   sh_off_t r_file_size;                                 /* Original size of the bucket file */
595   int more;                                             /* How much does the last bucket overlap the buffer */
596
597   buf = xmalloc(buflen);
598   rstart = wstart = 0;
599   roff = woff = rsize = 0;
600
601   /* We need to be the only accessor, all the object ID's are becoming invalid */
602   obuck_lock_write();
603   r_file_size = sh_seek(obuck_fd, 0, SEEK_END);
604   ASSERT(!(r_file_size & (OBUCK_ALIGN - 1)));
605   if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen)
606     die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work.");
607
608   DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size);
609
610   for(;;)
611     {
612       r_bucket_start = rstart + roff;
613       w_bucket_start = wstart + woff;
614       rhdr = (struct obuck_header *)(buf + roff);
615       whdr = (struct obuck_header *)(buf + woff);
616       if (roff == rsize)
617         {
618           more = 0;
619           goto next;
620         }
621       if (rhdr->magic != OBUCK_MAGIC ||
622           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
623         {
624           msg = "header mismatch";
625           goto broken;
626         }
627       l = obuck_bucket_size(rhdr->length);
628       if (l > buflen)
629         {
630           if (rhdr->oid != OBUCK_OID_DELETED)
631             {
632               msg = "bucket longer than ShakeBufSize";
633               goto broken;
634             }
635           /* Empty buckets are allowed to be large, but we need to handle them extra */
636           DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l);
637           rsize = roff + l;
638         }
639       else
640         {
641           if (rsize - roff < l)
642             {
643               more = l - (rsize - roff);
644               goto next;
645             }
646           if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER)
647             {
648               msg = "missing trailer";
649               goto broken;
650             }
651         }
652       if (rhdr->oid != OBUCK_OID_DELETED)
653         {
654           int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
655           if (status)
656             {
657               int lnew = l;
658               if (status > 1)
659                 {
660                   /* Changed! Reconstruct the trailer. */
661                   lnew = obuck_bucket_size(rhdr->length);
662                   ASSERT(lnew <= l);
663                   PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER);
664                   changed = 1;
665                 }
666               whdr = (struct obuck_header *)(buf+woff);
667               if (rhdr != whdr)
668                 memmove(whdr, rhdr, lnew);
669               whdr->oid = w_bucket_start >> OBUCK_SHIFT;
670               woff += lnew;
671             }
672           else
673             changed = 1;
674         }
675       else
676         {
677           kibitz(rhdr, OBUCK_OID_DELETED, NULL);
678           changed = 1;
679         }
680       roff += l;
681       continue;
682
683     next:
684       if (changed)
685         {
686           /* Write the new contents of the bucket file */
687           if (!wrote_anything)
688             {
689               if (obuck_shake_security)
690                 {
691                   /* But first write a backup at the end of the file to ensure nothing can be lost. */
692                   shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more);
693                   shake_sync();
694                 }
695               wrote_anything = 1;
696             }
697           if (woff)
698             {
699               DBG("Write %Lx %x", wstart, woff);
700               shake_write(buf, woff, wstart);
701               shake_sync();
702             }
703         }
704       else
705         ASSERT(wstart == rstart);
706
707       /* In any case, update the write position */
708       wstart += woff;
709       woff = 0;
710
711       /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */
712       rstart += roff;
713       if (more)
714         {
715           memmove(buf, buf+roff, rsize-roff);
716           rsize = rsize-roff;
717         }
718       else
719         rsize = 0;
720
721       /* And refill the buffer */
722       r_bucket_start = rstart+rsize;    /* Also needed for error messages */
723       l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start);
724       DBG("Read  %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize);
725       if (l < 0)
726         die("obuck_shakedown read error: %m");
727       if (!l)
728         {
729           if (!more)
730             break;
731           msg = "unexpected EOF";
732           goto broken;
733         }
734       if (l & (OBUCK_ALIGN-1))
735         {
736           msg = "garbage at the end of file";
737           goto broken;
738         }
739       rsize += l;
740       roff = 0;
741     }
742
743   DBG("Finished at position %Lx", (long long) wstart);
744   sh_ftruncate(obuck_fd, wstart);
745   shake_sync();
746
747   obuck_unlock();
748   xfree(buf);
749   return;
750
751  broken:
752   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris",
753       msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
754   /*
755    * We can attempt to clean up the bucket file by erasing everything between the last
756    * byte written and the next byte to be read. If the secure mode is switched on, we can
757    * guarantee that no data are lost, only some might be duplicated.
758    */
759   shake_erase(wstart, rstart);
760   die("Fatal error during object pool shakedown");
761 }
762
763 /*** Testing ***/
764
765 #ifdef TEST
766
767 #define COUNT 5000
768 #define MAXLEN 10000
769 #define KILLPERC 13
770 #define LEN(i) ((259309*(i))%MAXLEN)
771
772 static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck)
773 {
774   return 1;
775 }
776
777 int main(int argc, char **argv)
778 {
779   int ids[COUNT];
780   unsigned int i, j, cnt;
781   struct obuck_header h;
782   struct fastbuf *b;
783
784   log_init(NULL);
785   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
786       optind < argc)
787   {
788     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
789     exit(1);
790   }
791
792   unlink(obuck_name);
793   obuck_init(1);
794   for(j=0; j<COUNT; j++)
795     {
796       b = obuck_create(BUCKET_TYPE_PLAIN);
797       for(i=0; i<LEN(j); i++)
798         bputc(b, (i+j) % 256);
799       obuck_create_end(b, &h);
800       printf("Writing %08x %d\n", h.oid, h.length);
801       ids[j] = h.oid;
802     }
803   for(j=0; j<COUNT; j++)
804     if (j % 100 < KILLPERC)
805       {
806         printf("Deleting %08x\n", ids[j]);
807         obuck_delete(ids[j]);
808       }
809   cnt = 0;
810   for(j=0; j<COUNT; j++)
811     if (j % 100 >= KILLPERC)
812       {
813         cnt++;
814         h.oid = ids[j];
815         obuck_find_by_oid(&h);
816         b = obuck_fetch();
817         printf("Reading %08x %d\n", h.oid, h.length);
818         if (h.length != LEN(j))
819           die("Invalid length");
820         for(i=0; i<h.length; i++)
821           if ((unsigned) bgetc(b) != (i+j) % 256)
822             die("Contents mismatch");
823         if (bgetc(b) != EOF)
824           die("EOF mismatch");
825         bclose(b);
826       }
827   obuck_shakedown(test_kibitz);
828   if (obuck_find_first(&h, 0))
829     do
830       {
831         printf("<<< %08x\t%d\n", h.oid, h.length);
832         cnt--;
833       }
834     while (obuck_find_next(&h, 0));
835   if (cnt)
836     die("Walk mismatch");
837   obuck_cleanup();
838   return 0;
839 }
840
841 #endif