This way, they are well visible in `ps'.
);
if (defined $aliases{$sub}) { $sub = $aliases{$sub}; }
-my $sub_path = "$bex_lib/bin/$sub";
+my $sub_path = "$bex_lib/bin/bex-$sub";
-x $sub_path or die "Unknown subcommand $sub\n";
$ENV{"BEX_HOME"} = $bex_home;
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Insert to Queue
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use File::stat;
-use BEX;
-
-my $given_body;
-my $given_go;
-my $given_id;
-my $queue_name;
-my $requeue_id;
-my $given_subject;
-my $given_template;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex add [<options>] [!]<machine-or-class> ...
-
-Options:
--b, --body=<file> Load job body from the given file
--g, --go Do not run editor, go enqueue the job immediately
--i, --id=<id> Set job ID of the new job
--q, --queue=<name> Insert new jobs to the given queue
--r, --requeue=<id> Re-queue an existing job instead of creating a new one
--s, --subject=<subj> Set subject of the new job
--t, --template=<file> Load job template (headers and body) from the given file
-AMEN
- exit 0;
-}
-
-GetOptions(
- "b|body=s" => \$given_body,
- "g|go!" => \$given_go,
- "i|id=s" => \$given_id,
- "q|queue=s" => \$queue_name,
- "r|requeue=s" => \$requeue_id,
- "s|subject=s" => \$given_subject,
- "t|template=s" => \$given_template,
- "help" => \&usage,
-) or die "Try `bex add --help' for more information.\n";
-
-# Prepare machine set
-@ARGV or die "No machines specified\n";
-my @machines = BEX::Config::parse_machine_list(@ARGV);
-@machines or die "No machines match\n";
-
-my $queue = BEX::Queue->new($queue_name);
-my $job;
-my $tmp_fn;
-
-if (defined $requeue_id) {
- # When requeueing, just fetch the existing job
- if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) {
- die "Parameters of a requeued job cannot be changed\n";
- }
- my $fn = $queue->job_file($requeue_id);
- -f $fn or die "Job $requeue_id not known\n";
- $job = BEX::Job->new_from_file($fn);
-} else {
- # Create job template
- if (defined $given_template) {
- $job = BEX::Job->new_from_file($given_template);
- } else {
- $job = BEX::Job->new;
- }
- $job->attr('ID', $given_id) if defined $given_id;
- $job->attr('Subject', $given_subject) if defined $given_subject;
- if (defined $given_body) {
- open B, '<', $given_body or die "Cannot open $given_body: $!\n";
- local $/;
- $job->attr('body', <B>);
- close B;
- }
-
- # Let the user edit the template
- if (!$given_go) {
- $tmp_fn = $job->save;
- my $orig_stat = stat($tmp_fn) or die;
- system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n";
- my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n";
- if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) {
- unlink $tmp_fn;
- die "Cancelled\n";
- }
- $job = BEX::Job->new_from_file($tmp_fn);
- }
-}
-
-# Put the job to the queue
-print "New job ", $job->id, "\n";
-for my $m (@machines) {
- if ($queue->enqueue($m, $job)) {
- $queue->update_job_status($m, $job->id, 'NEW');
- print "\t$m\n";
- } else {
- $queue->log($m, $job->id, 'REQUEUE');
- print "\t$m (already queued)\n";
- }
-}
-
-# Remove the temporary file if there's any
-unlink $tmp_fn if defined $tmp_fn;
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Insert to Queue
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use File::stat;
+use BEX;
+
+my $given_body;
+my $given_go;
+my $given_id;
+my $queue_name;
+my $requeue_id;
+my $given_subject;
+my $given_template;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex add [<options>] [!]<machine-or-class> ...
+
+Options:
+-b, --body=<file> Load job body from the given file
+-g, --go Do not run editor, go enqueue the job immediately
+-i, --id=<id> Set job ID of the new job
+-q, --queue=<name> Insert new jobs to the given queue
+-r, --requeue=<id> Re-queue an existing job instead of creating a new one
+-s, --subject=<subj> Set subject of the new job
+-t, --template=<file> Load job template (headers and body) from the given file
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "b|body=s" => \$given_body,
+ "g|go!" => \$given_go,
+ "i|id=s" => \$given_id,
+ "q|queue=s" => \$queue_name,
+ "r|requeue=s" => \$requeue_id,
+ "s|subject=s" => \$given_subject,
+ "t|template=s" => \$given_template,
+ "help" => \&usage,
+) or die "Try `bex add --help' for more information.\n";
+
+# Prepare machine set
+@ARGV or die "No machines specified\n";
+my @machines = BEX::Config::parse_machine_list(@ARGV);
+@machines or die "No machines match\n";
+
+my $queue = BEX::Queue->new($queue_name);
+my $job;
+my $tmp_fn;
+
+if (defined $requeue_id) {
+ # When requeueing, just fetch the existing job
+ if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) {
+ die "Parameters of a requeued job cannot be changed\n";
+ }
+ my $fn = $queue->job_file($requeue_id);
+ -f $fn or die "Job $requeue_id not known\n";
+ $job = BEX::Job->new_from_file($fn);
+} else {
+ # Create job template
+ if (defined $given_template) {
+ $job = BEX::Job->new_from_file($given_template);
+ } else {
+ $job = BEX::Job->new;
+ }
+ $job->attr('ID', $given_id) if defined $given_id;
+ $job->attr('Subject', $given_subject) if defined $given_subject;
+ if (defined $given_body) {
+ open B, '<', $given_body or die "Cannot open $given_body: $!\n";
+ local $/;
+ $job->attr('body', <B>);
+ close B;
+ }
+
+ # Let the user edit the template
+ if (!$given_go) {
+ $tmp_fn = $job->save;
+ my $orig_stat = stat($tmp_fn) or die;
+ system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n";
+ my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n";
+ if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) {
+ unlink $tmp_fn;
+ die "Cancelled\n";
+ }
+ $job = BEX::Job->new_from_file($tmp_fn);
+ }
+}
+
+# Put the job to the queue
+print "New job ", $job->id, "\n";
+for my $m (@machines) {
+ if ($queue->enqueue($m, $job)) {
+ $queue->update_job_status($m, $job->id, 'NEW');
+ print "\t$m\n";
+ } else {
+ $queue->log($m, $job->id, 'REQUEUE');
+ print "\t$m (already queued)\n";
+ }
+}
+
+# Remove the temporary file if there's any
+unlink $tmp_fn if defined $tmp_fn;
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Operations on a Job
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+my $edit;
+my $queue_name;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex job [<options>] <job-id>
+
+Options:
+-e, --edit Run editor on the given job (no locking)
+-q, --queue=<name> Act on the given queue
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "e|edit!" => \$edit,
+ "q|queue=s" => \$queue_name,
+ "help" => \&usage,
+) && @ARGV == 1 or die "Try `bex job --help' for more information.\n";
+
+my $queue = BEX::Queue->new($queue_name);
+my $fn = $queue->job_file($ARGV[0]);
+-f $fn or die "No such job " . $ARGV[0] . "\n";
+
+if ($edit) {
+ system "editor", $fn;
+} else {
+ system "cat", $fn;
+}
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Show Queued Jobs
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use POSIX;
+use BEX;
+
+my $op_by_job;
+my $op_by_host;
+my $op_rm;
+my $op_move_to;
+
+my $queue_name;
+my $given_job;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex ls [<options and actions>] [[!]<machine-or-class> ...]
+
+Actions:
+ --by-job Show jobs sorted by job ID (default)
+-h, --by-host Show jobs sorted by host
+ --rm Remove jobs from the queue
+ --move-to=<queue> Move jobs to a different queue
+
+Options:
+-j, --job=<id> Act on the specified job (default: on all)
+-q, --queue=<name> Act on the given queue
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "by-job!" => \$op_by_job,
+ "h|by-host!" => \$op_by_host,
+ "rm!" => \$op_rm,
+ "move-to=s" => \$op_move_to,
+ "j|job=s" => \$given_job,
+ "q|queue=s" => \$queue_name,
+ "help" => \&usage,
+) or die "Try `bex ls --help' for more information.\n";
+
+my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
+my $queue = BEX::Queue->new($queue_name);
+
+# Select jobs
+my %jobs = ();
+my %machs = ();
+for my $m (@machines) {
+ for my $j ($queue->scan($m)) {
+ if (defined $given_job) {
+ next if $j ne $given_job;
+ }
+ push @{$jobs{$j}}, $m;
+ push @{$machs{$m}}, $j;
+ }
+}
+
+sub do_ls();
+sub do_rm();
+sub do_move_to();
+
+my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to);
+if ($ops > 1) { die "Multiple actions are not allowed\n"; }
+
+if ($op_rm) { do_rm(); }
+elsif (defined $op_move_to) { do_move_to(); }
+else { do_ls(); }
+exit 0;
+
+sub do_ls()
+{
+ my %stat = ();
+ my %mach_locked = ();
+ for my $m (keys %machs) {
+ $mach_locked{$m} = $queue->is_locked($m, undef);
+ for my $j (@{$machs{$m}}) {
+ my $st = $queue->read_job_status($m, $j);
+ if (defined($st->{'Time'}) && defined($st->{'Status'})) {
+ $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
+ POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']';
+ } else {
+ $stat{$m}{$j} = '';
+ }
+ if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
+ $stat{$m}{$j} .= ' [LOCKED]';
+ }
+ }
+ }
+
+ if ($queue->is_locked(undef, undef)) {
+ print "### Queue lock present\n\n";
+ }
+
+ if ($op_by_host) {
+ for my $m (sort keys %machs) {
+ print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n";
+ for my $j (@{$machs{$m}}) {
+ print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n";
+ }
+ }
+ } else {
+ for my $j (sort keys %jobs) {
+ print $queue->job_name($j), "\n";
+ for my $m (sort @{$jobs{$j}}) {
+ print "\t$m", $stat{$m}{$j}, "\n";
+ }
+ }
+ }
+}
+
+sub do_rm()
+{
+ my $err = 0;
+ for my $m (sort keys %machs) {
+ for my $j (sort @{$machs{$m}}) {
+ if (!$queue->lock($m, $j)) {
+ print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n";
+ $err = 1;
+ } else {
+ $queue->update_job_status($m, $j, 'REMOVED');
+ $queue->remove($m, $j);
+ print "Removed $m:", $queue->job_name($j), "\n";
+ }
+ }
+ }
+ $queue->unlock;
+ exit $err;
+}
+
+sub do_move_to()
+{
+ my $err = 0;
+ my $dest = BEX::Queue->new($op_move_to);
+ $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n";
+ for my $j (sort keys %jobs) {
+ my $job = BEX::Job->new_from_file($queue->job_file($j));
+ for my $m (sort @{$jobs{$j}}) {
+ if (!$queue->lock($m, $j)) {
+ print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n";
+ $err = 1;
+ } else {
+ my $enq = $dest->enqueue($m, $job);
+ if ($enq) {
+ $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue');
+ } else {
+ $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue');
+ }
+ $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue');
+ $queue->remove($m, $j);
+ print "Moved $m:", $dest->job_name($j);
+ print " (already queued)" if !$enq;
+ print "\n";
+ }
+ }
+ }
+ $queue->unlock;
+ exit $err;
+}
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- List Machines and Groups
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+my $edit;
+my $queue_name;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex mach [<options>]
+
+Options:
+None defined so far.
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "help" => \&usage,
+) && @ARGV == 0 or die "Try `bex mach --help' for more information.\n";
+
+my $machines = \%BEX::Config::machines;
+
+print "# Hosts:\n";
+for my $h (sort keys %$machines) {
+ my $m = $machines->{$h};
+ ref $m eq 'HASH' or next;
+ print "$h\n";
+}
+
+print "\n# Groups:\n";
+for my $h (sort keys %$machines) {
+ my $m = $machines->{$h};
+ ref $m eq 'ARRAY' or next;
+ print "$h = ", join(" ",
+ map {
+ my $x = $machines->{$_};
+ !defined($x) ? "$_?" :
+ ref $x eq 'HASH' ? $_ :
+ ref $x eq 'ARRAY' ? "\@$_" :
+ "$_???"
+ } @$m), "\n";
+}
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Parallel Execution Using Screen
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use feature 'switch';
+use Getopt::Long;
+use POSIX;
+use BEX;
+
+my $queue_name;
+my $screen_session = 'BEX';
+my $text_mode;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex prun [<options>] [[!]<machine-or-class> ...]
+
+Options:
+-q, --queue=<name> Run jobs in the given queue
+ --session=<name> Job windows should be opened within the given screen
+ session (default: BEX)
+ --text Use textual user interface instead of curses
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "q|queue=s" => \$queue_name,
+ "session=s" => \$screen_session,
+ "text!" => \$text_mode,
+ "help" => \&usage,
+) or die "Try `bex prun --help' for more information.\n";
+
+system 'screen', '-S', $screen_session, '-X', 'select', '.';
+!$? or die "Screen session $screen_session not found\n";
+
+my $queue = BEX::Queue->new($queue_name);
+my $fifo_name = $queue->{'Name'} . '/status-fifo';
+unlink $fifo_name;
+mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
+open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
+
+my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
+
+my @machines = ();
+for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
+ my @jobs = $queue->scan($mach);
+ @jobs or next;
+ push @machines, $mach;
+ for (@jobs) { $ui->update($mach, $_, 'READY'); }
+}
+
+my %running = ();
+my $max = $BEX::Config::max_parallel_jobs;
+
+while (keys %running || @machines) {
+ if (@machines && keys %running < $max) {
+ my $mach = shift @machines;
+ $ui->update($mach, undef, 'START');
+ my @scr = ('screen', '-t', $mach);
+ push @scr, '-S', $screen_session if defined $screen_session;
+ push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach;
+ system @scr;
+ !$? or $ui->update($mach, undef, 'INTERR');
+ $running{$mach} = 'START';
+ next;
+ }
+ $_ = <FIFO>;
+ chomp;
+ my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
+ if (!defined $stat) {
+ $ui->err("Received invalid status message <$_>");
+ next;
+ }
+ if (!defined $running{$mach}) {
+ $ui->err("Received status message <$_> for a machine which does not run");
+ next;
+ }
+ $running{$mach} = $stat;
+ $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
+ if ($stat eq 'DONE') {
+ delete $running{$mach};
+ }
+}
+
+close FIFO;
+unlink $fifo_name;
+$ui->done;
+
+package BEX::bprun::text;
+
+sub new($) {
+ return bless {};
+}
+
+sub done($) {
+}
+
+sub update($$$$) {
+ my ($ui, $mach, $jid, $stat) = @_;
+ print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
+}
+
+sub err($$) {
+ my ($ui, $msg) = @_;
+ print STDERR "ERROR: $msg\n";
+}
+
+package BEX::bprun::curses;
+
+use Curses;
+
+my $C;
+
+my $nrows;
+my @by_row = ();
+my %by_host = ();
+
+my %host_state;
+my %host_cnt;
+
+my %job_state;
+my %job_cnt;
+
+my %host_last_fail_job;
+my %host_last_fail_stat;
+
+sub new($) {
+ $C = new Curses;
+ start_color;
+ has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
+ cbreak; noecho;
+ $C->intrflush(0);
+ $C->keypad(1);
+ $C->meta(1);
+ $C->clear;
+ init_pair(1, COLOR_YELLOW, COLOR_BLUE);
+ init_pair(2, COLOR_YELLOW, COLOR_RED);
+ init_pair(3, COLOR_YELLOW, COLOR_BLACK);
+ init_pair(4, COLOR_RED, COLOR_BLACK);
+
+ $nrows = $C->getmaxy - 2;
+ if ($BEX::Config::max_parallel_jobs > $nrows) {
+ $BEX::Config::max_parallel_jobs = $nrows;
+ }
+
+ %host_state = %host_cnt = ();
+ %job_state = %job_cnt = ();
+ for my $s ('unknown', 'ready', 'running', 'done', 'failed') {
+ $host_cnt{$s} = 0;
+ $job_cnt{'*'}{$s} = 0;
+ }
+
+ my $ui = bless {};
+ $ui->refresh_status;
+ return $ui;
+}
+
+sub done($)
+{
+ $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
+ $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
+ $C->clrtoeol;
+ $C->getch;
+ endwin;
+}
+
+sub err($$) {
+ my ($ui, $msg) = @_;
+ $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
+ $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
+ $C->clrtoeol;
+ $C->refresh;
+}
+
+sub set_host_status($$$) {
+ my ($ui, $mach, $stat) = @_;
+ my $prev_stat = $host_state{$mach};
+ if (defined $prev_stat) {
+ $host_cnt{$prev_stat}--;
+ } else {
+ for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; }
+ }
+ $host_state{$mach} = $stat;
+ $host_cnt{$stat}++;
+}
+
+sub set_job_status($$$$) {
+ my ($ui, $mach, $jid, $stat) = @_;
+ my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
+ $job_cnt{$mach}{$prev_stat}--;
+ $job_cnt{'*'}{$prev_stat}--;
+ $job_state{$mach}{$jid} = $stat;
+ $job_cnt{$mach}{$stat}++;
+ $job_cnt{'*'}{$stat}++;
+}
+
+sub refresh_status($) {
+ $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
+ $C->addnstr(0, 0,
+ sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW",
+ $host_cnt{'running'},
+ $host_cnt{'done'},
+ $host_cnt{'failed'},
+ $host_cnt{'ready'},
+ $job_cnt{'*'}{'running'},
+ $job_cnt{'*'}{'done'},
+ $job_cnt{'*'}{'failed'},
+ $job_cnt{'*'}{'ready'},
+ ), $C->getmaxx);
+ $C->clrtoeol;
+ $C->refresh;
+}
+
+sub get_slot($) {
+ my ($mach) = @_;
+ my $s;
+ if (defined ($s = $by_host{$mach})) {
+ delete $s->{'Gone'};
+ } else {
+ my ($best, $besti);
+ for my $i (0..$nrows-1) {
+ my $r = $by_row[$i];
+ if (!defined $r) {
+ $besti = $i;
+ $best = undef;
+ last;
+ } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) {
+ $besti = $i;
+ $best = $r;
+ }
+ }
+ if ($best) {
+ delete $by_host{$best->{'Host'}};
+ }
+ $s->{'Host'} = $mach;
+ $s->{'Row'} = $besti;
+ $by_host{$mach} = $s;
+ $by_row[$besti] = $s;
+ }
+ return $s;
+}
+
+my $gone_counter = 1;
+sub delete_slot($) {
+ my ($s) = @_;
+ $s->{'Gone'} = $gone_counter++;
+}
+
+sub redraw_slot($) {
+ my ($s) = @_;
+ my $mach = $s->{'Host'};
+ my $stat = $s->{'Status'} // "?";
+ my $jid = $s->{'Job'} // "";
+ my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
+ my $jcnt = $job_cnt{$mach};
+ if ($jcnt->{'running'}) {
+ if ($jcnt->{'failed'}) {
+ $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
+ } else {
+ $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
+ }
+ } else {
+ if ($jcnt->{'failed'}) {
+ $C->bkgdset(COLOR_PAIR(4));
+ } else {
+ $C->bkgdset(0);
+ }
+ }
+ my $r = $s->{'Row'} + 1;
+ $C->addstr($r, 0, sprintf("%-20.20s", $mach));
+ if ($jcnt->{'failed'}) {
+ $C->bkgdset(COLOR_PAIR(4));
+ $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
+ } else {
+ $C->bkgdset(0);
+ $C->addstr(" ");
+ }
+ $C->bkgdset(0);
+ $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
+ if ($stat eq 'DONE') {
+ if (defined $host_last_fail_stat{$mach}) {
+ $C->bkgdset(COLOR_PAIR(4));
+ $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach})));
+ }
+ } else {
+ my $text = sprintf(" %-8s %s", $stat, $jname);
+ $C->addstr($text);
+ }
+ $C->clrtoeol;
+ $C->refresh;
+}
+
+sub update($$$$) {
+ my ($ui, $mach, $jid, $stat) = @_;
+ my $s = get_slot($mach);
+ given ($stat) {
+ when ('READY') {
+ # Pseudo-state generated internally
+ $ui->set_host_status($mach, 'ready');
+ $ui->set_job_status($mach, $jid, 'ready');
+ }
+ when ('OK') {
+ $ui->set_job_status($mach, $jid, 'done');
+ }
+ when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) {
+ $ui->set_job_status($mach, $jid, 'failed');
+ $host_last_fail_job{$mach} = $jid;
+ $host_last_fail_stat{$mach} = $stat;
+ }
+ when ('DONE') {
+ if ($job_cnt{$mach}{'failed'}) {
+ $ui->set_host_status($mach, 'failed');
+ } else {
+ $ui->set_host_status($mach, 'done');
+ }
+ }
+ when ('INIT') {
+ $ui->set_host_status($mach, 'running');
+ $ui->set_job_status($mach, $jid, 'running') if defined $jid;
+ }
+ when ('LOCKED') {
+ if (defined $jid) {
+ $ui->set_job_status($mach, $jid, 'failed');
+ } else {
+ for my $j (keys %{$job_state{$mach}}) {
+ $ui->set_job_status($mach, $jid, 'failed');
+ }
+ $ui->set_host_status($mach, 'failed');
+ $host_last_fail_job{$mach} = $jid;
+ $host_last_fail_stat{$mach} = $stat;
+ }
+ }
+ when (['START', 'PING', 'SEND', 'RUN']) {
+ }
+ default {
+ $ui->err("Received unknown job status $stat");
+ }
+ }
+ $s->{'Job'} = $jid;
+ $s->{'Status'} = $stat;
+ redraw_slot($s);
+ if ($stat eq 'DONE') { delete_slot($s); }
+ $ui->refresh_status;
+}
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Operations on Queues
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+my $init;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex queue [<options>] <subcommand>
+
+Subcommands:
+init <queue> Create a new queue
+ls List all queues
+
+Options:
+None defined so far.
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "init!" => \$init,
+ "help" => \&usage,
+) or die "Try `bex queue --help' for more information.\n";
+
+my $op = shift @ARGV // 'ls';
+
+if ($op eq 'init') {
+ my $queue_name = shift @ARGV or die "bex queue init requires a queue name\n";
+ my $path = $BEX::Config::home . '/' . $queue_name;
+ -d $path and die "Queue directory $path already exists\n";
+ mkdir $path;
+ mkdir "$path/hosts";
+ mkdir "$path/jobs";
+ print "Queue $queue_name initialized.\n";
+} elsif ($op eq 'ls' && @ARGV == 0) {
+ opendir D, $BEX::Config::home or die "Cannot read BEX home directory\n";
+ for my $q (sort readdir D) {
+ next if $q =~ /^\./;
+ my $d = $BEX::Config::home . '/' . $q;
+ if (-d $d && -d "$d/hosts" && -d "$d/jobs") {
+ print "$q\n";
+ }
+ }
+ closedir D;
+} else {
+ die "Invalid subcommand\n";
+}
--- /dev/null
+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Run Queued Jobs
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use BEX;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex run [<options>] [[!]<machine-or-class> ...]
+
+Options:
+-j, --job=<id> Run only the specified job
+-q, --queue=<name> Select job queue
+ --status-fifo=<f> Send status updates to the given named pipe
+AMEN
+ exit 0;
+}
+
+my $given_job;
+my $queue_name;
+my $status_fifo;
+
+GetOptions(
+ "j|job=s" => \$given_job,
+ "q|queue=s" => \$queue_name,
+ "s|status-fifo=s" => \$status_fifo,
+ "help" => \&usage,
+) or die "Try `bex run --help' for more information.\n";
+
+my $status_fd;
+if (defined $status_fifo) {
+ open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
+ autoflush $status_fd, 1;
+}
+
+sub update_status($$$$;$) {
+ my ($mach, $job, $status, $log_on_queue, $msg) = @_;
+ if ($status_fd) {
+ print $status_fd "! $mach $job $status\n";
+ }
+ if ($log_on_queue) {
+ $log_on_queue->update_job_status($mach, $job, $status, $msg);
+ }
+}
+
+my %pings;
+
+sub ping_machine($) {
+ my ($mach) = @_;
+ if (!defined $pings{$mach}) {
+ if ($BEX::Config::ping_hosts) {
+ update_status($mach, '-', 'PING', undef);
+ my $host = BEX::Config::host_name($mach);
+ `ping -c1 -n $host >/dev/null 2>/dev/null`;
+ $pings{$mach} = !$?;
+ } else {
+ $pings{$mach} = 1;
+ }
+ }
+ if ($pings{$mach}) {
+ return ('OK', undef);
+ } else {
+ return ('NOPING', 'Does not ping');
+ }
+}
+
+sub exit_status($) {
+ my ($s) = @_;
+ if ($s >> 8) {
+ return "with exit code " . ($s >> 8);
+ } else {
+ return "on fatal signal " . ($s & 127);
+ }
+}
+
+sub run_job_prep($$$) {
+ my ($job, $queue, $mach) = @_;
+ my $prep = $job->attr('Prep');
+ defined($prep) && $prep !~ /^\s*$/ or return 'OK';
+
+ my $jid = $job->id;
+ update_status($mach, $jid, 'PREP', $queue);
+ my $lf = $queue->log_file($mach, $jid);
+ $ENV{'HOST'} = BEX::Config::host_name($mach);
+ system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
+ delete $ENV{'HOST'};
+ if ($?) {
+ return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
+ } else {
+ return 'OK';
+ }
+}
+
+sub run_job_body($$$) {
+ my ($job, $queue, $mach) = @_;
+
+ if ($job->attr('body') =~ /^\s*$/s) {
+ # Shortcut if the body is empty
+ return 'OK'
+ }
+
+ my $host = BEX::Config::host_name($mach);
+ my $jid = $job->id;
+
+ my $tmp = $queue->temp_file($mach, $jid);
+ open T, '>', $tmp or die;
+ if (defined $BEX::Config::job_prolog) {
+ open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
+ while (<P>) { print T; }
+ close P;
+ } else {
+ print T "#!/bin/sh\n";
+ }
+ print T "# BEX job ", $jid, "\n";
+ print T $job->attr('body');
+ if (defined $BEX::Config::job_epilog) {
+ open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
+ while (<E>) { print T; }
+ close E;
+ }
+ close T;
+
+ update_status($mach, $jid, 'SEND', undef);
+ my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
+ my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
+ !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
+ chomp $rtmp;
+
+ update_status($mach, $jid, 'RUN', $queue);
+ my $lf = $queue->log_file($mach, $jid);
+ system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
+ if ($?) {
+ return ('FAILED', 'Job failed ' . exit_status($?));
+ } else {
+ return 'OK';
+ }
+}
+
+sub run_job($$$) {
+ my ($job, $queue, $mach) = @_;
+ my ($stat, $msg);
+
+ ($stat, $msg) = ping_machine($mach);
+ $stat eq 'OK' or return ($stat, $msg);
+
+ ($stat, $msg) = run_job_prep($job, $queue, $mach);
+ $stat eq 'OK' or return ($stat, $msg);
+
+ return run_job_body($job, $queue, $mach);
+}
+
+my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
+my $queue = BEX::Queue->new($queue_name);
+
+$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
+
+for my $mach (@machines) {
+ my @q = $queue->scan($mach) or next;
+ if (!$queue->lock($mach, undef)) {
+ print "### Machine $mach is locked by another brun, skipping...\n";
+ update_status($mach, '-', 'LOCKED', undef);
+ update_status($mach, '-', 'DONE', undef);
+ next;
+ }
+ update_status($mach, '-', 'INIT', undef);
+ while (my $jid = shift @q) {
+ if (defined $given_job) {
+ $jid eq $given_job or next;
+ }
+ my $job = BEX::Job->new_from_file($queue->job_file($jid));
+ update_status($mach, $jid, 'INIT', undef);
+ if (!$queue->lock($mach, $jid)) {
+ print "### Skipping locked $jid on $mach ###\n";
+ update_status($mach, $jid, 'LOCKED', undef);
+ next;
+ }
+ print "### Running ", $job->name, " on $mach ###\n";
+ my ($s, $msg) = run_job($job, $queue, $mach);
+ update_status($mach, $jid, $s, $queue, $msg);
+
+ if ($s eq 'OK') {
+ print "+++ OK\n";
+ $queue->remove($mach, $jid);
+ } else {
+ print "--- $s: $msg\n";
+ if ($BEX::Config::skip_on_fail) {
+ print "### Skipping other jobs on the same host ###\n" if @q;
+ last;
+ }
+ }
+ }
+} continue {
+ update_status($mach, '-', 'DONE', undef);
+}
+$queue->unlock;
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Operations on a Job
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use BEX;
-
-my $edit;
-my $queue_name;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex job [<options>] <job-id>
-
-Options:
--e, --edit Run editor on the given job (no locking)
--q, --queue=<name> Act on the given queue
-AMEN
- exit 0;
-}
-
-GetOptions(
- "e|edit!" => \$edit,
- "q|queue=s" => \$queue_name,
- "help" => \&usage,
-) && @ARGV == 1 or die "Try `bex job --help' for more information.\n";
-
-my $queue = BEX::Queue->new($queue_name);
-my $fn = $queue->job_file($ARGV[0]);
--f $fn or die "No such job " . $ARGV[0] . "\n";
-
-if ($edit) {
- system "editor", $fn;
-} else {
- system "cat", $fn;
-}
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Show Queued Jobs
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use POSIX;
-use BEX;
-
-my $op_by_job;
-my $op_by_host;
-my $op_rm;
-my $op_move_to;
-
-my $queue_name;
-my $given_job;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex ls [<options and actions>] [[!]<machine-or-class> ...]
-
-Actions:
- --by-job Show jobs sorted by job ID (default)
--h, --by-host Show jobs sorted by host
- --rm Remove jobs from the queue
- --move-to=<queue> Move jobs to a different queue
-
-Options:
--j, --job=<id> Act on the specified job (default: on all)
--q, --queue=<name> Act on the given queue
-AMEN
- exit 0;
-}
-
-GetOptions(
- "by-job!" => \$op_by_job,
- "h|by-host!" => \$op_by_host,
- "rm!" => \$op_rm,
- "move-to=s" => \$op_move_to,
- "j|job=s" => \$given_job,
- "q|queue=s" => \$queue_name,
- "help" => \&usage,
-) or die "Try `bex ls --help' for more information.\n";
-
-my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
-my $queue = BEX::Queue->new($queue_name);
-
-# Select jobs
-my %jobs = ();
-my %machs = ();
-for my $m (@machines) {
- for my $j ($queue->scan($m)) {
- if (defined $given_job) {
- next if $j ne $given_job;
- }
- push @{$jobs{$j}}, $m;
- push @{$machs{$m}}, $j;
- }
-}
-
-sub do_ls();
-sub do_rm();
-sub do_move_to();
-
-my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to);
-if ($ops > 1) { die "Multiple actions are not allowed\n"; }
-
-if ($op_rm) { do_rm(); }
-elsif (defined $op_move_to) { do_move_to(); }
-else { do_ls(); }
-exit 0;
-
-sub do_ls()
-{
- my %stat = ();
- my %mach_locked = ();
- for my $m (keys %machs) {
- $mach_locked{$m} = $queue->is_locked($m, undef);
- for my $j (@{$machs{$m}}) {
- my $st = $queue->read_job_status($m, $j);
- if (defined($st->{'Time'}) && defined($st->{'Status'})) {
- $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
- POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']';
- } else {
- $stat{$m}{$j} = '';
- }
- if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
- $stat{$m}{$j} .= ' [LOCKED]';
- }
- }
- }
-
- if ($queue->is_locked(undef, undef)) {
- print "### Queue lock present\n\n";
- }
-
- if ($op_by_host) {
- for my $m (sort keys %machs) {
- print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n";
- for my $j (@{$machs{$m}}) {
- print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n";
- }
- }
- } else {
- for my $j (sort keys %jobs) {
- print $queue->job_name($j), "\n";
- for my $m (sort @{$jobs{$j}}) {
- print "\t$m", $stat{$m}{$j}, "\n";
- }
- }
- }
-}
-
-sub do_rm()
-{
- my $err = 0;
- for my $m (sort keys %machs) {
- for my $j (sort @{$machs{$m}}) {
- if (!$queue->lock($m, $j)) {
- print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n";
- $err = 1;
- } else {
- $queue->update_job_status($m, $j, 'REMOVED');
- $queue->remove($m, $j);
- print "Removed $m:", $queue->job_name($j), "\n";
- }
- }
- }
- $queue->unlock;
- exit $err;
-}
-
-sub do_move_to()
-{
- my $err = 0;
- my $dest = BEX::Queue->new($op_move_to);
- $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n";
- for my $j (sort keys %jobs) {
- my $job = BEX::Job->new_from_file($queue->job_file($j));
- for my $m (sort @{$jobs{$j}}) {
- if (!$queue->lock($m, $j)) {
- print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n";
- $err = 1;
- } else {
- my $enq = $dest->enqueue($m, $job);
- if ($enq) {
- $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue');
- } else {
- $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue');
- }
- $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue');
- $queue->remove($m, $j);
- print "Moved $m:", $dest->job_name($j);
- print " (already queued)" if !$enq;
- print "\n";
- }
- }
- }
- $queue->unlock;
- exit $err;
-}
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- List Machines and Groups
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use BEX;
-
-my $edit;
-my $queue_name;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex mach [<options>]
-
-Options:
-None defined so far.
-AMEN
- exit 0;
-}
-
-GetOptions(
- "help" => \&usage,
-) && @ARGV == 0 or die "Try `bex mach --help' for more information.\n";
-
-my $machines = \%BEX::Config::machines;
-
-print "# Hosts:\n";
-for my $h (sort keys %$machines) {
- my $m = $machines->{$h};
- ref $m eq 'HASH' or next;
- print "$h\n";
-}
-
-print "\n# Groups:\n";
-for my $h (sort keys %$machines) {
- my $m = $machines->{$h};
- ref $m eq 'ARRAY' or next;
- print "$h = ", join(" ",
- map {
- my $x = $machines->{$_};
- !defined($x) ? "$_?" :
- ref $x eq 'HASH' ? $_ :
- ref $x eq 'ARRAY' ? "\@$_" :
- "$_???"
- } @$m), "\n";
-}
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Parallel Execution Using Screen
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use feature 'switch';
-use Getopt::Long;
-use POSIX;
-use BEX;
-
-my $queue_name;
-my $screen_session = 'BEX';
-my $text_mode;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex prun [<options>] [[!]<machine-or-class> ...]
-
-Options:
--q, --queue=<name> Run jobs in the given queue
- --session=<name> Job windows should be opened within the given screen
- session (default: BEX)
- --text Use textual user interface instead of curses
-AMEN
- exit 0;
-}
-
-GetOptions(
- "q|queue=s" => \$queue_name,
- "session=s" => \$screen_session,
- "text!" => \$text_mode,
- "help" => \&usage,
-) or die "Try `bex prun --help' for more information.\n";
-
-system 'screen', '-S', $screen_session, '-X', 'select', '.';
-!$? or die "Screen session $screen_session not found\n";
-
-my $queue = BEX::Queue->new($queue_name);
-my $fifo_name = $queue->{'Name'} . '/status-fifo';
-unlink $fifo_name;
-mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
-open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
-
-my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
-
-my @machines = ();
-for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
- my @jobs = $queue->scan($mach);
- @jobs or next;
- push @machines, $mach;
- for (@jobs) { $ui->update($mach, $_, 'READY'); }
-}
-
-my %running = ();
-my $max = $BEX::Config::max_parallel_jobs;
-
-while (keys %running || @machines) {
- if (@machines && keys %running < $max) {
- my $mach = shift @machines;
- $ui->update($mach, undef, 'START');
- my @scr = ('screen', '-t', $mach);
- push @scr, '-S', $screen_session if defined $screen_session;
- push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach;
- system @scr;
- !$? or $ui->update($mach, undef, 'INTERR');
- $running{$mach} = 'START';
- next;
- }
- $_ = <FIFO>;
- chomp;
- my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
- if (!defined $stat) {
- $ui->err("Received invalid status message <$_>");
- next;
- }
- if (!defined $running{$mach}) {
- $ui->err("Received status message <$_> for a machine which does not run");
- next;
- }
- $running{$mach} = $stat;
- $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
- if ($stat eq 'DONE') {
- delete $running{$mach};
- }
-}
-
-close FIFO;
-unlink $fifo_name;
-$ui->done;
-
-package BEX::bprun::text;
-
-sub new($) {
- return bless {};
-}
-
-sub done($) {
-}
-
-sub update($$$$) {
- my ($ui, $mach, $jid, $stat) = @_;
- print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
-}
-
-sub err($$) {
- my ($ui, $msg) = @_;
- print STDERR "ERROR: $msg\n";
-}
-
-package BEX::bprun::curses;
-
-use Curses;
-
-my $C;
-
-my $nrows;
-my @by_row = ();
-my %by_host = ();
-
-my %host_state;
-my %host_cnt;
-
-my %job_state;
-my %job_cnt;
-
-my %host_last_fail_job;
-my %host_last_fail_stat;
-
-sub new($) {
- $C = new Curses;
- start_color;
- has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
- cbreak; noecho;
- $C->intrflush(0);
- $C->keypad(1);
- $C->meta(1);
- $C->clear;
- init_pair(1, COLOR_YELLOW, COLOR_BLUE);
- init_pair(2, COLOR_YELLOW, COLOR_RED);
- init_pair(3, COLOR_YELLOW, COLOR_BLACK);
- init_pair(4, COLOR_RED, COLOR_BLACK);
-
- $nrows = $C->getmaxy - 2;
- if ($BEX::Config::max_parallel_jobs > $nrows) {
- $BEX::Config::max_parallel_jobs = $nrows;
- }
-
- %host_state = %host_cnt = ();
- %job_state = %job_cnt = ();
- for my $s ('unknown', 'ready', 'running', 'done', 'failed') {
- $host_cnt{$s} = 0;
- $job_cnt{'*'}{$s} = 0;
- }
-
- my $ui = bless {};
- $ui->refresh_status;
- return $ui;
-}
-
-sub done($)
-{
- $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
- $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
- $C->clrtoeol;
- $C->getch;
- endwin;
-}
-
-sub err($$) {
- my ($ui, $msg) = @_;
- $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
- $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
- $C->clrtoeol;
- $C->refresh;
-}
-
-sub set_host_status($$$) {
- my ($ui, $mach, $stat) = @_;
- my $prev_stat = $host_state{$mach};
- if (defined $prev_stat) {
- $host_cnt{$prev_stat}--;
- } else {
- for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; }
- }
- $host_state{$mach} = $stat;
- $host_cnt{$stat}++;
-}
-
-sub set_job_status($$$$) {
- my ($ui, $mach, $jid, $stat) = @_;
- my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
- $job_cnt{$mach}{$prev_stat}--;
- $job_cnt{'*'}{$prev_stat}--;
- $job_state{$mach}{$jid} = $stat;
- $job_cnt{$mach}{$stat}++;
- $job_cnt{'*'}{$stat}++;
-}
-
-sub refresh_status($) {
- $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
- $C->addnstr(0, 0,
- sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW",
- $host_cnt{'running'},
- $host_cnt{'done'},
- $host_cnt{'failed'},
- $host_cnt{'ready'},
- $job_cnt{'*'}{'running'},
- $job_cnt{'*'}{'done'},
- $job_cnt{'*'}{'failed'},
- $job_cnt{'*'}{'ready'},
- ), $C->getmaxx);
- $C->clrtoeol;
- $C->refresh;
-}
-
-sub get_slot($) {
- my ($mach) = @_;
- my $s;
- if (defined ($s = $by_host{$mach})) {
- delete $s->{'Gone'};
- } else {
- my ($best, $besti);
- for my $i (0..$nrows-1) {
- my $r = $by_row[$i];
- if (!defined $r) {
- $besti = $i;
- $best = undef;
- last;
- } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) {
- $besti = $i;
- $best = $r;
- }
- }
- if ($best) {
- delete $by_host{$best->{'Host'}};
- }
- $s->{'Host'} = $mach;
- $s->{'Row'} = $besti;
- $by_host{$mach} = $s;
- $by_row[$besti] = $s;
- }
- return $s;
-}
-
-my $gone_counter = 1;
-sub delete_slot($) {
- my ($s) = @_;
- $s->{'Gone'} = $gone_counter++;
-}
-
-sub redraw_slot($) {
- my ($s) = @_;
- my $mach = $s->{'Host'};
- my $stat = $s->{'Status'} // "?";
- my $jid = $s->{'Job'} // "";
- my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
- my $jcnt = $job_cnt{$mach};
- if ($jcnt->{'running'}) {
- if ($jcnt->{'failed'}) {
- $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
- } else {
- $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
- }
- } else {
- if ($jcnt->{'failed'}) {
- $C->bkgdset(COLOR_PAIR(4));
- } else {
- $C->bkgdset(0);
- }
- }
- my $r = $s->{'Row'} + 1;
- $C->addstr($r, 0, sprintf("%-20.20s", $mach));
- if ($jcnt->{'failed'}) {
- $C->bkgdset(COLOR_PAIR(4));
- $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
- } else {
- $C->bkgdset(0);
- $C->addstr(" ");
- }
- $C->bkgdset(0);
- $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
- if ($stat eq 'DONE') {
- if (defined $host_last_fail_stat{$mach}) {
- $C->bkgdset(COLOR_PAIR(4));
- $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach})));
- }
- } else {
- my $text = sprintf(" %-8s %s", $stat, $jname);
- $C->addstr($text);
- }
- $C->clrtoeol;
- $C->refresh;
-}
-
-sub update($$$$) {
- my ($ui, $mach, $jid, $stat) = @_;
- my $s = get_slot($mach);
- given ($stat) {
- when ('READY') {
- # Pseudo-state generated internally
- $ui->set_host_status($mach, 'ready');
- $ui->set_job_status($mach, $jid, 'ready');
- }
- when ('OK') {
- $ui->set_job_status($mach, $jid, 'done');
- }
- when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) {
- $ui->set_job_status($mach, $jid, 'failed');
- $host_last_fail_job{$mach} = $jid;
- $host_last_fail_stat{$mach} = $stat;
- }
- when ('DONE') {
- if ($job_cnt{$mach}{'failed'}) {
- $ui->set_host_status($mach, 'failed');
- } else {
- $ui->set_host_status($mach, 'done');
- }
- }
- when ('INIT') {
- $ui->set_host_status($mach, 'running');
- $ui->set_job_status($mach, $jid, 'running') if defined $jid;
- }
- when ('LOCKED') {
- if (defined $jid) {
- $ui->set_job_status($mach, $jid, 'failed');
- } else {
- for my $j (keys %{$job_state{$mach}}) {
- $ui->set_job_status($mach, $jid, 'failed');
- }
- $ui->set_host_status($mach, 'failed');
- $host_last_fail_job{$mach} = $jid;
- $host_last_fail_stat{$mach} = $stat;
- }
- }
- when (['START', 'PING', 'SEND', 'RUN']) {
- }
- default {
- $ui->err("Received unknown job status $stat");
- }
- }
- $s->{'Job'} = $jid;
- $s->{'Status'} = $stat;
- redraw_slot($s);
- if ($stat eq 'DONE') { delete_slot($s); }
- $ui->refresh_status;
-}
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Operations on Queues
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use BEX;
-
-my $init;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex queue [<options>] <subcommand>
-
-Subcommands:
-init <queue> Create a new queue
-ls List all queues
-
-Options:
-None defined so far.
-AMEN
- exit 0;
-}
-
-GetOptions(
- "init!" => \$init,
- "help" => \&usage,
-) or die "Try `bex queue --help' for more information.\n";
-
-my $op = shift @ARGV // 'ls';
-
-if ($op eq 'init') {
- my $queue_name = shift @ARGV or die "bex queue init requires a queue name\n";
- my $path = $BEX::Config::home . '/' . $queue_name;
- -d $path and die "Queue directory $path already exists\n";
- mkdir $path;
- mkdir "$path/hosts";
- mkdir "$path/jobs";
- print "Queue $queue_name initialized.\n";
-} elsif ($op eq 'ls' && @ARGV == 0) {
- opendir D, $BEX::Config::home or die "Cannot read BEX home directory\n";
- for my $q (sort readdir D) {
- next if $q =~ /^\./;
- my $d = $BEX::Config::home . '/' . $q;
- if (-d $d && -d "$d/hosts" && -d "$d/jobs") {
- print "$q\n";
- }
- }
- closedir D;
-} else {
- die "Invalid subcommand\n";
-}
+++ /dev/null
-#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Run Queued Jobs
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
-
-use strict;
-use warnings;
-use Getopt::Long;
-use BEX;
-
-sub usage() {
- print <<AMEN ;
-Usage: bex run [<options>] [[!]<machine-or-class> ...]
-
-Options:
--j, --job=<id> Run only the specified job
--q, --queue=<name> Select job queue
- --status-fifo=<f> Send status updates to the given named pipe
-AMEN
- exit 0;
-}
-
-my $given_job;
-my $queue_name;
-my $status_fifo;
-
-GetOptions(
- "j|job=s" => \$given_job,
- "q|queue=s" => \$queue_name,
- "s|status-fifo=s" => \$status_fifo,
- "help" => \&usage,
-) or die "Try `bex run --help' for more information.\n";
-
-my $status_fd;
-if (defined $status_fifo) {
- open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
- autoflush $status_fd, 1;
-}
-
-sub update_status($$$$;$) {
- my ($mach, $job, $status, $log_on_queue, $msg) = @_;
- if ($status_fd) {
- print $status_fd "! $mach $job $status\n";
- }
- if ($log_on_queue) {
- $log_on_queue->update_job_status($mach, $job, $status, $msg);
- }
-}
-
-my %pings;
-
-sub ping_machine($) {
- my ($mach) = @_;
- if (!defined $pings{$mach}) {
- if ($BEX::Config::ping_hosts) {
- update_status($mach, '-', 'PING', undef);
- my $host = BEX::Config::host_name($mach);
- `ping -c1 -n $host >/dev/null 2>/dev/null`;
- $pings{$mach} = !$?;
- } else {
- $pings{$mach} = 1;
- }
- }
- if ($pings{$mach}) {
- return ('OK', undef);
- } else {
- return ('NOPING', 'Does not ping');
- }
-}
-
-sub exit_status($) {
- my ($s) = @_;
- if ($s >> 8) {
- return "with exit code " . ($s >> 8);
- } else {
- return "on fatal signal " . ($s & 127);
- }
-}
-
-sub run_job_prep($$$) {
- my ($job, $queue, $mach) = @_;
- my $prep = $job->attr('Prep');
- defined($prep) && $prep !~ /^\s*$/ or return 'OK';
-
- my $jid = $job->id;
- update_status($mach, $jid, 'PREP', $queue);
- my $lf = $queue->log_file($mach, $jid);
- $ENV{'HOST'} = BEX::Config::host_name($mach);
- system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
- delete $ENV{'HOST'};
- if ($?) {
- return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
- } else {
- return 'OK';
- }
-}
-
-sub run_job_body($$$) {
- my ($job, $queue, $mach) = @_;
-
- if ($job->attr('body') =~ /^\s*$/s) {
- # Shortcut if the body is empty
- return 'OK'
- }
-
- my $host = BEX::Config::host_name($mach);
- my $jid = $job->id;
-
- my $tmp = $queue->temp_file($mach, $jid);
- open T, '>', $tmp or die;
- if (defined $BEX::Config::job_prolog) {
- open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
- while (<P>) { print T; }
- close P;
- } else {
- print T "#!/bin/sh\n";
- }
- print T "# BEX job ", $jid, "\n";
- print T $job->attr('body');
- if (defined $BEX::Config::job_epilog) {
- open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
- while (<E>) { print T; }
- close E;
- }
- close T;
-
- update_status($mach, $jid, 'SEND', undef);
- my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
- my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
- !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
- chomp $rtmp;
-
- update_status($mach, $jid, 'RUN', $queue);
- my $lf = $queue->log_file($mach, $jid);
- system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
- if ($?) {
- return ('FAILED', 'Job failed ' . exit_status($?));
- } else {
- return 'OK';
- }
-}
-
-sub run_job($$$) {
- my ($job, $queue, $mach) = @_;
- my ($stat, $msg);
-
- ($stat, $msg) = ping_machine($mach);
- $stat eq 'OK' or return ($stat, $msg);
-
- ($stat, $msg) = run_job_prep($job, $queue, $mach);
- $stat eq 'OK' or return ($stat, $msg);
-
- return run_job_body($job, $queue, $mach);
-}
-
-my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
-my $queue = BEX::Queue->new($queue_name);
-
-$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
-
-for my $mach (@machines) {
- my @q = $queue->scan($mach) or next;
- if (!$queue->lock($mach, undef)) {
- print "### Machine $mach is locked by another brun, skipping...\n";
- update_status($mach, '-', 'LOCKED', undef);
- update_status($mach, '-', 'DONE', undef);
- next;
- }
- update_status($mach, '-', 'INIT', undef);
- while (my $jid = shift @q) {
- if (defined $given_job) {
- $jid eq $given_job or next;
- }
- my $job = BEX::Job->new_from_file($queue->job_file($jid));
- update_status($mach, $jid, 'INIT', undef);
- if (!$queue->lock($mach, $jid)) {
- print "### Skipping locked $jid on $mach ###\n";
- update_status($mach, $jid, 'LOCKED', undef);
- next;
- }
- print "### Running ", $job->name, " on $mach ###\n";
- my ($s, $msg) = run_job($job, $queue, $mach);
- update_status($mach, $jid, $s, $queue, $msg);
-
- if ($s eq 'OK') {
- print "+++ OK\n";
- $queue->remove($mach, $jid);
- } else {
- print "--- $s: $msg\n";
- if ($BEX::Config::skip_on_fail) {
- print "### Skipping other jobs on the same host ###\n" if @q;
- last;
- }
- }
- }
-} continue {
- update_status($mach, '-', 'DONE', undef);
-}
-$queue->unlock;