]> mj.ucw.cz Git - misc.git/commitdiff
qhash: two-level parallel SHA-1
authorMartin Mares <mj@ucw.cz>
Fri, 3 Apr 2015 20:30:54 +0000 (22:30 +0200)
committerMartin Mares <mj@ucw.cz>
Fri, 3 Apr 2015 20:30:54 +0000 (22:30 +0200)
ucw/Makefile
ucw/qhash.c [new file with mode: 0644]

index a4145d573c697c2c3c95fd76d69b18d6e8f589c3..ded2342f3e9aac5ad80890d34e12e804552f7f53 100644 (file)
@@ -7,7 +7,7 @@ LD=gcc
 CFLAGS=-O2 -Wall -W -Wno-parentheses -Wstrict-prototypes -Wmissing-prototypes -Wundef -Wredundant-decls -std=gnu99 $(UCWCF) -ggdb
 LDLIBS+=$(UCWLF)
 
-all: songsack
+all: qhash
 
 clean:
        rm -f `find . -name "*~" -or -name "*.[oa]" -or -name "\#*\#" -or -name TAGS -or -name core -or -name .depend -or -name .#*`
diff --git a/ucw/qhash.c b/ucw/qhash.c
new file mode 100644 (file)
index 0000000..4c97ef6
--- /dev/null
@@ -0,0 +1,213 @@
+/* Two-level parallel SHA1 */
+/* find -type f -print0 | qhash */
+
+#undef LOCAL_DEBUG
+
+#include <ucw/lib.h>
+#include <ucw/fastbuf.h>
+#include <ucw/sha1.h>
+#include <ucw/string.h>
+#include <ucw/workqueue.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#define BLOCK_SIZE 4096
+
+#define THREADS 4
+#define PIECES (2*THREADS)
+#define BLOCKS_PER_PIECE 2
+
+#ifdef THREADS
+
+static int threads_inited;
+static struct worker_pool thread_pool = {
+  .num_threads = THREADS,
+};
+static struct work_queue work_queue;
+
+struct hash_work {
+  struct work w;
+  byte *buffer;
+  byte out[BLOCKS_PER_PIECE][SHA1_SIZE];
+  uint len;
+  uint out_len;
+  uint running;
+};
+static struct hash_work works[PIECES];
+
+static void do_work(struct worker_thread *t UNUSED, struct work *w)
+{
+  struct hash_work *hw = (struct hash_work *) w;
+
+  int i = 0;
+  byte *in = hw->buffer;
+  uint remains = hw->len;
+  while (remains)
+    {
+      uint len = MIN(remains, BLOCK_SIZE);
+      sha1_hash_buffer(hw->out[i], in, len);
+      in += len;
+      remains -= len;
+      i++;
+    }
+  hw->out_len = i;
+}
+
+static void init_threads(void)
+{
+  if (threads_inited)
+    return;
+
+  worker_pool_init(&thread_pool);
+  work_queue_init(&thread_pool, &work_queue);
+
+  for (int i=0; i<PIECES; i++)
+    {
+      works[i].w.go = do_work;
+      works[i].buffer = big_alloc(BLOCK_SIZE * BLOCKS_PER_PIECE);
+    }
+  threads_inited = 1;
+}
+
+static void hash_file(char *name, char *buf)
+{
+  int fd = open(name, O_RDONLY);
+  if (fd < 0)
+    {
+      fprintf(stderr, "Cannot open %s: %m\n", name);
+      strcpy(buf, "???");
+      return;
+    }
+
+  sha1_context main_ctx;
+  sha1_init(&main_ctx);
+  init_threads();
+
+  uint current_piece = 0;
+  uint running_pieces = 0;
+  uint eof = 0;
+  uint seen_err = 0;
+  size_t total = 0;
+
+  while (!eof || running_pieces)
+    {
+      if (!eof && running_pieces < PIECES)
+       {
+         struct hash_work *hw = &works[(current_piece + running_pieces) % PIECES];
+         int r = read(fd, hw->buffer, BLOCK_SIZE * BLOCKS_PER_PIECE);
+         if (r < 0)
+           {
+             fprintf(stderr, "Error reading %s: %m\n", name);
+             eof = seen_err = 1;
+           }
+         else if (!r)
+           eof = 1;
+         else
+           {
+             total += r;
+             hw->len = r;
+             hw->running = 1;
+             work_submit(&work_queue, &hw->w);
+             running_pieces++;
+           }
+       }
+      else
+       {
+         struct hash_work *hw = (struct hash_work *) work_wait(&work_queue);
+         hw->running = 0;
+         while (running_pieces && !works[current_piece].running)
+           {
+             hw = &works[current_piece];
+             for (uint i=0; i < hw->out_len; i++)
+               {
+#ifdef LOCAL_DEBUG
+                 char xx[SHA1_HEX_SIZE];
+                 mem_to_hex(xx, hw->out[i], SHA1_SIZE, 0);
+                 DBG("\t\t%s", xx);
+#endif
+                 sha1_update(&main_ctx, hw->out[i], SHA1_SIZE);
+               }
+             current_piece = (current_piece + 1) % PIECES;
+             running_pieces--;
+           }
+       }
+    }
+
+  close(fd);
+
+  if (seen_err)
+    {
+      strcpy(buf, "???");
+      return;
+    }
+
+  byte *h = sha1_final(&main_ctx);
+  buf += sprintf(buf, "%jd\t", total);
+  mem_to_hex(buf, h, SHA1_SIZE, 0);
+}
+
+#else
+
+static void hash_file(char *name, char *buf)
+{
+  int fd = open(name, O_RDONLY);
+  if (fd < 0)
+    {
+      fprintf(stderr, "Cannot open %s: %m\n", name);
+      strcpy(buf, "???");
+      return;
+    }
+
+  static byte *hash_buffer;
+  if (!hash_buffer)
+    hash_buffer = big_alloc(BLOCK_SIZE);
+
+  sha1_context block_ctx, main_ctx;
+  sha1_init(&main_ctx);
+
+  int len;
+  size_t total = 0;
+  while ((len = read(fd, hash_buffer, BLOCK_SIZE)) > 0)
+    {
+      sha1_init(&block_ctx);
+      sha1_update(&block_ctx, hash_buffer, len);
+      byte *h = sha1_final(&block_ctx);
+      sha1_update(&main_ctx, h, SHA1_SIZE);
+      total += len;
+    }
+  if (len < 0)
+    {
+      fprintf(stderr, "Error reading %s: %m\n", name);
+      close(fd);
+      strcpy(buf, "???");
+      return;
+    }
+
+  close(fd);
+
+  byte *h = sha1_final(&main_ctx);
+  buf += sprintf(buf, "%jd\t", total);
+  mem_to_hex(buf, h, SHA1_SIZE, 0);
+}
+
+#endif
+
+int main(void)
+{
+  struct fastbuf *in = bopen_fd(0, NULL);
+  struct fastbuf *out = bopen_fd(1, NULL);
+
+  char line[4096];
+  while (bgets0(in, line, sizeof(line)))
+    {
+      char hash[256];
+      hash_file(line, hash);
+      bprintf(out, "%s\t%s\n", line, hash);
+    }
+
+  bclose(out);
+  bclose(in);
+  return 0;
+}