+#!/usr/bin/perl
+# Batch EXecutor 3.0 -- Parallel Execution Using Screen
+# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use feature 'switch';
+use Getopt::Long;
+use POSIX;
+use BEX;
+
+my $queue_name;
+my $screen_session = 'BEX';
+my $text_mode;
+
+sub usage() {
+ print <<AMEN ;
+Usage: bex prun [<options>] [[!]<machine-or-class> ...]
+
+Options:
+-q, --queue=<name> Run jobs in the given queue
+ --session=<name> Job windows should be opened within the given screen
+ session (default: BEX)
+ --text Use textual user interface instead of curses
+AMEN
+ exit 0;
+}
+
+GetOptions(
+ "q|queue=s" => \$queue_name,
+ "session=s" => \$screen_session,
+ "text!" => \$text_mode,
+ "help" => \&usage,
+) or die "Try `bex prun --help' for more information.\n";
+
+system 'screen', '-S', $screen_session, '-X', 'select', '.';
+!$? or die "Screen session $screen_session not found\n";
+
+my $queue = BEX::Queue->new($queue_name);
+my $fifo_name = $queue->{'Name'} . '/status-fifo';
+unlink $fifo_name;
+mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
+open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
+
+my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
+
+my @machines = ();
+for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
+ my @jobs = $queue->scan($mach);
+ @jobs or next;
+ push @machines, $mach;
+ for (@jobs) { $ui->update($mach, $_, 'READY'); }
+}
+
+my %running = ();
+my $max = $BEX::Config::max_parallel_jobs;
+
+while (keys %running || @machines) {
+ if (@machines && keys %running < $max) {
+ my $mach = shift @machines;
+ $ui->update($mach, undef, 'START');
+ my @scr = ('screen', '-t', $mach);
+ push @scr, '-S', $screen_session if defined $screen_session;
+ push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach;
+ system @scr;
+ !$? or $ui->update($mach, undef, 'INTERR');
+ $running{$mach} = 'START';
+ next;
+ }
+ $_ = <FIFO>;
+ chomp;
+ my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
+ if (!defined $stat) {
+ $ui->err("Received invalid status message <$_>");
+ next;
+ }
+ if (!defined $running{$mach}) {
+ $ui->err("Received status message <$_> for a machine which does not run");
+ next;
+ }
+ $running{$mach} = $stat;
+ $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
+ if ($stat eq 'DONE') {
+ delete $running{$mach};
+ }
+}
+
+close FIFO;
+unlink $fifo_name;
+$ui->done;
+
+package BEX::bprun::text;
+
+sub new($) {
+ return bless {};
+}
+
+sub done($) {
+}
+
+sub update($$$$) {
+ my ($ui, $mach, $jid, $stat) = @_;
+ print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
+}
+
+sub err($$) {
+ my ($ui, $msg) = @_;
+ print STDERR "ERROR: $msg\n";
+}
+
+package BEX::bprun::curses;
+
+use Curses;
+
+my $C;
+
+my $nrows;
+my @by_row = ();
+my %by_host = ();
+
+my %host_state;
+my %host_cnt;
+
+my %job_state;
+my %job_cnt;
+
+my %host_last_fail_job;
+my %host_last_fail_stat;
+
+sub new($) {
+ $C = new Curses;
+ start_color;
+ has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
+ cbreak; noecho;
+ $C->intrflush(0);
+ $C->keypad(1);
+ $C->meta(1);
+ $C->clear;
+ init_pair(1, COLOR_YELLOW, COLOR_BLUE);
+ init_pair(2, COLOR_YELLOW, COLOR_RED);
+ init_pair(3, COLOR_YELLOW, COLOR_BLACK);
+ init_pair(4, COLOR_RED, COLOR_BLACK);
+
+ $nrows = $C->getmaxy - 2;
+ if ($BEX::Config::max_parallel_jobs > $nrows) {
+ $BEX::Config::max_parallel_jobs = $nrows;
+ }
+
+ %host_state = %host_cnt = ();
+ %job_state = %job_cnt = ();
+ for my $s ('unknown', 'ready', 'running', 'done', 'failed') {
+ $host_cnt{$s} = 0;
+ $job_cnt{'*'}{$s} = 0;
+ }
+
+ my $ui = bless {};
+ $ui->refresh_status;
+ return $ui;
+}
+
+sub done($)
+{
+ $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
+ $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
+ $C->clrtoeol;
+ $C->getch;
+ endwin;
+}
+
+sub err($$) {
+ my ($ui, $msg) = @_;
+ $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
+ $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
+ $C->clrtoeol;
+ $C->refresh;
+}
+
+sub set_host_status($$$) {
+ my ($ui, $mach, $stat) = @_;
+ my $prev_stat = $host_state{$mach};
+ if (defined $prev_stat) {
+ $host_cnt{$prev_stat}--;
+ } else {
+ for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; }
+ }
+ $host_state{$mach} = $stat;
+ $host_cnt{$stat}++;
+}
+
+sub set_job_status($$$$) {
+ my ($ui, $mach, $jid, $stat) = @_;
+ my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
+ $job_cnt{$mach}{$prev_stat}--;
+ $job_cnt{'*'}{$prev_stat}--;
+ $job_state{$mach}{$jid} = $stat;
+ $job_cnt{$mach}{$stat}++;
+ $job_cnt{'*'}{$stat}++;
+}
+
+sub refresh_status($) {
+ $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
+ $C->addnstr(0, 0,
+ sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW",
+ $host_cnt{'running'},
+ $host_cnt{'done'},
+ $host_cnt{'failed'},
+ $host_cnt{'ready'},
+ $job_cnt{'*'}{'running'},
+ $job_cnt{'*'}{'done'},
+ $job_cnt{'*'}{'failed'},
+ $job_cnt{'*'}{'ready'},
+ ), $C->getmaxx);
+ $C->clrtoeol;
+ $C->refresh;
+}
+
+sub get_slot($) {
+ my ($mach) = @_;
+ my $s;
+ if (defined ($s = $by_host{$mach})) {
+ delete $s->{'Gone'};
+ } else {
+ my ($best, $besti);
+ for my $i (0..$nrows-1) {
+ my $r = $by_row[$i];
+ if (!defined $r) {
+ $besti = $i;
+ $best = undef;
+ last;
+ } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) {
+ $besti = $i;
+ $best = $r;
+ }
+ }
+ if ($best) {
+ delete $by_host{$best->{'Host'}};
+ }
+ $s->{'Host'} = $mach;
+ $s->{'Row'} = $besti;
+ $by_host{$mach} = $s;
+ $by_row[$besti] = $s;
+ }
+ return $s;
+}
+
+my $gone_counter = 1;
+sub delete_slot($) {
+ my ($s) = @_;
+ $s->{'Gone'} = $gone_counter++;
+}
+
+sub redraw_slot($) {
+ my ($s) = @_;
+ my $mach = $s->{'Host'};
+ my $stat = $s->{'Status'} // "?";
+ my $jid = $s->{'Job'} // "";
+ my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
+ my $jcnt = $job_cnt{$mach};
+ if ($jcnt->{'running'}) {
+ if ($jcnt->{'failed'}) {
+ $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
+ } else {
+ $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
+ }
+ } else {
+ if ($jcnt->{'failed'}) {
+ $C->bkgdset(COLOR_PAIR(4));
+ } else {
+ $C->bkgdset(0);
+ }
+ }
+ my $r = $s->{'Row'} + 1;
+ $C->addstr($r, 0, sprintf("%-20.20s", $mach));
+ if ($jcnt->{'failed'}) {
+ $C->bkgdset(COLOR_PAIR(4));
+ $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
+ } else {
+ $C->bkgdset(0);
+ $C->addstr(" ");
+ }
+ $C->bkgdset(0);
+ $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
+ if ($stat eq 'DONE') {
+ if (defined $host_last_fail_stat{$mach}) {
+ $C->bkgdset(COLOR_PAIR(4));
+ $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach})));
+ }
+ } else {
+ my $text = sprintf(" %-8s %s", $stat, $jname);
+ $C->addstr($text);
+ }
+ $C->clrtoeol;
+ $C->refresh;
+}
+
+sub update($$$$) {
+ my ($ui, $mach, $jid, $stat) = @_;
+ my $s = get_slot($mach);
+ given ($stat) {
+ when ('READY') {
+ # Pseudo-state generated internally
+ $ui->set_host_status($mach, 'ready');
+ $ui->set_job_status($mach, $jid, 'ready');
+ }
+ when ('OK') {
+ $ui->set_job_status($mach, $jid, 'done');
+ }
+ when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) {
+ $ui->set_job_status($mach, $jid, 'failed');
+ $host_last_fail_job{$mach} = $jid;
+ $host_last_fail_stat{$mach} = $stat;
+ }
+ when ('DONE') {
+ if ($job_cnt{$mach}{'failed'}) {
+ $ui->set_host_status($mach, 'failed');
+ } else {
+ $ui->set_host_status($mach, 'done');
+ }
+ }
+ when ('INIT') {
+ $ui->set_host_status($mach, 'running');
+ $ui->set_job_status($mach, $jid, 'running') if defined $jid;
+ }
+ when ('LOCKED') {
+ if (defined $jid) {
+ $ui->set_job_status($mach, $jid, 'failed');
+ } else {
+ for my $j (keys %{$job_state{$mach}}) {
+ $ui->set_job_status($mach, $jid, 'failed');
+ }
+ $ui->set_host_status($mach, 'failed');
+ $host_last_fail_job{$mach} = $jid;
+ $host_last_fail_stat{$mach} = $stat;
+ }
+ }
+ when (['START', 'PING', 'SEND', 'RUN']) {
+ }
+ default {
+ $ui->err("Received unknown job status $stat");
+ }
+ }
+ $s->{'Job'} = $jid;
+ $s->{'Status'} = $stat;
+ redraw_slot($s);
+ if ($stat eq 'DONE') { delete_slot($s); }
+ $ui->refresh_status;
+}