]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/govern.c
738186e34da8a6eebddb0384f6aa3781b4387fcb
[libucw.git] / lib / sorter / govern.c
1 /*
2  *      UCW Library -- Universal Sorter: Governing Routines
3  *
4  *      (c) 2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/fastbuf.h"
12 #include "lib/mempool.h"
13 #include "lib/sorter/common.h"
14
15 #include <string.h>
16
17 static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
18 {
19   sorter_alloc_buf(ctx);
20   if (in->flags & SBF_CUSTOM_PRESORT)
21     {
22       struct fastbuf *f = sbuck_write(out);
23       return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);   // FIXME: out_only optimization?
24     }
25   return ctx->internal_sort(ctx, in, out, out_only);
26 }
27
28 static inline struct sort_bucket *
29 sbuck_join_to(struct sort_bucket *b)
30 {
31   if (sorter_debug & SORT_DEBUG_NO_JOIN)
32     return NULL;
33
34   struct sort_bucket *out = (struct sort_bucket *) b->n.prev;   // Such bucket is guaranteed to exist
35   return (out->flags & SBF_FINAL) ? out : NULL;
36 }
37
38 static void
39 sorter_join(struct sort_bucket *b)
40 {
41   struct sort_bucket *join = (struct sort_bucket *) b->n.prev;
42   ASSERT(join->flags & SBF_FINAL);
43   ASSERT(b->runs == 1);
44
45   if (!sbuck_has_file(join))
46     {
47       // The final bucket doesn't have any file associated yet, so replace
48       // it with the new bucket.
49       SORT_XTRACE("Replaced final bucket");
50       b->flags |= SBF_FINAL;
51       sbuck_drop(join);
52     }
53   else
54     {
55       SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) sbuck_size(b));
56       struct fastbuf *src = sbuck_read(b);
57       struct fastbuf *dest = sbuck_write(join);
58       bbcopy(src, dest, ~0U);
59       sbuck_drop(b);
60     }
61 }
62
63 static void
64 sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
65 {
66   struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
67   cnode *list_pos = b->n.prev;
68   struct sort_bucket *join = sbuck_join_to(b);
69
70   if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
71     {
72       SORT_TRACE("Presorting");
73       ins[0] = sbuck_new(ctx);
74       if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
75         {
76           SORT_XTRACE("Sorted in memory");
77           if (join)
78             sbuck_drop(ins[0]);
79           else
80             clist_insert_after(&ins[0]->n, list_pos);
81           sbuck_drop(b);
82           return;
83         }
84
85       ins[1] = sbuck_new(ctx);
86       int i = 1;
87       while (sorter_presort(ctx, b, ins[i], ins[i]))
88         i = 1-i;
89       sbuck_drop(b);
90     }
91   else
92     {
93       SORT_TRACE("Skipped presorting");
94       ins[0] = b;
95     }
96
97   SORT_TRACE("Main sorting");
98   do {
99     if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)
100       {
101         // This is guaranteed to produce a single run, so join if possible
102         outs[0] = join;
103         outs[1] = NULL;
104         ctx->twoway_merge(ctx, ins, outs);
105         ASSERT(outs[0]->runs == 2);
106         outs[0]->runs--;
107         SORT_TRACE("Pass done (joined final run)");
108         sbuck_drop(ins[0]);
109         sbuck_drop(ins[1]);
110         return;
111       }
112     outs[0] = sbuck_new(ctx);
113     outs[1] = sbuck_new(ctx);
114     outs[2] = NULL;
115     ctx->twoway_merge(ctx, ins, outs);
116     SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) sbuck_size(outs[0]), (uintmax_t) sbuck_size(outs[1]));
117     sbuck_drop(ins[0]);
118     sbuck_drop(ins[1]);
119     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
120   } while (sbuck_have(ins[1]));
121
122   sbuck_drop(ins[1]);
123   clist_insert_after(&ins[0]->n, list_pos);
124 }
125
126 void
127 sorter_run(struct sort_context *ctx)
128 {
129   ctx->pool = mp_new(4096);
130   clist_init(&ctx->bucket_list);
131
132   /* FIXME: There should be a way how to detect size of the input file */
133   /* FIXME: Remember to test sorting of empty files */
134
135   // Create bucket containing the source
136   struct sort_bucket *bin = sbuck_new(ctx);
137   bin->flags = SBF_SOURCE | SBF_OPEN_READ;
138   if (ctx->custom_presort)
139     bin->flags |= SBF_CUSTOM_PRESORT;
140   else
141     bin->fb = ctx->in_fb;
142   bin->ident = "in";
143   bin->size = ~(u64)0;
144   bin->hash_bits = ctx->hash_bits;
145   clist_add_tail(&ctx->bucket_list, &bin->n);
146
147   // Create bucket for the output
148   struct sort_bucket *bout = sbuck_new(ctx);
149   bout->flags = SBF_FINAL;
150   bout->fb = ctx->out_fb;
151   bout->ident = "out";
152   bout->runs = 1;
153   clist_add_head(&ctx->bucket_list, &bout->n);
154
155   struct sort_bucket *b;
156   while (bout = clist_head(&ctx->bucket_list), b = clist_next(&ctx->bucket_list, &bout->n))
157     {
158       if (!sbuck_have(b))
159         sbuck_drop(b);
160       else if (b->runs == 1)
161         sorter_join(b);
162       else
163         sorter_twoway(ctx, b);
164     }
165
166   sorter_free_buf(ctx);
167   sbuck_write(bout);            // Force empty bucket to a file
168   SORT_XTRACE("Final size: %jd", (uintmax_t) sbuck_size(bout));
169   ctx->out_fb = sbuck_read(bout);
170 }