]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
Oops, the card array was reversed!
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001--2002 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/bucket.h"
12 #include "lib/fastbuf.h"
13 #include "lib/lfs.h"
14 #include "lib/conf.h"
15
16 #include <string.h>
17 #include <stdlib.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/file.h>
21
22 static int obuck_fd;
23 static unsigned int obuck_remains, obuck_check_pad;
24 static struct fastbuf *obuck_fb;
25 static struct obuck_header obuck_hdr;
26 static sh_off_t bucket_start, bucket_current;
27
28 /*** Configuration ***/
29
30 byte *obuck_name = "not/configured";
31 static uns obuck_io_buflen = 65536;
32 static int obuck_shake_buflen = 1048576;
33
34 static struct cfitem obuck_config[] = {
35   { "Buckets",          CT_SECTION,     NULL },
36   { "BucketFile",       CT_STRING,      &obuck_name },
37   { "BufSize",          CT_INT,         &obuck_io_buflen },
38   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
39   { NULL,               CT_STOP,        NULL }
40 };
41
42 static void CONSTRUCTOR obuck_init_config(void)
43 {
44   cf_register(obuck_config);
45 }
46
47 /*** Internal operations ***/
48
49 static void
50 obuck_broken(char *msg)
51 {
52   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
53 }
54
55 /*
56  *  Unfortunately we cannot use flock() here since it happily permits
57  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
58  *  locks are very ugly and they don't support 64-bit offsets, but we
59  *  can work around the problem by always locking the first header
60  *  in the file.
61  */
62
63 static inline void
64 obuck_do_lock(int type)
65 {
66   struct flock fl;
67
68   fl.l_type = type;
69   fl.l_whence = SEEK_SET;
70   fl.l_start = 0;
71   fl.l_len = sizeof(struct obuck_header);
72   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
73     die("fcntl lock: %m");
74 }
75
76 inline void
77 obuck_lock_read(void)
78 {
79   obuck_do_lock(F_RDLCK);
80 }
81
82 inline void
83 obuck_lock_write(void)
84 {
85   obuck_do_lock(F_WRLCK);
86 }
87
88 inline void
89 obuck_unlock(void)
90 {
91   obuck_do_lock(F_UNLCK);
92 }
93
94 /*** FastIO emulation ***/
95
96 /* We need to use pread/pwrite since we work on fd's shared between processes */
97
98 static int
99 obuck_fb_refill(struct fastbuf *f)
100 {
101   unsigned limit = (obuck_io_buflen < obuck_remains) ? obuck_io_buflen : obuck_remains;
102   unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
103   int l;
104
105   if (!limit)
106     return 0;
107   l = sh_pread(obuck_fd, f->buffer, size, bucket_current);
108   if (l < 0)
109     die("Error reading bucket: %m");
110   if ((unsigned) l != size)
111     obuck_broken("Short read");
112   f->bptr = f->buffer;
113   f->bstop = f->buffer + limit;
114   bucket_current += limit;
115   f->pos = bucket_current - bucket_start - sizeof(obuck_hdr);
116   obuck_remains -= limit;
117   if (!obuck_remains)   /* Should check the trailer */
118     {
119       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
120         obuck_broken("Missing trailer");
121     }
122   return limit;
123 }
124
125 static void
126 obuck_fb_spout(struct fastbuf *f)
127 {
128   int l = f->bptr - f->buffer;
129   char *c = f->buffer;
130
131   while (l)
132     {
133       int z = sh_pwrite(obuck_fd, c, l, bucket_current);
134       if (z <= 0)
135         die("Error writing bucket: %m");
136       bucket_current += z;
137       l -= z;
138       c += z;
139     }
140   f->bptr = f->buffer;
141   f->pos = bucket_current - bucket_start - sizeof(obuck_hdr);
142 }
143
144 /*** Exported functions ***/
145
146 void
147 obuck_init(int writeable)
148 {
149   struct fastbuf *b;
150   sh_off_t size;
151
152   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
153   if (obuck_fd < 0)
154     die("Unable to open bucket file %s: %m", obuck_name);
155   obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
156   b->buffer = (char *)(b+1);
157   b->bptr = b->bstop = b->buffer;
158   b->bufend = b->buffer + obuck_io_buflen;
159   b->name = "bucket";
160   b->refill = obuck_fb_refill;
161   b->spout = obuck_fb_spout;
162   obuck_lock_read();
163   size = sh_seek(obuck_fd, 0, SEEK_END);
164   if (size)
165     {
166       /* If the bucket pool is not empty, check consistency of its end */
167       u32 check;
168       bucket_start = size - 4;  /* for error reporting */
169       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
170           check != OBUCK_TRAILER)
171         obuck_broken("Missing trailer of last object");
172     }
173   obuck_unlock();
174 }
175
176 void
177 obuck_cleanup(void)
178 {
179   bclose(obuck_fb);
180   close(obuck_fd);
181   xfree(obuck_fb);
182 }
183
184 void
185 obuck_sync(void)
186 {
187   bflush(obuck_fb);
188   fsync(obuck_fd);
189 }
190
191 static void
192 obuck_get(oid_t oid)
193 {
194   struct fastbuf *b = obuck_fb;
195
196   bucket_start = obuck_get_pos(oid);
197   bflush(b);
198   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
199     obuck_broken("Short header read");
200   bucket_current = bucket_start + sizeof(obuck_hdr);
201   if (obuck_hdr.magic != OBUCK_MAGIC)
202     obuck_broken("Missing magic number");
203   if (obuck_hdr.oid == OBUCK_OID_DELETED)
204     obuck_broken("Access to deleted bucket");
205   if (obuck_hdr.oid != oid)
206     obuck_broken("Invalid backlink");
207 }
208
209 void
210 obuck_find_by_oid(struct obuck_header *hdrp)
211 {
212   oid_t oid = hdrp->oid;
213
214   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
215   obuck_lock_read();
216   obuck_get(oid);
217   obuck_unlock();
218   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
219 }
220
221 int
222 obuck_find_first(struct obuck_header *hdrp, int full)
223 {
224   bucket_start = 0;
225   obuck_hdr.magic = 0;
226   return obuck_find_next(hdrp, full);
227 }
228
229 int
230 obuck_find_next(struct obuck_header *hdrp, int full)
231 {
232   int c;
233   struct fastbuf *b = obuck_fb;
234
235   for(;;)
236     {
237       if (obuck_hdr.magic)
238         bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
239                         4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
240       bflush(b);
241       obuck_lock_read();
242       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
243       obuck_unlock();
244       if (!c)
245         return 0;
246       if (c != sizeof(obuck_hdr))
247         obuck_broken("Short header read");
248       bucket_current = bucket_start + sizeof(obuck_hdr);
249       if (obuck_hdr.magic != OBUCK_MAGIC)
250         obuck_broken("Missing magic number");
251       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
252         {
253           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
254           return 1;
255         }
256     }
257 }
258
259 struct fastbuf *
260 obuck_fetch(void)
261 {
262   obuck_fb->pos = 0;
263   obuck_remains = obuck_hdr.length;
264   obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
265   return obuck_fb;
266 }
267
268 void
269 obuck_fetch_end(struct fastbuf *b UNUSED)
270 {
271 }
272
273 struct fastbuf *
274 obuck_create(void)
275 {
276   obuck_lock_write();
277   bflush(obuck_fb);
278   bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
279   if (bucket_start & (OBUCK_ALIGN - 1))
280     obuck_broken("Misaligned file");
281   obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
282   obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
283   obuck_hdr.length = obuck_hdr.orig_length = 0;
284   bucket_current = bucket_start;
285   bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
286   obuck_fb->pos = -sizeof(obuck_hdr);
287   return obuck_fb;
288 }
289
290 void
291 obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
292 {
293   int pad;
294   obuck_hdr.magic = OBUCK_MAGIC;
295   obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb);
296   pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
297   while (pad--)
298     bputc(obuck_fb, 0);
299   bputl(obuck_fb, OBUCK_TRAILER);
300   bflush(obuck_fb);
301   ASSERT(!(bucket_current & (OBUCK_ALIGN - 1)));
302   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
303   obuck_unlock();
304   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
305 }
306
307 void
308 obuck_delete(oid_t oid)
309 {
310   obuck_lock_write();
311   obuck_get(oid);
312   obuck_hdr.oid = OBUCK_OID_DELETED;
313   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
314   obuck_unlock();
315 }
316
317 /*** Shakedown ***/
318
319 void
320 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
321 {
322   byte *rbuf, *wbuf, *msg;
323   sh_off_t rstart, wstart, w_bucket_start;
324   int roff, woff, rsize, l;
325   struct obuck_header *rhdr, *whdr;
326
327   rbuf = xmalloc(obuck_shake_buflen);
328   wbuf = xmalloc(obuck_shake_buflen);
329   rstart = wstart = 0;
330   roff = woff = rsize = 0;
331
332   /* We need to be the only accessor, all the object ID's are becoming invalid */
333   obuck_lock_write();
334
335   for(;;)
336     {
337       bucket_start = rstart + roff;
338       w_bucket_start = wstart + woff;
339       if (rsize - roff < OBUCK_ALIGN)
340         goto reread;
341       rhdr = (struct obuck_header *)(rbuf + roff);
342       if (rhdr->magic != OBUCK_MAGIC ||
343           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(bucket_start >> OBUCK_SHIFT))
344         {
345           msg = "header mismatch";
346           goto broken;
347         }
348       l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
349       if (rsize - roff < l)
350         goto reread;
351       if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
352         {
353           msg = "missing trailer";
354           goto broken;
355         }
356       if (rhdr->oid != OBUCK_OID_DELETED)
357         {
358           if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
359             {
360               if (bucket_start == w_bucket_start)
361                 {
362                   /* No copying needed now nor ever in the past, hence woff==0 */
363                   wstart += l;
364                 }
365               else
366                 {
367                   if (obuck_shake_buflen - woff < l)
368                     {
369                       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
370                         die("obuck_shakedown write failed: %m");
371                       wstart += woff;
372                       woff = 0;
373                     }
374                   whdr = (struct obuck_header *)(wbuf+woff);
375                   memcpy(whdr, rhdr, l);
376                   whdr->oid = w_bucket_start >> OBUCK_SHIFT;
377                   woff += l;
378                 }
379             }
380         }
381       else
382         kibitz(rhdr, OBUCK_OID_DELETED, NULL);
383       roff += l;
384       continue;
385
386     reread:
387       if (roff)
388         {
389           memmove(rbuf, rbuf+roff, rsize-roff);
390           rsize -= roff;
391           rstart += roff;
392           roff = 0;
393         }
394       l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
395       if (l < 0)
396         die("obuck_shakedown read error: %m");
397       if (!l)
398         {
399           if (!rsize)
400             break;
401           msg = "unexpected EOF";
402           goto broken;
403         }
404       rsize += l;
405     }
406   if (woff)
407     {
408       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
409         die("obuck_shakedown write failed: %m");
410       wstart += woff;
411     }
412   sh_ftruncate(obuck_fd, wstart);
413
414   obuck_unlock();
415   xfree(rbuf);
416   xfree(wbuf);
417   return;
418
419  broken:
420   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld), gathering debris", msg, (long long) bucket_start);
421   if (woff)
422     {
423       sh_pwrite(obuck_fd, wbuf, woff, wstart);
424       wstart += woff;
425     }
426   while (wstart + OBUCK_ALIGN <= bucket_start)
427     {
428       u32 check = OBUCK_TRAILER;
429       obuck_hdr.magic = OBUCK_MAGIC;
430       obuck_hdr.oid = OBUCK_OID_DELETED;
431       if (bucket_start - wstart < 0x40000000)
432         obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4;
433       else
434         obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
435       obuck_hdr.orig_length = obuck_hdr.length;
436       sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
437       wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4;
438       sh_pwrite(obuck_fd, &check, 4, wstart-4);
439     }
440   die("Fatal error during object pool shakedown");
441 }
442
443 /*** Testing ***/
444
445 #ifdef TEST
446
447 #define COUNT 5000
448 #define MAXLEN 10000
449 #define KILLPERC 13
450 #define LEN(i) ((259309*(i))%MAXLEN)
451
452 int main(int argc, char **argv)
453 {
454   int ids[COUNT];
455   unsigned int i, j, cnt;
456   struct obuck_header h;
457   struct fastbuf *b;
458
459   log_init(NULL);
460   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
461       optind < argc)
462   {
463     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
464     exit(1);
465   }
466
467   unlink(obuck_name);
468   obuck_init(1);
469   for(j=0; j<COUNT; j++)
470     {
471       b = obuck_create();
472       for(i=0; i<LEN(j); i++)
473         bputc(b, (i+j) % 256);
474       obuck_create_end(b, &h);
475       printf("Writing %08x %d -> %d\n", h.oid, h.orig_length, h.length);
476       ids[j] = h.oid;
477     }
478   for(j=0; j<COUNT; j++)
479     if (j % 100 < KILLPERC)
480       {
481         printf("Deleting %08x\n", ids[j]);
482         obuck_delete(ids[j]);
483       }
484   cnt = 0;
485   for(j=0; j<COUNT; j++)
486     if (j % 100 >= KILLPERC)
487       {
488         cnt++;
489         h.oid = ids[j];
490         obuck_find_by_oid(&h);
491         b = obuck_fetch();
492         printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length);
493         if (h.orig_length != LEN(j))
494           die("Invalid length");
495         for(i=0; i<h.orig_length; i++)
496           if ((unsigned) bgetc(b) != (i+j) % 256)
497             die("Contents mismatch");
498         if (bgetc(b) != EOF)
499           die("EOF mismatch");
500         obuck_fetch_end(b);
501       }
502   if (obuck_find_first(&h, 0))
503     do
504       {
505         printf("<<< %08x\t%d\n", h.oid, h.orig_length);
506         cnt--;
507       }
508     while (obuck_find_next(&h, 0));
509   if (cnt)
510     die("Walk mismatch");
511   obuck_cleanup();
512   return 0;
513 }
514
515 #endif