]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/govern.c
97942dee37bbc0d7604d3b2a6ed62f0e7fb631ca
[libucw.git] / lib / sorter / govern.c
1 /*
2  *      UCW Library -- Universal Sorter: Governing Routines
3  *
4  *      (c) 2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/fastbuf.h"
12 #include "lib/mempool.h"
13 #include "lib/sorter/common.h"
14
15 void *
16 sorter_alloc(struct sort_context *ctx, uns size)
17 {
18   return mp_alloc_zero(ctx->pool, size);
19 }
20
21 struct sort_bucket *
22 sbuck_new(struct sort_context *ctx)
23 {
24   return sorter_alloc(ctx, sizeof(struct sort_bucket));
25 }
26
27 void
28 sbuck_drop(struct sort_bucket *b)
29 {
30   if (b)
31     {
32       if (b->n.prev)
33         clist_remove(&b->n);
34       bclose(b->fb);
35       bzero(b, sizeof(*b));
36     }
37 }
38
39 int
40 sbuck_can_read(struct sort_bucket *b)
41 {
42   return b && b->size;
43 }
44
45 struct fastbuf *
46 sbuck_open_read(struct sort_bucket *b)
47 {
48   /* FIXME: These functions should handle buckets with no fb and only name. */
49   ASSERT(b->fb);
50   return b->fb;
51 }
52
53 struct fastbuf *
54 sbuck_open_write(struct sort_bucket *b)
55 {
56   if (!b->fb)
57     b->fb = bopen_tmp(sorter_stream_bufsize);
58   return b->fb;
59 }
60
61 void
62 sbuck_close_read(struct sort_bucket *b)
63 {
64   if (!b)
65     return;
66   ASSERT(b->fb);
67   bclose(b->fb);
68   b->fb = NULL;
69 }
70
71 void
72 sbuck_close_write(struct sort_bucket *b)
73 {
74   if (b->fb)
75     {
76       b->size = btell(b->fb);
77       brewind(b->fb);
78     }
79 }
80
81 void
82 sorter_alloc_buf(struct sort_context *ctx)
83 {
84   if (ctx->big_buf)
85     return;
86   u64 bs = MAX(sorter_bufsize/2, 1);
87   bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE);
88   ctx->big_buf = big_alloc(2*bs);
89   ctx->big_buf_size = 2*bs;
90   ctx->big_buf_half = ((byte*) ctx->big_buf) + bs;
91   ctx->big_buf_half_size = bs;
92   SORT_XTRACE("Allocated sorting buffer (%jd bytes)", (uintmax_t) bs);
93 }
94
95 void
96 sorter_free_buf(struct sort_context *ctx)
97 {
98   if (!ctx->big_buf)
99     return;
100   big_free(ctx->big_buf, ctx->big_buf_size);
101   ctx->big_buf = NULL;
102   SORT_XTRACE("Freed sorting buffer");
103 }
104
105 static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
106 {
107   /* FIXME: Mode with no presorting (mostly for debugging) */
108   sorter_alloc_buf(ctx);
109   if (in->flags & SBF_CUSTOM_PRESORT)
110     {
111       struct fastbuf *f = sbuck_open_write(out);
112       return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);   // FIXME: out_only optimization?
113     }
114   return ctx->internal_sort(ctx, in, out, out_only);
115 }
116
117 static inline struct sort_bucket *
118 sbuck_join_to(struct sort_bucket *b)
119 {
120   struct sort_bucket *out = (struct sort_bucket *) b->n.prev;   // Such bucket is guaranteed to exist
121   return (out->flags & SBF_FINAL) ? out : NULL;
122 }
123
124 static void
125 sorter_join(struct sort_bucket *b)
126 {
127   struct sort_bucket *join = sbuck_join_to(b);
128   ASSERT(join);
129
130   // FIXME: What if the final bucket doesn't contain any file yet?
131
132   SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) b->size);
133   struct fastbuf *src = sbuck_open_read(b);
134   struct fastbuf *dest = sbuck_open_write(join);
135   bbcopy(src, dest, ~0U);
136   sbuck_drop(b);
137 }
138
139 static void
140 sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
141 {
142   struct sort_bucket *ins[3], *outs[3];
143   struct sort_bucket *join = sbuck_join_to(b);
144
145   SORT_TRACE("Presorting");
146   ins[0] = sbuck_new(ctx);
147   sbuck_open_read(b);
148   if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
149     {
150       if (join)
151         sbuck_drop(ins[0]);
152       else
153         clist_insert_after(&ins[0]->n, &b->n);
154       sbuck_drop(b);
155       return;
156     }
157
158   ins[1] = sbuck_new(ctx);
159   ins[2] = NULL;
160   int i = 1;
161   while (sorter_presort(ctx, b, ins[i], ins[i]))
162     i = 1-i;
163   sbuck_close_read(b);
164   sbuck_close_write(ins[0]);
165   sbuck_close_write(ins[1]);
166
167   SORT_TRACE("Main sorting");
168   do {
169     if (ins[0]->runs == 1 && ins[1]->runs == 1 && join) // FIXME: Debug switch for disabling joining optimizations
170       {
171         // This is guaranteed to produce a single run, so join if possible
172         outs[0] = join;
173         outs[1] = NULL;
174         ctx->twoway_merge(ctx, ins, outs);
175         ASSERT(outs[0]->runs == 2);
176         outs[0]->runs--;
177         SORT_TRACE("Pass done (joined final run)");
178         sbuck_drop(b);
179         return;
180       }
181     outs[0] = sbuck_new(ctx);
182     outs[1] = sbuck_new(ctx);
183     outs[2] = NULL;
184     ctx->twoway_merge(ctx, ins, outs);
185     sbuck_close_write(outs[0]);
186     sbuck_close_write(outs[1]);
187     SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) outs[0]->size, (uintmax_t) outs[1]->size);
188     sbuck_drop(ins[0]);
189     sbuck_drop(ins[1]);
190     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
191   } while (ins[1]->size);
192
193   sbuck_drop(ins[1]);
194   clist_insert_after(&ins[0]->n, &b->n);
195   sbuck_drop(b);
196 }
197
198 void
199 sorter_run(struct sort_context *ctx)
200 {
201   ctx->pool = mp_new(4096);
202   clist_init(&ctx->bucket_list);
203
204   /* FIXME: There should be a way how to detect size of the input file */
205   /* FIXME: Remember to test sorting of empty files */
206
207   // Create bucket containing the source
208   struct sort_bucket *bin = sbuck_new(ctx);
209   bin->flags = SBF_SOURCE;
210   if (ctx->custom_presort)
211     bin->flags |= SBF_CUSTOM_PRESORT;
212   else
213     bin->fb = ctx->in_fb;
214   bin->ident = "in";
215   bin->size = ~(u64)0;
216   bin->hash_bits = ctx->hash_bits;
217   clist_add_tail(&ctx->bucket_list, &bin->n);
218
219   // Create bucket for the output
220   struct sort_bucket *bout = sbuck_new(ctx);
221   bout->flags = SBF_FINAL;
222   bout->fb = ctx->out_fb;
223   bout->ident = "out";
224   bout->runs = 1;
225   clist_add_head(&ctx->bucket_list, &bout->n);
226
227   struct sort_bucket *b;
228   while (b = clist_next(&ctx->bucket_list, &bout->n))
229     {
230       if (!b->size)
231         sbuck_drop(b);
232       else if (b->runs == 1)
233         sorter_join(b);
234       else
235         sorter_twoway(ctx, b);
236     }
237
238   sorter_free_buf(ctx);
239   sbuck_close_write(bout);
240   SORT_XTRACE("Final size: %jd", (uintmax_t) bout->size);
241   ctx->out_fb = sbuck_open_read(bout);
242 }