]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
obuck_predict_last_oid() can be made safe easily.
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001--2004 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/bucket.h"
14 #include "lib/fastbuf.h"
15 #include "lib/lfs.h"
16 #include "lib/conf.h"
17
18 #include <string.h>
19 #include <stdlib.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22 #include <sys/file.h>
23 #include <alloca.h>
24
25 static int obuck_fd;
26 static struct obuck_header obuck_hdr, obuck_create_hdr;
27 static sh_off_t bucket_find_pos;
28 static struct fastbuf *obuck_write_fb;
29
30 /*** Configuration ***/
31
32 byte *obuck_name = "not/configured";
33 static uns obuck_io_buflen = 65536;
34 static int obuck_shake_buflen = 1048576;
35 static uns obuck_shake_security;
36 static uns obuck_slurp_buflen = 65536;
37
38 static struct cfitem obuck_config[] = {
39   { "Buckets",          CT_SECTION,     NULL },
40   { "BucketFile",       CT_STRING,      &obuck_name },
41   { "BufSize",          CT_INT,         &obuck_io_buflen },
42   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
43   { "ShakeSecurity",    CT_INT,         &obuck_shake_security },
44   { "SlurpBufSize",     CT_INT,         &obuck_slurp_buflen },
45   { NULL,               CT_STOP,        NULL }
46 };
47
48 static void CONSTRUCTOR obuck_init_config(void)
49 {
50   cf_register(obuck_config);
51 }
52
53 /*** Internal operations ***/
54
55 static void
56 obuck_broken(char *msg, sh_off_t pos)
57 {
58   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
59 }
60
61 /*
62  *  Unfortunately we cannot use flock() here since it happily permits
63  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
64  *  locks are very ugly and they don't support 64-bit offsets, but we
65  *  can work around the problem by always locking the first header
66  *  in the file.
67  */
68
69 static inline void
70 obuck_do_lock(int type)
71 {
72   struct flock fl;
73
74   fl.l_type = type;
75   fl.l_whence = SEEK_SET;
76   fl.l_start = 0;
77   fl.l_len = sizeof(struct obuck_header);
78   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
79     die("fcntl lock: %m");
80 }
81
82 inline void
83 obuck_lock_read(void)
84 {
85   obuck_do_lock(F_RDLCK);
86 }
87
88 inline void
89 obuck_lock_write(void)
90 {
91   obuck_do_lock(F_WRLCK);
92 }
93
94 inline void
95 obuck_unlock(void)
96 {
97   obuck_do_lock(F_UNLCK);
98 }
99
100 /*** FastIO emulation ***/
101
102 struct fb_bucket {
103   struct fastbuf fb;
104   sh_off_t start_pos;
105   uns bucket_size;
106   byte buffer[0];
107 };
108 #define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
109
110 static int obuck_fb_count;
111
112 static void
113 obuck_fb_close(struct fastbuf *f)
114 {
115   obuck_fb_count--;
116   xfree(f);
117 }
118
119 /* We need to use pread/pwrite since we work on fd's shared between processes */
120
121 static int
122 obuck_fb_refill(struct fastbuf *f)
123 {
124   uns remains, bufsize, size, datasize;
125
126   remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
127   bufsize = f->bufend - f->buffer;
128   if (!remains)
129     return 0;
130   sh_off_t start = FB_BUCKET(f)->start_pos;
131   sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
132   if (remains <= bufsize)
133     {
134       datasize = remains;
135       size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
136     }
137   else
138     size = datasize = bufsize;
139   int l = sh_pread(obuck_fd, f->buffer, size, pos);
140   if (l < 0)
141     die("Error reading bucket: %m");
142   if ((unsigned) l != size)
143     obuck_broken("Short read", FB_BUCKET(f)->start_pos);
144   f->bptr = f->buffer;
145   f->bstop = f->buffer + datasize;
146   f->pos += datasize;
147   if (datasize < size)
148     {
149       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
150         obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
151     }
152   return datasize;
153 }
154
155 static void
156 obuck_fb_spout(struct fastbuf *f)
157 {
158   int l = f->bptr - f->buffer;
159   char *c = f->buffer;
160
161   while (l)
162     {
163       int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
164       if (z <= 0)
165         die("Error writing bucket: %m");
166       f->pos += z;
167       l -= z;
168       c += z;
169     }
170   f->bptr = f->buffer;
171 }
172
173 /*** Exported functions ***/
174
175 void
176 obuck_init(int writeable)
177 {
178   sh_off_t size;
179
180   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
181   if (obuck_fd < 0)
182     die("Unable to open bucket file %s: %m", obuck_name);
183   obuck_lock_read();
184   size = sh_seek(obuck_fd, 0, SEEK_END);
185   if (size)
186     {
187       /* If the bucket pool is not empty, check consistency of its end */
188       u32 check;
189       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
190           check != OBUCK_TRAILER)
191         obuck_broken("Missing trailer of last object", size - 4);
192     }
193   obuck_unlock();
194 }
195
196 void
197 obuck_cleanup(void)
198 {
199   close(obuck_fd);
200   if (obuck_fb_count)
201     log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
202   if (obuck_write_fb)
203     log(L_ERROR, "Bug: Forgot to close bucket write stream");
204 }
205
206 void
207 obuck_sync(void)
208 {
209   if (obuck_write_fb)
210     bflush(obuck_write_fb);
211   fsync(obuck_fd);
212 }
213
214 static void
215 obuck_get(oid_t oid)
216 {
217   bucket_find_pos = obuck_get_pos(oid);
218   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
219     obuck_broken("Short header read", bucket_find_pos);
220   if (obuck_hdr.magic != OBUCK_MAGIC)
221     obuck_broken("Missing magic number", bucket_find_pos);
222   if (obuck_hdr.oid == OBUCK_OID_DELETED)
223     obuck_broken("Access to deleted bucket", bucket_find_pos);
224   if (obuck_hdr.oid != oid)
225     obuck_broken("Invalid backlink", bucket_find_pos);
226 }
227
228 void
229 obuck_find_by_oid(struct obuck_header *hdrp)
230 {
231   oid_t oid = hdrp->oid;
232
233   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
234   obuck_lock_read();
235   obuck_get(oid);
236   obuck_unlock();
237   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
238 }
239
240 int
241 obuck_find_first(struct obuck_header *hdrp, int full)
242 {
243   bucket_find_pos = 0;
244   obuck_hdr.magic = 0;
245   return obuck_find_next(hdrp, full);
246 }
247
248 int
249 obuck_find_next(struct obuck_header *hdrp, int full)
250 {
251   int c;
252
253   for(;;)
254     {
255       if (obuck_hdr.magic)
256         bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
257                            4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
258       obuck_lock_read();
259       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
260       obuck_unlock();
261       if (!c)
262         return 0;
263       if (c != sizeof(obuck_hdr))
264         obuck_broken("Short header read", bucket_find_pos);
265       if (obuck_hdr.magic != OBUCK_MAGIC)
266         obuck_broken("Missing magic number", bucket_find_pos);
267       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
268         {
269           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
270           return 1;
271         }
272     }
273 }
274
275 struct fastbuf *
276 obuck_fetch(void)
277 {
278   struct fastbuf *b;
279   uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
280   uns real_buflen = official_buflen + OBUCK_ALIGN;
281
282   b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
283   b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
284   b->bufend = b->buffer + official_buflen;
285   b->name = "bucket-read";
286   b->pos = 0;
287   b->refill = obuck_fb_refill;
288   b->spout = NULL;
289   b->seek = NULL;
290   b->close = obuck_fb_close;
291   b->config = NULL;
292   FB_BUCKET(b)->start_pos = bucket_find_pos;
293   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
294   obuck_fb_count++;
295   return b;
296 }
297
298 oid_t
299 obuck_predict_last_oid(void)
300 {
301   obuck_lock_write();
302   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
303   oid_t ss = size >> OBUCK_SHIFT;
304   obuck_unlock();
305   return ss;
306 }
307
308 struct fastbuf *
309 obuck_create(u32 type)
310 {
311   ASSERT(!obuck_write_fb);
312
313   obuck_lock_write();
314   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
315   if (start & (OBUCK_ALIGN - 1))
316     obuck_broken("Misaligned file", start);
317   obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
318   obuck_create_hdr.oid = start >> OBUCK_SHIFT;
319   obuck_create_hdr.length = 0;
320   obuck_create_hdr.type = type;
321
322   struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
323   obuck_write_fb = b;
324   b->buffer = FB_BUCKET(b)->buffer;
325   b->bptr = b->bstop = b->buffer;
326   b->bufend = b->buffer + obuck_io_buflen;
327   b->pos = -(int)sizeof(obuck_create_hdr);
328   b->name = "bucket-write";
329   b->refill = NULL;
330   b->spout = obuck_fb_spout;
331   b->seek = NULL;
332   b->close = NULL;
333   b->config = NULL;
334   FB_BUCKET(b)->start_pos = start;
335   FB_BUCKET(b)->bucket_size = 0;
336   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
337
338   return b;
339 }
340
341 void
342 obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
343 {
344   ASSERT(b == obuck_write_fb);
345   obuck_write_fb = NULL;
346
347   obuck_create_hdr.magic = OBUCK_MAGIC;
348   obuck_create_hdr.length = btell(b);
349   int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
350   while (pad--)
351     bputc(b, 0);
352   bputl(b, OBUCK_TRAILER);
353   bflush(b);
354   ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
355   if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
356     die("Bucket header update failed: %m");
357   obuck_unlock();
358   memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
359   xfree(b);
360 }
361
362 void
363 obuck_delete(oid_t oid)
364 {
365   obuck_lock_write();
366   obuck_get(oid);
367   obuck_hdr.oid = OBUCK_OID_DELETED;
368   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
369   obuck_unlock();
370 }
371
372 /*** Fast reading of the whole pool ***/
373
374 static struct fastbuf *obuck_rpf;
375 static uns slurp_remains;
376 static sh_off_t slurp_start, slurp_current;
377
378 static int
379 obuck_slurp_refill(struct fastbuf *f)
380 {
381   uns l;
382
383   if (!slurp_remains)
384     return 0;
385   l = bdirect_read_prepare(obuck_rpf, &f->buffer);
386   if (!l)
387     obuck_broken("Incomplete object", slurp_start);
388   l = MIN(l, slurp_remains);
389   bdirect_read_commit(obuck_rpf, f->buffer + l);
390   slurp_remains -= l;
391   f->bptr = f->buffer;
392   f->bufend = f->bstop = f->buffer + l;
393   return 1;
394 }
395
396 struct fastbuf *
397 obuck_slurp_pool(struct obuck_header *hdrp)
398 {
399   static struct fastbuf limiter;
400   uns l;
401
402   do
403     {
404       if (!obuck_rpf)
405         {
406           obuck_lock_read();
407           obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
408         }
409       else
410         {
411           bsetpos(obuck_rpf, slurp_current - 4);
412           if (bgetl(obuck_rpf) != OBUCK_TRAILER)
413             obuck_broken("Missing trailer", slurp_start);
414         }
415       slurp_start = btell(obuck_rpf);
416       l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
417       if (!l)
418         {
419           bclose(obuck_rpf);
420           obuck_rpf = NULL;
421           obuck_unlock();
422           return NULL;
423         }
424       if (l != sizeof(struct obuck_header))
425         obuck_broken("Short header read", slurp_start);
426       if (hdrp->magic != OBUCK_MAGIC)
427         obuck_broken("Missing magic number", slurp_start);
428       slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
429                        4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
430     }
431   while (hdrp->oid == OBUCK_OID_DELETED);
432   if (obuck_get_pos(hdrp->oid) != slurp_start)
433     obuck_broken("Invalid backlink", slurp_start);
434   slurp_remains = hdrp->length;
435   limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
436   limiter.name = "Bucket";
437   limiter.pos = 0;
438   limiter.refill = obuck_slurp_refill;
439   return &limiter;
440 }
441
442 /*** Shakedown ***/
443
444 static inline void
445 shake_write(void *addr, int len, sh_off_t pos)
446 {
447   int l = sh_pwrite(obuck_fd, addr, len, pos);
448   if (l != len)
449     {
450       if (l < 0)
451         die("obuck_shakedown write error: %m");
452       else
453         die("obuck_shakedown write error: disk full");
454     }
455 }
456
457 static inline void
458 shake_sync(void)
459 {
460   if (obuck_shake_security > 1)
461     fdatasync(obuck_fd);
462 }
463
464 static void
465 shake_write_backup(sh_off_t bpos, byte *norm_buf, int norm_size, byte *fragment, int frag_size, sh_off_t frag_pos, int more_size)
466 {
467   struct obuck_header *bhdr;
468   int boff = 0;
469   int l;
470   oid_t old_oid;
471
472   /* First of all, the "normal" part -- everything that will be written in this pass */
473   DBG("Backing up first round of changes at position %Lx + %x", (long long) bpos, norm_size);
474   while (boff < norm_size)
475     {
476       /* This needn't be optimized for speed. */
477       bhdr = (struct obuck_header *) (norm_buf + boff);
478       ASSERT(bhdr->magic == OBUCK_MAGIC);
479       l = ALIGN(sizeof(struct obuck_header) + bhdr->length + 4, OBUCK_ALIGN);
480       old_oid = bhdr->oid;
481       bhdr->oid = bpos >> OBUCK_SHIFT;
482       shake_write(bhdr, l, bpos);
483       bhdr->oid = old_oid;
484       boff += l;
485       bpos += l;
486     }
487
488   /* If we have an incomplete bucket at the end of the buffer, we must copy it as well. */
489   if (more_size)
490     {
491       DBG("Backing up fragment of size %x and %x more", frag_size, more_size);
492
493       /* First the part we already have in the buffer */
494       bhdr = (struct obuck_header *) fragment;
495       ASSERT(bhdr->magic == OBUCK_MAGIC);
496       old_oid = bhdr->oid;
497       bhdr->oid = bpos >> OBUCK_SHIFT;
498       shake_write(bhdr, frag_size, bpos);
499       bhdr->oid = old_oid;
500       bpos += frag_size;
501
502       /* And then the rest, using a small 64K buffer */
503       byte *auxbuf = alloca(65536);
504       l = 0;
505       while (l < more_size)
506         {
507           int j = MIN(more_size-l, 65536);
508           if (sh_pread(obuck_fd, auxbuf, j, frag_pos + frag_size + l) != j)
509             die("obuck_shakedown read error: %m");
510           shake_write(auxbuf, j, bpos);
511           bpos += j;
512           l += j;
513         }
514     }
515 }
516
517 static void
518 shake_erase(sh_off_t start, sh_off_t end)
519 {
520   if (start > end)
521     die("shake_erase called with negative length, that's a bug");
522   ASSERT(!(start & (OBUCK_ALIGN-1)) && !(end & (OBUCK_ALIGN-1)));
523   while (start < end)
524     {
525       u32 check = OBUCK_TRAILER;
526       obuck_hdr.magic = OBUCK_MAGIC;
527       obuck_hdr.oid = OBUCK_OID_DELETED;
528       uns len = MIN(0x40000000, end-start);
529       obuck_hdr.length = len - sizeof(obuck_hdr) - 4;
530       DBG("Erasing %08x bytes at %Lx", len, (long long) start);
531       shake_write(&obuck_hdr, sizeof(obuck_hdr), start);
532       start += len;
533       shake_write(&check, 4, start-4);
534     }
535 }
536
537 void
538 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
539 {
540   byte *buf;                                            /* Shakedown buffer and its size */
541   int buflen = ALIGN(obuck_shake_buflen, OBUCK_ALIGN);
542   byte *msg;                                            /* Error message we will print */
543   sh_off_t rstart, wstart;                              /* Original and new position of buffer start */
544   sh_off_t r_bucket_start, w_bucket_start;              /* Original and new position of the current bucket */
545   int roff, woff;                                       /* Orig/new position of the current bucket relative to buffer start */
546   int rsize;                                            /* Number of original bytes in the buffer */
547   int l;                                                /* Raw size of the current bucket */
548   int changed = 0;                                      /* "Something has been altered" flag */
549   int wrote_anything = 0;                               /* We already did a write to the bucket file */
550   struct obuck_header *rhdr, *whdr;                     /* Original and new address of header of the current bucket */
551   sh_off_t r_file_size;                                 /* Original size of the bucket file */
552   int more;                                             /* How much does the last bucket overlap the buffer */
553
554   buf = xmalloc(buflen);
555   rstart = wstart = 0;
556   roff = woff = rsize = 0;
557
558   /* We need to be the only accessor, all the object ID's are becoming invalid */
559   obuck_lock_write();
560   r_file_size = sh_seek(obuck_fd, 0, SEEK_END);
561   ASSERT(!(r_file_size & (OBUCK_ALIGN - 1)));
562   if (r_file_size >= (0x100000000 << OBUCK_SHIFT) - buflen)
563     die("Bucket file is too large for safe shakedown. Shaking down with Bucket.ShakeSecurity=0 will still work.");
564
565   DBG("Starting shakedown. Buffer size is %d, original length %Lx", buflen, (long long) r_file_size);
566
567   for(;;)
568     {
569       r_bucket_start = rstart + roff;
570       w_bucket_start = wstart + woff;
571       rhdr = (struct obuck_header *)(buf + roff);
572       whdr = (struct obuck_header *)(buf + woff);
573       if (roff == rsize)
574         {
575           more = 0;
576           goto next;
577         }
578       if (rhdr->magic != OBUCK_MAGIC ||
579           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
580         {
581           msg = "header mismatch";
582           goto broken;
583         }
584       l = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
585       if (l > buflen)
586         {
587           if (rhdr->oid != OBUCK_OID_DELETED)
588             {
589               msg = "bucket longer than ShakeBufSize";
590               goto broken;
591             }
592           /* Empty buckets are allowed to be large, but we need to handle them extra */
593           DBG("Tricking around an extra-large empty bucket at %Lx + %x", (long long)r_bucket_start, l);
594           rsize = roff + l;
595         }
596       else
597         {
598           if (rsize - roff < l)
599             {
600               more = l - (rsize - roff);
601               goto next;
602             }
603           if (GET_U32((byte *)rhdr + l - 4) != OBUCK_TRAILER)
604             {
605               msg = "missing trailer";
606               goto broken;
607             }
608         }
609       if (rhdr->oid != OBUCK_OID_DELETED)
610         {
611           int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
612           if (status)
613             {
614               if (status > 1)
615                 {
616                   /* Changed! Reconstruct the trailer. */
617                   int l2 = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
618                   ASSERT(l2 <= l);
619                   PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER);
620                   l = l2;
621                   changed = 1;
622                 }
623               whdr = (struct obuck_header *)(buf+woff);
624               if (rhdr != whdr)
625                 memmove(whdr, rhdr, l);
626               whdr->oid = w_bucket_start >> OBUCK_SHIFT;
627               woff += l;
628             }
629           else
630             changed = 1;
631         }
632       else
633         {
634           kibitz(rhdr, OBUCK_OID_DELETED, NULL);
635           changed = 1;
636         }
637       roff += l;
638       continue;
639
640     next:
641       if (changed)
642         {
643           /* Write the new contents of the bucket file */
644           if (!wrote_anything)
645             {
646               if (obuck_shake_security)
647                 {
648                   /* But first write a backup at the end of the file to ensure nothing can be lost. */
649                   shake_write_backup(r_file_size, buf, woff, buf+roff, rsize-roff, rstart+roff, more);
650                   shake_sync();
651                 }
652               wrote_anything = 1;
653             }
654           if (woff)
655             {
656               DBG("Write %Lx %x", wstart, woff);
657               shake_write(buf, woff, wstart);
658               shake_sync();
659             }
660         }
661       else
662         ASSERT(wstart == rstart);
663
664       /* In any case, update the write position */
665       wstart += woff;
666       woff = 0;
667
668       /* Skip what's been read and if there is any fragment at the end of the buffer, move it to the start */
669       rstart += roff;
670       if (more)
671         {
672           memmove(buf, buf+roff, rsize-roff);
673           rsize = rsize-roff;
674         }
675       else
676         rsize = 0;
677
678       /* And refill the buffer */
679       r_bucket_start = rstart+rsize;    /* Also needed for error messages */
680       l = sh_pread(obuck_fd, buf+rsize, MIN(buflen-rsize, r_file_size - r_bucket_start), r_bucket_start);
681       DBG("Read  %Lx %x (%x inherited)", (long long)r_bucket_start, l, rsize);
682       if (l < 0)
683         die("obuck_shakedown read error: %m");
684       if (!l)
685         {
686           if (!more)
687             break;
688           msg = "unexpected EOF";
689           goto broken;
690         }
691       if (l & (OBUCK_ALIGN-1))
692         {
693           msg = "garbage at the end of file";
694           goto broken;
695         }
696       rsize += l;
697       roff = 0;
698     }
699
700   DBG("Finished at position %Lx", (long long) wstart);
701   sh_ftruncate(obuck_fd, wstart);
702   shake_sync();
703
704   obuck_unlock();
705   xfree(buf);
706   return;
707
708  broken:
709   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris",
710       msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
711   /*
712    * We can attempt to clean up the bucket file by erasing everything between the last
713    * byte written and the next byte to be read. If the secure mode is switched on, we can
714    * guarantee that no data are lost, only some might be duplicated.
715    */
716   shake_erase(wstart, rstart);
717   die("Fatal error during object pool shakedown");
718 }
719
720 /*** Testing ***/
721
722 #ifdef TEST
723
724 #define COUNT 5000
725 #define MAXLEN 10000
726 #define KILLPERC 13
727 #define LEN(i) ((259309*(i))%MAXLEN)
728
729 static int test_kibitz(struct obuck_header *h, oid_t new, byte *buck)
730 {
731   return 1;
732 }
733
734 int main(int argc, char **argv)
735 {
736   int ids[COUNT];
737   unsigned int i, j, cnt;
738   struct obuck_header h;
739   struct fastbuf *b;
740
741   log_init(NULL);
742   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
743       optind < argc)
744   {
745     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
746     exit(1);
747   }
748
749   unlink(obuck_name);
750   obuck_init(1);
751   for(j=0; j<COUNT; j++)
752     {
753       b = obuck_create(BUCKET_TYPE_PLAIN);
754       for(i=0; i<LEN(j); i++)
755         bputc(b, (i+j) % 256);
756       obuck_create_end(b, &h);
757       printf("Writing %08x %d\n", h.oid, h.length);
758       ids[j] = h.oid;
759     }
760   for(j=0; j<COUNT; j++)
761     if (j % 100 < KILLPERC)
762       {
763         printf("Deleting %08x\n", ids[j]);
764         obuck_delete(ids[j]);
765       }
766   cnt = 0;
767   for(j=0; j<COUNT; j++)
768     if (j % 100 >= KILLPERC)
769       {
770         cnt++;
771         h.oid = ids[j];
772         obuck_find_by_oid(&h);
773         b = obuck_fetch();
774         printf("Reading %08x %d\n", h.oid, h.length);
775         if (h.length != LEN(j))
776           die("Invalid length");
777         for(i=0; i<h.length; i++)
778           if ((unsigned) bgetc(b) != (i+j) % 256)
779             die("Contents mismatch");
780         if (bgetc(b) != EOF)
781           die("EOF mismatch");
782         bclose(b);
783       }
784   obuck_shakedown(test_kibitz);
785   if (obuck_find_first(&h, 0))
786     do
787       {
788         printf("<<< %08x\t%d\n", h.oid, h.length);
789         cnt--;
790       }
791     while (obuck_find_next(&h, 0));
792   if (cnt)
793     die("Walk mismatch");
794   obuck_cleanup();
795   return 0;
796 }
797
798 #endif