]> mj.ucw.cz Git - bex.git/commitdiff
The big move -- introduced subcommands
authorMartin Mares <mj@ucw.cz>
Wed, 15 Feb 2012 20:22:16 +0000 (21:22 +0100)
committerMartin Mares <mj@ucw.cz>
Wed, 15 Feb 2012 20:22:16 +0000 (21:22 +0100)
27 files changed:
BEX.cf [deleted file]
BEX/config [new file with mode: 0644]
BEX/epilog [new file with mode: 0644]
BEX/prolog [new file with mode: 0644]
benq [deleted file]
bex [new file with mode: 0755]
bjob [deleted file]
bls [deleted file]
bprun [deleted file]
bq [deleted file]
brun [deleted file]
epilog [deleted file]
lib/BEX.pm [deleted file]
lib/BEX/Config.pm [deleted file]
lib/BEX/Job.pm [deleted file]
lib/BEX/Queue.pm [deleted file]
lib/bin/add [new file with mode: 0755]
lib/bin/job [new file with mode: 0755]
lib/bin/ls [new file with mode: 0755]
lib/bin/prun [new file with mode: 0755]
lib/bin/queue [new file with mode: 0755]
lib/bin/run [new file with mode: 0755]
lib/perl/BEX.pm [new file with mode: 0644]
lib/perl/BEX/Config.pm [new file with mode: 0644]
lib/perl/BEX/Job.pm [new file with mode: 0644]
lib/perl/BEX/Queue.pm [new file with mode: 0644]
prolog [deleted file]

diff --git a/BEX.cf b/BEX.cf
deleted file mode 100644 (file)
index 09c1b1b..0000000
--- a/BEX.cf
+++ /dev/null
@@ -1,12 +0,0 @@
-# Configuration of the Batch EXecutor
-# This is a Perl script, which can set anything in the BEX::Config package
-
-package BEX::Config;
-
-%machines = (
-       'albireo' => { 'Host' => 'albireo.burrow.ucw.cz' },
-       'localhost' => {},
-       'home' => ['albireo', 'localhost'],
-);
-
-42;
diff --git a/BEX/config b/BEX/config
new file mode 100644 (file)
index 0000000..09c1b1b
--- /dev/null
@@ -0,0 +1,12 @@
+# Configuration of the Batch EXecutor
+# This is a Perl script, which can set anything in the BEX::Config package
+
+package BEX::Config;
+
+%machines = (
+       'albireo' => { 'Host' => 'albireo.burrow.ucw.cz' },
+       'localhost' => {},
+       'home' => ['albireo', 'localhost'],
+);
+
+42;
diff --git a/BEX/epilog b/BEX/epilog
new file mode 100644 (file)
index 0000000..bc2da7a
--- /dev/null
@@ -0,0 +1 @@
+# BEX epilog
diff --git a/BEX/prolog b/BEX/prolog
new file mode 100644 (file)
index 0000000..3350a91
--- /dev/null
@@ -0,0 +1,3 @@
+#!/bin/sh
+# BEX prolog
+set -e
diff --git a/benq b/benq
deleted file mode 100755 (executable)
index 1f92e94..0000000
--- a/benq
+++ /dev/null
@@ -1,102 +0,0 @@
-#!/usr/bin/perl
-# Batch EXecutor 2.0 -- Insert to Queue
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use File::stat;
-
-use lib 'lib';
-use BEX;
-
-my $given_body;
-my $given_go;
-my $given_id;
-my $queue_name;
-my $requeue_id;
-my $given_subject;
-my $given_template;
-
-GetOptions(
-       "b|body=s" => \$given_body,
-       "g|go!" => \$given_go,
-       "i|id=s" => \$given_id,
-       "q|queue=s" => \$queue_name,
-       "r|requeue=s" => \$requeue_id,
-       "s|subject=s" => \$given_subject,
-       "t|template=s" => \$given_template,
-) or die <<AMEN ;
-Usage: benq [<options>] [!]<machine-or-class> ...
-
-Options:
--b, --body=<file>      Load job body from the given file
--g, --go               Do not run editor, go enqueue the job immediately
--i, --id=<id>          Set job ID of the new job
--q, --queue=<name>     Insert new jobs to the given queue
--r, --requeue=<id>     Re-queue an existing job instead of creating a new one
--s, --subject=<subj>   Set subject of the new job
--t, --template=<file>  Load job template (headers and body) from the given file
-AMEN
-
-# Prepare machine set
-@ARGV or die "No machines specified\n";
-my @machines = BEX::Config::parse_machine_list(@ARGV);
-@machines or die "No machines match\n";
-
-my $queue = BEX::Queue->new($queue_name);
-my $job;
-my $tmp_fn;
-
-if (defined $requeue_id) {
-       # When requeueing, just fetch the existing job
-       if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) {
-               die "Parameters of a requeued job cannot be changed\n";
-       }
-       my $fn = $queue->job_file($requeue_id);
-       -f $fn or die "Job $requeue_id not known\n";
-       $job = BEX::Job->new_from_file($fn);
-} else {
-       # Create job template
-       if (defined $given_template) {
-               $job = BEX::Job->new_from_file($given_template);
-       } else {
-               $job = BEX::Job->new;
-       }
-       $job->attr('ID', $given_id) if defined $given_id;
-       $job->attr('Subject', $given_subject) if defined $given_subject;
-       if (defined $given_body) {
-               open B, '<', $given_body or die "Cannot open $given_body: $!\n";
-               local $/;
-               $job->attr('body', <B>);
-               close B;
-       }
-
-       # Let the user edit the template
-       if (!$given_go) {
-               $tmp_fn = $job->save;
-               my $orig_stat = stat($tmp_fn) or die;
-               system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n";
-               my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n";
-               if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) {
-                       unlink $tmp_fn;
-                       die "Cancelled\n";
-               }
-               $job = BEX::Job->new_from_file($tmp_fn);
-       }
-}
-
-# Put the job to the queue
-print "New job ", $job->id, "\n";
-for my $m (@machines) {
-       if ($queue->enqueue($m, $job)) {
-               $queue->update_job_status($m, $job->id, 'NEW');
-               print "\t$m\n";
-       } else {
-               $queue->log($m, $job->id, 'REQUEUE');
-               print "\t$m (already queued)\n";
-       }
-}
-
-# Remove the temporary file if there's any
-unlink $tmp_fn if defined $tmp_fn;
diff --git a/bex b/bex
new file mode 100755 (executable)
index 0000000..ac89512
--- /dev/null
+++ b/bex
@@ -0,0 +1,51 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Master Program
+# (c) 2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+
+my $bex_home = $ENV{"BEX_HOME"} // ".";
+my $bex_lib = $ENV{"BEX_LIB"} // "lib";
+
+Getopt::Long::Configure('require_order');
+GetOptions(
+       "home=s" => \$bex_home,
+       "lib=s" => \$bex_lib,
+       "help" => sub {
+                       print "Usage: brum\n";
+                       exit 0;
+               },
+       "version" => sub {
+                       print "BEX 3.0 (c) 2011-2012 Martin Mares <mj\@ucw.cz>\n";
+               },
+) or die "Try `bex --help' for more information.\n";
+Getopt::Long::Configure('default');
+
+if (!-d $bex_home) {
+       die "BEX home directory $bex_home does not exist.\n";
+}
+if (!-d "$bex_home/BEX") {
+       die "BEX home directory $bex_home does not contain the BEX subdirectory.\n";
+}
+
+@ARGV or die "Missing subcommand.\n";
+my $sub = shift @ARGV;
+$sub =~ /^[0-9a-zA-Z]+$/ or die "Invalid subcommand $sub\n";
+
+my %aliases = (
+       'a' => 'add',
+       'q' => 'queue',
+       'r' => 'run',
+);
+if (defined $aliases{$sub}) { $sub = $aliases{$sub}; }
+
+my $sub_path = "$bex_lib/bin/$sub";
+-x $sub_path or die "Unknown subcommand $sub\n";
+
+$ENV{"BEX_HOME"} = $bex_home;
+$ENV{"BEX_LIB"} = $bex_lib;
+$ENV{"PERL5LIB"} = join(":", $bex_lib . "/perl", $ENV{"PERL5LIB"} // ());
+exec $sub_path, @ARGV;
+die "Cannot execute $sub_path: $!\n";
diff --git a/bjob b/bjob
deleted file mode 100755 (executable)
index 0d6f468..0000000
--- a/bjob
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/perl
-# Batch EXecutor 2.0 -- Operations on a Job
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-
-use lib 'lib';
-use BEX;
-
-my $edit;
-my $queue_name;
-
-GetOptions(
-       "e|edit!" => \$edit,
-       "q|queue=s" => \$queue_name,
-) && @ARGV == 1 or die <<AMEN ;
-Usage: bjob [<options>] <job-id>
-
-Options:
--e, --edit             Run editor on the given job (no locking)
--q, --queue=<name>     Act on the given queue
-AMEN
-
-my $queue = BEX::Queue->new($queue_name);
-my $fn = $queue->job_file($ARGV[0]);
--f $fn or die "No such job " . $ARGV[0] . "\n";
-
-if ($edit) {
-       system "editor", $fn;
-} else {
-       system "cat", $fn;
-}
diff --git a/bls b/bls
deleted file mode 100755 (executable)
index be558d3..0000000
--- a/bls
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/usr/bin/perl
-# Batch EXecutor 2.0 -- List Machines and Groups
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-
-use lib 'lib';
-use BEX;
-
-my $edit;
-my $queue_name;
-
-GetOptions(
-) && @ARGV == 0 or die <<AMEN ;
-Usage: bjob [<options>]
-
-Options:
-None defined so far.
-AMEN
-
-my $machines = \%BEX::Config::machines;
-
-print "# Hosts:\n";
-for my $h (sort keys %$machines) {
-       my $m = $machines->{$h};
-       ref $m eq 'HASH' or next;
-       print "$h\n";
-}
-
-print "\n# Groups:\n";
-for my $h (sort keys %$machines) {
-       my $m = $machines->{$h};
-       ref $m eq 'ARRAY' or next;
-       print "$h = ", join(" ",
-               map {
-                       my $x = $machines->{$_};
-                       !defined($x) ? "$_?" :
-                       ref $x eq 'HASH' ? $_ :
-                       ref $x eq 'ARRAY' ? "\@$_" :
-                       "$_???"
-               } @$m), "\n";
-}
diff --git a/bprun b/bprun
deleted file mode 100755 (executable)
index 7a66f0b..0000000
--- a/bprun
+++ /dev/null
@@ -1,344 +0,0 @@
-#!/usr/bin/perl
-# Batch EXecutor 2.0 -- Parallel Execution Using Screen
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use feature 'switch';
-
-use Getopt::Long;
-use POSIX;
-
-use lib 'lib';
-use BEX;
-
-my $queue_name;
-my $screen_session = 'BEX';
-my $text_mode;
-
-GetOptions(
-       "q|queue=s" => \$queue_name,
-       "session=s" => \$screen_session,
-       "text!" => \$text_mode,
-) or die <<AMEN ;
-Usage: bprun [<options>] [[!]<machine-or-class> ...]
-
-Options:
--q, --queue=<name>     Run jobs in the given queue
-    --session=<name>   Job windows should be opened within the given screen
-                       session (default: BEX)
-    --text             Use textual user interface instead of curses
-AMEN
-
-system 'screen', '-S', $screen_session, '-X', 'select', '.';
-!$? or die "Screen session $screen_session not found\n";
-
-my $queue = BEX::Queue->new($queue_name);
-my $fifo_name = $queue->{'Name'} . '/status-fifo';
-unlink $fifo_name;
-mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
-open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
-
-my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
-
-my @machines = ();
-for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
-       my @jobs = $queue->scan($mach);
-       @jobs or next;
-       push @machines, $mach;
-       for (@jobs) { $ui->update($mach, $_, 'READY'); }
-}
-
-my %running = ();
-my $max = $BEX::Config::max_parallel_jobs;
-
-while (keys %running || @machines) {
-       if (@machines && keys %running < $max) {
-               my $mach = shift @machines;
-               $ui->update($mach, undef, 'START');
-               my @scr = ('screen', '-t', $mach);
-               push @scr, '-S', $screen_session if defined $screen_session;
-               push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach;
-               system @scr;
-               !$? or $ui->update($mach, undef, 'INTERR');
-               $running{$mach} = 'START';
-               next;
-       }
-       $_ = <FIFO>;
-       chomp;
-       my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
-       if (!defined $stat) {
-               $ui->err("Received invalid status message <$_>");
-               next;
-       }
-       if (!defined $running{$mach}) {
-               $ui->err("Received status message <$_> for a machine which does not run");
-               next;
-       }
-       $running{$mach} = $stat;
-       $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
-       if ($stat eq 'DONE') {
-               delete $running{$mach};
-       }
-}
-
-close FIFO;
-unlink $fifo_name;
-$ui->done;
-
-package BEX::bprun::text;
-
-sub new($) {
-       return bless {};
-}
-
-sub done($) {
-}
-
-sub update($$$$) {
-       my ($ui, $mach, $jid, $stat) = @_;
-       print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
-}
-
-sub err($$) {
-       my ($ui, $msg) = @_;
-       print STDERR "ERROR: $msg\n";
-}
-
-package BEX::bprun::curses;
-
-use Curses;
-
-my $C;
-
-my $nrows;
-my @by_row = ();
-my %by_host = ();
-
-my %host_state;
-my %host_cnt;
-
-my %job_state;
-my %job_cnt;
-
-my %host_last_fail_job;
-my %host_last_fail_stat;
-
-sub new($) {
-       $C = new Curses;
-       start_color;
-       has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
-       cbreak; noecho;
-       $C->intrflush(0);
-       $C->keypad(1);
-       $C->meta(1);
-       $C->clear;
-       init_pair(1, COLOR_YELLOW, COLOR_BLUE);
-       init_pair(2, COLOR_YELLOW, COLOR_RED);
-       init_pair(3, COLOR_YELLOW, COLOR_BLACK);
-       init_pair(4, COLOR_RED, COLOR_BLACK);
-
-       $nrows = $C->getmaxy - 2;
-       if ($BEX::Config::max_parallel_jobs > $nrows) {
-               $BEX::Config::max_parallel_jobs = $nrows;
-       }
-
-       %host_state = %host_cnt = ();
-       %job_state = %job_cnt = ();
-       for my $s ('unknown', 'ready', 'running', 'done', 'failed') {
-               $host_cnt{$s} = 0;
-               $job_cnt{'*'}{$s} = 0;
-       }
-
-       my $ui = bless {};
-       $ui->refresh_status;
-       return $ui;
-}
-
-sub done($)
-{
-       $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
-       $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
-       $C->clrtoeol;
-       $C->getch;
-       endwin;
-}
-
-sub err($$) {
-       my ($ui, $msg) = @_;
-       $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
-       $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
-       $C->clrtoeol;
-       $C->refresh;
-}
-
-sub set_host_status($$$) {
-       my ($ui, $mach, $stat) = @_;
-       my $prev_stat = $host_state{$mach};
-       if (defined $prev_stat) {
-               $host_cnt{$prev_stat}--;
-       } else {
-               for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; }
-       }
-       $host_state{$mach} = $stat;
-       $host_cnt{$stat}++;
-}
-
-sub set_job_status($$$$) {
-       my ($ui, $mach, $jid, $stat) = @_;
-       my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
-       $job_cnt{$mach}{$prev_stat}--;
-       $job_cnt{'*'}{$prev_stat}--;
-       $job_state{$mach}{$jid} = $stat;
-       $job_cnt{$mach}{$stat}++;
-       $job_cnt{'*'}{$stat}++;
-}
-
-sub refresh_status($) {
-       $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
-       $C->addnstr(0, 0,
-               sprintf("BEX  Hosts: %dR %dD %dE %dW  Jobs: %dR %dD %dE %dW",
-                       $host_cnt{'running'},
-                       $host_cnt{'done'},
-                       $host_cnt{'failed'},
-                       $host_cnt{'ready'},
-                       $job_cnt{'*'}{'running'},
-                       $job_cnt{'*'}{'done'},
-                       $job_cnt{'*'}{'failed'},
-                       $job_cnt{'*'}{'ready'},
-               ), $C->getmaxx);
-       $C->clrtoeol;
-       $C->refresh;
-}
-
-sub get_slot($) {
-       my ($mach) = @_;
-       my $s;
-       if (defined ($s = $by_host{$mach})) {
-               delete $s->{'Gone'};
-       } else {
-               my ($best, $besti);
-               for my $i (0..$nrows-1) {
-                       my $r = $by_row[$i];
-                       if (!defined $r) {
-                               $besti = $i;
-                               $best = undef;
-                               last;
-                       } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) {
-                               $besti = $i;
-                               $best = $r;
-                       }
-               }
-               if ($best) {
-                       delete $by_host{$best->{'Host'}};
-               }
-               $s->{'Host'} = $mach;
-               $s->{'Row'} = $besti;
-               $by_host{$mach} = $s;
-               $by_row[$besti] = $s;
-       }
-       return $s;
-}
-
-my $gone_counter = 1;
-sub delete_slot($) {
-       my ($s) = @_;
-       $s->{'Gone'} = $gone_counter++;
-}
-
-sub redraw_slot($) {
-       my ($s) = @_;
-       my $mach = $s->{'Host'};
-       my $stat = $s->{'Status'} // "?";
-       my $jid = $s->{'Job'} // "";
-       my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
-       my $jcnt = $job_cnt{$mach};
-       if ($jcnt->{'running'}) {
-               if ($jcnt->{'failed'}) {
-                       $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
-               } else {
-                       $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
-               }
-       } else {
-               if ($jcnt->{'failed'}) {
-                       $C->bkgdset(COLOR_PAIR(4));
-               } else {
-                       $C->bkgdset(0);
-               }
-       }
-       my $r = $s->{'Row'} + 1;
-       $C->addstr($r, 0, sprintf("%-20.20s", $mach));
-       if ($jcnt->{'failed'}) {
-               $C->bkgdset(COLOR_PAIR(4));
-               $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
-       } else {
-               $C->bkgdset(0);
-               $C->addstr("     ");
-       }
-       $C->bkgdset(0);
-       $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
-       if ($stat eq 'DONE') {
-               if (defined $host_last_fail_stat{$mach}) {
-                       $C->bkgdset(COLOR_PAIR(4));
-                       $C->addstr(sprintf("  %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach})));
-               }
-       } else {
-               my $text = sprintf("  %-8s %s", $stat, $jname);
-               $C->addstr($text);
-       }
-       $C->clrtoeol;
-       $C->refresh;
-}
-
-sub update($$$$) {
-       my ($ui, $mach, $jid, $stat) = @_;
-       my $s = get_slot($mach);
-       given ($stat) {
-               when ('READY') {
-                       # Pseudo-state generated internally
-                       $ui->set_host_status($mach, 'ready');
-                       $ui->set_job_status($mach, $jid, 'ready');
-               }
-               when ('OK') {
-                       $ui->set_job_status($mach, $jid, 'done');
-               }
-               when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) {
-                       $ui->set_job_status($mach, $jid, 'failed');
-                       $host_last_fail_job{$mach} = $jid;
-                       $host_last_fail_stat{$mach} = $stat;
-               }
-               when ('DONE') {
-                       if ($job_cnt{$mach}{'failed'}) {
-                               $ui->set_host_status($mach, 'failed');
-                       } else {
-                               $ui->set_host_status($mach, 'done');
-                       }
-               }
-               when ('INIT') {
-                       $ui->set_host_status($mach, 'running');
-                       $ui->set_job_status($mach, $jid, 'running') if defined $jid;
-               }
-               when ('LOCKED') {
-                       if (defined $jid) {
-                               $ui->set_job_status($mach, $jid, 'failed');
-                       } else {
-                               for my $j (keys %{$job_state{$mach}}) {
-                                       $ui->set_job_status($mach, $jid, 'failed');
-                               }
-                               $ui->set_host_status($mach, 'failed');
-                               $host_last_fail_job{$mach} = $jid;
-                               $host_last_fail_stat{$mach} = $stat;
-                       }
-               }
-               when (['START', 'PING', 'SEND', 'RUN']) {
-               }
-               default {
-                       $ui->err("Received unknown job status $stat");
-               }
-       }
-       $s->{'Job'} = $jid;
-       $s->{'Status'} = $stat;
-       redraw_slot($s);
-       if ($stat eq 'DONE') { delete_slot($s); }
-       $ui->refresh_status;
-}
diff --git a/bq b/bq
deleted file mode 100755 (executable)
index 6fb7a17..0000000
--- a/bq
+++ /dev/null
@@ -1,158 +0,0 @@
-#!/usr/bin/perl
-# Batch EXecutor 2.0 -- Show Queued Jobs
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use POSIX;
-
-use lib 'lib';
-use BEX;
-
-my $op_by_job;
-my $op_by_host;
-my $op_rm;
-my $op_move_to;
-
-my $queue_name;
-my $given_job;
-
-GetOptions(
-       "by-job!" => \$op_by_job,
-       "h|by-host!" => \$op_by_host,
-       "rm!" => \$op_rm,
-       "move-to=s" => \$op_move_to,
-       "j|job=s" => \$given_job,
-       "q|queue=s" => \$queue_name,
-) or die <<AMEN ;
-Usage: bq [<options and actions>] [[!]<machine-or-class> ...]
-
-Actions:
-    --by-job           Show jobs sorted by job ID (default)
--h, --by-host          Show jobs sorted by host
-    --rm               Remove jobs from the queue
-    --move-to=<queue>  Move jobs to a different queue
-
-Options:
--j, --job=<id>         Act on the specified job (default: on all)
--q, --queue=<name>     Act on the given queue
-AMEN
-
-my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
-my $queue = BEX::Queue->new($queue_name);
-
-# Select jobs
-my %jobs = ();
-my %machs = ();
-for my $m (@machines) {
-       for my $j ($queue->scan($m)) {
-               if (defined $given_job) {
-                       next if $j ne $given_job;
-               }
-               push @{$jobs{$j}}, $m;
-               push @{$machs{$m}}, $j;
-       }
-}
-
-sub do_ls();
-sub do_rm();
-sub do_move_to();
-
-my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to);
-if ($ops > 1) { die "Multiple actions are not allowed\n"; }
-
-if ($op_rm) { do_rm(); }
-elsif (defined $op_move_to) { do_move_to(); }
-else { do_ls(); }
-exit 0;
-
-sub do_ls()
-{
-       my %stat = ();
-       my %mach_locked = ();
-       for my $m (keys %machs) {
-               $mach_locked{$m} = $queue->is_locked($m, undef);
-               for my $j (@{$machs{$m}}) {
-                       my $st = $queue->read_job_status($m, $j);
-                       if (defined($st->{'Time'}) && defined($st->{'Status'})) {
-                               $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
-                                               POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']';
-                       } else {
-                               $stat{$m}{$j} = '';
-                       }
-                       if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
-                               $stat{$m}{$j} .= ' [LOCKED]';
-                       }
-               }
-       }
-
-       if ($queue->is_locked(undef, undef)) {
-               print "### Queue lock present\n\n";
-       }
-
-       if ($op_by_host) {
-               for my $m (sort keys %machs) {
-                       print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n";
-                       for my $j (@{$machs{$m}}) {
-                               print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n";
-                       }
-               }
-       } else {
-               for my $j (sort keys %jobs) {
-                       print $queue->job_name($j), "\n";
-                       for my $m (sort @{$jobs{$j}}) {
-                               print "\t$m", $stat{$m}{$j}, "\n";
-                       }
-               }
-       }
-}
-
-sub do_rm()
-{
-       my $err = 0;
-       for my $m (sort keys %machs) {
-               for my $j (sort @{$machs{$m}}) {
-                       if (!$queue->lock($m, $j)) {
-                               print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n";
-                               $err = 1;
-                       } else {
-                               $queue->update_job_status($m, $j, 'REMOVED');
-                               $queue->remove($m, $j);
-                               print "Removed $m:", $queue->job_name($j), "\n";
-                       }
-               }
-       }
-       $queue->unlock;
-       exit $err;
-}
-
-sub do_move_to()
-{
-       my $err = 0;
-       my $dest = BEX::Queue->new($op_move_to);
-       $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n";
-       for my $j (sort keys %jobs) {
-               my $job = BEX::Job->new_from_file($queue->job_file($j));
-               for my $m (sort @{$jobs{$j}}) {
-                       if (!$queue->lock($m, $j)) {
-                               print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n";
-                               $err = 1;
-                       } else {
-                               my $enq = $dest->enqueue($m, $job);
-                               if ($enq) {
-                                       $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue');
-                               } else {
-                                       $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue');
-                               }
-                               $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue');
-                               $queue->remove($m, $j);
-                               print "Moved $m:", $dest->job_name($j);
-                               print " (already queued)" if !$enq;
-                               print "\n";
-                       }
-               }
-       }
-       $queue->unlock;
-       exit $err;
-}
diff --git a/brun b/brun
deleted file mode 100755 (executable)
index 040b6ba..0000000
--- a/brun
+++ /dev/null
@@ -1,194 +0,0 @@
-#!/usr/bin/perl
-# Batch EXecutor 2.0 -- Run Queued Jobs
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-
-use lib 'lib';
-use BEX;
-
-my $given_job;
-my $queue_name;
-my $status_fifo;
-
-GetOptions(
-       "j|job=s" => \$given_job,
-       "q|queue=s" => \$queue_name,
-       "s|status-fifo=s" => \$status_fifo,
-) or die <<AMEN ;
-Usage: brun [<options>] [[!]<machine-or-class> ...]
-
-Options:
--j, --job=<id>         Run only the specified job
--q, --queue=<name>     Select job queue
-    --status-fifo=<f>  Send status updates to the given named pipe
-AMEN
-
-my $status_fd;
-if (defined $status_fifo) {
-       open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
-       autoflush $status_fd, 1;
-}
-
-sub update_status($$$$;$) {
-       my ($mach, $job, $status, $log_on_queue, $msg) = @_;
-       if ($status_fd) {
-               print $status_fd "! $mach $job $status\n";
-       }
-       if ($log_on_queue) {
-               $log_on_queue->update_job_status($mach, $job, $status, $msg);
-       }
-}
-
-my %pings;
-
-sub ping_machine($) {
-       my ($mach) = @_;
-       if (!defined $pings{$mach}) {
-               if ($BEX::Config::ping_hosts) {
-                       update_status($mach, '-', 'PING', undef);
-                       my $host = BEX::Config::host_name($mach);
-                       `ping -c1 -n $host >/dev/null 2>/dev/null`;
-                       $pings{$mach} = !$?;
-               } else {
-                       $pings{$mach} = 1;
-               }
-       }
-       if ($pings{$mach}) {
-               return ('OK', undef);
-       } else {
-               return ('NOPING', 'Does not ping');
-       }
-}
-
-sub exit_status($) {
-       my ($s) = @_;
-       if ($s >> 8) {
-               return "with exit code " . ($s >> 8);
-       } else {
-               return "on fatal signal " . ($s & 127);
-       }
-}
-
-sub run_job_prep($$$) {
-       my ($job, $queue, $mach) = @_;
-       my $prep = $job->attr('Prep');
-       defined($prep) && $prep !~ /^\s*$/ or return 'OK';
-
-       my $jid = $job->id;
-       update_status($mach, $jid, 'PREP', $queue);
-       my $lf = $queue->log_file($mach, $jid);
-       $ENV{'HOST'} = BEX::Config::host_name($mach);
-       system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
-       delete $ENV{'HOST'};
-       if ($?) {
-               return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
-       } else {
-               return 'OK';
-       }
-}
-
-sub run_job_body($$$) {
-       my ($job, $queue, $mach) = @_;
-
-       if ($job->attr('body') =~ /^\s*$/s) {
-               # Shortcut if the body is empty
-               return 'OK'
-       }
-
-       my $host = BEX::Config::host_name($mach);
-       my $jid = $job->id;
-
-       my $tmp = $queue->temp_file($mach, $jid);
-       open T, '>', $tmp or die;
-       if (defined $BEX::Config::job_prolog) {
-               open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
-               while (<P>) { print T; }
-               close P;
-       } else {
-               print T "#!/bin/sh\n";
-       }
-       print T "# BEX job ", $jid, "\n";
-       print T $job->attr('body');
-       if (defined $BEX::Config::job_epilog) {
-               open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
-               while (<E>) { print T; }
-               close E;
-       }
-       close T;
-
-       update_status($mach, $jid, 'SEND', undef);
-       my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
-       my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
-       !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
-       chomp $rtmp;
-
-       update_status($mach, $jid, 'RUN', $queue);
-       my $lf = $queue->log_file($mach, $jid);
-       system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
-       if ($?) {
-               return ('FAILED', 'Job failed ' . exit_status($?));
-       } else {
-               return 'OK';
-       }
-}
-
-sub run_job($$$) {
-       my ($job, $queue, $mach) = @_;
-       my ($stat, $msg);
-
-       ($stat, $msg) = ping_machine($mach);
-       $stat eq 'OK' or return ($stat, $msg);
-
-       ($stat, $msg) = run_job_prep($job, $queue, $mach);
-       $stat eq 'OK' or return ($stat, $msg);
-
-       return run_job_body($job, $queue, $mach);
-}
-
-my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
-my $queue = BEX::Queue->new($queue_name);
-
-$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
-
-for my $mach (@machines) {
-       my @q = $queue->scan($mach) or next;
-       if (!$queue->lock($mach, undef)) {
-               print "### Machine $mach is locked by another brun, skipping...\n";
-               update_status($mach, '-', 'LOCKED', undef);
-               update_status($mach, '-', 'DONE', undef);
-               next;
-       }
-       update_status($mach, '-', 'INIT', undef);
-       while (my $jid = shift @q) {
-               if (defined $given_job) {
-                       $jid eq $given_job or next;
-               }
-               my $job = BEX::Job->new_from_file($queue->job_file($jid));
-               update_status($mach, $jid, 'INIT', undef);
-               if (!$queue->lock($mach, $jid)) {
-                       print "### Skipping locked $jid on $mach ###\n";
-                       update_status($mach, $jid, 'LOCKED', undef);
-                       next;
-               }
-               print "### Running ", $job->name, " on $mach ###\n";
-               my ($s, $msg) = run_job($job, $queue, $mach);
-               update_status($mach, $jid, $s, $queue, $msg);
-
-               if ($s eq 'OK') {
-                       print "+++ OK\n";
-                       $queue->remove($mach, $jid);
-               } else {
-                       print "--- $s: $msg\n";
-                       if ($BEX::Config::skip_on_fail) {
-                               print "### Skipping other jobs on the same host ###\n" if @q;
-                               last;
-                       }
-               }
-       }
-} continue {
-       update_status($mach, '-', 'DONE', undef);
-}
-$queue->unlock;
diff --git a/epilog b/epilog
deleted file mode 100644 (file)
index bc2da7a..0000000
--- a/epilog
+++ /dev/null
@@ -1 +0,0 @@
-# BEX epilog
diff --git a/lib/BEX.pm b/lib/BEX.pm
deleted file mode 100644 (file)
index 579eff2..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-# Batch EXecutor 2.0
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-
-package BEX;
-
-use BEX::Config;
-use BEX::Job;
-use BEX::Queue;
-
-42;
diff --git a/lib/BEX/Config.pm b/lib/BEX/Config.pm
deleted file mode 100644 (file)
index a5ee67e..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-# Batch EXecutor 2.0 -- Configuration
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-
-package BEX::Config;
-
-# This file specifies default values, which can be overridden in BEX.cf
-
-# A hash of all known machines and groups
-# 'name' => { Option => ... }  for a machine; options:
-#      Host => 'name'                  Host name to use in ssh, ping, ...
-# 'name' => ['a','b','c']      for a group containing specified machines/subgroups
-our %machines = (
-);
-
-
-# A file whose contents should be prepended before the job. Should start with the "#!" line.
-our $job_prolog = 'prolog';
-
-# A file whose contents should be appended to the job
-our $job_epilog = 'epilog';
-
-# Keep history of successfully completed jobs
-our $keep_history = 1;
-
-# Before we try to connect to a host, ping it to check if it's alive
-our $ping_hosts = 1;
-
-# Whenever we want to run a job on a machine, we must obtain a lock.
-# Available locking schemes are:
-#      none - no locking takes place (dangerous!)
-#      job - obtain exclusive access to the job, other jobs on the same
-#            host can run in parallel
-#      host - obtain exclusive access to the host, other hosts can run
-#      queue - obtain exclusive access to the whole queue
-our $locking_scheme = 'host';
-
-# Maximum number of simultaneously running jobs in `bprun'
-our $max_parallel_jobs = 5;
-
-# When a job fails, skip all other jobs on the same host
-# (however, when locking_scheme is set to `job', another instance of `brun'
-# still could run such jobs in parallel)
-our $skip_on_fail = 0;
-
-# How we run ssh (including options)
-our $ssh_command = "ssh";
-
-# Various utility functions
-
-sub parse_machine_list(@);
-
-sub parse_machine_list(@) {
-       my %set = ();
-       for my $m (@_) {
-               if ($m eq '*') {
-                       for my $mm (keys %machines) {
-                               if (ref($machines{$mm}) eq 'HASH') {
-                                       $set{$mm} = 1;
-                               }
-                       }
-                       next;
-               }
-               my $op = 1;
-               if ($m =~ s{^!}{}) { $op = 0; }
-               my $v = $machines{$m};
-               if (!defined $v) {
-                       die "Unknown machine or class: $m\n";
-               } elsif (ref($v) eq 'HASH') {
-                       $set{$m} = $op;
-               } elsif (ref($v) eq 'ARRAY') {
-                       for my $mm (parse_machine_list(@$v)) {
-                               $set{$mm} = $op;
-                       }
-               }
-       }
-       return sort grep { $set{$_} } keys %set;
-}
-
-sub host_name($) {
-       my ($mach) = @_;
-       return $machines{$mach}->{'Host'} // $mach;
-}
-
-require 'BEX.cf';
-
-42;
diff --git a/lib/BEX/Job.pm b/lib/BEX/Job.pm
deleted file mode 100644 (file)
index d040f1a..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-# Batch EXecutor 2.0 -- Jobs
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-
-package BEX::Job;
-
-use POSIX ();
-
-our $job_cnt = 0;
-
-sub check_id($) {
-       my ($id) = @_;
-       return $id =~ /^([0-9A-Za-z-]+)$/;
-}
-
-sub new($;$) {
-       my ($class, $id) = @_;
-       my $job = { };
-       bless $job;
-       if (defined $id) {
-               check_id($id) or die "Invalid job ID";
-               $job->{'ID'} = $id;
-       } else {
-               $job_cnt++;
-               $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
-       }
-       $job->{'Subject'} = '';
-       return $job;
-}
-
-sub new_from_file($$;$) {
-       my ($class, $file, $header_only) = @_;
-       my $job = { };
-       open T, '<', $file or die "Cannot open $file: $!";
-       while (<T>) {
-               chomp;
-               /^$/ and last;
-               /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
-               !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
-               $job->{$1} = $2;
-       }
-       if (!$header_only) {
-               my @cmds = <T>;
-               $job->{'body'} = join("", @cmds);
-       }
-       close T;
-       $job->{'Subject'} //= '';
-       $job->{'ID'} or die "Cannot load $file: Missing ID";
-       check_id($job->{'ID'}) or die "Cannot load $file: Invalid ID syntax";
-       return bless $job;
-}
-
-sub id($) {
-       return $_[0]->{'ID'};
-}
-
-sub name($) {
-       my ($job) = @_;
-       my $name = $job->{'ID'};
-       my $subj = $job->{'Subject'} // "";
-       $name .= " ($subj)" if $subj !~ /^\s*$/;
-       return $name;
-}
-
-sub attr($$;$) {
-       my ($job, $attr, $val) = @_;
-       $job->{$attr} = $val if defined $val;
-       return $job->{$attr};
-}
-
-sub dump($) {
-       my ($job) = @_;
-       for my $k (sort keys %$job) {
-               print "$k: ", $job->{$k}, "\n";
-       }
-}
-
-sub save($;$) {
-       my ($job, $fn) = @_;
-       -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
-       $fn //= 'tmp/' . $job->id;
-       open T, '>', $fn or die "Cannot create $fn: $!";
-       for my $k (sort grep { /^[A-Z]/ } keys %$job) {
-               print T "$k: ", $job->{$k}, "\n";
-       }
-       print T "\n";
-       print T $job->{'body'} if defined $job->{'body'};
-       close T;
-       return $fn;
-}
-
-42;
diff --git a/lib/BEX/Queue.pm b/lib/BEX/Queue.pm
deleted file mode 100644 (file)
index 3fb94f3..0000000
+++ /dev/null
@@ -1,249 +0,0 @@
-# Batch EXecutor 2.0 -- Queues
-# (c) 2011 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use feature 'switch';
-
-package BEX::Queue;
-
-use IO::File;
-use File::Path;
-use Fcntl qw(:flock);
-use POSIX ();
-
-sub new($;$) {
-       my ($class, $name) = @_;
-       $name //= 'queue';
-       -d $name or die "Queue directory $name does not exist\n";
-       for my $d ("hosts", "jobs") {
-               -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
-       }
-       my $queue = {
-               'Name' => $name,
-               'MetaCache' => {},
-       };
-       return bless $queue;
-}
-
-sub log_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.log';
-}
-
-# Most actions have to be logged by the caller
-sub log($$$$;$) {
-       my ($queue, $mach, $jid, $stat, $msg) = @_;
-       my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
-       my $m = join(" ", $t, $mach, $jid, $stat);
-       $m .= " $msg" if defined $msg;
-
-       my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!";
-       print $fh "$m\n";
-
-       # Append to the per-job log file
-       if (open L, '>>', $queue->log_file($mach, $jid)) {
-               print L "### $m\n";
-               close L;
-       }
-}
-
-sub host_dir($$) {
-       my ($queue, $machine) = @_;
-       return $queue->{'Name'} . '/hosts/' . $machine;
-}
-
-sub queue_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.job';
-}
-
-sub status_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.stat';
-}
-
-sub temp_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.tmp';
-}
-
-sub job_file($$) {
-       my ($queue, $jid) = @_;
-       return $queue->{'Name'} . '/jobs/' . $jid. '.job';
-}
-
-sub enqueue($$$) {
-       my ($queue, $machine, $job) = @_;
-       my $qf = $queue->queue_file($machine, $job->id);
-       if (-f $qf) {
-               return 0;
-       }
-       my $fn = $queue->job_file($job->id);
-       -f $fn or $job->save($fn);
-       my $dir = $queue->host_dir($machine);
-       -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
-       symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
-       return 1;
-}
-
-sub scan($$) {
-       my ($queue, $machine) = @_;
-       my @list = ();
-       if (opendir D, $queue->host_dir($machine)) {
-               while ($_ = readdir D) {
-                       /^\./ and next;
-                       s{\.job}{} or next;
-                       push @list, $_;
-               }
-               closedir D;
-       }
-       return sort @list;
-}
-
-sub remove($$;$) {
-       my ($queue, $machine, $jid, $force_remove) = @_;
-       if ($BEX::Config::keep_history && !$force_remove) {
-               my $s = $queue->{'Name'} . '/hosts/' . $machine;
-               my $d = $queue->{'Name'} . '/history/' . $machine;
-               File::Path::mkpath($d);
-               for my $suff ('job', 'stat', 'log') {
-                       my $src = "$s/$jid.$suff";
-                       my $dst = "$d/$jid.$suff";
-                       if (-f $src) {
-                               rename $src, $dst or die "Cannot rename $src to $dst: $!";
-                       } else {
-                               # Might be present from the previous incarnation of the same job
-                               unlink $dst;
-                       }
-               }
-       } else {
-               unlink $queue->queue_file($machine, $jid);
-               unlink $queue->status_file($machine, $jid);
-               unlink $queue->log_file($machine, $jid);
-       }
-       unlink $queue->temp_file($machine, $jid);
-}
-
-sub job_metadata($$) {
-       my ($queue, $jid) = @_;
-       my $cache = $queue->{'MetaCache'};
-       if (!defined $cache->{$jid}) {
-               $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
-       }
-       return $cache->{$jid};
-}
-
-sub job_name($$) {
-       my ($queue, $jid) = @_;
-       return $queue->job_metadata($jid)->name;
-}
-
-sub read_job_status($$$) {
-       my ($queue, $machine, $jid) = @_;
-       my %s = ();
-       my $sf = $queue->status_file($machine, $jid);
-       if (open S, '<', $sf) {
-               while (<S>) {
-                       chomp;
-                       /^(\w+):\s*(.*)/ or die "Parse error in $sf";
-                       $s{$1} = $2;
-               }
-               close S;
-       }
-       return \%s;
-}
-
-sub write_job_status($$$$) {
-       my ($queue, $machine, $jid, $stat) = @_;
-       my $sf = $queue->status_file($machine, $jid);
-       open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
-       for my $k (sort keys %$stat) {
-               print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
-       }
-       close S;
-       rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
-}
-
-sub update_job_status($$$$;$) {
-       my ($queue, $machine, $jid, $stat, $msg) = @_;
-       my $s = {
-               'Time' => time,
-               'Status' => $stat,
-               'Message' => $msg,
-       };
-       $queue->write_job_status($machine, $jid, $s);
-       $queue->log($machine, $jid, $stat, $msg);
-}
-
-sub lock_name($$$) {
-       my ($queue, $machine, $jid) = @_;
-       my $lock = $queue->{'Name'};
-       if (defined $jid) {
-               $lock .= "/hosts/$machine/$jid.lock";
-       } elsif (defined $machine) {
-               $lock .= "/hosts/$machine/lock";
-       } else {
-               $lock .= '/lock';
-       }
-}
-
-# Whenever we want to run a job on a machine, we must obtain a lock;
-# at most one lock can be held at a time by a single BEX::Queue object.
-# See the description of locking schemes in BEX::Config.
-sub lock($$$) {
-       my ($queue, $machine, $jid) = @_;
-       my $lock;
-       given ($BEX::Config::locking_scheme) {
-               when ('queue') {
-                       $lock = lock_name($queue, undef, undef);
-               }
-               when ('host') {
-                       defined($machine) or return 1;
-                       $lock = lock_name($queue, $machine, undef);
-               }
-               when ('job') {
-                       defined($machine) && defined($jid) or return 1;
-                       $lock = lock_name($queue, $machine, $jid);
-               }
-               when ('none') { return 1; }
-               default { die "Invalid BEX::Config::locking_scheme"; }
-       }
-       if (defined($queue->{'LockName'})) {
-               return 1 if ($queue->{'LockName'} eq $lock);
-               $queue->unlock;
-       }
-       open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
-       if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
-               close $queue->{'LockHandle'};
-               delete $queue->{'LockHandle'};
-               return 0;
-       }
-       $queue->{'LockName'} = $lock;
-       return 1;
-}
-
-sub unlock($) {
-       my ($queue) = @_;
-       defined $queue->{'LockName'} or return;
-       unlink $queue->{'LockName'};
-       flock $queue->{'LockHandle'}, LOCK_UN;
-       close $queue->{'LockHandle'};
-       delete $queue->{'LockHandle'};
-       delete $queue->{'LockName'};
-}
-
-# Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq
-sub is_locked($$$) {
-       my ($queue, $machine, $jid) = @_;
-       given ($BEX::Config::locking_scheme) {
-               # Shortcuts
-               when ('host') { return unless defined $machine; }
-               when ('jid') { return unless defined $jid; }
-               when ('none') { return; }
-       }
-       my $lock = lock_name($queue, $machine, $jid);
-       return -f $lock;
-}
-
-42;
diff --git a/lib/bin/add b/lib/bin/add
new file mode 100755 (executable)
index 0000000..95fbbb8
--- /dev/null
@@ -0,0 +1,100 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use File::stat;
+use BEX;
+
+my $given_body;
+my $given_go;
+my $given_id;
+my $queue_name;
+my $requeue_id;
+my $given_subject;
+my $given_template;
+
+GetOptions(
+       "b|body=s" => \$given_body,
+       "g|go!" => \$given_go,
+       "i|id=s" => \$given_id,
+       "q|queue=s" => \$queue_name,
+       "r|requeue=s" => \$requeue_id,
+       "s|subject=s" => \$given_subject,
+       "t|template=s" => \$given_template,
+) or die <<AMEN ;
+Usage: benq [<options>] [!]<machine-or-class> ...
+
+Options:
+-b, --body=<file>      Load job body from the given file
+-g, --go               Do not run editor, go enqueue the job immediately
+-i, --id=<id>          Set job ID of the new job
+-q, --queue=<name>     Insert new jobs to the given queue
+-r, --requeue=<id>     Re-queue an existing job instead of creating a new one
+-s, --subject=<subj>   Set subject of the new job
+-t, --template=<file>  Load job template (headers and body) from the given file
+AMEN
+
+# Prepare machine set
+@ARGV or die "No machines specified\n";
+my @machines = BEX::Config::parse_machine_list(@ARGV);
+@machines or die "No machines match\n";
+
+my $queue = BEX::Queue->new($queue_name);
+my $job;
+my $tmp_fn;
+
+if (defined $requeue_id) {
+       # When requeueing, just fetch the existing job
+       if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) {
+               die "Parameters of a requeued job cannot be changed\n";
+       }
+       my $fn = $queue->job_file($requeue_id);
+       -f $fn or die "Job $requeue_id not known\n";
+       $job = BEX::Job->new_from_file($fn);
+} else {
+       # Create job template
+       if (defined $given_template) {
+               $job = BEX::Job->new_from_file($given_template);
+       } else {
+               $job = BEX::Job->new;
+       }
+       $job->attr('ID', $given_id) if defined $given_id;
+       $job->attr('Subject', $given_subject) if defined $given_subject;
+       if (defined $given_body) {
+               open B, '<', $given_body or die "Cannot open $given_body: $!\n";
+               local $/;
+               $job->attr('body', <B>);
+               close B;
+       }
+
+       # Let the user edit the template
+       if (!$given_go) {
+               $tmp_fn = $job->save;
+               my $orig_stat = stat($tmp_fn) or die;
+               system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n";
+               my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n";
+               if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) {
+                       unlink $tmp_fn;
+                       die "Cancelled\n";
+               }
+               $job = BEX::Job->new_from_file($tmp_fn);
+       }
+}
+
+# Put the job to the queue
+print "New job ", $job->id, "\n";
+for my $m (@machines) {
+       if ($queue->enqueue($m, $job)) {
+               $queue->update_job_status($m, $job->id, 'NEW');
+               print "\t$m\n";
+       } else {
+               $queue->log($m, $job->id, 'REQUEUE');
+               print "\t$m (already queued)\n";
+       }
+}
+
+# Remove the temporary file if there's any
+unlink $tmp_fn if defined $tmp_fn;
diff --git a/lib/bin/job b/lib/bin/job
new file mode 100755 (executable)
index 0000000..bfd2f31
--- /dev/null
@@ -0,0 +1,32 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+my $edit;
+my $queue_name;
+
+GetOptions(
+       "e|edit!" => \$edit,
+       "q|queue=s" => \$queue_name,
+) && @ARGV == 1 or die <<AMEN ;
+Usage: bjob [<options>] <job-id>
+
+Options:
+-e, --edit             Run editor on the given job (no locking)
+-q, --queue=<name>     Act on the given queue
+AMEN
+
+my $queue = BEX::Queue->new($queue_name);
+my $fn = $queue->job_file($ARGV[0]);
+-f $fn or die "No such job " . $ARGV[0] . "\n";
+
+if ($edit) {
+       system "editor", $fn;
+} else {
+       system "cat", $fn;
+}
diff --git a/lib/bin/ls b/lib/bin/ls
new file mode 100755 (executable)
index 0000000..5da501e
--- /dev/null
@@ -0,0 +1,42 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+my $edit;
+my $queue_name;
+
+GetOptions(
+) && @ARGV == 0 or die <<AMEN ;
+Usage: bjob [<options>]
+
+Options:
+None defined so far.
+AMEN
+
+my $machines = \%BEX::Config::machines;
+
+print "# Hosts:\n";
+for my $h (sort keys %$machines) {
+       my $m = $machines->{$h};
+       ref $m eq 'HASH' or next;
+       print "$h\n";
+}
+
+print "\n# Groups:\n";
+for my $h (sort keys %$machines) {
+       my $m = $machines->{$h};
+       ref $m eq 'ARRAY' or next;
+       print "$h = ", join(" ",
+               map {
+                       my $x = $machines->{$_};
+                       !defined($x) ? "$_?" :
+                       ref $x eq 'HASH' ? $_ :
+                       ref $x eq 'ARRAY' ? "\@$_" :
+                       "$_???"
+               } @$m), "\n";
+}
diff --git a/lib/bin/prun b/lib/bin/prun
new file mode 100755 (executable)
index 0000000..d8b9872
--- /dev/null
@@ -0,0 +1,341 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use feature 'switch';
+use Getopt::Long;
+use POSIX;
+use BEX;
+
+my $queue_name;
+my $screen_session = 'BEX';
+my $text_mode;
+
+GetOptions(
+       "q|queue=s" => \$queue_name,
+       "session=s" => \$screen_session,
+       "text!" => \$text_mode,
+) or die <<AMEN ;
+Usage: bprun [<options>] [[!]<machine-or-class> ...]
+
+Options:
+-q, --queue=<name>     Run jobs in the given queue
+    --session=<name>   Job windows should be opened within the given screen
+                       session (default: BEX)
+    --text             Use textual user interface instead of curses
+AMEN
+
+system 'screen', '-S', $screen_session, '-X', 'select', '.';
+!$? or die "Screen session $screen_session not found\n";
+
+my $queue = BEX::Queue->new($queue_name);
+my $fifo_name = $queue->{'Name'} . '/status-fifo';
+unlink $fifo_name;
+mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
+open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
+
+my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
+
+my @machines = ();
+for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
+       my @jobs = $queue->scan($mach);
+       @jobs or next;
+       push @machines, $mach;
+       for (@jobs) { $ui->update($mach, $_, 'READY'); }
+}
+
+my %running = ();
+my $max = $BEX::Config::max_parallel_jobs;
+
+while (keys %running || @machines) {
+       if (@machines && keys %running < $max) {
+               my $mach = shift @machines;
+               $ui->update($mach, undef, 'START');
+               my @scr = ('screen', '-t', $mach);
+               push @scr, '-S', $screen_session if defined $screen_session;
+               push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach;
+               system @scr;
+               !$? or $ui->update($mach, undef, 'INTERR');
+               $running{$mach} = 'START';
+               next;
+       }
+       $_ = <FIFO>;
+       chomp;
+       my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
+       if (!defined $stat) {
+               $ui->err("Received invalid status message <$_>");
+               next;
+       }
+       if (!defined $running{$mach}) {
+               $ui->err("Received status message <$_> for a machine which does not run");
+               next;
+       }
+       $running{$mach} = $stat;
+       $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
+       if ($stat eq 'DONE') {
+               delete $running{$mach};
+       }
+}
+
+close FIFO;
+unlink $fifo_name;
+$ui->done;
+
+package BEX::bprun::text;
+
+sub new($) {
+       return bless {};
+}
+
+sub done($) {
+}
+
+sub update($$$$) {
+       my ($ui, $mach, $jid, $stat) = @_;
+       print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
+}
+
+sub err($$) {
+       my ($ui, $msg) = @_;
+       print STDERR "ERROR: $msg\n";
+}
+
+package BEX::bprun::curses;
+
+use Curses;
+
+my $C;
+
+my $nrows;
+my @by_row = ();
+my %by_host = ();
+
+my %host_state;
+my %host_cnt;
+
+my %job_state;
+my %job_cnt;
+
+my %host_last_fail_job;
+my %host_last_fail_stat;
+
+sub new($) {
+       $C = new Curses;
+       start_color;
+       has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
+       cbreak; noecho;
+       $C->intrflush(0);
+       $C->keypad(1);
+       $C->meta(1);
+       $C->clear;
+       init_pair(1, COLOR_YELLOW, COLOR_BLUE);
+       init_pair(2, COLOR_YELLOW, COLOR_RED);
+       init_pair(3, COLOR_YELLOW, COLOR_BLACK);
+       init_pair(4, COLOR_RED, COLOR_BLACK);
+
+       $nrows = $C->getmaxy - 2;
+       if ($BEX::Config::max_parallel_jobs > $nrows) {
+               $BEX::Config::max_parallel_jobs = $nrows;
+       }
+
+       %host_state = %host_cnt = ();
+       %job_state = %job_cnt = ();
+       for my $s ('unknown', 'ready', 'running', 'done', 'failed') {
+               $host_cnt{$s} = 0;
+               $job_cnt{'*'}{$s} = 0;
+       }
+
+       my $ui = bless {};
+       $ui->refresh_status;
+       return $ui;
+}
+
+sub done($)
+{
+       $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
+       $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
+       $C->clrtoeol;
+       $C->getch;
+       endwin;
+}
+
+sub err($$) {
+       my ($ui, $msg) = @_;
+       $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
+       $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
+       $C->clrtoeol;
+       $C->refresh;
+}
+
+sub set_host_status($$$) {
+       my ($ui, $mach, $stat) = @_;
+       my $prev_stat = $host_state{$mach};
+       if (defined $prev_stat) {
+               $host_cnt{$prev_stat}--;
+       } else {
+               for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; }
+       }
+       $host_state{$mach} = $stat;
+       $host_cnt{$stat}++;
+}
+
+sub set_job_status($$$$) {
+       my ($ui, $mach, $jid, $stat) = @_;
+       my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
+       $job_cnt{$mach}{$prev_stat}--;
+       $job_cnt{'*'}{$prev_stat}--;
+       $job_state{$mach}{$jid} = $stat;
+       $job_cnt{$mach}{$stat}++;
+       $job_cnt{'*'}{$stat}++;
+}
+
+sub refresh_status($) {
+       $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
+       $C->addnstr(0, 0,
+               sprintf("BEX  Hosts: %dR %dD %dE %dW  Jobs: %dR %dD %dE %dW",
+                       $host_cnt{'running'},
+                       $host_cnt{'done'},
+                       $host_cnt{'failed'},
+                       $host_cnt{'ready'},
+                       $job_cnt{'*'}{'running'},
+                       $job_cnt{'*'}{'done'},
+                       $job_cnt{'*'}{'failed'},
+                       $job_cnt{'*'}{'ready'},
+               ), $C->getmaxx);
+       $C->clrtoeol;
+       $C->refresh;
+}
+
+sub get_slot($) {
+       my ($mach) = @_;
+       my $s;
+       if (defined ($s = $by_host{$mach})) {
+               delete $s->{'Gone'};
+       } else {
+               my ($best, $besti);
+               for my $i (0..$nrows-1) {
+                       my $r = $by_row[$i];
+                       if (!defined $r) {
+                               $besti = $i;
+                               $best = undef;
+                               last;
+                       } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) {
+                               $besti = $i;
+                               $best = $r;
+                       }
+               }
+               if ($best) {
+                       delete $by_host{$best->{'Host'}};
+               }
+               $s->{'Host'} = $mach;
+               $s->{'Row'} = $besti;
+               $by_host{$mach} = $s;
+               $by_row[$besti] = $s;
+       }
+       return $s;
+}
+
+my $gone_counter = 1;
+sub delete_slot($) {
+       my ($s) = @_;
+       $s->{'Gone'} = $gone_counter++;
+}
+
+sub redraw_slot($) {
+       my ($s) = @_;
+       my $mach = $s->{'Host'};
+       my $stat = $s->{'Status'} // "?";
+       my $jid = $s->{'Job'} // "";
+       my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
+       my $jcnt = $job_cnt{$mach};
+       if ($jcnt->{'running'}) {
+               if ($jcnt->{'failed'}) {
+                       $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
+               } else {
+                       $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
+               }
+       } else {
+               if ($jcnt->{'failed'}) {
+                       $C->bkgdset(COLOR_PAIR(4));
+               } else {
+                       $C->bkgdset(0);
+               }
+       }
+       my $r = $s->{'Row'} + 1;
+       $C->addstr($r, 0, sprintf("%-20.20s", $mach));
+       if ($jcnt->{'failed'}) {
+               $C->bkgdset(COLOR_PAIR(4));
+               $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
+       } else {
+               $C->bkgdset(0);
+               $C->addstr("     ");
+       }
+       $C->bkgdset(0);
+       $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
+       if ($stat eq 'DONE') {
+               if (defined $host_last_fail_stat{$mach}) {
+                       $C->bkgdset(COLOR_PAIR(4));
+                       $C->addstr(sprintf("  %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach})));
+               }
+       } else {
+               my $text = sprintf("  %-8s %s", $stat, $jname);
+               $C->addstr($text);
+       }
+       $C->clrtoeol;
+       $C->refresh;
+}
+
+sub update($$$$) {
+       my ($ui, $mach, $jid, $stat) = @_;
+       my $s = get_slot($mach);
+       given ($stat) {
+               when ('READY') {
+                       # Pseudo-state generated internally
+                       $ui->set_host_status($mach, 'ready');
+                       $ui->set_job_status($mach, $jid, 'ready');
+               }
+               when ('OK') {
+                       $ui->set_job_status($mach, $jid, 'done');
+               }
+               when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) {
+                       $ui->set_job_status($mach, $jid, 'failed');
+                       $host_last_fail_job{$mach} = $jid;
+                       $host_last_fail_stat{$mach} = $stat;
+               }
+               when ('DONE') {
+                       if ($job_cnt{$mach}{'failed'}) {
+                               $ui->set_host_status($mach, 'failed');
+                       } else {
+                               $ui->set_host_status($mach, 'done');
+                       }
+               }
+               when ('INIT') {
+                       $ui->set_host_status($mach, 'running');
+                       $ui->set_job_status($mach, $jid, 'running') if defined $jid;
+               }
+               when ('LOCKED') {
+                       if (defined $jid) {
+                               $ui->set_job_status($mach, $jid, 'failed');
+                       } else {
+                               for my $j (keys %{$job_state{$mach}}) {
+                                       $ui->set_job_status($mach, $jid, 'failed');
+                               }
+                               $ui->set_host_status($mach, 'failed');
+                               $host_last_fail_job{$mach} = $jid;
+                               $host_last_fail_stat{$mach} = $stat;
+                       }
+               }
+               when (['START', 'PING', 'SEND', 'RUN']) {
+               }
+               default {
+                       $ui->err("Received unknown job status $stat");
+               }
+       }
+       $s->{'Job'} = $jid;
+       $s->{'Status'} = $stat;
+       redraw_slot($s);
+       if ($stat eq 'DONE') { delete_slot($s); }
+       $ui->refresh_status;
+}
diff --git a/lib/bin/queue b/lib/bin/queue
new file mode 100755 (executable)
index 0000000..483e794
--- /dev/null
@@ -0,0 +1,156 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use POSIX;
+use BEX;
+
+my $op_by_job;
+my $op_by_host;
+my $op_rm;
+my $op_move_to;
+
+my $queue_name;
+my $given_job;
+
+GetOptions(
+       "by-job!" => \$op_by_job,
+       "h|by-host!" => \$op_by_host,
+       "rm!" => \$op_rm,
+       "move-to=s" => \$op_move_to,
+       "j|job=s" => \$given_job,
+       "q|queue=s" => \$queue_name,
+) or die <<AMEN ;
+Usage: bq [<options and actions>] [[!]<machine-or-class> ...]
+
+Actions:
+    --by-job           Show jobs sorted by job ID (default)
+-h, --by-host          Show jobs sorted by host
+    --rm               Remove jobs from the queue
+    --move-to=<queue>  Move jobs to a different queue
+
+Options:
+-j, --job=<id>         Act on the specified job (default: on all)
+-q, --queue=<name>     Act on the given queue
+AMEN
+
+my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
+my $queue = BEX::Queue->new($queue_name);
+
+# Select jobs
+my %jobs = ();
+my %machs = ();
+for my $m (@machines) {
+       for my $j ($queue->scan($m)) {
+               if (defined $given_job) {
+                       next if $j ne $given_job;
+               }
+               push @{$jobs{$j}}, $m;
+               push @{$machs{$m}}, $j;
+       }
+}
+
+sub do_ls();
+sub do_rm();
+sub do_move_to();
+
+my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to);
+if ($ops > 1) { die "Multiple actions are not allowed\n"; }
+
+if ($op_rm) { do_rm(); }
+elsif (defined $op_move_to) { do_move_to(); }
+else { do_ls(); }
+exit 0;
+
+sub do_ls()
+{
+       my %stat = ();
+       my %mach_locked = ();
+       for my $m (keys %machs) {
+               $mach_locked{$m} = $queue->is_locked($m, undef);
+               for my $j (@{$machs{$m}}) {
+                       my $st = $queue->read_job_status($m, $j);
+                       if (defined($st->{'Time'}) && defined($st->{'Status'})) {
+                               $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
+                                               POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']';
+                       } else {
+                               $stat{$m}{$j} = '';
+                       }
+                       if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
+                               $stat{$m}{$j} .= ' [LOCKED]';
+                       }
+               }
+       }
+
+       if ($queue->is_locked(undef, undef)) {
+               print "### Queue lock present\n\n";
+       }
+
+       if ($op_by_host) {
+               for my $m (sort keys %machs) {
+                       print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n";
+                       for my $j (@{$machs{$m}}) {
+                               print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n";
+                       }
+               }
+       } else {
+               for my $j (sort keys %jobs) {
+                       print $queue->job_name($j), "\n";
+                       for my $m (sort @{$jobs{$j}}) {
+                               print "\t$m", $stat{$m}{$j}, "\n";
+                       }
+               }
+       }
+}
+
+sub do_rm()
+{
+       my $err = 0;
+       for my $m (sort keys %machs) {
+               for my $j (sort @{$machs{$m}}) {
+                       if (!$queue->lock($m, $j)) {
+                               print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n";
+                               $err = 1;
+                       } else {
+                               $queue->update_job_status($m, $j, 'REMOVED');
+                               $queue->remove($m, $j);
+                               print "Removed $m:", $queue->job_name($j), "\n";
+                       }
+               }
+       }
+       $queue->unlock;
+       exit $err;
+}
+
+sub do_move_to()
+{
+       my $err = 0;
+       my $dest = BEX::Queue->new($op_move_to);
+       $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n";
+       for my $j (sort keys %jobs) {
+               my $job = BEX::Job->new_from_file($queue->job_file($j));
+               for my $m (sort @{$jobs{$j}}) {
+                       if (!$queue->lock($m, $j)) {
+                               print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n";
+                               $err = 1;
+                       } else {
+                               my $enq = $dest->enqueue($m, $job);
+                               if ($enq) {
+                                       $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue');
+                               } else {
+                                       $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue');
+                               }
+                               $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue');
+                               $queue->remove($m, $j);
+                               print "Moved $m:", $dest->job_name($j);
+                               print " (already queued)" if !$enq;
+                               print "\n";
+                       }
+               }
+       }
+       $queue->unlock;
+       exit $err;
+}
diff --git a/lib/bin/run b/lib/bin/run
new file mode 100755 (executable)
index 0000000..e311729
--- /dev/null
@@ -0,0 +1,192 @@
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+my $given_job;
+my $queue_name;
+my $status_fifo;
+
+GetOptions(
+       "j|job=s" => \$given_job,
+       "q|queue=s" => \$queue_name,
+       "s|status-fifo=s" => \$status_fifo,
+) or die <<AMEN ;
+Usage: brun [<options>] [[!]<machine-or-class> ...]
+
+Options:
+-j, --job=<id>         Run only the specified job
+-q, --queue=<name>     Select job queue
+    --status-fifo=<f>  Send status updates to the given named pipe
+AMEN
+
+my $status_fd;
+if (defined $status_fifo) {
+       open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
+       autoflush $status_fd, 1;
+}
+
+sub update_status($$$$;$) {
+       my ($mach, $job, $status, $log_on_queue, $msg) = @_;
+       if ($status_fd) {
+               print $status_fd "! $mach $job $status\n";
+       }
+       if ($log_on_queue) {
+               $log_on_queue->update_job_status($mach, $job, $status, $msg);
+       }
+}
+
+my %pings;
+
+sub ping_machine($) {
+       my ($mach) = @_;
+       if (!defined $pings{$mach}) {
+               if ($BEX::Config::ping_hosts) {
+                       update_status($mach, '-', 'PING', undef);
+                       my $host = BEX::Config::host_name($mach);
+                       `ping -c1 -n $host >/dev/null 2>/dev/null`;
+                       $pings{$mach} = !$?;
+               } else {
+                       $pings{$mach} = 1;
+               }
+       }
+       if ($pings{$mach}) {
+               return ('OK', undef);
+       } else {
+               return ('NOPING', 'Does not ping');
+       }
+}
+
+sub exit_status($) {
+       my ($s) = @_;
+       if ($s >> 8) {
+               return "with exit code " . ($s >> 8);
+       } else {
+               return "on fatal signal " . ($s & 127);
+       }
+}
+
+sub run_job_prep($$$) {
+       my ($job, $queue, $mach) = @_;
+       my $prep = $job->attr('Prep');
+       defined($prep) && $prep !~ /^\s*$/ or return 'OK';
+
+       my $jid = $job->id;
+       update_status($mach, $jid, 'PREP', $queue);
+       my $lf = $queue->log_file($mach, $jid);
+       $ENV{'HOST'} = BEX::Config::host_name($mach);
+       system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
+       delete $ENV{'HOST'};
+       if ($?) {
+               return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
+       } else {
+               return 'OK';
+       }
+}
+
+sub run_job_body($$$) {
+       my ($job, $queue, $mach) = @_;
+
+       if ($job->attr('body') =~ /^\s*$/s) {
+               # Shortcut if the body is empty
+               return 'OK'
+       }
+
+       my $host = BEX::Config::host_name($mach);
+       my $jid = $job->id;
+
+       my $tmp = $queue->temp_file($mach, $jid);
+       open T, '>', $tmp or die;
+       if (defined $BEX::Config::job_prolog) {
+               open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
+               while (<P>) { print T; }
+               close P;
+       } else {
+               print T "#!/bin/sh\n";
+       }
+       print T "# BEX job ", $jid, "\n";
+       print T $job->attr('body');
+       if (defined $BEX::Config::job_epilog) {
+               open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
+               while (<E>) { print T; }
+               close E;
+       }
+       close T;
+
+       update_status($mach, $jid, 'SEND', undef);
+       my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
+       my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
+       !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
+       chomp $rtmp;
+
+       update_status($mach, $jid, 'RUN', $queue);
+       my $lf = $queue->log_file($mach, $jid);
+       system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
+       if ($?) {
+               return ('FAILED', 'Job failed ' . exit_status($?));
+       } else {
+               return 'OK';
+       }
+}
+
+sub run_job($$$) {
+       my ($job, $queue, $mach) = @_;
+       my ($stat, $msg);
+
+       ($stat, $msg) = ping_machine($mach);
+       $stat eq 'OK' or return ($stat, $msg);
+
+       ($stat, $msg) = run_job_prep($job, $queue, $mach);
+       $stat eq 'OK' or return ($stat, $msg);
+
+       return run_job_body($job, $queue, $mach);
+}
+
+my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
+my $queue = BEX::Queue->new($queue_name);
+
+$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
+
+for my $mach (@machines) {
+       my @q = $queue->scan($mach) or next;
+       if (!$queue->lock($mach, undef)) {
+               print "### Machine $mach is locked by another brun, skipping...\n";
+               update_status($mach, '-', 'LOCKED', undef);
+               update_status($mach, '-', 'DONE', undef);
+               next;
+       }
+       update_status($mach, '-', 'INIT', undef);
+       while (my $jid = shift @q) {
+               if (defined $given_job) {
+                       $jid eq $given_job or next;
+               }
+               my $job = BEX::Job->new_from_file($queue->job_file($jid));
+               update_status($mach, $jid, 'INIT', undef);
+               if (!$queue->lock($mach, $jid)) {
+                       print "### Skipping locked $jid on $mach ###\n";
+                       update_status($mach, $jid, 'LOCKED', undef);
+                       next;
+               }
+               print "### Running ", $job->name, " on $mach ###\n";
+               my ($s, $msg) = run_job($job, $queue, $mach);
+               update_status($mach, $jid, $s, $queue, $msg);
+
+               if ($s eq 'OK') {
+                       print "+++ OK\n";
+                       $queue->remove($mach, $jid);
+               } else {
+                       print "--- $s: $msg\n";
+                       if ($BEX::Config::skip_on_fail) {
+                               print "### Skipping other jobs on the same host ###\n" if @q;
+                               last;
+                       }
+               }
+       }
+} continue {
+       update_status($mach, '-', 'DONE', undef);
+}
+$queue->unlock;
diff --git a/lib/perl/BEX.pm b/lib/perl/BEX.pm
new file mode 100644 (file)
index 0000000..2c257de
--- /dev/null
@@ -0,0 +1,13 @@
+# Batch EXecutor 3.0
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX;
+
+use BEX::Config;
+use BEX::Job;
+use BEX::Queue;
+
+42;
diff --git a/lib/perl/BEX/Config.pm b/lib/perl/BEX/Config.pm
new file mode 100644 (file)
index 0000000..6a2efdd
--- /dev/null
@@ -0,0 +1,94 @@
+# Batch EXecutor 3.0 -- Configuration
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX::Config;
+
+# This file specifies default values, which can be overridden in BEX.cf
+
+# A hash of all known machines and groups
+# 'name' => { Option => ... }  for a machine; options:
+#      Host => 'name'                  Host name to use in ssh, ping, ...
+# 'name' => ['a','b','c']      for a group containing specified machines/subgroups
+our %machines = (
+);
+
+# Home directory in which everything resides
+our $bex_home = $ENV{"BEX_HOME"} // ".";
+
+# Configuration directory
+our $bex_cf_dir = $bex_home . "/BEX";
+
+# A file whose contents should be prepended before the job. Should start with the "#!" line.
+our $job_prolog = $bex_cf_dir . '/prolog';
+
+# A file whose contents should be appended to the job
+our $job_epilog = $bex_cf_dir . '/epilog';
+
+# Keep history of successfully completed jobs
+our $keep_history = 1;
+
+# Before we try to connect to a host, ping it to check if it's alive
+our $ping_hosts = 1;
+
+# Whenever we want to run a job on a machine, we must obtain a lock.
+# Available locking schemes are:
+#      none - no locking takes place (dangerous!)
+#      job - obtain exclusive access to the job, other jobs on the same
+#            host can run in parallel
+#      host - obtain exclusive access to the host, other hosts can run
+#      queue - obtain exclusive access to the whole queue
+our $locking_scheme = 'host';
+
+# Maximum number of simultaneously running jobs in `bprun'
+our $max_parallel_jobs = 5;
+
+# When a job fails, skip all other jobs on the same host
+# (however, when locking_scheme is set to `job', another instance of `brun'
+# still could run such jobs in parallel)
+our $skip_on_fail = 0;
+
+# How we run ssh (including options)
+our $ssh_command = "ssh";
+
+# Various utility functions
+
+sub parse_machine_list(@);
+
+sub parse_machine_list(@) {
+       my %set = ();
+       for my $m (@_) {
+               if ($m eq '*') {
+                       for my $mm (keys %machines) {
+                               if (ref($machines{$mm}) eq 'HASH') {
+                                       $set{$mm} = 1;
+                               }
+                       }
+                       next;
+               }
+               my $op = 1;
+               if ($m =~ s{^!}{}) { $op = 0; }
+               my $v = $machines{$m};
+               if (!defined $v) {
+                       die "Unknown machine or class: $m\n";
+               } elsif (ref($v) eq 'HASH') {
+                       $set{$m} = $op;
+               } elsif (ref($v) eq 'ARRAY') {
+                       for my $mm (parse_machine_list(@$v)) {
+                               $set{$mm} = $op;
+                       }
+               }
+       }
+       return sort grep { $set{$_} } keys %set;
+}
+
+sub host_name($) {
+       my ($mach) = @_;
+       return $machines{$mach}->{'Host'} // $mach;
+}
+
+require $bex_cf_dir . '/config';
+
+42;
diff --git a/lib/perl/BEX/Job.pm b/lib/perl/BEX/Job.pm
new file mode 100644 (file)
index 0000000..a459cc3
--- /dev/null
@@ -0,0 +1,94 @@
+# Batch EXecutor 3.0 -- Jobs
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX::Job;
+
+use POSIX ();
+
+our $job_cnt = 0;
+
+sub check_id($) {
+       my ($id) = @_;
+       return $id =~ /^([0-9A-Za-z-]+)$/;
+}
+
+sub new($;$) {
+       my ($class, $id) = @_;
+       my $job = { };
+       bless $job;
+       if (defined $id) {
+               check_id($id) or die "Invalid job ID";
+               $job->{'ID'} = $id;
+       } else {
+               $job_cnt++;
+               $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
+       }
+       $job->{'Subject'} = '';
+       return $job;
+}
+
+sub new_from_file($$;$) {
+       my ($class, $file, $header_only) = @_;
+       my $job = { };
+       open T, '<', $file or die "Cannot open $file: $!";
+       while (<T>) {
+               chomp;
+               /^$/ and last;
+               /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
+               !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
+               $job->{$1} = $2;
+       }
+       if (!$header_only) {
+               my @cmds = <T>;
+               $job->{'body'} = join("", @cmds);
+       }
+       close T;
+       $job->{'Subject'} //= '';
+       $job->{'ID'} or die "Cannot load $file: Missing ID";
+       check_id($job->{'ID'}) or die "Cannot load $file: Invalid ID syntax";
+       return bless $job;
+}
+
+sub id($) {
+       return $_[0]->{'ID'};
+}
+
+sub name($) {
+       my ($job) = @_;
+       my $name = $job->{'ID'};
+       my $subj = $job->{'Subject'} // "";
+       $name .= " ($subj)" if $subj !~ /^\s*$/;
+       return $name;
+}
+
+sub attr($$;$) {
+       my ($job, $attr, $val) = @_;
+       $job->{$attr} = $val if defined $val;
+       return $job->{$attr};
+}
+
+sub dump($) {
+       my ($job) = @_;
+       for my $k (sort keys %$job) {
+               print "$k: ", $job->{$k}, "\n";
+       }
+}
+
+sub save($;$) {
+       my ($job, $fn) = @_;
+       -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
+       $fn //= 'tmp/' . $job->id;
+       open T, '>', $fn or die "Cannot create $fn: $!";
+       for my $k (sort grep { /^[A-Z]/ } keys %$job) {
+               print T "$k: ", $job->{$k}, "\n";
+       }
+       print T "\n";
+       print T $job->{'body'} if defined $job->{'body'};
+       close T;
+       return $fn;
+}
+
+42;
diff --git a/lib/perl/BEX/Queue.pm b/lib/perl/BEX/Queue.pm
new file mode 100644 (file)
index 0000000..6ae9240
--- /dev/null
@@ -0,0 +1,249 @@
+# Batch EXecutor 3.0 -- Queues
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use feature 'switch';
+
+package BEX::Queue;
+
+use IO::File;
+use File::Path;
+use Fcntl qw(:flock);
+use POSIX ();
+
+sub new($;$) {
+       my ($class, $name) = @_;
+       $name //= 'queue';
+       -d $name or die "Queue directory $name does not exist\n";
+       for my $d ("hosts", "jobs") {
+               -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
+       }
+       my $queue = {
+               'Name' => $name,
+               'MetaCache' => {},
+       };
+       return bless $queue;
+}
+
+sub log_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.log';
+}
+
+# Most actions have to be logged by the caller
+sub log($$$$;$) {
+       my ($queue, $mach, $jid, $stat, $msg) = @_;
+       my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
+       my $m = join(" ", $t, $mach, $jid, $stat);
+       $m .= " $msg" if defined $msg;
+
+       my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!";
+       print $fh "$m\n";
+
+       # Append to the per-job log file
+       if (open L, '>>', $queue->log_file($mach, $jid)) {
+               print L "### $m\n";
+               close L;
+       }
+}
+
+sub host_dir($$) {
+       my ($queue, $machine) = @_;
+       return $queue->{'Name'} . '/hosts/' . $machine;
+}
+
+sub queue_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.job';
+}
+
+sub status_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.stat';
+}
+
+sub temp_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.tmp';
+}
+
+sub job_file($$) {
+       my ($queue, $jid) = @_;
+       return $queue->{'Name'} . '/jobs/' . $jid. '.job';
+}
+
+sub enqueue($$$) {
+       my ($queue, $machine, $job) = @_;
+       my $qf = $queue->queue_file($machine, $job->id);
+       if (-f $qf) {
+               return 0;
+       }
+       my $fn = $queue->job_file($job->id);
+       -f $fn or $job->save($fn);
+       my $dir = $queue->host_dir($machine);
+       -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
+       symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
+       return 1;
+}
+
+sub scan($$) {
+       my ($queue, $machine) = @_;
+       my @list = ();
+       if (opendir D, $queue->host_dir($machine)) {
+               while ($_ = readdir D) {
+                       /^\./ and next;
+                       s{\.job}{} or next;
+                       push @list, $_;
+               }
+               closedir D;
+       }
+       return sort @list;
+}
+
+sub remove($$;$) {
+       my ($queue, $machine, $jid, $force_remove) = @_;
+       if ($BEX::Config::keep_history && !$force_remove) {
+               my $s = $queue->{'Name'} . '/hosts/' . $machine;
+               my $d = $queue->{'Name'} . '/history/' . $machine;
+               File::Path::mkpath($d);
+               for my $suff ('job', 'stat', 'log') {
+                       my $src = "$s/$jid.$suff";
+                       my $dst = "$d/$jid.$suff";
+                       if (-f $src) {
+                               rename $src, $dst or die "Cannot rename $src to $dst: $!";
+                       } else {
+                               # Might be present from the previous incarnation of the same job
+                               unlink $dst;
+                       }
+               }
+       } else {
+               unlink $queue->queue_file($machine, $jid);
+               unlink $queue->status_file($machine, $jid);
+               unlink $queue->log_file($machine, $jid);
+       }
+       unlink $queue->temp_file($machine, $jid);
+}
+
+sub job_metadata($$) {
+       my ($queue, $jid) = @_;
+       my $cache = $queue->{'MetaCache'};
+       if (!defined $cache->{$jid}) {
+               $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
+       }
+       return $cache->{$jid};
+}
+
+sub job_name($$) {
+       my ($queue, $jid) = @_;
+       return $queue->job_metadata($jid)->name;
+}
+
+sub read_job_status($$$) {
+       my ($queue, $machine, $jid) = @_;
+       my %s = ();
+       my $sf = $queue->status_file($machine, $jid);
+       if (open S, '<', $sf) {
+               while (<S>) {
+                       chomp;
+                       /^(\w+):\s*(.*)/ or die "Parse error in $sf";
+                       $s{$1} = $2;
+               }
+               close S;
+       }
+       return \%s;
+}
+
+sub write_job_status($$$$) {
+       my ($queue, $machine, $jid, $stat) = @_;
+       my $sf = $queue->status_file($machine, $jid);
+       open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
+       for my $k (sort keys %$stat) {
+               print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
+       }
+       close S;
+       rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
+}
+
+sub update_job_status($$$$;$) {
+       my ($queue, $machine, $jid, $stat, $msg) = @_;
+       my $s = {
+               'Time' => time,
+               'Status' => $stat,
+               'Message' => $msg,
+       };
+       $queue->write_job_status($machine, $jid, $s);
+       $queue->log($machine, $jid, $stat, $msg);
+}
+
+sub lock_name($$$) {
+       my ($queue, $machine, $jid) = @_;
+       my $lock = $queue->{'Name'};
+       if (defined $jid) {
+               $lock .= "/hosts/$machine/$jid.lock";
+       } elsif (defined $machine) {
+               $lock .= "/hosts/$machine/lock";
+       } else {
+               $lock .= '/lock';
+       }
+}
+
+# Whenever we want to run a job on a machine, we must obtain a lock;
+# at most one lock can be held at a time by a single BEX::Queue object.
+# See the description of locking schemes in BEX::Config.
+sub lock($$$) {
+       my ($queue, $machine, $jid) = @_;
+       my $lock;
+       given ($BEX::Config::locking_scheme) {
+               when ('queue') {
+                       $lock = lock_name($queue, undef, undef);
+               }
+               when ('host') {
+                       defined($machine) or return 1;
+                       $lock = lock_name($queue, $machine, undef);
+               }
+               when ('job') {
+                       defined($machine) && defined($jid) or return 1;
+                       $lock = lock_name($queue, $machine, $jid);
+               }
+               when ('none') { return 1; }
+               default { die "Invalid BEX::Config::locking_scheme"; }
+       }
+       if (defined($queue->{'LockName'})) {
+               return 1 if ($queue->{'LockName'} eq $lock);
+               $queue->unlock;
+       }
+       open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
+       if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
+               close $queue->{'LockHandle'};
+               delete $queue->{'LockHandle'};
+               return 0;
+       }
+       $queue->{'LockName'} = $lock;
+       return 1;
+}
+
+sub unlock($) {
+       my ($queue) = @_;
+       defined $queue->{'LockName'} or return;
+       unlink $queue->{'LockName'};
+       flock $queue->{'LockHandle'}, LOCK_UN;
+       close $queue->{'LockHandle'};
+       delete $queue->{'LockHandle'};
+       delete $queue->{'LockName'};
+}
+
+# Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq
+sub is_locked($$$) {
+       my ($queue, $machine, $jid) = @_;
+       given ($BEX::Config::locking_scheme) {
+               # Shortcuts
+               when ('host') { return unless defined $machine; }
+               when ('jid') { return unless defined $jid; }
+               when ('none') { return; }
+       }
+       my $lock = lock_name($queue, $machine, $jid);
+       return -f $lock;
+}
+
+42;
diff --git a/prolog b/prolog
deleted file mode 100644 (file)
index 3350a91..0000000
--- a/prolog
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/sh
-# BEX prolog
-set -e