]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
abe97672246d1ac84da46b790f872744d9a5ea5f
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001 Martin Mares <mj@ucw.cz>
5  */
6
7 #include "lib/lib.h"
8 #include "lib/bucket.h"
9 #include "lib/fastbuf.h"
10 #include "lib/lfs.h"
11 #include "lib/conf.h"
12
13 #include <string.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 #include <sys/file.h>
17
18 static int obuck_fd;
19 static unsigned int obuck_remains, obuck_check_pad;
20 static struct fastbuf *obuck_fb;
21 static struct obuck_header obuck_hdr;
22 static sh_off_t bucket_start;
23
24 /*** Configuration ***/
25
26 byte *obuck_name = "not/configured";
27 static int obuck_io_buflen = 65536;
28
29 static struct cfitem obuck_config[] = {
30   { "Buckets",          CT_SECTION,     NULL },
31   { "BucketFile",       CT_STRING,      &obuck_name },
32   { "BufSize",          CT_INT,         &obuck_io_buflen },
33   { NULL,               CT_STOP,        NULL }
34 };
35
36 static void CONSTRUCTOR obuck_init_config(void)
37 {
38   cf_register(obuck_config);
39 }
40
41 /*** Internal operations ***/
42
43 static void
44 obuck_broken(char *msg)
45 {
46   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
47 }
48
49 /*
50  *  Unfortunately we cannot use flock() here since it happily permits
51  *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
52  *  locks are very ugly and they don't support 64-bit offsets, but we
53  *  can work around the problem by always locking the first header
54  *  in the file.
55  */
56
57 static inline void
58 obuck_do_lock(int type)
59 {
60   struct flock fl;
61
62   fl.l_type = type;
63   fl.l_whence = SEEK_SET;
64   fl.l_start = 0;
65   fl.l_len = sizeof(struct obuck_header);
66   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
67     die("fcntl lock: %m");
68 }
69
70 static inline void
71 obuck_lock_read(void)
72 {
73   obuck_do_lock(F_RDLCK);
74 }
75
76 static inline void
77 obuck_lock_write(void)
78 {
79   obuck_do_lock(F_WRLCK);
80 }
81
82 static inline void
83 obuck_unlock(void)
84 {
85   obuck_do_lock(F_UNLCK);
86 }
87
88 /*** FastIO emulation ***/
89
90 /* We need to use pread/pwrite since we work on fd's shared between processes */
91
92 static int
93 obuck_fb_refill(struct fastbuf *f)
94 {
95   unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains;
96   unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
97   int l;
98
99   if (!limit)
100     return 0;
101   l = sh_pread(f->fd, f->buffer, size, f->fdpos);
102   if (l < 0)
103     die("Error reading bucket: %m");
104   if ((unsigned) l != size)
105     obuck_broken("Short read");
106   f->bptr = f->buffer;
107   f->bstop = f->buffer + limit;
108   f->pos = f->fdpos;
109   f->fdpos += limit;
110   obuck_remains -= limit;
111   if (!obuck_remains)   /* Should check the trailer */
112     {
113       u32 check;
114       memcpy(&check, f->buffer + size - 4, 4);
115       if (check != OBUCK_TRAILER)
116         obuck_broken("Missing trailer");
117     }
118   return limit;
119 }
120
121 static void
122 obuck_fb_spout(struct fastbuf *f)
123 {
124   int l = f->bptr - f->buffer;
125   char *c = f->buffer;
126
127   while (l)
128     {
129       int z = sh_pwrite(f->fd, c, l, f->fdpos);
130       if (z <= 0)
131         die("Error writing bucket: %m");
132       f->fdpos += z;
133       l -= z;
134       c += z;
135     }
136   f->bptr = f->buffer;
137   f->pos = f->fdpos;
138 }
139
140 static void
141 obuck_fb_close(struct fastbuf *f)
142 {
143   close(f->fd);
144 }
145
146 /*** Exported functions ***/
147
148 void
149 obuck_init(int writeable)
150 {
151   struct fastbuf *b;
152   sh_off_t size;
153
154   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
155   if (obuck_fd < 0)
156     die("Unable to open bucket file %s: %m", obuck_name);
157   obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
158   b->buflen = obuck_io_buflen;
159   b->buffer = (char *)(b+1);
160   b->bptr = b->bstop = b->buffer;
161   b->bufend = b->buffer + obuck_io_buflen;
162   b->name = "bucket";
163   b->fd = obuck_fd;
164   b->refill = obuck_fb_refill;
165   b->spout = obuck_fb_spout;
166   b->close = obuck_fb_close;
167   obuck_lock_read();
168   size = sh_seek(obuck_fd, 0, SEEK_END);
169   if (size)
170     {
171       /* If the bucket pool is not empty, check consistency of its end */
172       u32 check;
173       bucket_start = size - 4;  /* for error reporting */
174       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
175           check != OBUCK_TRAILER)
176         obuck_broken("Missing trailer of last object");
177     }
178   obuck_unlock();
179 }
180
181 void
182 obuck_cleanup(void)
183 {
184   bclose(obuck_fb);
185 }
186
187 void
188 obuck_sync(void)
189 {
190   bflush(obuck_fb);
191   fsync(obuck_fd);
192 }
193
194 static void
195 obuck_get(oid_t oid)
196 {
197   struct fastbuf *b = obuck_fb;
198
199   bucket_start = obuck_get_pos(oid);
200   bflush(b);
201   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
202     obuck_broken("Short header read");
203   b->fdpos = bucket_start + sizeof(obuck_hdr);
204   if (obuck_hdr.magic != OBUCK_MAGIC)
205     obuck_broken("Missing magic number");
206   if (obuck_hdr.oid == OBUCK_OID_DELETED)
207     obuck_broken("Access to deleted bucket");
208   if (obuck_hdr.oid != oid)
209     obuck_broken("Invalid backlink");
210 }
211
212 void
213 obuck_find_by_oid(struct obuck_header *hdrp)
214 {
215   oid_t oid = hdrp->oid;
216
217   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
218   obuck_lock_read();
219   obuck_get(oid);
220   obuck_unlock();
221   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
222 }
223
224 int
225 obuck_find_first(struct obuck_header *hdrp, int full)
226 {
227   bucket_start = 0;
228   obuck_hdr.magic = 0;
229   return obuck_find_next(hdrp, full);
230 }
231
232 int
233 obuck_find_next(struct obuck_header *hdrp, int full)
234 {
235   int c;
236   struct fastbuf *b = obuck_fb;
237
238   for(;;)
239     {
240       if (obuck_hdr.magic)
241         bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
242                         4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
243       bflush(b);
244       obuck_lock_read();
245       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
246       obuck_unlock();
247       if (!c)
248         return 0;
249       if (c != sizeof(obuck_hdr))
250         obuck_broken("Short header read");
251       b->fdpos = bucket_start + sizeof(obuck_hdr);
252       if (obuck_hdr.magic != OBUCK_MAGIC)
253         obuck_broken("Missing magic number");
254       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
255         {
256           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
257           return 1;
258         }
259     }
260 }
261
262 struct fastbuf *
263 obuck_fetch(void)
264 {
265   obuck_remains = obuck_hdr.length;
266   obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
267   return obuck_fb;
268 }
269
270 void
271 obuck_fetch_end(struct fastbuf *b UNUSED)
272 {
273 }
274
275 struct fastbuf *
276 obuck_create(void)
277 {
278   obuck_lock_write();
279   bflush(obuck_fb);
280   bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
281   if (bucket_start & (OBUCK_ALIGN - 1))
282     obuck_broken("Misaligned file");
283   obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
284   obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
285   obuck_hdr.length = obuck_hdr.orig_length = 0;
286   obuck_fb->fdpos = obuck_fb->pos = bucket_start;
287   bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
288   return obuck_fb;
289 }
290
291 void
292 obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
293 {
294   int pad;
295   obuck_hdr.magic = OBUCK_MAGIC;
296   obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr);
297   pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
298   while (pad--)
299     bputc(obuck_fb, 0);
300   bputl(obuck_fb, OBUCK_TRAILER);
301   bflush(obuck_fb);
302   ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1)));
303   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
304   obuck_unlock();
305   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
306 }
307
308 void
309 obuck_delete(oid_t oid)
310 {
311   obuck_lock_write();
312   obuck_get(oid);
313   obuck_hdr.oid = OBUCK_OID_DELETED;
314   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
315   obuck_unlock();
316 }
317
318 /*** Testing ***/
319
320 #ifdef TEST
321
322 #define COUNT 5000
323 #define MAXLEN 10000
324 #define KILLPERC 13
325 #define LEN(i) ((259309*(i))%MAXLEN)
326
327 int main(int argc, char **argv)
328 {
329   int ids[COUNT];
330   unsigned int i, j, cnt;
331   struct obuck_header h;
332   struct fastbuf *b;
333
334   log_init(NULL);
335   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
336       optind < argc)
337     die("This program has no command-line arguments.");
338
339   unlink(obuck_name);
340   obuck_init(1);
341   for(j=0; j<COUNT; j++)
342     {
343       b = obuck_create();
344       for(i=0; i<LEN(j); i++)
345         bputc(b, (i+j) % 256);
346       obuck_create_end(b, &h);
347       printf("Writing %08x %d -> %d\n", h.oid, h.orig_length, h.length);
348       ids[j] = h.oid;
349     }
350   for(j=0; j<COUNT; j++)
351     if (j % 100 < KILLPERC)
352       {
353         printf("Deleting %08x\n", ids[j]);
354         obuck_delete(ids[j]);
355       }
356   cnt = 0;
357   for(j=0; j<COUNT; j++)
358     if (j % 100 >= KILLPERC)
359       {
360         cnt++;
361         h.oid = ids[j];
362         obuck_find_by_oid(&h);
363         b = obuck_fetch();
364         printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length);
365         if (h.orig_length != LEN(j))
366           die("Invalid length");
367         for(i=0; i<h.orig_length; i++)
368           if ((unsigned) bgetc(b) != (i+j) % 256)
369             die("Contents mismatch");
370         if (bgetc(b) != EOF)
371           die("EOF mismatch");
372         obuck_fetch_end(b);
373       }
374   if (obuck_find_first(&h, 0))
375     do
376       {
377         printf("<<< %08x\t%d\n", h.oid, h.orig_length);
378         cnt--;
379       }
380     while (obuck_find_next(&h, 0));
381   if (cnt)
382     die("Walk mismatch");
383   obuck_cleanup();
384   return 0;
385 }
386
387 #endif