#!/usr/bin/perl # Batch EXecutor 2.0 -- Parallel Execution Using Screen # (c) 2011 Martin Mares use strict; use warnings; use feature 'switch'; use Getopt::Long; use POSIX; use lib 'lib'; use BEX; my $queue_name; my $screen_session = 'BEX'; my $text_mode; GetOptions( "q|queue=s" => \$queue_name, "session=s" => \$screen_session, "text!" => \$text_mode, ) or die <] [[!] ...] Options: -q, --queue= Run jobs in the given queue --session= Job windows should be opened within the given screen session (default: BEX) --text Use textual user interface instead of curses AMEN system 'screen', '-S', $screen_session, '-X', 'select', '.'; !$? or die "Screen session $screen_session not found\n"; my $queue = BEX::Queue->new($queue_name); my $fifo_name = $queue->{'Name'} . '/status-fifo'; unlink $fifo_name; mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); my @machines = (); for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { my @jobs = $queue->scan($mach); @jobs or next; push @machines, $mach; for (@jobs) { $ui->update($mach, $_, 'READY'); } } my %running = (); my $max = $BEX::Config::max_parallel_jobs; while (keys %running || @machines) { if (@machines && keys %running < $max) { my $mach = shift @machines; $ui->update($mach, undef, 'START'); my @scr = ('screen', '-t', $mach); push @scr, '-S', $screen_session if defined $screen_session; push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach; system @scr; !$? or $ui->update($mach, undef, 'INTERR'); $running{$mach} = 'START'; next; } $_ = ; chomp; my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; if (!defined $stat) { $ui->err("Received invalid status message <$_>"); next; } if (!defined $running{$mach}) { $ui->err("Received status message <$_> for a machine which does not run"); next; } $running{$mach} = $stat; $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); if ($stat eq 'DONE') { delete $running{$mach}; } } close FIFO; unlink $fifo_name; $ui->done; package BEX::bprun::text; sub new($) { return bless {}; } sub done($) { } sub update($$$$) { my ($ui, $mach, $jid, $stat) = @_; print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; } sub err($$) { my ($ui, $msg) = @_; print STDERR "ERROR: $msg\n"; } package BEX::bprun::curses; use Curses; my $C; my $nrows; my @by_row = (); my %by_host = (); my %host_state; my %host_cnt; my %job_state; my %job_cnt; my %host_last_fail_job; my %host_last_fail_stat; sub new($) { $C = new Curses; start_color; has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; cbreak; noecho; $C->intrflush(0); $C->keypad(1); $C->meta(1); $C->clear; init_pair(1, COLOR_YELLOW, COLOR_BLUE); init_pair(2, COLOR_YELLOW, COLOR_RED); init_pair(3, COLOR_YELLOW, COLOR_BLACK); init_pair(4, COLOR_RED, COLOR_BLACK); $nrows = $C->getmaxy - 2; if ($BEX::Config::max_parallel_jobs > $nrows) { $BEX::Config::max_parallel_jobs = $nrows; } %host_state = %host_cnt = (); %job_state = %job_cnt = (); for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $host_cnt{$s} = 0; $job_cnt{'*'}{$s} = 0; } my $ui = bless {}; $ui->refresh_status; return $ui; } sub done($) { $C->bkgdset(COLOR_PAIR(1) | A_BOLD); $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); $C->clrtoeol; $C->getch; endwin; } sub err($$) { my ($ui, $msg) = @_; $C->bkgdset(COLOR_PAIR(2) | A_BOLD); $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); $C->clrtoeol; $C->refresh; } sub set_host_status($$$) { my ($ui, $mach, $stat) = @_; my $prev_stat = $host_state{$mach}; if (defined $prev_stat) { $host_cnt{$prev_stat}--; } else { for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; } } $host_state{$mach} = $stat; $host_cnt{$stat}++; } sub set_job_status($$$$) { my ($ui, $mach, $jid, $stat) = @_; my $prev_stat = $job_state{$mach}{$jid} // 'unknown'; $job_cnt{$mach}{$prev_stat}--; $job_cnt{'*'}{$prev_stat}--; $job_state{$mach}{$jid} = $stat; $job_cnt{$mach}{$stat}++; $job_cnt{'*'}{$stat}++; } sub refresh_status($) { $C->bkgdset(COLOR_PAIR(1) | A_BOLD); $C->addnstr(0, 0, sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW", $host_cnt{'running'}, $host_cnt{'done'}, $host_cnt{'failed'}, $host_cnt{'ready'}, $job_cnt{'*'}{'running'}, $job_cnt{'*'}{'done'}, $job_cnt{'*'}{'failed'}, $job_cnt{'*'}{'ready'}, ), $C->getmaxx); $C->clrtoeol; $C->refresh; } sub get_slot($) { my ($mach) = @_; my $s; if (defined ($s = $by_host{$mach})) { delete $s->{'Gone'}; } else { my ($best, $besti); for my $i (0..$nrows-1) { my $r = $by_row[$i]; if (!defined $r) { $besti = $i; $best = undef; last; } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) { $besti = $i; $best = $r; } } if ($best) { delete $by_host{$best->{'Host'}}; } $s->{'Host'} = $mach; $s->{'Row'} = $besti; $by_host{$mach} = $s; $by_row[$besti] = $s; } return $s; } my $gone_counter = 1; sub delete_slot($) { my ($s) = @_; $s->{'Gone'} = $gone_counter++; } sub redraw_slot($) { my ($s) = @_; my $mach = $s->{'Host'}; my $stat = $s->{'Status'} // "?"; my $jid = $s->{'Job'} // ""; my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); my $jcnt = $job_cnt{$mach}; if ($jcnt->{'running'}) { if ($jcnt->{'failed'}) { $C->bkgdset(COLOR_PAIR(4) | A_BOLD); } else { $C->bkgdset(COLOR_PAIR(3) | A_BOLD); } } else { if ($jcnt->{'failed'}) { $C->bkgdset(COLOR_PAIR(4)); } else { $C->bkgdset(0); } } my $r = $s->{'Row'} + 1; $C->addstr($r, 0, sprintf("%-20.20s", $mach)); if ($jcnt->{'failed'}) { $C->bkgdset(COLOR_PAIR(4)); $C->addstr(sprintf("%3dE ", $jcnt->{'failed'})); } else { $C->bkgdset(0); $C->addstr(" "); } $C->bkgdset(0); $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'})); if ($stat eq 'DONE') { if (defined $host_last_fail_stat{$mach}) { $C->bkgdset(COLOR_PAIR(4)); $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach}))); } } else { my $text = sprintf(" %-8s %s", $stat, $jname); $C->addstr($text); } $C->clrtoeol; $C->refresh; } sub update($$$$) { my ($ui, $mach, $jid, $stat) = @_; my $s = get_slot($mach); given ($stat) { when ('READY') { # Pseudo-state generated internally $ui->set_host_status($mach, 'ready'); $ui->set_job_status($mach, $jid, 'ready'); } when ('OK') { $ui->set_job_status($mach, $jid, 'done'); } when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) { $ui->set_job_status($mach, $jid, 'failed'); $host_last_fail_job{$mach} = $jid; $host_last_fail_stat{$mach} = $stat; } when ('DONE') { if ($job_cnt{$mach}{'failed'}) { $ui->set_host_status($mach, 'failed'); } else { $ui->set_host_status($mach, 'done'); } } when ('INIT') { $ui->set_host_status($mach, 'running'); $ui->set_job_status($mach, $jid, 'running') if defined $jid; } when ('LOCKED') { if (defined $jid) { $ui->set_job_status($mach, $jid, 'failed'); } else { for my $j (keys %{$job_state{$mach}}) { $ui->set_job_status($mach, $jid, 'failed'); } $ui->set_host_status($mach, 'failed'); $host_last_fail_job{$mach} = $jid; $host_last_fail_stat{$mach} = $stat; } } when (['START', 'PING', 'SEND', 'RUN']) { } default { $ui->err("Received unknown job status $stat"); } } $s->{'Job'} = $jid; $s->{'Status'} = $stat; redraw_slot($s); if ($stat eq 'DONE') { delete_slot($s); } $ui->refresh_status; }