]> mj.ucw.cz Git - libucw.git/blobdiff - lib/sorter/govern.c
Cleaned up tracing levels. Array sorter now has its own control knob.
[libucw.git] / lib / sorter / govern.c
index 5173f2967071a44b0295b294bd9dac2e722f3e3a..7231a5b77a56ef48a93ecdd59038993e83c37f90 100644 (file)
@@ -48,15 +48,20 @@ sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_buc
   sorter_alloc_buf(ctx);
   if (in->flags & SBF_CUSTOM_PRESORT)
     {
   sorter_alloc_buf(ctx);
   if (in->flags & SBF_CUSTOM_PRESORT)
     {
+      /*
+       *  The trick with automatic joining, which we use for the normal presorter,
+       *  is not necessary with the custom presorter, because the custom presorter
+       *  is never called in the middle of the sorted data.
+       */
       struct fastbuf *f = sbuck_write(out);
       out->runs++;
       struct fastbuf *f = sbuck_write(out);
       out->runs++;
-      return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);  // FIXME: out_only optimization?
+      return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);
     }
   return ctx->internal_sort(ctx, in, out, out_only);
 }
 
     }
   return ctx->internal_sort(ctx, in, out, out_only);
 }
 
-static inline struct sort_bucket *
-sbuck_join_to(struct sort_bucket *b)
+static struct sort_bucket *
+sbuck_join_to(struct sort_bucket *b, sh_off_t *sizep)
 {
   if (sorter_debug & SORT_DEBUG_NO_JOIN)
     return NULL;
 {
   if (sorter_debug & SORT_DEBUG_NO_JOIN)
     return NULL;
@@ -65,9 +70,30 @@ sbuck_join_to(struct sort_bucket *b)
   if (!(out->flags & SBF_FINAL))
     return NULL;
   ASSERT(out->runs == 1);
   if (!(out->flags & SBF_FINAL))
     return NULL;
   ASSERT(out->runs == 1);
+  *sizep = sbuck_size(out);
   return out;
 }
 
   return out;
 }
 
+static sh_off_t
+sbuck_ins_or_join(struct sort_bucket *b, cnode *list_pos, struct sort_bucket *join, sh_off_t join_size)
+{
+  if (join)
+    {
+      if (b)
+       sbuck_drop(b);
+      ASSERT(join->runs == 2);
+      join->runs--;
+      return sbuck_size(join) - join_size;
+    }
+  else if (b)
+    {
+      clist_insert_after(&b->n, list_pos);
+      return sbuck_size(b);
+    }
+  else
+    return 0;
+}
+
 static void
 sorter_join(struct sort_bucket *b)
 {
 static void
 sorter_join(struct sort_bucket *b)
 {
@@ -79,7 +105,7 @@ sorter_join(struct sort_bucket *b)
     {
       // The final bucket doesn't have any file associated yet, so replace
       // it with the new bucket.
     {
       // The final bucket doesn't have any file associated yet, so replace
       // it with the new bucket.
-      SORT_XTRACE(2, "Replaced final bucket");
+      SORT_XTRACE(3, "Replaced final bucket");
       b->flags |= SBF_FINAL;
       sbuck_drop(join);
     }
       b->flags |= SBF_FINAL;
       sbuck_drop(join);
     }
@@ -98,7 +124,8 @@ sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
 {
   struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
   cnode *list_pos = b->n.prev;
 {
   struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
   cnode *list_pos = b->n.prev;
-  struct sort_bucket *join = sbuck_join_to(b);
+  sh_off_t join_size;
+  struct sort_bucket *join = sbuck_join_to(b, &join_size);
 
   if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
     {
 
   if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
     {
@@ -108,15 +135,8 @@ sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
       if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
        {
          sorter_stop_timer(ctx, &ctx->total_pre_time);
       if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
        {
          sorter_stop_timer(ctx, &ctx->total_pre_time);
-         SORT_XTRACE(((b->flags & SBF_SOURCE) ? 1 : 2), "Sorted in memory");
-         if (join)
-           {
-             ASSERT(join->runs == 2);
-             join->runs--;
-             sbuck_drop(ins[0]);
-           }
-         else
-           clist_insert_after(&ins[0]->n, list_pos);
+         sh_off_t size = sbuck_ins_or_join(ins[0], list_pos, join, join_size);
+         SORT_XTRACE(((b->flags & SBF_SOURCE) ? 1 : 3), "Sorted in memory (%s, %dMB/s)", stk_fsize(size), sorter_speed(ctx, size));
          sbuck_drop(b);
          return;
        }
          sbuck_drop(b);
          return;
        }
@@ -143,18 +163,15 @@ sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
   do {
     ++pass;
     sorter_start_timer(ctx);
   do {
     ++pass;
     sorter_start_timer(ctx);
-    if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)
+    if (ins[0]->runs <= 1 && ins[1]->runs <= 1 && join)
       {
        // This is guaranteed to produce a single run, so join if possible
       {
        // This is guaranteed to produce a single run, so join if possible
-       sh_off_t join_size = sbuck_size(join);
        outs[0] = join;
        outs[1] = NULL;
        ctx->twoway_merge(ctx, ins, outs);
        outs[0] = join;
        outs[1] = NULL;
        ctx->twoway_merge(ctx, ins, outs);
-       ASSERT(join->runs == 2);
-       join->runs--;
-       join_size = sbuck_size(join) - join_size;
+       sh_off_t size = sbuck_ins_or_join(NULL, NULL, join, join_size);
        sorter_stop_timer(ctx, &ctx->total_ext_time);
        sorter_stop_timer(ctx, &ctx->total_ext_time);
-       SORT_TRACE("Mergesort pass %d (final run, %s, %dMB/s)", pass, stk_fsize(join_size), sorter_speed(ctx, join_size));
+       SORT_TRACE("Mergesort pass %d (final run, %s, %dMB/s)", pass, stk_fsize(size), sorter_speed(ctx, size));
        sbuck_drop(ins[0]);
        sbuck_drop(ins[1]);
        return;
        sbuck_drop(ins[0]);
        sbuck_drop(ins[1]);
        return;
@@ -182,8 +199,9 @@ sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
 {
   clist parts;
   cnode *list_pos = b->n.prev;
 {
   clist parts;
   cnode *list_pos = b->n.prev;
-  struct sort_bucket *join = sbuck_join_to(b);
-  uns trace_level = (b->flags & SBF_SOURCE) ? 1 : 2;
+  sh_off_t join_size;
+  struct sort_bucket *join = sbuck_join_to(b, &join_size);
+  uns trace_level = (b->flags & SBF_SOURCE) ? 1 : 3;
 
   clist_init(&parts);
   ASSERT(!(sorter_debug & SORT_DEBUG_NO_PRESORT));
 
   clist_init(&parts);
   ASSERT(!(sorter_debug & SORT_DEBUG_NO_PRESORT));
@@ -211,22 +229,10 @@ sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
   sorter_free_buf(ctx);
   sbuck_drop(b);
 
   sorter_free_buf(ctx);
   sbuck_drop(b);
 
-  // FIXME: This is way too similar to the two-way case.
-  if (!part_cnt)
-    {
-      if (join)
-       {
-          SORT_XTRACE(trace_level, "Sorted in memory and joined");
-         ASSERT(join->runs == 2);
-         join->runs--;
-       }
-      return;
-    }
-  if (part_cnt == 1)
+  if (part_cnt <= 1)
     {
     {
-      struct sort_bucket *p = clist_head(&parts);
-      SORT_XTRACE(trace_level, "Sorted in memory");
-      clist_insert_after(&p->n, list_pos);
+      sh_off_t size = sbuck_ins_or_join(clist_head(&parts), list_pos, (part_cnt ? NULL : join), join_size);
+      SORT_XTRACE(trace_level, "Sorted in memory (%s, %dMB/s)", stk_fsize(size), sorter_speed(ctx, size));
       return;
     }
 
       return;
     }
 
@@ -234,7 +240,7 @@ sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
 
   uns max_ways = 1 << sorter_max_multiway_bits;
   struct sort_bucket *ways[max_ways+1];
 
   uns max_ways = 1 << sorter_max_multiway_bits;
   struct sort_bucket *ways[max_ways+1];
-  SORT_XTRACE(2, "Starting up to %d-way merge", max_ways);
+  SORT_XTRACE(3, "Starting up to %d-way merge", max_ways);
   for (;;)
     {
       uns n = 0;
   for (;;)
     {
       uns n = 0;
@@ -248,7 +254,10 @@ sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
       ASSERT(n > 1);
 
       struct sort_bucket *out;
       ASSERT(n > 1);
 
       struct sort_bucket *out;
-      out = sbuck_new(ctx);            // FIXME: No joining so far
+      if (clist_empty(&parts) && join)
+       out = join;
+      else
+       out = sbuck_new(ctx);
       sorter_start_timer(ctx);
       ctx->multiway_merge(ctx, ways, out);
       sorter_stop_timer(ctx, &ctx->total_ext_time);
       sorter_start_timer(ctx);
       ctx->multiway_merge(ctx, ways, out);
       sorter_stop_timer(ctx, &ctx->total_ext_time);
@@ -258,8 +267,8 @@ sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
 
       if (clist_empty(&parts))
        {
 
       if (clist_empty(&parts))
        {
-         clist_insert_after(&out->n, list_pos);
-         SORT_TRACE("Multi-way merge completed (%s, %dMB/s)", F_BSIZE(out), sorter_speed(ctx, sbuck_size(out)));
+         sh_off_t size = sbuck_ins_or_join((join ? NULL : out), list_pos, join, join_size);
+         SORT_TRACE("Multi-way merge completed (%d ways, %s, %dMB/s)", n, stk_fsize(size), sorter_speed(ctx, size));
          return;
        }
       else
          return;
        }
       else
@@ -274,8 +283,11 @@ sorter_multiway(struct sort_context *ctx, struct sort_bucket *b)
 static void
 sorter_radix(struct sort_context *ctx, struct sort_bucket *b, uns bits)
 {
 static void
 sorter_radix(struct sort_context *ctx, struct sort_bucket *b, uns bits)
 {
+  // Add more bits if requested and allowed.
+  bits = MIN(bits + sorter_add_radix_bits, sorter_max_radix_bits);
+
   uns nbuck = 1 << bits;
   uns nbuck = 1 << bits;
-  SORT_XTRACE(2, "Running radix split on %s with hash %d bits of %d (expecting %s buckets)",
+  SORT_XTRACE(3, "Running radix split on %s with hash %d bits of %d (expecting %s buckets)",
              F_BSIZE(b), bits, b->hash_bits, stk_fsize(sbuck_size(b) / nbuck));
   sorter_free_buf(ctx);
   sorter_start_timer(ctx);
              F_BSIZE(b), bits, b->hash_bits, stk_fsize(sbuck_size(b) / nbuck));
   sorter_free_buf(ctx);
   sorter_start_timer(ctx);
@@ -313,7 +325,7 @@ sorter_decide(struct sort_context *ctx, struct sort_bucket *b)
   // Drop empty buckets
   if (!sbuck_have(b))
     {
   // Drop empty buckets
   if (!sbuck_have(b))
     {
-      SORT_XTRACE(3, "Dropping empty bucket");
+      SORT_XTRACE(4, "Dropping empty bucket");
       sbuck_drop(b);
       return;
     }
       sbuck_drop(b);
       return;
     }
@@ -321,7 +333,7 @@ sorter_decide(struct sort_context *ctx, struct sort_bucket *b)
   // How many bits of bucket size we have to reduce before it fits in the RAM?
   // (this is insanely large if the input size is unknown, but it serves our purpose)
   u64 insize = sbuck_size(b);
   // How many bits of bucket size we have to reduce before it fits in the RAM?
   // (this is insanely large if the input size is unknown, but it serves our purpose)
   u64 insize = sbuck_size(b);
-  u64 mem = ctx->internal_estimate(ctx, b) * 0.8;      // FIXME: Magical factor for various non-uniformities
+  u64 mem = ctx->internal_estimate(ctx, b) * 0.8;      // Magical factor accounting for various non-uniformities
   uns bits = 0;
   while ((insize >> bits) > mem)
     bits++;
   uns bits = 0;
   while ((insize >> bits) > mem)
     bits++;
@@ -353,7 +365,7 @@ sorter_decide(struct sort_context *ctx, struct sort_bucket *b)
        multiway_bits = 0;
     }
 
        multiway_bits = 0;
     }
 
-  SORT_XTRACE(2, "Decisions: size=%s max=%s runs=%d bits=%d hash=%d -> radix=%d multi=%d",
+  SORT_XTRACE(3, "Decisions: size=%s max=%s runs=%d bits=%d hash=%d -> radix=%d multi=%d",
        stk_fsize(insize), stk_fsize(mem), b->runs, bits, b->hash_bits,
        radix_bits, multiway_bits);
 
        stk_fsize(insize), stk_fsize(mem), b->runs, bits, b->hash_bits,
        radix_bits, multiway_bits);
 
@@ -387,6 +399,7 @@ sorter_run(struct sort_context *ctx)
   ctx->pool = mp_new(4096);
   clist_init(&ctx->bucket_list);
   sorter_prepare_buf(ctx);
   ctx->pool = mp_new(4096);
   clist_init(&ctx->bucket_list);
   sorter_prepare_buf(ctx);
+  asort_start_threads(0);
 
   // Create bucket containing the source
   struct sort_bucket *bin = sbuck_new(ctx);
 
   // Create bucket containing the source
   struct sort_bucket *bin = sbuck_new(ctx);
@@ -400,6 +413,7 @@ sorter_run(struct sort_context *ctx)
   bin->hash_bits = ctx->hash_bits;
   clist_add_tail(&ctx->bucket_list, &bin->n);
   SORT_XTRACE(2, "Input size: %s, %d hash bits", F_BSIZE(bin), bin->hash_bits);
   bin->hash_bits = ctx->hash_bits;
   clist_add_tail(&ctx->bucket_list, &bin->n);
   SORT_XTRACE(2, "Input size: %s, %d hash bits", F_BSIZE(bin), bin->hash_bits);
+  ctx->fb_params = (bin->size < sorter_small_input) ? &sorter_small_fb_params : &sorter_fb_params;
 
   // Create bucket for the output
   struct sort_bucket *bout = sbuck_new(ctx);
 
   // Create bucket for the output
   struct sort_bucket *bout = sbuck_new(ctx);
@@ -415,6 +429,7 @@ sorter_run(struct sort_context *ctx)
   while (bout = clist_head(&ctx->bucket_list), b = clist_next(&ctx->bucket_list, &bout->n))
     sorter_decide(ctx, b);
 
   while (bout = clist_head(&ctx->bucket_list), b = clist_next(&ctx->bucket_list, &bout->n))
     sorter_decide(ctx, b);
 
+  asort_stop_threads();
   sorter_free_buf(ctx);
   sbuck_write(bout);           // Force empty bucket to a file
   SORT_XTRACE(2, "Final size: %s", F_BSIZE(bout));
   sorter_free_buf(ctx);
   sbuck_write(bout);           // Force empty bucket to a file
   SORT_XTRACE(2, "Final size: %s", F_BSIZE(bout));