--- /dev/null
+/* Two-level parallel SHA1 */
+/* find -type f -print0 | qhash */
+
+#undef LOCAL_DEBUG
+
+#include <ucw/lib.h>
+#include <ucw/fastbuf.h>
+#include <ucw/sha1.h>
+#include <ucw/string.h>
+#include <ucw/workqueue.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#define BLOCK_SIZE 4096
+
+#define THREADS 4
+#define PIECES (2*THREADS)
+#define BLOCKS_PER_PIECE 2
+
+#ifdef THREADS
+
+static int threads_inited;
+static struct worker_pool thread_pool = {
+ .num_threads = THREADS,
+};
+static struct work_queue work_queue;
+
+struct hash_work {
+ struct work w;
+ byte *buffer;
+ byte out[BLOCKS_PER_PIECE][SHA1_SIZE];
+ uint len;
+ uint out_len;
+ uint running;
+};
+static struct hash_work works[PIECES];
+
+static void do_work(struct worker_thread *t UNUSED, struct work *w)
+{
+ struct hash_work *hw = (struct hash_work *) w;
+
+ int i = 0;
+ byte *in = hw->buffer;
+ uint remains = hw->len;
+ while (remains)
+ {
+ uint len = MIN(remains, BLOCK_SIZE);
+ sha1_hash_buffer(hw->out[i], in, len);
+ in += len;
+ remains -= len;
+ i++;
+ }
+ hw->out_len = i;
+}
+
+static void init_threads(void)
+{
+ if (threads_inited)
+ return;
+
+ worker_pool_init(&thread_pool);
+ work_queue_init(&thread_pool, &work_queue);
+
+ for (int i=0; i<PIECES; i++)
+ {
+ works[i].w.go = do_work;
+ works[i].buffer = big_alloc(BLOCK_SIZE * BLOCKS_PER_PIECE);
+ }
+ threads_inited = 1;
+}
+
+static void hash_file(char *name, char *buf)
+{
+ int fd = open(name, O_RDONLY);
+ if (fd < 0)
+ {
+ fprintf(stderr, "Cannot open %s: %m\n", name);
+ strcpy(buf, "???");
+ return;
+ }
+
+ sha1_context main_ctx;
+ sha1_init(&main_ctx);
+ init_threads();
+
+ uint current_piece = 0;
+ uint running_pieces = 0;
+ uint eof = 0;
+ uint seen_err = 0;
+ size_t total = 0;
+
+ while (!eof || running_pieces)
+ {
+ if (!eof && running_pieces < PIECES)
+ {
+ struct hash_work *hw = &works[(current_piece + running_pieces) % PIECES];
+ int r = read(fd, hw->buffer, BLOCK_SIZE * BLOCKS_PER_PIECE);
+ if (r < 0)
+ {
+ fprintf(stderr, "Error reading %s: %m\n", name);
+ eof = seen_err = 1;
+ }
+ else if (!r)
+ eof = 1;
+ else
+ {
+ total += r;
+ hw->len = r;
+ hw->running = 1;
+ work_submit(&work_queue, &hw->w);
+ running_pieces++;
+ }
+ }
+ else
+ {
+ struct hash_work *hw = (struct hash_work *) work_wait(&work_queue);
+ hw->running = 0;
+ while (running_pieces && !works[current_piece].running)
+ {
+ hw = &works[current_piece];
+ for (uint i=0; i < hw->out_len; i++)
+ {
+#ifdef LOCAL_DEBUG
+ char xx[SHA1_HEX_SIZE];
+ mem_to_hex(xx, hw->out[i], SHA1_SIZE, 0);
+ DBG("\t\t%s", xx);
+#endif
+ sha1_update(&main_ctx, hw->out[i], SHA1_SIZE);
+ }
+ current_piece = (current_piece + 1) % PIECES;
+ running_pieces--;
+ }
+ }
+ }
+
+ close(fd);
+
+ if (seen_err)
+ {
+ strcpy(buf, "???");
+ return;
+ }
+
+ byte *h = sha1_final(&main_ctx);
+ buf += sprintf(buf, "%jd\t", total);
+ mem_to_hex(buf, h, SHA1_SIZE, 0);
+}
+
+#else
+
+static void hash_file(char *name, char *buf)
+{
+ int fd = open(name, O_RDONLY);
+ if (fd < 0)
+ {
+ fprintf(stderr, "Cannot open %s: %m\n", name);
+ strcpy(buf, "???");
+ return;
+ }
+
+ static byte *hash_buffer;
+ if (!hash_buffer)
+ hash_buffer = big_alloc(BLOCK_SIZE);
+
+ sha1_context block_ctx, main_ctx;
+ sha1_init(&main_ctx);
+
+ int len;
+ size_t total = 0;
+ while ((len = read(fd, hash_buffer, BLOCK_SIZE)) > 0)
+ {
+ sha1_init(&block_ctx);
+ sha1_update(&block_ctx, hash_buffer, len);
+ byte *h = sha1_final(&block_ctx);
+ sha1_update(&main_ctx, h, SHA1_SIZE);
+ total += len;
+ }
+ if (len < 0)
+ {
+ fprintf(stderr, "Error reading %s: %m\n", name);
+ close(fd);
+ strcpy(buf, "???");
+ return;
+ }
+
+ close(fd);
+
+ byte *h = sha1_final(&main_ctx);
+ buf += sprintf(buf, "%jd\t", total);
+ mem_to_hex(buf, h, SHA1_SIZE, 0);
+}
+
+#endif
+
+int main(void)
+{
+ struct fastbuf *in = bopen_fd(0, NULL);
+ struct fastbuf *out = bopen_fd(1, NULL);
+
+ char line[4096];
+ while (bgets0(in, line, sizeof(line)))
+ {
+ char hash[256];
+ hash_file(line, hash);
+ bprintf(out, "%s\t%s\n", line, hash);
+ }
+
+ bclose(out);
+ bclose(in);
+ return 0;
+}