]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
Added license notices to all library files which are not specific
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/bucket.h"
12 #include "lib/fastbuf.h"
13 #include "lib/lfs.h"
14 #include "lib/conf.h"
15
16 #include <string.h>
17 #include <stdlib.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/file.h>
21
22 static int obuck_fd;
23 static unsigned int obuck_remains, obuck_check_pad;
24 static struct fastbuf *obuck_fb;
25 static struct obuck_header obuck_hdr;
26 static sh_off_t bucket_start;
27
28 /*** Configuration ***/
29
30 byte *obuck_name = "not/configured";
31 static int obuck_io_buflen = 65536;
32 static int obuck_shake_buflen = 1048576;
33
34 static struct cfitem obuck_config[] = {
35   { "Buckets",          CT_SECTION,     NULL },
36   { "BucketFile",       CT_STRING,      &obuck_name },
37   { "BufSize",          CT_INT,         &obuck_io_buflen },
38   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
39   { NULL,               CT_STOP,        NULL }
40 };
41
42 static void CONSTRUCTOR obuck_init_config(void)
43 {
44   cf_register(obuck_config);
45 }
46
47 /*** Internal operations ***/
48
49 static void
50 obuck_broken(char *msg)
51 {
52   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
53 }
54
55 /*
56  *  Unfortunately we cannot use flock() here since it happily permits
57  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
58  *  locks are very ugly and they don't support 64-bit offsets, but we
59  *  can work around the problem by always locking the first header
60  *  in the file.
61  */
62
63 static inline void
64 obuck_do_lock(int type)
65 {
66   struct flock fl;
67
68   fl.l_type = type;
69   fl.l_whence = SEEK_SET;
70   fl.l_start = 0;
71   fl.l_len = sizeof(struct obuck_header);
72   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
73     die("fcntl lock: %m");
74 }
75
76 static inline void
77 obuck_lock_read(void)
78 {
79   obuck_do_lock(F_RDLCK);
80 }
81
82 static inline void
83 obuck_lock_write(void)
84 {
85   obuck_do_lock(F_WRLCK);
86 }
87
88 static inline void
89 obuck_unlock(void)
90 {
91   obuck_do_lock(F_UNLCK);
92 }
93
94 /*** FastIO emulation ***/
95
96 /* We need to use pread/pwrite since we work on fd's shared between processes */
97
98 static int
99 obuck_fb_refill(struct fastbuf *f)
100 {
101   unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains;
102   unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
103   int l;
104
105   if (!limit)
106     return 0;
107   l = sh_pread(f->fd, f->buffer, size, f->fdpos);
108   if (l < 0)
109     die("Error reading bucket: %m");
110   if ((unsigned) l != size)
111     obuck_broken("Short read");
112   f->bptr = f->buffer;
113   f->bstop = f->buffer + limit;
114   f->pos = f->fdpos;
115   f->fdpos += limit;
116   obuck_remains -= limit;
117   if (!obuck_remains)   /* Should check the trailer */
118     {
119       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
120         obuck_broken("Missing trailer");
121     }
122   return limit;
123 }
124
125 static void
126 obuck_fb_spout(struct fastbuf *f)
127 {
128   int l = f->bptr - f->buffer;
129   char *c = f->buffer;
130
131   while (l)
132     {
133       int z = sh_pwrite(f->fd, c, l, f->fdpos);
134       if (z <= 0)
135         die("Error writing bucket: %m");
136       f->fdpos += z;
137       l -= z;
138       c += z;
139     }
140   f->bptr = f->buffer;
141   f->pos = f->fdpos;
142 }
143
144 static void
145 obuck_fb_close(struct fastbuf *f)
146 {
147   close(f->fd);
148 }
149
150 /*** Exported functions ***/
151
152 void
153 obuck_init(int writeable)
154 {
155   struct fastbuf *b;
156   sh_off_t size;
157
158   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
159   if (obuck_fd < 0)
160     die("Unable to open bucket file %s: %m", obuck_name);
161   obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
162   b->buflen = obuck_io_buflen;
163   b->buffer = (char *)(b+1);
164   b->bptr = b->bstop = b->buffer;
165   b->bufend = b->buffer + obuck_io_buflen;
166   b->name = "bucket";
167   b->fd = obuck_fd;
168   b->refill = obuck_fb_refill;
169   b->spout = obuck_fb_spout;
170   b->close = obuck_fb_close;
171   obuck_lock_read();
172   size = sh_seek(obuck_fd, 0, SEEK_END);
173   if (size)
174     {
175       /* If the bucket pool is not empty, check consistency of its end */
176       u32 check;
177       bucket_start = size - 4;  /* for error reporting */
178       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
179           check != OBUCK_TRAILER)
180         obuck_broken("Missing trailer of last object");
181     }
182   obuck_unlock();
183 }
184
185 void
186 obuck_cleanup(void)
187 {
188   bclose(obuck_fb);
189 }
190
191 void
192 obuck_sync(void)
193 {
194   bflush(obuck_fb);
195   fsync(obuck_fd);
196 }
197
198 static void
199 obuck_get(oid_t oid)
200 {
201   struct fastbuf *b = obuck_fb;
202
203   bucket_start = obuck_get_pos(oid);
204   bflush(b);
205   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
206     obuck_broken("Short header read");
207   b->fdpos = bucket_start + sizeof(obuck_hdr);
208   if (obuck_hdr.magic != OBUCK_MAGIC)
209     obuck_broken("Missing magic number");
210   if (obuck_hdr.oid == OBUCK_OID_DELETED)
211     obuck_broken("Access to deleted bucket");
212   if (obuck_hdr.oid != oid)
213     obuck_broken("Invalid backlink");
214 }
215
216 void
217 obuck_find_by_oid(struct obuck_header *hdrp)
218 {
219   oid_t oid = hdrp->oid;
220
221   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
222   obuck_lock_read();
223   obuck_get(oid);
224   obuck_unlock();
225   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
226 }
227
228 int
229 obuck_find_first(struct obuck_header *hdrp, int full)
230 {
231   bucket_start = 0;
232   obuck_hdr.magic = 0;
233   return obuck_find_next(hdrp, full);
234 }
235
236 int
237 obuck_find_next(struct obuck_header *hdrp, int full)
238 {
239   int c;
240   struct fastbuf *b = obuck_fb;
241
242   for(;;)
243     {
244       if (obuck_hdr.magic)
245         bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
246                         4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
247       bflush(b);
248       obuck_lock_read();
249       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
250       obuck_unlock();
251       if (!c)
252         return 0;
253       if (c != sizeof(obuck_hdr))
254         obuck_broken("Short header read");
255       b->fdpos = bucket_start + sizeof(obuck_hdr);
256       if (obuck_hdr.magic != OBUCK_MAGIC)
257         obuck_broken("Missing magic number");
258       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
259         {
260           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
261           return 1;
262         }
263     }
264 }
265
266 struct fastbuf *
267 obuck_fetch(void)
268 {
269   obuck_remains = obuck_hdr.length;
270   obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
271   return obuck_fb;
272 }
273
274 void
275 obuck_fetch_end(struct fastbuf *b UNUSED)
276 {
277 }
278
279 struct fastbuf *
280 obuck_create(void)
281 {
282   obuck_lock_write();
283   bflush(obuck_fb);
284   bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
285   if (bucket_start & (OBUCK_ALIGN - 1))
286     obuck_broken("Misaligned file");
287   obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
288   obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
289   obuck_hdr.length = obuck_hdr.orig_length = 0;
290   obuck_fb->fdpos = obuck_fb->pos = bucket_start;
291   bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
292   return obuck_fb;
293 }
294
295 void
296 obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
297 {
298   int pad;
299   obuck_hdr.magic = OBUCK_MAGIC;
300   obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr);
301   pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
302   while (pad--)
303     bputc(obuck_fb, 0);
304   bputl(obuck_fb, OBUCK_TRAILER);
305   bflush(obuck_fb);
306   ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1)));
307   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
308   obuck_unlock();
309   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
310 }
311
312 void
313 obuck_delete(oid_t oid)
314 {
315   obuck_lock_write();
316   obuck_get(oid);
317   obuck_hdr.oid = OBUCK_OID_DELETED;
318   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
319   obuck_unlock();
320 }
321
322 /*** Shakedown ***/
323
324 void
325 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
326 {
327   byte *rbuf, *wbuf, *msg;
328   sh_off_t rstart, wstart, w_bucket_start;
329   int roff, woff, rsize, l;
330   struct obuck_header *rhdr, *whdr;
331
332   rbuf = xmalloc(obuck_shake_buflen);
333   wbuf = xmalloc(obuck_shake_buflen);
334   rstart = wstart = 0;
335   roff = woff = rsize = 0;
336
337   /* We need to be the only accessor, all the object ID's are becoming invalid */
338   obuck_lock_write();
339
340   for(;;)
341     {
342       bucket_start = rstart + roff;
343       w_bucket_start = wstart + woff;
344       if (rsize - roff < OBUCK_ALIGN)
345         goto reread;
346       rhdr = (struct obuck_header *)(rbuf + roff);
347       if (rhdr->magic != OBUCK_MAGIC ||
348           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (bucket_start >> OBUCK_SHIFT))
349         {
350           msg = "header mismatch";
351           goto broken;
352         }
353       l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
354       if (rsize - roff < l)
355         goto reread;
356       if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
357         {
358           msg = "missing trailer";
359           goto broken;
360         }
361       if (rhdr->oid != OBUCK_OID_DELETED)
362         {
363           if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
364             {
365               if (bucket_start == w_bucket_start)
366                 {
367                   /* No copying needed now nor ever in the past, hence woff==0 */
368                   wstart += l;
369                 }
370               else
371                 {
372                   if (obuck_shake_buflen - woff < l)
373                     {
374                       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
375                         die("obuck_shakedown write failed: %m");
376                       wstart += woff;
377                       woff = 0;
378                     }
379                   whdr = (struct obuck_header *)(wbuf+woff);
380                   memcpy(whdr, rhdr, l);
381                   whdr->oid = w_bucket_start >> OBUCK_SHIFT;
382                   woff += l;
383                 }
384             }
385         }
386       else
387         kibitz(rhdr, OBUCK_OID_DELETED, NULL);
388       roff += l;
389       continue;
390
391     reread:
392       if (roff)
393         {
394           memmove(rbuf, rbuf+roff, rsize-roff);
395           rsize -= roff;
396           rstart += roff;
397           roff = 0;
398         }
399       l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
400       if (l < 0)
401         die("obuck_shakedown read error: %m");
402       if (!l)
403         {
404           if (!rsize)
405             break;
406           msg = "unexpected EOF";
407           goto broken;
408         }
409       rsize += l;
410     }
411   if (woff)
412     {
413       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
414         die("obuck_shakedown write failed: %m");
415       wstart += woff;
416     }
417   sh_ftruncate(obuck_fd, wstart);
418
419   obuck_unlock();
420   xfree(rbuf);
421   xfree(wbuf);
422   return;
423
424  broken:
425   log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld), gathering debris", msg, (long long) bucket_start);
426   if (woff)
427     {
428       sh_pwrite(obuck_fd, wbuf, woff, wstart);
429       wstart += woff;
430     }
431   while (wstart + OBUCK_ALIGN <= bucket_start)
432     {
433       u32 check = OBUCK_TRAILER;
434       obuck_hdr.magic = OBUCK_MAGIC;
435       obuck_hdr.oid = OBUCK_OID_DELETED;
436       if (bucket_start - wstart < 0x40000000)
437         obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4;
438       else
439         obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
440       obuck_hdr.orig_length = obuck_hdr.length;
441       sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
442       wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4;
443       sh_pwrite(obuck_fd, &check, 4, wstart-4);
444     }
445   die("Fatal error during object pool shakedown");
446 }
447
448 /*** Testing ***/
449
450 #ifdef TEST
451
452 #define COUNT 5000
453 #define MAXLEN 10000
454 #define KILLPERC 13
455 #define LEN(i) ((259309*(i))%MAXLEN)
456
457 int main(int argc, char **argv)
458 {
459   int ids[COUNT];
460   unsigned int i, j, cnt;
461   struct obuck_header h;
462   struct fastbuf *b;
463
464   log_init(NULL);
465   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
466       optind < argc)
467   {
468     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
469     exit(1);
470   }
471
472   unlink(obuck_name);
473   obuck_init(1);
474   for(j=0; j<COUNT; j++)
475     {
476       b = obuck_create();
477       for(i=0; i<LEN(j); i++)
478         bputc(b, (i+j) % 256);
479       obuck_create_end(b, &h);
480       printf("Writing %08x %d -> %d\n", h.oid, h.orig_length, h.length);
481       ids[j] = h.oid;
482     }
483   for(j=0; j<COUNT; j++)
484     if (j % 100 < KILLPERC)
485       {
486         printf("Deleting %08x\n", ids[j]);
487         obuck_delete(ids[j]);
488       }
489   cnt = 0;
490   for(j=0; j<COUNT; j++)
491     if (j % 100 >= KILLPERC)
492       {
493         cnt++;
494         h.oid = ids[j];
495         obuck_find_by_oid(&h);
496         b = obuck_fetch();
497         printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length);
498         if (h.orig_length != LEN(j))
499           die("Invalid length");
500         for(i=0; i<h.orig_length; i++)
501           if ((unsigned) bgetc(b) != (i+j) % 256)
502             die("Contents mismatch");
503         if (bgetc(b) != EOF)
504           die("EOF mismatch");
505         obuck_fetch_end(b);
506       }
507   if (obuck_find_first(&h, 0))
508     do
509       {
510         printf("<<< %08x\t%d\n", h.oid, h.orig_length);
511         cnt--;
512       }
513     while (obuck_find_next(&h, 0));
514   if (cnt)
515     die("Walk mismatch");
516   obuck_cleanup();
517   return 0;
518 }
519
520 #endif