]> mj.ucw.cz Git - libucw.git/commitdiff
Cleanup of bucket handling.
authorMartin Mares <mj@ucw.cz>
Fri, 2 Feb 2007 19:41:33 +0000 (20:41 +0100)
committerMartin Mares <mj@ucw.cz>
Fri, 2 Feb 2007 19:41:33 +0000 (20:41 +0100)
The bucket operations have much nicer semantics now.
Also added a couple of debug switches.

lib/sorter/common.h
lib/sorter/config.c
lib/sorter/govern.c
lib/sorter/s-internal.h
lib/sorter/s-twoway.h

index 15083663f3cb03aead02787bdba87d2b7ae4982b..6372b5f450f9f02114b9367b7f474f47ba7f1d1c 100644 (file)
 
 /* Configuration, some of the variables are used by the old sorter, too. */
 extern uns sorter_trace, sorter_presort_bufsize, sorter_stream_bufsize;
+extern uns sorter_debug;
 extern u64 sorter_bufsize;
 
 #define SORT_TRACE(x...) do { if (sorter_trace) log(L_DEBUG, x); } while(0)
 #define SORT_XTRACE(x...) do { if (sorter_trace > 1) log(L_DEBUG, x); } while(0)
 
+enum sort_debug {
+  SORT_DEBUG_NO_PRESORT = 1,
+  SORT_DEBUG_NO_JOIN = 2,
+};
+
 struct sort_bucket {
   cnode n;
   uns flags;
   struct fastbuf *fb;
   byte *name;
-  u64 size;                            // Size in bytes
+  u64 size;                            // Size in bytes (not valid when writing)
   uns runs;                            // Number of runs, 0 if not sorted
   uns hash_bits;                       // Remaining bits of the hash function
   byte *ident;                         // Identifier used in debug messages
@@ -34,6 +40,9 @@ enum sort_bucket_flags {
   SBF_FINAL = 1,                       // This bucket corresponds to the final output file (always 1 run)
   SBF_SOURCE = 2,                      // Contains the source file (always 0 runs)
   SBF_CUSTOM_PRESORT = 4,              // Contains source to read via custom presorter
+  SBF_OPEN_WRITE = 256,                        // We are currently writing to the fastbuf
+  SBF_OPEN_READ = 512,                 // We are reading from the fastbuf
+  SBF_DESTROYED = 1024,                        // Already done with, no further references allowed
 };
 
 struct sort_context {
@@ -68,10 +77,9 @@ void sorter_free_buf(struct sort_context *ctx);
 // Operations on buckets
 struct sort_bucket *sbuck_new(struct sort_context *ctx);
 void sbuck_drop(struct sort_bucket *b);
-int sbuck_can_read(struct sort_bucket *b);
-struct fastbuf *sbuck_open_read(struct sort_bucket *b);
-struct fastbuf *sbuck_open_write(struct sort_bucket *b);
-void sbuck_close_read(struct sort_bucket *b);
-void sbuck_close_write(struct sort_bucket *b);
+int sbuck_have(struct sort_bucket *b);
+sh_off_t sbuck_size(struct sort_bucket *b);
+struct fastbuf *sbuck_read(struct sort_bucket *b);
+struct fastbuf *sbuck_write(struct sort_bucket *b);
 
 #endif
index b1504047d5c576e1136ef05267cbaa3368812934..2658190253fc877399b0a273979447c1d7def29f 100644 (file)
@@ -16,6 +16,7 @@ uns sorter_trace;
 uns sorter_presort_bufsize = 65536;
 uns sorter_stream_bufsize = 65536;
 u64 sorter_bufsize = 65536;
+uns sorter_debug;
 
 static struct cf_section sorter_config = {
   CF_ITEMS {
@@ -23,6 +24,7 @@ static struct cf_section sorter_config = {
     CF_UNS("PresortBuffer", &sorter_presort_bufsize),
     CF_UNS("StreamBuffer", &sorter_stream_bufsize),
     CF_U64("SortBuffer", &sorter_bufsize),
+    CF_UNS("Debug", &sorter_debug),
     CF_END
   }
 };
index 97942dee37bbc0d7604d3b2a6ed62f0e7fb631ca..f962810b3ba0ce72a47d8d63c4f3f8b4b54329eb 100644 (file)
@@ -29,53 +29,60 @@ sbuck_drop(struct sort_bucket *b)
 {
   if (b)
     {
+      ASSERT(!(b->flags & SBF_DESTROYED));
       if (b->n.prev)
        clist_remove(&b->n);
       bclose(b->fb);
       bzero(b, sizeof(*b));
+      b->flags = SBF_DESTROYED;
     }
 }
 
-int
-sbuck_can_read(struct sort_bucket *b)
+sh_off_t
+sbuck_size(struct sort_bucket *b)
 {
-  return b && b->size;
+  if (b->flags & SBF_OPEN_WRITE)
+    return btell(b->fb);
+  else
+    return b->size;
 }
 
-struct fastbuf *
-sbuck_open_read(struct sort_bucket *b)
+int
+sbuck_have(struct sort_bucket *b)
 {
-  /* FIXME: These functions should handle buckets with no fb and only name. */
-  ASSERT(b->fb);
-  return b->fb;
+  return b && sbuck_size(b);
 }
 
 struct fastbuf *
-sbuck_open_write(struct sort_bucket *b)
+sbuck_read(struct sort_bucket *b)
 {
-  if (!b->fb)
-    b->fb = bopen_tmp(sorter_stream_bufsize);
-  return b->fb;
-}
-
-void
-sbuck_close_read(struct sort_bucket *b)
-{
-  if (!b)
-    return;
+  /* FIXME: These functions should handle buckets with no fb and only name. */
   ASSERT(b->fb);
-  bclose(b->fb);
-  b->fb = NULL;
+  if (b->flags & SBF_OPEN_READ)
+    return b->fb;
+  else if (b->flags & SBF_OPEN_WRITE)
+    {
+      b->size = btell(b->fb);
+      b->flags = (b->flags & ~SBF_OPEN_WRITE) | SBF_OPEN_READ;
+      brewind(b->fb);
+      return b->fb;
+    }
+  else
+    ASSERT(0);
 }
 
-void
-sbuck_close_write(struct sort_bucket *b)
+struct fastbuf *
+sbuck_write(struct sort_bucket *b)
 {
-  if (b->fb)
+  if (b->flags & SBF_OPEN_WRITE)
+    ASSERT(b->fb);
+  else
     {
-      b->size = btell(b->fb);
-      brewind(b->fb);
+      ASSERT(!(b->flags & (SBF_OPEN_READ | SBF_DESTROYED)));
+      b->fb = bopen_tmp(sorter_stream_bufsize);
+      b->flags |= SBF_OPEN_WRITE;
     }
+  return b->fb;
 }
 
 void
@@ -108,7 +115,7 @@ static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, stru
   sorter_alloc_buf(ctx);
   if (in->flags & SBF_CUSTOM_PRESORT)
     {
-      struct fastbuf *f = sbuck_open_write(out);
+      struct fastbuf *f = sbuck_write(out);
       return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);  // FIXME: out_only optimization?
     }
   return ctx->internal_sort(ctx, in, out, out_only);
@@ -129,9 +136,9 @@ sorter_join(struct sort_bucket *b)
 
   // FIXME: What if the final bucket doesn't contain any file yet?
 
-  SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) b->size);
-  struct fastbuf *src = sbuck_open_read(b);
-  struct fastbuf *dest = sbuck_open_write(join);
+  SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) sbuck_size(b));
+  struct fastbuf *src = sbuck_read(b);
+  struct fastbuf *dest = sbuck_write(join);
   bbcopy(src, dest, ~0U);
   sbuck_drop(b);
 }
@@ -139,34 +146,42 @@ sorter_join(struct sort_bucket *b)
 static void
 sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
 {
-  struct sort_bucket *ins[3], *outs[3];
+  struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
+  cnode *list_pos = b->n.prev;
   struct sort_bucket *join = sbuck_join_to(b);
+  if (sorter_debug & SORT_DEBUG_NO_JOIN)
+    join = NULL;
 
-  SORT_TRACE("Presorting");
-  ins[0] = sbuck_new(ctx);
-  sbuck_open_read(b);
-  if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
+  if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
     {
-      if (join)
-       sbuck_drop(ins[0]);
-      else
-       clist_insert_after(&ins[0]->n, &b->n);
+      SORT_TRACE("Presorting");
+      ins[0] = sbuck_new(ctx);
+      sbuck_read(b);
+      if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
+       {
+         if (join)
+           sbuck_drop(ins[0]);
+         else
+           clist_insert_after(&ins[0]->n, list_pos);
+         sbuck_drop(b);
+         return;
+       }
+
+      ins[1] = sbuck_new(ctx);
+      int i = 1;
+      while (sorter_presort(ctx, b, ins[i], ins[i]))
+       i = 1-i;
       sbuck_drop(b);
-      return;
     }
-
-  ins[1] = sbuck_new(ctx);
-  ins[2] = NULL;
-  int i = 1;
-  while (sorter_presort(ctx, b, ins[i], ins[i]))
-    i = 1-i;
-  sbuck_close_read(b);
-  sbuck_close_write(ins[0]);
-  sbuck_close_write(ins[1]);
+  else
+    {
+      SORT_TRACE("Skipped presorting");
+      ins[0] = b;
+    }
 
   SORT_TRACE("Main sorting");
   do {
-    if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)        // FIXME: Debug switch for disabling joining optimizations
+    if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)
       {
        // This is guaranteed to produce a single run, so join if possible
        outs[0] = join;
@@ -175,24 +190,22 @@ sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
        ASSERT(outs[0]->runs == 2);
        outs[0]->runs--;
        SORT_TRACE("Pass done (joined final run)");
-       sbuck_drop(b);
+       sbuck_drop(ins[0]);
+       sbuck_drop(ins[1]);
        return;
       }
     outs[0] = sbuck_new(ctx);
     outs[1] = sbuck_new(ctx);
     outs[2] = NULL;
     ctx->twoway_merge(ctx, ins, outs);
-    sbuck_close_write(outs[0]);
-    sbuck_close_write(outs[1]);
-    SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) outs[0]->size, (uintmax_t) outs[1]->size);
+    SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) sbuck_size(outs[0]), (uintmax_t) sbuck_size(outs[1]));
     sbuck_drop(ins[0]);
     sbuck_drop(ins[1]);
     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
-  } while (ins[1]->size);
+  } while (sbuck_have(ins[1]));
 
   sbuck_drop(ins[1]);
-  clist_insert_after(&ins[0]->n, &b->n);
-  sbuck_drop(b);
+  clist_insert_after(&ins[0]->n, list_pos);
 }
 
 void
@@ -206,7 +219,7 @@ sorter_run(struct sort_context *ctx)
 
   // Create bucket containing the source
   struct sort_bucket *bin = sbuck_new(ctx);
-  bin->flags = SBF_SOURCE;
+  bin->flags = SBF_SOURCE | SBF_OPEN_READ;
   if (ctx->custom_presort)
     bin->flags |= SBF_CUSTOM_PRESORT;
   else
@@ -227,7 +240,7 @@ sorter_run(struct sort_context *ctx)
   struct sort_bucket *b;
   while (b = clist_next(&ctx->bucket_list, &bout->n))
     {
-      if (!b->size)
+      if (!sbuck_have(b))
        sbuck_drop(b);
       else if (b->runs == 1)
        sorter_join(b);
@@ -236,7 +249,6 @@ sorter_run(struct sort_context *ctx)
     }
 
   sorter_free_buf(ctx);
-  sbuck_close_write(bout);
-  SORT_XTRACE("Final size: %jd", (uintmax_t) bout->size);
-  ctx->out_fb = sbuck_open_read(bout);
+  SORT_XTRACE("Final size: %jd", (uintmax_t) sbuck_size(bout));
+  ctx->out_fb = sbuck_read(bout);
 }
index 7f779401f147ade11efdd383dde8aba4d202ba3b..c05ca077e574868bb7e27cd94c85ddc6cf24ee10 100644 (file)
@@ -87,7 +87,7 @@ static int P(internal)(struct sort_context *ctx, struct sort_bucket *bin, struct
   SORT_XTRACE("s-internal: Writing");
   if (!ctx->more_keys)
     bout = bout_only;
-  struct fastbuf *out = sbuck_open_write(bout);
+  struct fastbuf *out = sbuck_write(bout);
   bout->runs++;
   /* FIXME: No unification done yet */
   for (item = item_array; item < last_item; item++)
index 16a6720b10d75526f272c5193d2a43402003b0d8..51caf3d45887d1626a6b6f0ea8334f85b262c48e 100644 (file)
@@ -20,11 +20,11 @@ static void P(twoway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket
   int comp;
   uns run_count = 0;
 
-  fin1 = sbuck_open_read(ins[0]);
+  fin1 = sbuck_read(ins[0]);
   next1 = P(read_key)(fin1, kin1);
-  if (sbuck_can_read(ins[1]))
+  if (sbuck_have(ins[1]))
     {
-      fin2 = sbuck_open_read(ins[1]);
+      fin2 = sbuck_read(ins[1]);
       next2 = P(read_key)(fin2, kin2);
     }
   else
@@ -50,9 +50,9 @@ static void P(twoway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket
          if (unlikely(!fout1))
            {
              if (!fout2)
-               fout1 = sbuck_open_write(outs[0]);
+               fout1 = sbuck_write(outs[0]);
              else if (outs[1])
-               fout1 = sbuck_open_write(outs[1]);
+               fout1 = sbuck_write(outs[1]);
              else
                fout1 = fout2;
            }
@@ -100,8 +100,6 @@ static void P(twoway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket
        }
     }
 
-  sbuck_close_read(ins[0]);
-  sbuck_close_read(ins[1]);
   if (fout2 && fout2 != fout1)
     outs[1]->runs += run_count / 2;
   if (fout1)