]> mj.ucw.cz Git - libucw.git/blob - lib/bucket.c
d5733c59160fc9abc890fc1a36887314d96d16e5
[libucw.git] / lib / bucket.c
1 /*
2  *      Sherlock Library -- Object Buckets
3  *
4  *      (c) 2001 Martin Mares <mj@ucw.cz>
5  */
6
7 #include "lib/lib.h"
8 #include "lib/bucket.h"
9 #include "lib/fastbuf.h"
10 #include "lib/lfs.h"
11 #include "lib/conf.h"
12
13 #include <string.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 #include <sys/file.h>
17
18 static int obuck_fd;
19 static unsigned int obuck_remains, obuck_check_pad;
20 static struct fastbuf *obuck_fb;
21 static struct obuck_header obuck_hdr;
22 static sh_off_t bucket_start;
23
24 /*** Configuration ***/
25
26 byte *obuck_name = "not/configured";
27 static int obuck_io_buflen = 65536;
28
29 static struct cfitem obuck_config[] = {
30   { "Buckets",          CT_SECTION,     NULL },
31   { "BucketFile",       CT_STRING,      &obuck_name },
32   { "BufSize",          CT_INT,         &obuck_io_buflen },
33   { NULL,               CT_STOP,        NULL }
34 };
35
36 static void CONSTRUCTOR obuck_init_config(void)
37 {
38   cf_register(obuck_config);
39 }
40
41 /*** Internal operations ***/
42
43 static void
44 obuck_broken(char *msg)
45 {
46   die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);    /* FIXME */
47 }
48
49 static inline void
50 obuck_lock_read(void)
51 {
52   flock(obuck_fd, LOCK_SH);
53 }
54
55 static inline void
56 obuck_lock_write(void)
57 {
58   flock(obuck_fd, LOCK_EX);
59 }
60
61 static inline void
62 obuck_unlock(void)
63 {
64   flock(obuck_fd, LOCK_UN);
65 }
66
67 /*** FastIO emulation ***/
68
69 /* We need to use pread/pwrite since we work on fd's shared between processes */
70
71 static int
72 obuck_fb_refill(struct fastbuf *f)
73 {
74   unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains;
75   unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
76   int l;
77
78   if (!limit)
79     return 0;
80   l = sh_pread(f->fd, f->buffer, size, f->fdpos);
81   if (l < 0)
82     die("Error reading bucket: %m");
83   if ((unsigned) l != size)
84     obuck_broken("Short read");
85   f->bptr = f->buffer;
86   f->bstop = f->buffer + limit;
87   f->pos = f->fdpos;
88   f->fdpos += limit;
89   obuck_remains -= limit;
90   if (!obuck_remains)   /* Should check the trailer */
91     {
92       u32 check;
93       memcpy(&check, f->buffer + size - 4, 4);
94       if (check != OBUCK_TRAILER)
95         obuck_broken("Missing trailer");
96     }
97   return limit;
98 }
99
100 static void
101 obuck_fb_spout(struct fastbuf *f)
102 {
103   int l = f->bptr - f->buffer;
104   char *c = f->buffer;
105
106   while (l)
107     {
108       int z = sh_pwrite(f->fd, c, l, f->fdpos);
109       if (z <= 0)
110         die("Error writing bucket: %m");
111       f->fdpos += z;
112       l -= z;
113       c += z;
114     }
115   f->bptr = f->buffer;
116   f->pos = f->fdpos;
117 }
118
119 static void
120 obuck_fb_close(struct fastbuf *f)
121 {
122   close(f->fd);
123 }
124
125 /*** Exported functions ***/
126
127 void
128 obuck_init(int writeable)
129 {
130   struct fastbuf *b;
131   sh_off_t size;
132
133   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
134   if (obuck_fd < 0)
135     die("Unable to open bucket file %s: %m", obuck_name);
136   obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
137   b->buflen = obuck_io_buflen;
138   b->buffer = (char *)(b+1);
139   b->bptr = b->bstop = b->buffer;
140   b->bufend = b->buffer + obuck_io_buflen;
141   b->name = "bucket";
142   b->fd = obuck_fd;
143   b->refill = obuck_fb_refill;
144   b->spout = obuck_fb_spout;
145   b->close = obuck_fb_close;
146   obuck_lock_read();
147   size = sh_seek(obuck_fd, 0, SEEK_END);
148   if (size)
149     {
150       /* If the bucket pool is not empty, check consistency of its end */
151       u32 check;
152       bucket_start = size - 4;  /* for error reporting */
153       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
154           check != OBUCK_TRAILER)
155         obuck_broken("Missing trailer of last object");
156     }
157   obuck_unlock();
158 }
159
160 void
161 obuck_cleanup(void)
162 {
163   bclose(obuck_fb);
164 }
165
166 void                                    /* FIXME: Call somewhere :) */
167 obuck_sync(void)
168 {
169   bflush(obuck_fb);
170   fsync(obuck_fd);
171 }
172
173 static void
174 obuck_get(oid_t oid)
175 {
176   struct fastbuf *b = obuck_fb;
177
178   bucket_start = obuck_get_pos(oid);
179   bflush(b);
180   if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
181     obuck_broken("Short header read");
182   b->fdpos = bucket_start + sizeof(obuck_hdr);
183   if (obuck_hdr.magic != OBUCK_MAGIC)
184     obuck_broken("Missing magic number");
185   if (obuck_hdr.oid == OBUCK_OID_DELETED)
186     obuck_broken("Access to deleted bucket");
187   if (obuck_hdr.oid != oid)
188     obuck_broken("Invalid backlink");
189 }
190
191 void
192 obuck_find_by_oid(struct obuck_header *hdrp)
193 {
194   oid_t oid = hdrp->oid;
195
196   ASSERT(oid < OBUCK_OID_FIRST_SPECIAL);
197   obuck_lock_read();
198   obuck_get(oid);
199   obuck_unlock();
200   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
201 }
202
203 int
204 obuck_find_first(struct obuck_header *hdrp, int full)
205 {
206   bucket_start = 0;
207   obuck_hdr.magic = 0;
208   return obuck_find_next(hdrp, full);
209 }
210
211 int
212 obuck_find_next(struct obuck_header *hdrp, int full)
213 {
214   int c;
215   struct fastbuf *b = obuck_fb;
216
217   for(;;)
218     {
219       if (obuck_hdr.magic)
220         bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
221                         4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
222       bflush(b);
223       obuck_lock_read();
224       c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
225       obuck_unlock();
226       if (!c)
227         return 0;
228       if (c != sizeof(obuck_hdr))
229         obuck_broken("Short header read");
230       b->fdpos = bucket_start + sizeof(obuck_hdr);
231       if (obuck_hdr.magic != OBUCK_MAGIC)
232         obuck_broken("Missing magic number");
233       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
234         {
235           memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
236           return 1;
237         }
238     }
239 }
240
241 struct fastbuf *
242 obuck_fetch(void)
243 {
244   obuck_remains = obuck_hdr.length;
245   obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
246   return obuck_fb;
247 }
248
249 void
250 obuck_fetch_end(struct fastbuf *b UNUSED)
251 {
252 }
253
254 struct fastbuf *
255 obuck_create(void)
256 {
257   obuck_lock_write();
258   bflush(obuck_fb);
259   bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
260   if (bucket_start & (OBUCK_ALIGN - 1))
261     obuck_broken("Misaligned file");
262   obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
263   obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
264   obuck_hdr.length = obuck_hdr.orig_length = 0;
265   obuck_fb->fdpos = obuck_fb->pos = bucket_start;
266   bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
267   return obuck_fb;
268 }
269
270 void
271 obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
272 {
273   int pad;
274   obuck_hdr.magic = OBUCK_MAGIC;
275   obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr);
276   pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
277   while (pad--)
278     bputc(obuck_fb, 0);
279   bputl(obuck_fb, OBUCK_TRAILER);
280   bflush(obuck_fb);
281   ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1)));
282   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
283   obuck_unlock();
284   memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
285 }
286
287 void
288 obuck_delete(oid_t oid)
289 {
290   obuck_lock_write();
291   obuck_get(oid);
292   obuck_hdr.oid = OBUCK_OID_DELETED;
293   sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
294   obuck_unlock();
295 }
296
297 /*** Testing ***/
298
299 #ifdef TEST
300
301 #define COUNT 5000
302 #define MAXLEN 10000
303 #define KILLPERC 13
304 #define LEN(i) ((259309*(i))%MAXLEN)
305
306 int main(int argc, char **argv)
307 {
308   int ids[COUNT];
309   unsigned int i, j, cnt;
310   struct obuck_header h;
311   struct fastbuf *b;
312
313   log_init(NULL);
314   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 ||
315       optind < argc)
316     die("This program has no command-line arguments.");
317
318   unlink(obuck_name);
319   obuck_init(1);
320   for(j=0; j<COUNT; j++)
321     {
322       b = obuck_create();
323       for(i=0; i<LEN(j); i++)
324         bputc(b, (i+j) % 256);
325       obuck_create_end(b, &h);
326       printf("Writing %08x %d -> %d\n", h.oid, h.orig_length, h.length);
327       ids[j] = h.oid;
328     }
329   for(j=0; j<COUNT; j++)
330     if (j % 100 < KILLPERC)
331       {
332         printf("Deleting %08x\n", ids[j]);
333         obuck_delete(ids[j]);
334       }
335   cnt = 0;
336   for(j=0; j<COUNT; j++)
337     if (j % 100 >= KILLPERC)
338       {
339         cnt++;
340         h.oid = ids[j];
341         obuck_find_by_oid(&h);
342         b = obuck_fetch();
343         printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length);
344         if (h.orig_length != LEN(j))
345           die("Invalid length");
346         for(i=0; i<h.orig_length; i++)
347           if ((unsigned) bgetc(b) != (i+j) % 256)
348             die("Contents mismatch");
349         if (bgetc(b) != EOF)
350           die("EOF mismatch");
351         obuck_fetch_end(b);
352       }
353   if (obuck_find_first(&h, 0))
354     do
355       {
356         printf("<<< %08x\t%d\n", h.oid, h.orig_length);
357         cnt--;
358       }
359     while (obuck_find_next(&h, 0));
360   if (cnt)
361     die("Walk mismatch");
362   obuck_cleanup();
363   return 0;
364 }
365
366 #endif