]> mj.ucw.cz Git - libucw.git/blob - lib/db.c
Added very simple functions for emulating a fastbuf stream over a static
[libucw.git] / lib / db.c
1 /*
2  *      Sherlock Library -- Fast Database Management Routines
3  *
4  *      (c) 1999--2001 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *  This library uses the standard algorithm for external hashing (page directory
12  *  mapping topmost K bits of hash value to page address, directory splits and
13  *  so on). Peculiarities of this implementation (aka design decisions):
14  *
15  *   o  We allow both fixed and variable length keys and values (this includes
16  *      zero size values for cases you want to represent only a set of keys).
17  *   o  We assume that key_size + val_size < page_size.
18  *   o  We never shrink the directory nor free empty pages. (The reason is that
19  *      if the database was once large, it's likely it will again become large soon.)
20  *   o  The only pages which can be freed are those of the directory (during
21  *      directory split), so we keep only a simple 32-entry free block list
22  *      and we assume it's sorted.
23  *   o  All pointers are always given in pages from start of the file.
24  *      This gives us page_size*2^32 limit for file size which should be enough.
25  */
26
27 #include "lib/lib.h"
28 #include "lib/lfs.h"
29 #include "lib/pagecache.h"
30 #include "lib/db.h"
31 #include "lib/db_internal.h"
32
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38
39 #define GET_PAGE(d,x) pgc_get((d)->cache, (d)->fd, ((sh_off_t)(x)) << (d)->page_order)
40 #define GET_ZERO_PAGE(d,x) pgc_get_zero((d)->cache, (d)->fd, ((sh_off_t)(x)) << (d)->page_order)
41 #define READ_PAGE(d,x) pgc_read((d)->cache, (d)->fd, ((sh_off_t)(x)) << (d)->page_order)
42 #define READ_DIR(d,off) pgc_read((d)->cache, (d)->fd, (((sh_off_t)(d)->root->dir_start) << (d)->page_order) + (off))
43
44 struct sdbm *
45 sdbm_open(struct sdbm_options *o)
46 {
47   struct sdbm *d;
48   struct sdbm_root root, *r;
49   uns cache_size = o->cache_size ? o->cache_size : 16;
50
51   d = xmalloc_zero(sizeof(struct sdbm));
52   d->flags = o->flags;
53   d->fd = sh_open(o->name, ((d->flags & SDBM_WRITE) ? O_RDWR : O_RDONLY), 0666);
54   if (d->fd >= 0)                       /* Already exists, let's check it */
55     {
56       if (read(d->fd, &root, sizeof(root)) != sizeof(root))
57         goto bad;
58       if (root.magic != SDBM_MAGIC || root.version != SDBM_VERSION)
59         goto bad;
60       d->file_size = sh_seek(d->fd, 0, SEEK_END) >> root.page_order;
61       d->page_order = root.page_order;
62       d->page_size = 1 << root.page_order;
63       d->cache = pgc_open(d->page_size, cache_size);
64       d->root_page = pgc_read(d->cache, d->fd, 0);
65       d->root = (void *) d->root_page->data;
66     }
67   else if ((d->flags & SDBM_CREAT) && (d->fd = sh_open(o->name, O_RDWR | O_CREAT, 0666)) >= 0)
68     {
69       struct page *q;
70       uns page_order = o->page_order;
71       if (page_order < 10)
72         page_order = 10;
73       d->page_size = 1 << page_order;
74       d->cache = pgc_open(d->page_size, cache_size);
75       d->root_page = GET_ZERO_PAGE(d, 0);
76       r = d->root = (void *) d->root_page->data;                 /* Build root page */
77       r->magic = SDBM_MAGIC;
78       r->version = SDBM_VERSION;
79       r->page_order = d->page_order = page_order;
80       r->key_size = o->key_size;
81       r->val_size = o->val_size;
82       r->dir_start = 1;
83       r->dir_order = 0;
84       d->file_size = 3;
85       q = GET_ZERO_PAGE(d, 1);                                  /* Build page directory */
86       GET32(q->data, 0) = 2;
87       pgc_put(d->cache, q);
88       q = GET_ZERO_PAGE(d, 2);                                  /* Build single data page */
89       pgc_put(d->cache, q);
90     }
91   else
92     goto bad;
93   d->dir_size = 1 << d->root->dir_order;
94   d->dir_shift = 32 - d->root->dir_order;
95   d->page_mask = d->page_size - 1;
96   d->key_size = d->root->key_size;
97   d->val_size = d->root->val_size;
98   return d;
99
100 bad:
101   sdbm_close(d);
102   return NULL;
103 }
104
105 void
106 sdbm_close(struct sdbm *d)
107 {
108   if (d->root_page)
109     pgc_put(d->cache, d->root_page);
110   if (d->cache)
111     pgc_close(d->cache);
112   if (d->fd >= 0)
113     close(d->fd);
114   xfree(d);
115 }
116
117 static uns
118 sdbm_alloc_pages(struct sdbm *d, uns number)
119 {
120   uns where = d->file_size;
121   if (where + number < where)   /* Wrap around? */
122     die("SDB: Database file too large, giving up");
123   d->file_size += number;
124   return where;
125 }
126
127 static uns
128 sdbm_alloc_page(struct sdbm *d)
129 {
130   uns pos;
131
132   if (!d->root->free_pool[0].count)
133     return sdbm_alloc_pages(d, 1);
134   pos = d->root->free_pool[0].first;
135   d->root->free_pool[0].first++;
136   if (!--d->root->free_pool[0].count)
137     {
138       memmove(d->root->free_pool, d->root->free_pool+1, (SDBM_NUM_FREE_PAGE_POOLS-1) * sizeof(d->root->free_pool[0]));
139       d->root->free_pool[SDBM_NUM_FREE_PAGE_POOLS-1].count = 0;
140     }
141   pgc_mark_dirty(d->cache, d->root_page);
142   return pos;
143 }
144
145 static void
146 sdbm_free_pages(struct sdbm *d, uns start, uns number)
147 {
148   uns i = 0;
149
150   while (d->root->free_pool[i].count)
151     i++;
152   ASSERT(i < SDBM_NUM_FREE_PAGE_POOLS);
153   d->root->free_pool[i].first = start;
154   d->root->free_pool[i].count = number;
155   pgc_mark_dirty(d->cache, d->root_page);
156 }
157
158 u32
159 sdbm_hash(byte *key, uns keylen)
160 {
161   /*
162    *  This used to be the same hash function as GDBM uses,
163    *  but it turned out that it tends to give the same results
164    *  on similar keys. Damn it.
165    */
166   u32 value = 0x238F13AF * keylen;
167   while (keylen--)
168     value = 37*value + *key++;
169   return (1103515243 * value + 12345);
170 }
171
172 static int
173 sdbm_get_entry(struct sdbm *d, byte *pos, byte **key, uns *keylen, byte **val, uns *vallen)
174 {
175   byte *p = pos;
176
177   if (d->key_size >= 0)
178     *keylen = d->key_size;
179   else
180     {
181       *keylen = (p[0] << 8) | p[1];
182       p += 2;
183     }
184   *key = p;
185   p += *keylen;
186   if (d->val_size >= 0)
187     *vallen = d->val_size;
188   else
189     {
190       *vallen = (p[0] << 8) | p[1];
191       p += 2;
192     }
193   *val = p;
194   p += *vallen;
195   return p - pos;
196 }
197
198 static int
199 sdbm_entry_len(struct sdbm *d, uns keylen, uns vallen)
200 {
201   uns len = keylen + vallen;
202   if (d->key_size < 0)
203     len += 2;
204   if (d->val_size < 0)
205     len += 2;
206   return len;
207 }
208
209 static void
210 sdbm_store_entry(struct sdbm *d, byte *pos, byte *key, uns keylen, byte *val, uns vallen)
211 {
212   if (d->key_size < 0)
213     {
214       *pos++ = keylen >> 8;
215       *pos++ = keylen;
216     }
217   memmove(pos, key, keylen);
218   pos += keylen;
219   if (d->val_size < 0)
220     {
221       *pos++ = vallen >> 8;
222       *pos++ = vallen;
223     }
224   memmove(pos, val, vallen);
225 }
226
227 static uns
228 sdbm_page_rank(struct sdbm *d, uns dirpos)
229 {
230   struct page *b;
231   u32 pg, x;
232   uns l, r;
233   uns pm = d->page_mask;
234
235   b = READ_DIR(d, dirpos & ~pm);
236   pg = GET32(b->data, dirpos & pm);
237   l = dirpos;
238   while ((l & pm) && GET32(b->data, (l - 4) & pm) == pg)
239     l -= 4;
240   r = dirpos + 4;
241   /* We heavily depend on unused directory entries being zero */
242   while ((r & pm) && GET32(b->data, r & pm) == pg)
243     r += 4;
244   pgc_put(d->cache, b);
245
246   if (!(l & pm) && !(r & pm))
247     {
248       /* Note that if it spans page boundary, it must contain an integer number of pages */
249       while (l)
250         {
251           b = READ_DIR(d, (l - 4) & ~pm);
252           x = GET32(b->data, 0);
253           pgc_put(d->cache, b);
254           if (x != pg)
255             break;
256           l -= d->page_size;
257         }
258       while (r < 4*d->dir_size)
259         {
260           b = READ_DIR(d, r & ~pm);
261           x = GET32(b->data, 0);
262           pgc_put(d->cache, b);
263           if (x != pg)
264             break;
265           r += d->page_size;
266         }
267     }
268   return (r - l) >> 2;
269 }
270
271 static void
272 sdbm_expand_directory(struct sdbm *d)
273 {
274   struct page *b, *c;
275   int i, ent;
276   u32 *dir, *t;
277
278   if (d->root->dir_order >= 31)
279     die("SDB: Database directory too large, giving up");
280
281   if (4*d->dir_size < d->page_size)
282     {
283       /* It still fits within single page */
284       b = READ_DIR(d, 0);
285       dir = (u32 *) b->data;
286       for(i=d->dir_size-1; i>=0; i--)
287         dir[2*i] = dir[2*i+1] = dir[i];
288       pgc_mark_dirty(d->cache, b);
289       pgc_put(d->cache, b);
290     }
291   else
292     {
293       uns old_dir = d->root->dir_start;
294       uns old_dir_pages = 1 << (d->root->dir_order + 2 - d->page_order);
295       uns page, new_dir;
296       new_dir = d->root->dir_start = sdbm_alloc_pages(d, 2*old_dir_pages);
297       ent = 1 << (d->page_order - 3);
298       for(page=0; page < old_dir_pages; page++)
299         {
300           b = READ_PAGE(d, old_dir + page);
301           dir = (u32 *) b->data;
302           c = GET_PAGE(d, new_dir + 2*page);
303           t = (u32 *) c->data;
304           for(i=0; i<ent; i++)
305             t[2*i] = t[2*i+1] = dir[i];
306           pgc_put(d->cache, c);
307           c = GET_PAGE(d, new_dir + 2*page + 1);
308           t = (u32 *) c->data;
309           for(i=0; i<ent; i++)
310             t[2*i] = t[2*i+1] = dir[ent+i];
311           pgc_put(d->cache, c);
312           pgc_put(d->cache, b);
313         }
314       if (!(d->flags & SDBM_FAST))
315         {
316           /*
317            *  Unless in super-fast mode, fill old directory pages with zeroes.
318            *  This slows us down a bit, but allows database reconstruction after
319            *  the free list is lost.
320            */
321           for(page=0; page < old_dir_pages; page++)
322             {
323               b = GET_ZERO_PAGE(d, old_dir + page);
324               pgc_put(d->cache, b);
325             }
326         }
327       sdbm_free_pages(d, old_dir, old_dir_pages);
328     }
329
330   d->root->dir_order++;
331   d->dir_size = 1 << d->root->dir_order;
332   d->dir_shift = 32 - d->root->dir_order;
333   pgc_mark_dirty(d->cache, d->root_page);
334   if (!(d->flags & SDBM_FAST))
335     sdbm_sync(d);
336 }
337
338 static void
339 sdbm_split_data(struct sdbm *d, struct sdbm_bucket *s, struct sdbm_bucket *d0, struct sdbm_bucket *d1, uns sigbit)
340 {
341   byte *sp = s->data;
342   byte *dp[2] = { d0->data, d1->data };
343   byte *K, *D;
344   uns Kl, Dl, sz, i;
345
346   while (sp < s->data + s->used)
347     {
348       sz = sdbm_get_entry(d, sp, &K, &Kl, &D, &Dl);
349       sp += sz;
350       i = (sdbm_hash(K, Kl) & (1 << sigbit)) ? 1 : 0;
351       sdbm_store_entry(d, dp[i], K, Kl, D, Dl);
352       dp[i] += sz;
353     }
354   d0->used = dp[0] - d0->data;
355   d1->used = dp[1] - d1->data;
356 }
357
358 static void
359 sdbm_split_dir(struct sdbm *d, uns dirpos, uns count, uns pos)
360 {
361   struct page *b;
362   uns i;
363
364   count *= 4;
365   while (count)
366     {
367       b = READ_DIR(d, dirpos & ~d->page_mask);
368       i = d->page_size - (dirpos & d->page_mask);
369       if (i > count)
370         i = count;
371       count -= i;
372       while (i)
373         {
374           GET32(b->data, dirpos & d->page_mask) = pos;
375           dirpos += 4;
376           i -= 4;
377         }
378       pgc_mark_dirty(d->cache, b);
379       pgc_put(d->cache, b);
380     }
381 }
382
383 static inline uns
384 sdbm_dirpos(struct sdbm *d, uns hash)
385 {
386   if (d->dir_shift != 32)               /* avoid shifting by 32 bits */
387     return (hash >> d->dir_shift) << 2; /* offset in the directory */
388   else
389     return 0;
390 }
391
392 static struct page *
393 sdbm_split_page(struct sdbm *d, struct page *b, u32 hash)
394 {
395   struct page *p[2];
396   uns i, rank, sigbit, rank_log, dirpos, newpg;
397
398   dirpos = sdbm_dirpos(d, hash);
399   rank = sdbm_page_rank(d, dirpos);     /* rank = # of pointers to this page */
400   if (rank == 1)
401     {
402       sdbm_expand_directory(d);
403       rank = 2;
404       dirpos *= 2;
405     }
406   rank_log = 1;                         /* rank_log = log2(rank) */
407   while ((1U << rank_log) < rank)
408     rank_log++;
409   sigbit = d->dir_shift + rank_log - 1; /* sigbit = bit we split on */
410   p[0] = b;
411   newpg = sdbm_alloc_page(d);
412   p[1] = GET_PAGE(d, newpg);
413   sdbm_split_data(d, (void *) b->data, (void *) p[0]->data, (void *) p[1]->data, sigbit);
414   sdbm_split_dir(d, (dirpos & ~(4*rank - 1))+2*rank, rank/2, newpg);
415   pgc_mark_dirty(d->cache, p[0]);
416   i = (hash & (1 << sigbit)) ? 1 : 0;
417   pgc_put(d->cache, p[!i]);
418   return p[i];
419 }
420
421 static int
422 sdbm_put_user(byte *D, uns Dl, byte *val, uns *vallen)
423 {
424   if (vallen)
425     {
426       if (*vallen < Dl)
427         return 1;
428       *vallen = Dl;
429     }
430   if (val)
431     memcpy(val, D, Dl);
432   return 0;
433 }
434
435 static int
436 sdbm_access(struct sdbm *d, byte *key, uns keylen, byte *val, uns *vallen, uns mode)    /* 0=read, 1=store, 2=replace */
437 {
438   struct page *p, *q;
439   u32 hash, h, pos, size;
440   struct sdbm_bucket *b;
441   byte *c, *e;
442   int rc;
443
444   if ((d->key_size >= 0 && keylen != (uns) d->key_size) || keylen > 65535)
445     return SDBM_ERROR_BAD_KEY_SIZE;
446   if (val && ((d->val_size >= 0 && *vallen != (uns) d->val_size) || *vallen >= 65535) && mode)
447     return SDBM_ERROR_BAD_VAL_SIZE;
448   if (!mode && !(d->flags & SDBM_WRITE))
449     return SDBM_ERROR_READ_ONLY;
450   hash = sdbm_hash(key, keylen);
451   h = sdbm_dirpos(d, hash);
452   p = READ_DIR(d, h & ~d->page_mask);
453   pos = GET32(p->data, h & d->page_mask);
454   pgc_put(d->cache, p);
455   q = READ_PAGE(d, pos);
456   b = (void *) q->data;
457   c = b->data;
458   e = c + b->used;
459   while (c < e)
460     {
461       byte *K, *D;
462       uns Kl, Dl, s;
463       s = sdbm_get_entry(d, c, &K, &Kl, &D, &Dl);
464       if (Kl == keylen && !memcmp(K, key, Kl))
465         {
466           /* Gotcha! */
467           switch (mode)
468             {
469             case 0:                     /* fetch: found */
470               rc = sdbm_put_user(D, Dl, val, vallen);
471               pgc_put(d->cache, q);
472               return rc ? SDBM_ERROR_TOO_LARGE : 1;
473             case 1:                     /* store: already present */
474               pgc_put(d->cache, q);
475               return 0;
476             default:                    /* replace: delete the old one */
477               memmove(c, c+s, e-(c+s));
478               b->used -= s;
479               goto insert;
480             }
481         }
482       c += s;
483     }
484   if (!mode || !val)            /* fetch or delete: no success */
485     {
486       pgc_put(d->cache, q);
487       return 0;
488     }
489
490 insert:
491   if (val)
492     {
493       size = sdbm_entry_len(d, keylen, *vallen);
494       while (b->used + size > d->page_size - sizeof(struct sdbm_bucket))
495         {
496           /* Page overflow, need to split */
497           if (size >= d->page_size - sizeof(struct sdbm_bucket))
498             {
499               pgc_put(d->cache, q);
500               return SDBM_ERROR_GIANT;
501             }
502           q = sdbm_split_page(d, q, hash);
503           b = (void *) q->data;
504         }
505       sdbm_store_entry(d, b->data + b->used, key, keylen, val, *vallen);
506       b->used += size;
507     }
508   pgc_mark_dirty(d->cache, q);
509   pgc_put(d->cache, q);
510   if (d->flags & SDBM_SYNC)
511     sdbm_sync(d);
512   return 1;
513 }
514
515 int
516 sdbm_store(struct sdbm *d, byte *key, uns keylen, byte *val, uns vallen)
517 {
518   return sdbm_access(d, key, keylen, val, &vallen, 1);
519 }
520
521 int
522 sdbm_replace(struct sdbm *d, byte *key, uns keylen, byte *val, uns vallen)
523 {
524   return sdbm_access(d, key, keylen, val, &vallen, 2);
525 }
526
527 int
528 sdbm_delete(struct sdbm *d, byte *key, uns keylen)
529 {
530   return sdbm_access(d, key, keylen, NULL, NULL, 2);
531 }
532
533 int
534 sdbm_fetch(struct sdbm *d, byte *key, uns keylen, byte *val, uns *vallen)
535 {
536   return sdbm_access(d, key, keylen, val, vallen, 0);
537 }
538
539 void
540 sdbm_rewind(struct sdbm *d)
541 {
542   d->find_page = 1;
543   d->find_pos = 0;
544   d->find_free_list = 0;
545 }
546
547 int
548 sdbm_get_next(struct sdbm *d, byte *key, uns *keylen, byte *val, uns *vallen)
549 {
550   uns page = d->find_page;
551   uns pos = d->find_pos;
552   byte *K, *V;
553   uns c, Kl, Vl;
554   struct page *p;
555   struct sdbm_bucket *b;
556
557   for(;;)
558     {
559       if (!pos)
560         {
561           if (page >= d->file_size)
562             break;
563           if (page == d->root->dir_start)
564             page += (4*d->dir_size + d->page_size - 1) >> d->page_order;
565           else if (page == d->root->free_pool[d->find_free_list].first)
566             page += d->root->free_pool[d->find_free_list++].count;
567           else
568             pos = 4;
569           continue;
570         }
571       p = READ_PAGE(d, page);
572       b = (void *) p->data;
573       if (pos - 4 >= b->used)
574         {
575           pos = 0;
576           page++;
577           pgc_put(d->cache, p);
578           continue;
579         }
580       c = sdbm_get_entry(d, p->data + pos, &K, &Kl, &V, &Vl);
581       d->find_page = page;
582       d->find_pos = pos + c;
583       c = sdbm_put_user(K, Kl, key, keylen) ||
584           sdbm_put_user(V, Vl, val, vallen);
585       pgc_put(d->cache, p);
586       return c ? SDBM_ERROR_TOO_LARGE : 1;
587     }
588   d->find_page = page;
589   d->find_pos = pos;
590   return 0;
591 }
592
593 void
594 sdbm_sync(struct sdbm *d)
595 {
596   pgc_flush(d->cache);
597   if (d->flags & SDBM_FSYNC)
598     fsync(d->fd);
599 }