]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
dmalloc and efence work again (ported from rel-2.1 branch).
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001 Martin Mares <mj@ucw.cz>
5  */
6
7 #include "lib/lib.h"
8 #include "lib/bucket.h"
9 #include "lib/fastbuf.h"
10 #include "lib/lfs.h"
11 #include "lib/conf.h"
12
13 #include <string.h>
14 #include <stdlib.h>
15 #include <fcntl.h>
16 #include <unistd.h>
17 #include <sys/file.h>
18
19 static int obuck_fd;
20 static unsigned int obuck_remains, obuck_check_pad;
21 static struct fastbuf *obuck_fb;
22 static struct obuck_header obuck_hdr;
23 static sh_off_t bucket_start;
24
25 /*** Configuration ***/
26
27 byte *obuck_name = "not/configured";
28 static int obuck_io_buflen = 65536;
29 static int obuck_shake_buflen = 1048576;
30
31 static struct cfitem obuck_config[] = {
32   { "Buckets",          CT_SECTION,     NULL },
33   { "BucketFile",       CT_STRING,      &obuck_name },
34   { "BufSize",          CT_INT,         &obuck_io_buflen },
35   { "ShakeBufSize",     CT_INT,         &obuck_shake_buflen },
36   { NULL,               CT_STOP,        NULL }
37 };
38
39 static void CONSTRUCTOR obuck_init_config(void)
40 {
41   cf_register(obuck_config);
42 }
43
44 /*** Internal operations ***/
45
46 static void
47 obuck_broken(char *msg)
48 {
49   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
50 }
51
52 /*
53  *  Unfortunately we cannot use flock() here since it happily permits
54  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
55  *  locks are very ugly and they don't support 64-bit offsets, but we
56  *  can work around the problem by always locking the first header
57  *  in the file.
58  */
59
60 static inline void
61 obuck_do_lock(int type)
62 {
63   struct flock fl;
64
65   fl.l_type = type;
66   fl.l_whence = SEEK_SET;
67   fl.l_start = 0;
68   fl.l_len = sizeof(struct obuck_header);
69   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
70     die("fcntl lock: %m");
71 }
72
73 static inline void
74 obuck_lock_read(void)
75 {
76   obuck_do_lock(F_RDLCK);
77 }
78
79 static inline void
80 obuck_lock_write(void)
81 {
82   obuck_do_lock(F_WRLCK);
83 }
84
85 static inline void
86 obuck_unlock(void)
87 {
88   obuck_do_lock(F_UNLCK);
89 }
90
91 /*** FastIO emulation ***/
92
93 /* We need to use pread/pwrite since we work on fd's shared between processes */
94
95 static int
96 obuck_fb_refill(struct fastbuf *f)
97 {
98   unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains;
99   unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
100   int l;
101
102   if (!limit)
103     return 0;
104   l = sh_pread(f->fd, f->buffer, size, f->fdpos);
105   if (l < 0)
106     die("Error reading bucket: %m");
107   if ((unsigned) l != size)
108     obuck_broken("Short read");
109   f->bptr = f->buffer;
110   f->bstop = f->buffer + limit;
111   f->pos = f->fdpos;
112   f->fdpos += limit;
113   obuck_remains -= limit;
114   if (!obuck_remains)   /* Should check the trailer */
115     {
116       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
117         obuck_broken("Missing trailer");
118     }
119   return limit;
120 }
121
122 static void
123 obuck_fb_spout(struct fastbuf *f)
124 {
125   int l = f->bptr - f->buffer;
126   char *c = f->buffer;
127
128   while (l)
129     {
130       int z = sh_pwrite(f->fd, c, l, f->fdpos);
131       if (z <= 0)
132         die("Error writing bucket: %m");
133       f->fdpos += z;
134       l -= z;
135       c += z;
136     }
137   f->bptr = f->buffer;
138   f->pos = f->fdpos;
139 }
140
141 static void
142 obuck_fb_close(struct fastbuf *f)
143 {
144   close(f->fd);
145 }
146
147 /*** Exported functions ***/
148
149 void
150 obuck_init(int writeable)
151 {
152   struct fastbuf *b;
153   sh_off_t size;
154
155   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
156   if (obuck_fd < 0)
157     die("Unable to open bucket file %s: %m", obuck_name);
158   obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
159   b->buflen = obuck_io_buflen;
160   b->buffer = (char *)(b+1);
161   b->bptr = b->bstop = b->buffer;
162   b->bufend = b->buffer + obuck_io_buflen;
163   b->name = "bucket";
164   b->fd = obuck_fd;
165   b->refill = obuck_fb_refill;
166   b->spout = obuck_fb_spout;
167   b->close = obuck_fb_close;
168   obuck_lock_read();
169   size = sh_seek(obuck_fd, 0, SEEK_END);
170   if (size)
171     {
172       /* If the bucket pool is not empty, check consistency of its end */
173       u32 check;
174       bucket_start = size - 4;  /* for error reporting */
175       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
176           check != OBUCK_TRAILER)
177         obuck_broken("Missing trailer of last object");
178     }
179   obuck_unlock();
180 }
181
182 void
183 obuck_cleanup(void)
184 {
185   bclose(obuck_fb);
186 }
187
188 void
189 obuck_sync(void)
190 {
191   bflush(obuck_fb);
192   fsync(obuck_fd);
193 }
194
195 static void
196 obuck_get(oid_t oid)
197 {
198   struct fastbuf *b = obuck_fb;
199
200   bucket_start = obuck_get_pos(oid);
201   bflush(b);
202   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
203     obuck_broken("Short header read");
204   b->fdpos = bucket_start + sizeof(obuck_hdr);
205   if (obuck_hdr.magic != OBUCK_MAGIC)
206     obuck_broken("Missing magic number");
207   if (obuck_hdr.oid == OBUCK_OID_DELETED)
208     obuck_broken("Access to deleted bucket");
209   if (obuck_hdr.oid != oid)
210     obuck_broken("Invalid backlink");
211 }
212
213 void
214 obuck_find_by_oid(struct obuck_header *hdrp)
215 {
216   oid_t oid = hdrp->oid;
217
218   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
219   obuck_lock_read();
220   obuck_get(oid);
221   obuck_unlock();
222   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
223 }
224
225 int
226 obuck_find_first(struct obuck_header *hdrp, int full)
227 {
228   bucket_start = 0;
229   obuck_hdr.magic = 0;
230   return obuck_find_next(hdrp, full);
231 }
232
233 int
234 obuck_find_next(struct obuck_header *hdrp, int full)
235 {
236   int c;
237   struct fastbuf *b = obuck_fb;
238
239   for(;;)
240     {
241       if (obuck_hdr.magic)
242         bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
243                         4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
244       bflush(b);
245       obuck_lock_read();
246       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
247       obuck_unlock();
248       if (!c)
249         return 0;
250       if (c != sizeof(obuck_hdr))
251         obuck_broken("Short header read");
252       b->fdpos = bucket_start + sizeof(obuck_hdr);
253       if (obuck_hdr.magic != OBUCK_MAGIC)
254         obuck_broken("Missing magic number");
255       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
256         {
257           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
258           return 1;
259         }
260     }
261 }
262
263 struct fastbuf *
264 obuck_fetch(void)
265 {
266   obuck_remains = obuck_hdr.length;
267   obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
268   return obuck_fb;
269 }
270
271 void
272 obuck_fetch_end(struct fastbuf *b UNUSED)
273 {
274 }
275
276 struct fastbuf *
277 obuck_create(void)
278 {
279   obuck_lock_write();
280   bflush(obuck_fb);
281   bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
282   if (bucket_start & (OBUCK_ALIGN - 1))
283     obuck_broken("Misaligned file");
284   obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
285   obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
286   obuck_hdr.length = obuck_hdr.orig_length = 0;
287   obuck_fb->fdpos = obuck_fb->pos = bucket_start;
288   bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
289   return obuck_fb;
290 }
291
292 void
293 obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
294 {
295   int pad;
296   obuck_hdr.magic = OBUCK_MAGIC;
297   obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr);
298   pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
299   while (pad--)
300     bputc(obuck_fb, 0);
301   bputl(obuck_fb, OBUCK_TRAILER);
302   bflush(obuck_fb);
303   ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1)));
304   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
305   obuck_unlock();
306   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
307 }
308
309 void
310 obuck_delete(oid_t oid)
311 {
312   obuck_lock_write();
313   obuck_get(oid);
314   obuck_hdr.oid = OBUCK_OID_DELETED;
315   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
316   obuck_unlock();
317 }
318
319 /*** Shakedown ***/
320
321 void
322 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
323 {
324   byte *rbuf, *wbuf;
325   sh_off_t rstart, wstart, w_bucket_start;
326   int roff, woff, rsize, l;
327   struct obuck_header *rhdr, *whdr;
328
329   rbuf = xmalloc(obuck_shake_buflen);
330   wbuf = xmalloc(obuck_shake_buflen);
331   rstart = wstart = 0;
332   roff = woff = rsize = 0;
333
334   /* We need to be the only accessor, all the object ID's are becoming invalid */
335   obuck_lock_write();
336
337   for(;;)
338     {
339       bucket_start = rstart + roff;
340       w_bucket_start = wstart + woff;
341       if (rsize - roff < OBUCK_ALIGN)
342         goto reread;
343       rhdr = (struct obuck_header *)(rbuf + roff);
344       if (rhdr->magic != OBUCK_MAGIC ||
345           rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (bucket_start >> OBUCK_SHIFT))
346         obuck_broken("header mismatch during shakedown");
347       l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1);
348       if (rsize - roff < l)
349         goto reread;
350       if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER)
351         obuck_broken("missing trailer during shakedown");
352       if (rhdr->oid != OBUCK_OID_DELETED)
353         {
354           if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
355             {
356               if (bucket_start == w_bucket_start)
357                 {
358                   /* No copying needed now nor ever in the past, hence woff==0 */
359                   wstart += l;
360                 }
361               else
362                 {
363                   if (obuck_shake_buflen - woff < l)
364                     {
365                       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
366                         die("obuck_shakedown write failed: %m");
367                       wstart += woff;
368                       woff = 0;
369                     }
370                   whdr = (struct obuck_header *)(wbuf+woff);
371                   memcpy(whdr, rhdr, l);
372                   whdr->oid = w_bucket_start >> OBUCK_SHIFT;
373                   woff += l;
374                 }
375             }
376         }
377       else
378         kibitz(rhdr, OBUCK_OID_DELETED, NULL);
379       roff += l;
380       continue;
381
382     reread:
383       if (roff)
384         {
385           memmove(rbuf, rbuf+roff, rsize-roff);
386           rsize -= roff;
387           rstart += roff;
388           roff = 0;
389         }
390       l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize);
391       if (l < 0)
392         die("obuck_shakedown read error: %m");
393       if (!l)
394         {
395           if (!rsize)
396             break;
397           obuck_broken("unexpected EOF during shakedown");
398         }
399       rsize += l;
400     }
401   if (woff)
402     {
403       if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff)
404         die("obuck_shakedown write failed: %m");
405       wstart += woff;
406     }
407   sh_ftruncate(obuck_fd, wstart);
408
409   obuck_unlock();
410   xfree(rbuf);
411   xfree(wbuf);
412 }
413
414 /*** Testing ***/
415
416 #ifdef TEST
417
418 #define COUNT 5000
419 #define MAXLEN 10000
420 #define KILLPERC 13
421 #define LEN(i) ((259309*(i))%MAXLEN)
422
423 int main(int argc, char **argv)
424 {
425   int ids[COUNT];
426   unsigned int i, j, cnt;
427   struct obuck_header h;
428   struct fastbuf *b;
429
430   log_init(NULL);
431   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
432       optind < argc)
433   {
434     fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr);
435     exit(1);
436   }
437
438   unlink(obuck_name);
439   obuck_init(1);
440   for(j=0; j<COUNT; j++)
441     {
442       b = obuck_create();
443       for(i=0; i<LEN(j); i++)
444         bputc(b, (i+j) % 256);
445       obuck_create_end(b, &h);
446       printf("Writing %08x %d -> %d\n", h.oid, h.orig_length, h.length);
447       ids[j] = h.oid;
448     }
449   for(j=0; j<COUNT; j++)
450     if (j % 100 < KILLPERC)
451       {
452         printf("Deleting %08x\n", ids[j]);
453         obuck_delete(ids[j]);
454       }
455   cnt = 0;
456   for(j=0; j<COUNT; j++)
457     if (j % 100 >= KILLPERC)
458       {
459         cnt++;
460         h.oid = ids[j];
461         obuck_find_by_oid(&h);
462         b = obuck_fetch();
463         printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length);
464         if (h.orig_length != LEN(j))
465           die("Invalid length");
466         for(i=0; i<h.orig_length; i++)
467           if ((unsigned) bgetc(b) != (i+j) % 256)
468             die("Contents mismatch");
469         if (bgetc(b) != EOF)
470           die("EOF mismatch");
471         obuck_fetch_end(b);
472       }
473   if (obuck_find_first(&h, 0))
474     do
475       {
476         printf("<<< %08x\t%d\n", h.oid, h.orig_length);
477         cnt--;
478       }
479     while (obuck_find_next(&h, 0));
480   if (cnt)
481     die("Walk mismatch");
482   obuck_cleanup();
483   return 0;
484 }
485
486 #endif