]> mj.ucw.cz Git - libucw.git/blob - lib/db.c
fd8a8c0e25b4808f2162173c817531fb43c24e3f
[libucw.git] / lib / db.c
1 /*
2  *      Sherlock Library -- Fast Database Management Routines
3  *
4  *      (c) 1999--2000 Martin Mares <mj@atrey.karlin.mff.cuni.cz>
5  */
6
7 /*
8  *  This library uses the standard algorithm for external hashing (page directory
9  *  mapping topmost K bits of hash value to page address, directory splits and
10  *  so on). Peculiarities of this implementation (aka design decisions):
11  *
12  *   o  We allow both fixed and variable length keys and values (this includes
13  *      zero size values for cases you want to represent only a set of keys).
14  *   o  We assume that key_size + val_size < page_size.
15  *   o  We never shrink the directory nor free empty pages. (The reason is that
16  *      if the database was once large, it's likely it will again become large soon.)
17  *   o  The only pages which can be freed are those of the directory (during
18  *      directory split), so we keep only a simple 32-entry free block list
19  *      and we assume it's sorted.
20  */
21
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27
28 #include "lib.h"
29 #include "lfs.h"
30 #include "pagecache.h"
31 #include "db.h"
32 #include "db_internal.h"
33
34 struct sdbm *
35 sdbm_open(struct sdbm_options *o)
36 {
37   struct sdbm *d;
38   struct sdbm_root root, *r;
39   uns cache_size = o->cache_size ? o->cache_size : 16;
40
41   d = xmalloc(sizeof(struct sdbm));
42   bzero(d, sizeof(*d));
43   d->flags = o->flags;
44   d->fd = sh_open(o->name, ((d->flags & SDBM_WRITE) ? O_RDWR : O_RDONLY), 0666);
45   if (d->fd >= 0)                       /* Already exists, let's check it */
46     {
47       if (read(d->fd, &root, sizeof(root)) != sizeof(root))
48         goto bad;
49       if (root.magic != SDBM_MAGIC || root.version != SDBM_VERSION)
50         goto bad;
51       d->file_size = sh_seek(d->fd, 0, SEEK_END);
52       d->page_size = 1 << root.page_order;
53       d->cache = pgc_open(d->page_size, cache_size);
54       d->root_page = pgc_read(d->cache, d->fd, 0);
55       d->root = (void *) d->root_page->data;
56     }
57   else if ((d->flags & SDBM_CREAT) && (d->fd = sh_open(o->name, O_RDWR | O_CREAT, 0666)) >= 0)
58     {
59       struct page *q;
60       uns page_order = o->page_order;
61       if (page_order < 10)
62         page_order = 10;
63       d->page_size = 1 << page_order;
64       d->cache = pgc_open(d->page_size, cache_size);
65       d->root_page = pgc_get_zero(d->cache, d->fd, 0);
66       r = d->root = (void *) d->root_page->data;                 /* Build root page */
67       r->magic = SDBM_MAGIC;
68       r->version = SDBM_VERSION;
69       r->page_order = page_order;
70       r->key_size = o->key_size;
71       r->val_size = o->val_size;
72       r->dir_start = d->page_size;
73       r->dir_order = 0;
74       d->file_size = 3*d->page_size;
75       q = pgc_get_zero(d->cache, d->fd, d->page_size);          /* Build page directory */
76       ((u32 *)q->data)[0] = 2*d->page_size;
77       pgc_put(d->cache, q);
78       q = pgc_get_zero(d->cache, d->fd, 2*d->page_size);        /* Build single data page */
79       pgc_put(d->cache, q);
80     }
81   else
82     goto bad;
83   d->dir_size = 1 << d->root->dir_order;
84   d->dir_shift = 32 - d->root->dir_order;
85   d->page_order = d->root->page_order;
86   d->page_mask = d->page_size - 1;
87   d->key_size = d->root->key_size;
88   d->val_size = d->root->val_size;
89   return d;
90
91 bad:
92   sdbm_close(d);
93   return NULL;
94 }
95
96 void
97 sdbm_close(struct sdbm *d)
98 {
99   if (d->root_page)
100     pgc_put(d->cache, d->root_page);
101   if (d->cache)
102     pgc_close(d->cache);
103   if (d->fd >= 0)
104     close(d->fd);
105   free(d);
106 }
107
108 static uns
109 sdbm_alloc_pages(struct sdbm *d, uns number)
110 {
111   uns where = d->file_size;
112   uns size = number << d->page_order;
113   if (d->file_size + size < d->file_size)       /* Wrap around? */
114     die("SDB: Database file too large, giving up");
115   d->file_size += size;
116   return where;
117 }
118
119 static uns
120 sdbm_alloc_page(struct sdbm *d)
121 {
122   uns pos;
123
124   if (!d->root->free_pool[0].count)
125     return sdbm_alloc_pages(d, 1);
126   pos = d->root->free_pool[0].first;
127   d->root->free_pool[0].first += d->page_size;
128   if (!--d->root->free_pool[0].count)
129     {
130       memmove(d->root->free_pool, d->root->free_pool+1, SDBM_NUM_FREE_PAGE_POOLS * sizeof(d->root->free_pool[0]));
131       d->root->free_pool[SDBM_NUM_FREE_PAGE_POOLS-1].count = 0;
132     }
133   pgc_mark_dirty(d->cache, d->root_page);
134   return pos;
135 }
136
137 static void
138 sdbm_free_pages(struct sdbm *d, uns start, uns number)
139 {
140   uns i = 0;
141
142   while (d->root->free_pool[i].count)
143     i++;
144   d->root->free_pool[i].first = start;
145   d->root->free_pool[i].count = number;
146   pgc_mark_dirty(d->cache, d->root_page);
147 }
148
149 static u32
150 sdbm_hash(byte *key, uns keylen)
151 {
152   /*
153    *  This used to be the same hash function as GDBM uses,
154    *  but it turned out that it tends to give the same results
155    *  on similar keys. Damn it.
156    */
157   u32 value = 0x238F13AF * keylen;
158   while (keylen--)
159     value = 37*value + *key++;
160   return (1103515243 * value + 12345);
161 }
162
163 static int
164 sdbm_get_entry(struct sdbm *d, byte *pos, byte **key, uns *keylen, byte **val, uns *vallen)
165 {
166   byte *p = pos;
167
168   if (d->key_size >= 0)
169     *keylen = d->key_size;
170   else
171     {
172       *keylen = (p[0] << 8) | p[1];
173       p += 2;
174     }
175   *key = p;
176   p += *keylen;
177   if (d->val_size >= 0)
178     *vallen = d->val_size;
179   else
180     {
181       *vallen = (p[0] << 8) | p[1];
182       p += 2;
183     }
184   *val = p;
185   p += *vallen;
186   return p - pos;
187 }
188
189 static int
190 sdbm_entry_len(struct sdbm *d, uns keylen, uns vallen)
191 {
192   uns len = keylen + vallen;
193   if (d->key_size < 0)
194     len += 2;
195   if (d->val_size < 0)
196     len += 2;
197   return len;
198 }
199
200 static void
201 sdbm_store_entry(struct sdbm *d, byte *pos, byte *key, uns keylen, byte *val, uns vallen)
202 {
203   if (d->key_size < 0)
204     {
205       *pos++ = keylen >> 8;
206       *pos++ = keylen;
207     }
208   memmove(pos, key, keylen);
209   pos += keylen;
210   if (d->val_size < 0)
211     {
212       *pos++ = vallen >> 8;
213       *pos++ = vallen;
214     }
215   memmove(pos, val, vallen);
216 }
217
218 static uns
219 sdbm_page_rank(struct sdbm *d, uns dirpos)
220 {
221   struct page *b;
222   u32 pg, x;
223   uns l, r;
224   uns pm = d->page_mask;
225
226   b = pgc_read(d->cache, d->fd, d->root->dir_start + (dirpos & ~pm));
227   pg = GET32(b->data, dirpos & pm);
228   l = dirpos;
229   while ((l & pm) && GET32(b->data, (l - 4) & pm) == pg)
230     l -= 4;
231   r = dirpos + 4;
232   /* We heavily depend on unused directory entries being zero */
233   while ((r & pm) && GET32(b->data, r & pm) == pg)
234     r += 4;
235   pgc_put(d->cache, b);
236
237   if (!(l & pm) && !(r & pm))
238     {
239       /* Note that if it spans page boundary, it must contain an integer number of pages */
240       while (l)
241         {
242           b = pgc_read(d->cache, d->fd, d->root->dir_start + ((l - 4) & ~pm));
243           x = GET32(b->data, 0);
244           pgc_put(d->cache, b);
245           if (x != pg)
246             break;
247           l -= d->page_size;
248         }
249       while (r < 4*d->dir_size)
250         {
251           b = pgc_read(d->cache, d->fd, d->root->dir_start + (r & ~pm));
252           x = GET32(b->data, 0);
253           pgc_put(d->cache, b);
254           if (x != pg)
255             break;
256           r += d->page_size;
257         }
258     }
259   return (r - l) >> 2;
260 }
261
262 static void
263 sdbm_expand_directory(struct sdbm *d)
264 {
265   struct page *b, *c;
266   int i, ent;
267   u32 *dir, *t;
268
269   if (4*d->dir_size < d->page_size)
270     {
271       /* It still fits within single page */
272       b = pgc_read(d->cache, d->fd, d->root->dir_start);
273       dir = (u32 *) b->data;
274       for(i=d->dir_size-1; i>=0; i--)
275         dir[2*i] = dir[2*i+1] = dir[i];
276       pgc_mark_dirty(d->cache, b);
277       pgc_put(d->cache, b);
278     }
279   else
280     {
281       uns old_dir = d->root->dir_start;
282       uns old_dir_pages = 1 << (d->root->dir_order + 2 - d->page_order);
283       uns page, new_dir;
284       new_dir = d->root->dir_start = sdbm_alloc_pages(d, 2*old_dir_pages);
285       ent = 1 << (d->page_order - 3);
286       for(page=0; page < old_dir_pages; page++)
287         {
288           b = pgc_read(d->cache, d->fd, old_dir + (page << d->page_order));
289           dir = (u32 *) b->data;
290           c = pgc_get(d->cache, d->fd, new_dir + (page << (d->page_order + 1)));
291           t = (u32 *) c->data;
292           for(i=0; i<ent; i++)
293             t[2*i] = t[2*i+1] = dir[i];
294           pgc_put(d->cache, c);
295           c = pgc_get(d->cache, d->fd, new_dir + (page << (d->page_order + 1)) + d->page_size);
296           t = (u32 *) c->data;
297           for(i=0; i<ent; i++)
298             t[2*i] = t[2*i+1] = dir[ent+i];
299           pgc_put(d->cache, c);
300           pgc_put(d->cache, b);
301         }
302       if (!(d->flags & SDBM_FAST))
303         {
304           /*
305            *  Unless in super-fast mode, fill old directory pages with zeroes.
306            *  This slows us down a bit, but allows database reconstruction after
307            *  the free list is lost.
308            */
309           for(page=0; page < old_dir_pages; page++)
310             {
311               b = pgc_get_zero(d->cache, d->fd, old_dir + (page << d->page_order));
312               pgc_put(d->cache, b);
313             }
314         }
315       sdbm_free_pages(d, old_dir, old_dir_pages);
316     }
317
318   d->root->dir_order++;
319   d->dir_size = 1 << d->root->dir_order;
320   d->dir_shift = 32 - d->root->dir_order;
321   pgc_mark_dirty(d->cache, d->root_page);
322   if (!(d->flags & SDBM_FAST))
323     sdbm_sync(d);
324 }
325
326 static void
327 sdbm_split_data(struct sdbm *d, struct sdbm_bucket *s, struct sdbm_bucket *d0, struct sdbm_bucket *d1, uns sigbit)
328 {
329   byte *sp = s->data;
330   byte *dp[2] = { d0->data, d1->data };
331   byte *K, *D;
332   uns Kl, Dl, sz, i;
333
334   while (sp < s->data + s->used)
335     {
336       sz = sdbm_get_entry(d, sp, &K, &Kl, &D, &Dl);
337       sp += sz;
338       i = (sdbm_hash(K, Kl) & (1 << sigbit)) ? 1 : 0;
339       sdbm_store_entry(d, dp[i], K, Kl, D, Dl);
340       dp[i] += sz;
341     }
342   d0->used = dp[0] - d0->data;
343   d1->used = dp[1] - d1->data;
344 }
345
346 static void
347 sdbm_split_dir(struct sdbm *d, uns dirpos, uns count, uns pos)
348 {
349   struct page *b;
350   uns i;
351
352   count *= 4;
353   while (count)
354     {
355       b = pgc_read(d->cache, d->fd, d->root->dir_start + (dirpos & ~d->page_mask));
356       i = d->page_size - (dirpos & d->page_mask);
357       if (i > count)
358         i = count;
359       count -= i;
360       while (i)
361         {
362           GET32(b->data, dirpos & d->page_mask) = pos;
363           dirpos += 4;
364           i -= 4;
365         }
366       pgc_mark_dirty(d->cache, b);
367       pgc_put(d->cache, b);
368     }
369 }
370
371 static inline uns
372 sdbm_dirpos(struct sdbm *d, uns hash)
373 {
374   if (d->dir_shift != 32)               /* avoid shifting by 32 bits */
375     return (hash >> d->dir_shift) << 2; /* offset in the directory */
376   else
377     return 0;
378 }
379
380 static struct page *
381 sdbm_split_page(struct sdbm *d, struct page *b, u32 hash)
382 {
383   struct page *p[2];
384   uns i, rank, sigbit, rank_log, dirpos;
385
386   dirpos = sdbm_dirpos(d, hash);
387   rank = sdbm_page_rank(d, dirpos);     /* rank = # of pointers to this page */
388   if (rank == 1)
389     {
390       sdbm_expand_directory(d);
391       rank = 2;
392       dirpos *= 2;
393     }
394   rank_log = 1;                         /* rank_log = log2(rank) */
395   while ((1U << rank_log) < rank)
396     rank_log++;
397   sigbit = d->dir_shift + rank_log - 1; /* sigbit = bit we split on */
398   p[0] = b;
399   p[1] = pgc_get(d->cache, d->fd, sdbm_alloc_page(d));
400   sdbm_split_data(d, (void *) b->data, (void *) p[0]->data, (void *) p[1]->data, sigbit);
401   sdbm_split_dir(d, (dirpos & ~(4*rank - 1))+2*rank, rank/2, p[1]->pos);
402   pgc_mark_dirty(d->cache, p[0]);
403   i = (hash & (1 << sigbit)) ? 1 : 0;
404   pgc_put(d->cache, p[!i]);
405   return p[i];
406 }
407
408 static int
409 sdbm_put_user(byte *D, uns Dl, byte *val, uns *vallen)
410 {
411   if (vallen)
412     {
413       if (*vallen < Dl)
414         return 1;
415       *vallen = Dl;
416     }
417   if (val)
418     memcpy(val, D, Dl);
419   return 0;
420 }
421
422 static int
423 sdbm_access(struct sdbm *d, byte *key, uns keylen, byte *val, uns *vallen, uns mode)    /* 0=read, 1=store, 2=replace */
424 {
425   struct page *p, *q;
426   u32 hash, h, pos, size;
427   struct sdbm_bucket *b;
428   byte *c, *e;
429   int rc;
430
431   if ((d->key_size >= 0 && keylen != (uns) d->key_size) || keylen > 65535)
432     return SDBM_ERROR_BAD_KEY_SIZE;
433   if (val && ((d->val_size >= 0 && *vallen != (uns) d->val_size) || *vallen >= 65535) && mode)
434     return SDBM_ERROR_BAD_VAL_SIZE;
435   if (!mode && !(d->flags & SDBM_WRITE))
436     return SDBM_ERROR_READ_ONLY;
437   hash = sdbm_hash(key, keylen);
438   h = sdbm_dirpos(d, hash);
439   p = pgc_read(d->cache, d->fd, d->root->dir_start + (h & ~d->page_mask));
440   pos = GET32(p->data, h & d->page_mask);
441   pgc_put(d->cache, p);
442   q = pgc_read(d->cache, d->fd, pos);
443   b = (void *) q->data;
444   c = b->data;
445   e = c + b->used;
446   while (c < e)
447     {
448       byte *K, *D;
449       uns Kl, Dl, s;
450       s = sdbm_get_entry(d, c, &K, &Kl, &D, &Dl);
451       if (Kl == keylen && !memcmp(K, key, Kl))
452         {
453           /* Gotcha! */
454           switch (mode)
455             {
456             case 0:                     /* fetch: found */
457               rc = sdbm_put_user(D, Dl, val, vallen);
458               pgc_put(d->cache, q);
459               return rc ? SDBM_ERROR_TOO_LARGE : 1;
460             case 1:                     /* store: already present */
461               pgc_put(d->cache, q);
462               return 0;
463             default:                    /* replace: delete the old one */
464               memmove(c, c+s, e-(c+s));
465               b->used -= s;
466               goto insert;
467             }
468         }
469       c += s;
470     }
471   if (!mode || !val)            /* fetch or delete: no success */
472     {
473       pgc_put(d->cache, q);
474       return 0;
475     }
476
477 insert:
478   if (val)
479     {
480       size = sdbm_entry_len(d, keylen, *vallen);
481       while (b->used + size > d->page_size - sizeof(struct sdbm_bucket))
482         {
483           /* Page overflow, need to split */
484           if (size >= d->page_size - sizeof(struct sdbm_bucket))
485             {
486               pgc_put(d->cache, q);
487               return SDBM_ERROR_GIANT;
488             }
489           q = sdbm_split_page(d, q, hash);
490           b = (void *) q->data;
491         }
492       sdbm_store_entry(d, b->data + b->used, key, keylen, val, *vallen);
493       b->used += size;
494     }
495   pgc_mark_dirty(d->cache, q);
496   pgc_put(d->cache, q);
497   if (d->flags & SDBM_SYNC)
498     sdbm_sync(d);
499   return 1;
500 }
501
502 int
503 sdbm_store(struct sdbm *d, byte *key, uns keylen, byte *val, uns vallen)
504 {
505   return sdbm_access(d, key, keylen, val, &vallen, 1);
506 }
507
508 int
509 sdbm_replace(struct sdbm *d, byte *key, uns keylen, byte *val, uns vallen)
510 {
511   return sdbm_access(d, key, keylen, val, &vallen, 2);
512 }
513
514 int
515 sdbm_delete(struct sdbm *d, byte *key, uns keylen)
516 {
517   return sdbm_access(d, key, keylen, NULL, NULL, 2);
518 }
519
520 int
521 sdbm_fetch(struct sdbm *d, byte *key, uns keylen, byte *val, uns *vallen)
522 {
523   return sdbm_access(d, key, keylen, val, vallen, 0);
524 }
525
526 void
527 sdbm_rewind(struct sdbm *d)
528 {
529   d->find_pos = d->page_size;
530   d->find_free_list = 0;
531 }
532
533 int
534 sdbm_get_next(struct sdbm *d, byte *key, uns *keylen, byte *val, uns *vallen)
535 {
536   uns pos = d->find_pos;
537   byte *K, *V;
538   uns c, Kl, Vl;
539   struct page *p;
540   struct sdbm_bucket *b;
541
542   for(;;)
543     {
544       c = pos & d->page_mask;
545       if (!c)
546         {
547           if (pos >= d->file_size)
548             break;
549           if (pos == d->root->dir_start)
550             pos += (4*d->dir_size + d->page_size - 1) & ~d->page_mask;
551           else if (pos == d->root->free_pool[d->find_free_list].first)
552             pos += d->root->free_pool[d->find_free_list++].count << d->page_order;
553           else
554             pos += 4;
555           continue;
556         }
557       p = pgc_read(d->cache, d->fd, pos & ~d->page_mask);
558       b = (void *) p->data;
559       if (c - 4 >= b->used)
560         {
561           pos = (pos & ~d->page_mask) + d->page_size;
562           pgc_put(d->cache, p);
563           continue;
564         }
565       c = sdbm_get_entry(d, p->data + c, &K, &Kl, &V, &Vl);
566       d->find_pos = pos + c;
567       c = sdbm_put_user(K, Kl, key, keylen) ||
568           sdbm_put_user(V, Vl, val, vallen);
569       pgc_put(d->cache, p);
570       return c ? SDBM_ERROR_TOO_LARGE : 1;
571     }
572   d->find_pos = pos;
573   return 0;
574 }
575
576 void
577 sdbm_sync(struct sdbm *d)
578 {
579   pgc_flush(d->cache);
580   if (d->flags & SDBM_FSYNC)
581     fsync(d->fd);
582 }