From: Martin Mares Date: Wed, 15 Feb 2012 22:57:29 +0000 (+0100) Subject: Renamed all subcommands to bex- X-Git-Tag: v3.0~13 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=664cc9558c223997c82e1227da6414b1e9404648;p=bex.git Renamed all subcommands to bex- This way, they are well visible in `ps'. --- diff --git a/bex b/bex index 860dce9..0fe8495 100755 --- a/bex +++ b/bex @@ -60,7 +60,7 @@ my %aliases = ( ); if (defined $aliases{$sub}) { $sub = $aliases{$sub}; } -my $sub_path = "$bex_lib/bin/$sub"; +my $sub_path = "$bex_lib/bin/bex-$sub"; -x $sub_path or die "Unknown subcommand $sub\n"; $ENV{"BEX_HOME"} = $bex_home; diff --git a/lib/bin/add b/lib/bin/add deleted file mode 100755 index 64945f9..0000000 --- a/lib/bin/add +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- Insert to Queue -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use File::stat; -use BEX; - -my $given_body; -my $given_go; -my $given_id; -my $queue_name; -my $requeue_id; -my $given_subject; -my $given_template; - -sub usage() { - print <] [!] ... - -Options: --b, --body= Load job body from the given file --g, --go Do not run editor, go enqueue the job immediately --i, --id= Set job ID of the new job --q, --queue= Insert new jobs to the given queue --r, --requeue= Re-queue an existing job instead of creating a new one --s, --subject= Set subject of the new job --t, --template= Load job template (headers and body) from the given file -AMEN - exit 0; -} - -GetOptions( - "b|body=s" => \$given_body, - "g|go!" => \$given_go, - "i|id=s" => \$given_id, - "q|queue=s" => \$queue_name, - "r|requeue=s" => \$requeue_id, - "s|subject=s" => \$given_subject, - "t|template=s" => \$given_template, - "help" => \&usage, -) or die "Try `bex add --help' for more information.\n"; - -# Prepare machine set -@ARGV or die "No machines specified\n"; -my @machines = BEX::Config::parse_machine_list(@ARGV); -@machines or die "No machines match\n"; - -my $queue = BEX::Queue->new($queue_name); -my $job; -my $tmp_fn; - -if (defined $requeue_id) { - # When requeueing, just fetch the existing job - if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) { - die "Parameters of a requeued job cannot be changed\n"; - } - my $fn = $queue->job_file($requeue_id); - -f $fn or die "Job $requeue_id not known\n"; - $job = BEX::Job->new_from_file($fn); -} else { - # Create job template - if (defined $given_template) { - $job = BEX::Job->new_from_file($given_template); - } else { - $job = BEX::Job->new; - } - $job->attr('ID', $given_id) if defined $given_id; - $job->attr('Subject', $given_subject) if defined $given_subject; - if (defined $given_body) { - open B, '<', $given_body or die "Cannot open $given_body: $!\n"; - local $/; - $job->attr('body', ); - close B; - } - - # Let the user edit the template - if (!$given_go) { - $tmp_fn = $job->save; - my $orig_stat = stat($tmp_fn) or die; - system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n"; - my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n"; - if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) { - unlink $tmp_fn; - die "Cancelled\n"; - } - $job = BEX::Job->new_from_file($tmp_fn); - } -} - -# Put the job to the queue -print "New job ", $job->id, "\n"; -for my $m (@machines) { - if ($queue->enqueue($m, $job)) { - $queue->update_job_status($m, $job->id, 'NEW'); - print "\t$m\n"; - } else { - $queue->log($m, $job->id, 'REQUEUE'); - print "\t$m (already queued)\n"; - } -} - -# Remove the temporary file if there's any -unlink $tmp_fn if defined $tmp_fn; diff --git a/lib/bin/bex-add b/lib/bin/bex-add new file mode 100755 index 0000000..64945f9 --- /dev/null +++ b/lib/bin/bex-add @@ -0,0 +1,106 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use File::stat; +use BEX; + +my $given_body; +my $given_go; +my $given_id; +my $queue_name; +my $requeue_id; +my $given_subject; +my $given_template; + +sub usage() { + print <] [!] ... + +Options: +-b, --body= Load job body from the given file +-g, --go Do not run editor, go enqueue the job immediately +-i, --id= Set job ID of the new job +-q, --queue= Insert new jobs to the given queue +-r, --requeue= Re-queue an existing job instead of creating a new one +-s, --subject= Set subject of the new job +-t, --template= Load job template (headers and body) from the given file +AMEN + exit 0; +} + +GetOptions( + "b|body=s" => \$given_body, + "g|go!" => \$given_go, + "i|id=s" => \$given_id, + "q|queue=s" => \$queue_name, + "r|requeue=s" => \$requeue_id, + "s|subject=s" => \$given_subject, + "t|template=s" => \$given_template, + "help" => \&usage, +) or die "Try `bex add --help' for more information.\n"; + +# Prepare machine set +@ARGV or die "No machines specified\n"; +my @machines = BEX::Config::parse_machine_list(@ARGV); +@machines or die "No machines match\n"; + +my $queue = BEX::Queue->new($queue_name); +my $job; +my $tmp_fn; + +if (defined $requeue_id) { + # When requeueing, just fetch the existing job + if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) { + die "Parameters of a requeued job cannot be changed\n"; + } + my $fn = $queue->job_file($requeue_id); + -f $fn or die "Job $requeue_id not known\n"; + $job = BEX::Job->new_from_file($fn); +} else { + # Create job template + if (defined $given_template) { + $job = BEX::Job->new_from_file($given_template); + } else { + $job = BEX::Job->new; + } + $job->attr('ID', $given_id) if defined $given_id; + $job->attr('Subject', $given_subject) if defined $given_subject; + if (defined $given_body) { + open B, '<', $given_body or die "Cannot open $given_body: $!\n"; + local $/; + $job->attr('body', ); + close B; + } + + # Let the user edit the template + if (!$given_go) { + $tmp_fn = $job->save; + my $orig_stat = stat($tmp_fn) or die; + system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n"; + my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n"; + if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) { + unlink $tmp_fn; + die "Cancelled\n"; + } + $job = BEX::Job->new_from_file($tmp_fn); + } +} + +# Put the job to the queue +print "New job ", $job->id, "\n"; +for my $m (@machines) { + if ($queue->enqueue($m, $job)) { + $queue->update_job_status($m, $job->id, 'NEW'); + print "\t$m\n"; + } else { + $queue->log($m, $job->id, 'REQUEUE'); + print "\t$m (already queued)\n"; + } +} + +# Remove the temporary file if there's any +unlink $tmp_fn if defined $tmp_fn; diff --git a/lib/bin/bex-job b/lib/bin/bex-job new file mode 100755 index 0000000..48f98a8 --- /dev/null +++ b/lib/bin/bex-job @@ -0,0 +1,38 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Operations on a Job +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +my $edit; +my $queue_name; + +sub usage() { + print <] + +Options: +-e, --edit Run editor on the given job (no locking) +-q, --queue= Act on the given queue +AMEN + exit 0; +} + +GetOptions( + "e|edit!" => \$edit, + "q|queue=s" => \$queue_name, + "help" => \&usage, +) && @ARGV == 1 or die "Try `bex job --help' for more information.\n"; + +my $queue = BEX::Queue->new($queue_name); +my $fn = $queue->job_file($ARGV[0]); +-f $fn or die "No such job " . $ARGV[0] . "\n"; + +if ($edit) { + system "editor", $fn; +} else { + system "cat", $fn; +} diff --git a/lib/bin/bex-ls b/lib/bin/bex-ls new file mode 100755 index 0000000..a575983 --- /dev/null +++ b/lib/bin/bex-ls @@ -0,0 +1,162 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Show Queued Jobs +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use POSIX; +use BEX; + +my $op_by_job; +my $op_by_host; +my $op_rm; +my $op_move_to; + +my $queue_name; +my $given_job; + +sub usage() { + print <] [[!] ...] + +Actions: + --by-job Show jobs sorted by job ID (default) +-h, --by-host Show jobs sorted by host + --rm Remove jobs from the queue + --move-to= Move jobs to a different queue + +Options: +-j, --job= Act on the specified job (default: on all) +-q, --queue= Act on the given queue +AMEN + exit 0; +} + +GetOptions( + "by-job!" => \$op_by_job, + "h|by-host!" => \$op_by_host, + "rm!" => \$op_rm, + "move-to=s" => \$op_move_to, + "j|job=s" => \$given_job, + "q|queue=s" => \$queue_name, + "help" => \&usage, +) or die "Try `bex ls --help' for more information.\n"; + +my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); +my $queue = BEX::Queue->new($queue_name); + +# Select jobs +my %jobs = (); +my %machs = (); +for my $m (@machines) { + for my $j ($queue->scan($m)) { + if (defined $given_job) { + next if $j ne $given_job; + } + push @{$jobs{$j}}, $m; + push @{$machs{$m}}, $j; + } +} + +sub do_ls(); +sub do_rm(); +sub do_move_to(); + +my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to); +if ($ops > 1) { die "Multiple actions are not allowed\n"; } + +if ($op_rm) { do_rm(); } +elsif (defined $op_move_to) { do_move_to(); } +else { do_ls(); } +exit 0; + +sub do_ls() +{ + my %stat = (); + my %mach_locked = (); + for my $m (keys %machs) { + $mach_locked{$m} = $queue->is_locked($m, undef); + for my $j (@{$machs{$m}}) { + my $st = $queue->read_job_status($m, $j); + if (defined($st->{'Time'}) && defined($st->{'Status'})) { + $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . + POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; + } else { + $stat{$m}{$j} = ''; + } + if ($mach_locked{$m} || $queue->is_locked($m, $j)) { + $stat{$m}{$j} .= ' [LOCKED]'; + } + } + } + + if ($queue->is_locked(undef, undef)) { + print "### Queue lock present\n\n"; + } + + if ($op_by_host) { + for my $m (sort keys %machs) { + print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; + for my $j (@{$machs{$m}}) { + print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n"; + } + } + } else { + for my $j (sort keys %jobs) { + print $queue->job_name($j), "\n"; + for my $m (sort @{$jobs{$j}}) { + print "\t$m", $stat{$m}{$j}, "\n"; + } + } + } +} + +sub do_rm() +{ + my $err = 0; + for my $m (sort keys %machs) { + for my $j (sort @{$machs{$m}}) { + if (!$queue->lock($m, $j)) { + print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n"; + $err = 1; + } else { + $queue->update_job_status($m, $j, 'REMOVED'); + $queue->remove($m, $j); + print "Removed $m:", $queue->job_name($j), "\n"; + } + } + } + $queue->unlock; + exit $err; +} + +sub do_move_to() +{ + my $err = 0; + my $dest = BEX::Queue->new($op_move_to); + $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n"; + for my $j (sort keys %jobs) { + my $job = BEX::Job->new_from_file($queue->job_file($j)); + for my $m (sort @{$jobs{$j}}) { + if (!$queue->lock($m, $j)) { + print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n"; + $err = 1; + } else { + my $enq = $dest->enqueue($m, $job); + if ($enq) { + $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue'); + } else { + $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue'); + } + $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue'); + $queue->remove($m, $j); + print "Moved $m:", $dest->job_name($j); + print " (already queued)" if !$enq; + print "\n"; + } + } + } + $queue->unlock; + exit $err; +} diff --git a/lib/bin/bex-mach b/lib/bin/bex-mach new file mode 100755 index 0000000..e63f180 --- /dev/null +++ b/lib/bin/bex-mach @@ -0,0 +1,48 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- List Machines and Groups +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +my $edit; +my $queue_name; + +sub usage() { + print <] + +Options: +None defined so far. +AMEN + exit 0; +} + +GetOptions( + "help" => \&usage, +) && @ARGV == 0 or die "Try `bex mach --help' for more information.\n"; + +my $machines = \%BEX::Config::machines; + +print "# Hosts:\n"; +for my $h (sort keys %$machines) { + my $m = $machines->{$h}; + ref $m eq 'HASH' or next; + print "$h\n"; +} + +print "\n# Groups:\n"; +for my $h (sort keys %$machines) { + my $m = $machines->{$h}; + ref $m eq 'ARRAY' or next; + print "$h = ", join(" ", + map { + my $x = $machines->{$_}; + !defined($x) ? "$_?" : + ref $x eq 'HASH' ? $_ : + ref $x eq 'ARRAY' ? "\@$_" : + "$_???" + } @$m), "\n"; +} diff --git a/lib/bin/bex-prun b/lib/bin/bex-prun new file mode 100755 index 0000000..4900ef2 --- /dev/null +++ b/lib/bin/bex-prun @@ -0,0 +1,347 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Parallel Execution Using Screen +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use feature 'switch'; +use Getopt::Long; +use POSIX; +use BEX; + +my $queue_name; +my $screen_session = 'BEX'; +my $text_mode; + +sub usage() { + print <] [[!] ...] + +Options: +-q, --queue= Run jobs in the given queue + --session= Job windows should be opened within the given screen + session (default: BEX) + --text Use textual user interface instead of curses +AMEN + exit 0; +} + +GetOptions( + "q|queue=s" => \$queue_name, + "session=s" => \$screen_session, + "text!" => \$text_mode, + "help" => \&usage, +) or die "Try `bex prun --help' for more information.\n"; + +system 'screen', '-S', $screen_session, '-X', 'select', '.'; +!$? or die "Screen session $screen_session not found\n"; + +my $queue = BEX::Queue->new($queue_name); +my $fifo_name = $queue->{'Name'} . '/status-fifo'; +unlink $fifo_name; +mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; +open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; + +my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); + +my @machines = (); +for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { + my @jobs = $queue->scan($mach); + @jobs or next; + push @machines, $mach; + for (@jobs) { $ui->update($mach, $_, 'READY'); } +} + +my %running = (); +my $max = $BEX::Config::max_parallel_jobs; + +while (keys %running || @machines) { + if (@machines && keys %running < $max) { + my $mach = shift @machines; + $ui->update($mach, undef, 'START'); + my @scr = ('screen', '-t', $mach); + push @scr, '-S', $screen_session if defined $screen_session; + push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach; + system @scr; + !$? or $ui->update($mach, undef, 'INTERR'); + $running{$mach} = 'START'; + next; + } + $_ = ; + chomp; + my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; + if (!defined $stat) { + $ui->err("Received invalid status message <$_>"); + next; + } + if (!defined $running{$mach}) { + $ui->err("Received status message <$_> for a machine which does not run"); + next; + } + $running{$mach} = $stat; + $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); + if ($stat eq 'DONE') { + delete $running{$mach}; + } +} + +close FIFO; +unlink $fifo_name; +$ui->done; + +package BEX::bprun::text; + +sub new($) { + return bless {}; +} + +sub done($) { +} + +sub update($$$$) { + my ($ui, $mach, $jid, $stat) = @_; + print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; +} + +sub err($$) { + my ($ui, $msg) = @_; + print STDERR "ERROR: $msg\n"; +} + +package BEX::bprun::curses; + +use Curses; + +my $C; + +my $nrows; +my @by_row = (); +my %by_host = (); + +my %host_state; +my %host_cnt; + +my %job_state; +my %job_cnt; + +my %host_last_fail_job; +my %host_last_fail_stat; + +sub new($) { + $C = new Curses; + start_color; + has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; + cbreak; noecho; + $C->intrflush(0); + $C->keypad(1); + $C->meta(1); + $C->clear; + init_pair(1, COLOR_YELLOW, COLOR_BLUE); + init_pair(2, COLOR_YELLOW, COLOR_RED); + init_pair(3, COLOR_YELLOW, COLOR_BLACK); + init_pair(4, COLOR_RED, COLOR_BLACK); + + $nrows = $C->getmaxy - 2; + if ($BEX::Config::max_parallel_jobs > $nrows) { + $BEX::Config::max_parallel_jobs = $nrows; + } + + %host_state = %host_cnt = (); + %job_state = %job_cnt = (); + for my $s ('unknown', 'ready', 'running', 'done', 'failed') { + $host_cnt{$s} = 0; + $job_cnt{'*'}{$s} = 0; + } + + my $ui = bless {}; + $ui->refresh_status; + return $ui; +} + +sub done($) +{ + $C->bkgdset(COLOR_PAIR(1) | A_BOLD); + $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); + $C->clrtoeol; + $C->getch; + endwin; +} + +sub err($$) { + my ($ui, $msg) = @_; + $C->bkgdset(COLOR_PAIR(2) | A_BOLD); + $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); + $C->clrtoeol; + $C->refresh; +} + +sub set_host_status($$$) { + my ($ui, $mach, $stat) = @_; + my $prev_stat = $host_state{$mach}; + if (defined $prev_stat) { + $host_cnt{$prev_stat}--; + } else { + for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; } + } + $host_state{$mach} = $stat; + $host_cnt{$stat}++; +} + +sub set_job_status($$$$) { + my ($ui, $mach, $jid, $stat) = @_; + my $prev_stat = $job_state{$mach}{$jid} // 'unknown'; + $job_cnt{$mach}{$prev_stat}--; + $job_cnt{'*'}{$prev_stat}--; + $job_state{$mach}{$jid} = $stat; + $job_cnt{$mach}{$stat}++; + $job_cnt{'*'}{$stat}++; +} + +sub refresh_status($) { + $C->bkgdset(COLOR_PAIR(1) | A_BOLD); + $C->addnstr(0, 0, + sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW", + $host_cnt{'running'}, + $host_cnt{'done'}, + $host_cnt{'failed'}, + $host_cnt{'ready'}, + $job_cnt{'*'}{'running'}, + $job_cnt{'*'}{'done'}, + $job_cnt{'*'}{'failed'}, + $job_cnt{'*'}{'ready'}, + ), $C->getmaxx); + $C->clrtoeol; + $C->refresh; +} + +sub get_slot($) { + my ($mach) = @_; + my $s; + if (defined ($s = $by_host{$mach})) { + delete $s->{'Gone'}; + } else { + my ($best, $besti); + for my $i (0..$nrows-1) { + my $r = $by_row[$i]; + if (!defined $r) { + $besti = $i; + $best = undef; + last; + } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) { + $besti = $i; + $best = $r; + } + } + if ($best) { + delete $by_host{$best->{'Host'}}; + } + $s->{'Host'} = $mach; + $s->{'Row'} = $besti; + $by_host{$mach} = $s; + $by_row[$besti] = $s; + } + return $s; +} + +my $gone_counter = 1; +sub delete_slot($) { + my ($s) = @_; + $s->{'Gone'} = $gone_counter++; +} + +sub redraw_slot($) { + my ($s) = @_; + my $mach = $s->{'Host'}; + my $stat = $s->{'Status'} // "?"; + my $jid = $s->{'Job'} // ""; + my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); + my $jcnt = $job_cnt{$mach}; + if ($jcnt->{'running'}) { + if ($jcnt->{'failed'}) { + $C->bkgdset(COLOR_PAIR(4) | A_BOLD); + } else { + $C->bkgdset(COLOR_PAIR(3) | A_BOLD); + } + } else { + if ($jcnt->{'failed'}) { + $C->bkgdset(COLOR_PAIR(4)); + } else { + $C->bkgdset(0); + } + } + my $r = $s->{'Row'} + 1; + $C->addstr($r, 0, sprintf("%-20.20s", $mach)); + if ($jcnt->{'failed'}) { + $C->bkgdset(COLOR_PAIR(4)); + $C->addstr(sprintf("%3dE ", $jcnt->{'failed'})); + } else { + $C->bkgdset(0); + $C->addstr(" "); + } + $C->bkgdset(0); + $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'})); + if ($stat eq 'DONE') { + if (defined $host_last_fail_stat{$mach}) { + $C->bkgdset(COLOR_PAIR(4)); + $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach}))); + } + } else { + my $text = sprintf(" %-8s %s", $stat, $jname); + $C->addstr($text); + } + $C->clrtoeol; + $C->refresh; +} + +sub update($$$$) { + my ($ui, $mach, $jid, $stat) = @_; + my $s = get_slot($mach); + given ($stat) { + when ('READY') { + # Pseudo-state generated internally + $ui->set_host_status($mach, 'ready'); + $ui->set_job_status($mach, $jid, 'ready'); + } + when ('OK') { + $ui->set_job_status($mach, $jid, 'done'); + } + when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) { + $ui->set_job_status($mach, $jid, 'failed'); + $host_last_fail_job{$mach} = $jid; + $host_last_fail_stat{$mach} = $stat; + } + when ('DONE') { + if ($job_cnt{$mach}{'failed'}) { + $ui->set_host_status($mach, 'failed'); + } else { + $ui->set_host_status($mach, 'done'); + } + } + when ('INIT') { + $ui->set_host_status($mach, 'running'); + $ui->set_job_status($mach, $jid, 'running') if defined $jid; + } + when ('LOCKED') { + if (defined $jid) { + $ui->set_job_status($mach, $jid, 'failed'); + } else { + for my $j (keys %{$job_state{$mach}}) { + $ui->set_job_status($mach, $jid, 'failed'); + } + $ui->set_host_status($mach, 'failed'); + $host_last_fail_job{$mach} = $jid; + $host_last_fail_stat{$mach} = $stat; + } + } + when (['START', 'PING', 'SEND', 'RUN']) { + } + default { + $ui->err("Received unknown job status $stat"); + } + } + $s->{'Job'} = $jid; + $s->{'Status'} = $stat; + redraw_slot($s); + if ($stat eq 'DONE') { delete_slot($s); } + $ui->refresh_status; +} diff --git a/lib/bin/bex-queue b/lib/bin/bex-queue new file mode 100755 index 0000000..aa90755 --- /dev/null +++ b/lib/bin/bex-queue @@ -0,0 +1,53 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Operations on Queues +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +my $init; + +sub usage() { + print <] + +Subcommands: +init Create a new queue +ls List all queues + +Options: +None defined so far. +AMEN + exit 0; +} + +GetOptions( + "init!" => \$init, + "help" => \&usage, +) or die "Try `bex queue --help' for more information.\n"; + +my $op = shift @ARGV // 'ls'; + +if ($op eq 'init') { + my $queue_name = shift @ARGV or die "bex queue init requires a queue name\n"; + my $path = $BEX::Config::home . '/' . $queue_name; + -d $path and die "Queue directory $path already exists\n"; + mkdir $path; + mkdir "$path/hosts"; + mkdir "$path/jobs"; + print "Queue $queue_name initialized.\n"; +} elsif ($op eq 'ls' && @ARGV == 0) { + opendir D, $BEX::Config::home or die "Cannot read BEX home directory\n"; + for my $q (sort readdir D) { + next if $q =~ /^\./; + my $d = $BEX::Config::home . '/' . $q; + if (-d $d && -d "$d/hosts" && -d "$d/jobs") { + print "$q\n"; + } + } + closedir D; +} else { + die "Invalid subcommand\n"; +} diff --git a/lib/bin/bex-run b/lib/bin/bex-run new file mode 100755 index 0000000..66b9efc --- /dev/null +++ b/lib/bin/bex-run @@ -0,0 +1,198 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Run Queued Jobs +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +sub usage() { + print <] [[!] ...] + +Options: +-j, --job= Run only the specified job +-q, --queue= Select job queue + --status-fifo= Send status updates to the given named pipe +AMEN + exit 0; +} + +my $given_job; +my $queue_name; +my $status_fifo; + +GetOptions( + "j|job=s" => \$given_job, + "q|queue=s" => \$queue_name, + "s|status-fifo=s" => \$status_fifo, + "help" => \&usage, +) or die "Try `bex run --help' for more information.\n"; + +my $status_fd; +if (defined $status_fifo) { + open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; + autoflush $status_fd, 1; +} + +sub update_status($$$$;$) { + my ($mach, $job, $status, $log_on_queue, $msg) = @_; + if ($status_fd) { + print $status_fd "! $mach $job $status\n"; + } + if ($log_on_queue) { + $log_on_queue->update_job_status($mach, $job, $status, $msg); + } +} + +my %pings; + +sub ping_machine($) { + my ($mach) = @_; + if (!defined $pings{$mach}) { + if ($BEX::Config::ping_hosts) { + update_status($mach, '-', 'PING', undef); + my $host = BEX::Config::host_name($mach); + `ping -c1 -n $host >/dev/null 2>/dev/null`; + $pings{$mach} = !$?; + } else { + $pings{$mach} = 1; + } + } + if ($pings{$mach}) { + return ('OK', undef); + } else { + return ('NOPING', 'Does not ping'); + } +} + +sub exit_status($) { + my ($s) = @_; + if ($s >> 8) { + return "with exit code " . ($s >> 8); + } else { + return "on fatal signal " . ($s & 127); + } +} + +sub run_job_prep($$$) { + my ($job, $queue, $mach) = @_; + my $prep = $job->attr('Prep'); + defined($prep) && $prep !~ /^\s*$/ or return 'OK'; + + my $jid = $job->id; + update_status($mach, $jid, 'PREP', $queue); + my $lf = $queue->log_file($mach, $jid); + $ENV{'HOST'} = BEX::Config::host_name($mach); + system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf"; + delete $ENV{'HOST'}; + if ($?) { + return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?)); + } else { + return 'OK'; + } +} + +sub run_job_body($$$) { + my ($job, $queue, $mach) = @_; + + if ($job->attr('body') =~ /^\s*$/s) { + # Shortcut if the body is empty + return 'OK' + } + + my $host = BEX::Config::host_name($mach); + my $jid = $job->id; + + my $tmp = $queue->temp_file($mach, $jid); + open T, '>', $tmp or die; + if (defined $BEX::Config::job_prolog) { + open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); + while (

) { print T; } + close P; + } else { + print T "#!/bin/sh\n"; + } + print T "# BEX job ", $jid, "\n"; + print T $job->attr('body'); + if (defined $BEX::Config::job_epilog) { + open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); + while () { print T; } + close E; + } + close T; + + update_status($mach, $jid, 'SEND', undef); + my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; + my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`; + !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); + chomp $rtmp; + + update_status($mach, $jid, 'RUN', $queue); + my $lf = $queue->log_file($mach, $jid); + system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf"; + if ($?) { + return ('FAILED', 'Job failed ' . exit_status($?)); + } else { + return 'OK'; + } +} + +sub run_job($$$) { + my ($job, $queue, $mach) = @_; + my ($stat, $msg); + + ($stat, $msg) = ping_machine($mach); + $stat eq 'OK' or return ($stat, $msg); + + ($stat, $msg) = run_job_prep($job, $queue, $mach); + $stat eq 'OK' or return ($stat, $msg); + + return run_job_body($job, $queue, $mach); +} + +my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); +my $queue = BEX::Queue->new($queue_name); + +$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n"; + +for my $mach (@machines) { + my @q = $queue->scan($mach) or next; + if (!$queue->lock($mach, undef)) { + print "### Machine $mach is locked by another brun, skipping...\n"; + update_status($mach, '-', 'LOCKED', undef); + update_status($mach, '-', 'DONE', undef); + next; + } + update_status($mach, '-', 'INIT', undef); + while (my $jid = shift @q) { + if (defined $given_job) { + $jid eq $given_job or next; + } + my $job = BEX::Job->new_from_file($queue->job_file($jid)); + update_status($mach, $jid, 'INIT', undef); + if (!$queue->lock($mach, $jid)) { + print "### Skipping locked $jid on $mach ###\n"; + update_status($mach, $jid, 'LOCKED', undef); + next; + } + print "### Running ", $job->name, " on $mach ###\n"; + my ($s, $msg) = run_job($job, $queue, $mach); + update_status($mach, $jid, $s, $queue, $msg); + + if ($s eq 'OK') { + print "+++ OK\n"; + $queue->remove($mach, $jid); + } else { + print "--- $s: $msg\n"; + if ($BEX::Config::skip_on_fail) { + print "### Skipping other jobs on the same host ###\n" if @q; + last; + } + } + } +} continue { + update_status($mach, '-', 'DONE', undef); +} +$queue->unlock; diff --git a/lib/bin/job b/lib/bin/job deleted file mode 100755 index 48f98a8..0000000 --- a/lib/bin/job +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- Operations on a Job -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use BEX; - -my $edit; -my $queue_name; - -sub usage() { - print <] - -Options: --e, --edit Run editor on the given job (no locking) --q, --queue= Act on the given queue -AMEN - exit 0; -} - -GetOptions( - "e|edit!" => \$edit, - "q|queue=s" => \$queue_name, - "help" => \&usage, -) && @ARGV == 1 or die "Try `bex job --help' for more information.\n"; - -my $queue = BEX::Queue->new($queue_name); -my $fn = $queue->job_file($ARGV[0]); --f $fn or die "No such job " . $ARGV[0] . "\n"; - -if ($edit) { - system "editor", $fn; -} else { - system "cat", $fn; -} diff --git a/lib/bin/ls b/lib/bin/ls deleted file mode 100755 index a575983..0000000 --- a/lib/bin/ls +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- Show Queued Jobs -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use POSIX; -use BEX; - -my $op_by_job; -my $op_by_host; -my $op_rm; -my $op_move_to; - -my $queue_name; -my $given_job; - -sub usage() { - print <] [[!] ...] - -Actions: - --by-job Show jobs sorted by job ID (default) --h, --by-host Show jobs sorted by host - --rm Remove jobs from the queue - --move-to= Move jobs to a different queue - -Options: --j, --job= Act on the specified job (default: on all) --q, --queue= Act on the given queue -AMEN - exit 0; -} - -GetOptions( - "by-job!" => \$op_by_job, - "h|by-host!" => \$op_by_host, - "rm!" => \$op_rm, - "move-to=s" => \$op_move_to, - "j|job=s" => \$given_job, - "q|queue=s" => \$queue_name, - "help" => \&usage, -) or die "Try `bex ls --help' for more information.\n"; - -my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); -my $queue = BEX::Queue->new($queue_name); - -# Select jobs -my %jobs = (); -my %machs = (); -for my $m (@machines) { - for my $j ($queue->scan($m)) { - if (defined $given_job) { - next if $j ne $given_job; - } - push @{$jobs{$j}}, $m; - push @{$machs{$m}}, $j; - } -} - -sub do_ls(); -sub do_rm(); -sub do_move_to(); - -my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to); -if ($ops > 1) { die "Multiple actions are not allowed\n"; } - -if ($op_rm) { do_rm(); } -elsif (defined $op_move_to) { do_move_to(); } -else { do_ls(); } -exit 0; - -sub do_ls() -{ - my %stat = (); - my %mach_locked = (); - for my $m (keys %machs) { - $mach_locked{$m} = $queue->is_locked($m, undef); - for my $j (@{$machs{$m}}) { - my $st = $queue->read_job_status($m, $j); - if (defined($st->{'Time'}) && defined($st->{'Status'})) { - $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . - POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; - } else { - $stat{$m}{$j} = ''; - } - if ($mach_locked{$m} || $queue->is_locked($m, $j)) { - $stat{$m}{$j} .= ' [LOCKED]'; - } - } - } - - if ($queue->is_locked(undef, undef)) { - print "### Queue lock present\n\n"; - } - - if ($op_by_host) { - for my $m (sort keys %machs) { - print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; - for my $j (@{$machs{$m}}) { - print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n"; - } - } - } else { - for my $j (sort keys %jobs) { - print $queue->job_name($j), "\n"; - for my $m (sort @{$jobs{$j}}) { - print "\t$m", $stat{$m}{$j}, "\n"; - } - } - } -} - -sub do_rm() -{ - my $err = 0; - for my $m (sort keys %machs) { - for my $j (sort @{$machs{$m}}) { - if (!$queue->lock($m, $j)) { - print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n"; - $err = 1; - } else { - $queue->update_job_status($m, $j, 'REMOVED'); - $queue->remove($m, $j); - print "Removed $m:", $queue->job_name($j), "\n"; - } - } - } - $queue->unlock; - exit $err; -} - -sub do_move_to() -{ - my $err = 0; - my $dest = BEX::Queue->new($op_move_to); - $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n"; - for my $j (sort keys %jobs) { - my $job = BEX::Job->new_from_file($queue->job_file($j)); - for my $m (sort @{$jobs{$j}}) { - if (!$queue->lock($m, $j)) { - print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n"; - $err = 1; - } else { - my $enq = $dest->enqueue($m, $job); - if ($enq) { - $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue'); - } else { - $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue'); - } - $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue'); - $queue->remove($m, $j); - print "Moved $m:", $dest->job_name($j); - print " (already queued)" if !$enq; - print "\n"; - } - } - } - $queue->unlock; - exit $err; -} diff --git a/lib/bin/mach b/lib/bin/mach deleted file mode 100755 index e63f180..0000000 --- a/lib/bin/mach +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- List Machines and Groups -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use BEX; - -my $edit; -my $queue_name; - -sub usage() { - print <] - -Options: -None defined so far. -AMEN - exit 0; -} - -GetOptions( - "help" => \&usage, -) && @ARGV == 0 or die "Try `bex mach --help' for more information.\n"; - -my $machines = \%BEX::Config::machines; - -print "# Hosts:\n"; -for my $h (sort keys %$machines) { - my $m = $machines->{$h}; - ref $m eq 'HASH' or next; - print "$h\n"; -} - -print "\n# Groups:\n"; -for my $h (sort keys %$machines) { - my $m = $machines->{$h}; - ref $m eq 'ARRAY' or next; - print "$h = ", join(" ", - map { - my $x = $machines->{$_}; - !defined($x) ? "$_?" : - ref $x eq 'HASH' ? $_ : - ref $x eq 'ARRAY' ? "\@$_" : - "$_???" - } @$m), "\n"; -} diff --git a/lib/bin/prun b/lib/bin/prun deleted file mode 100755 index 4900ef2..0000000 --- a/lib/bin/prun +++ /dev/null @@ -1,347 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- Parallel Execution Using Screen -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use feature 'switch'; -use Getopt::Long; -use POSIX; -use BEX; - -my $queue_name; -my $screen_session = 'BEX'; -my $text_mode; - -sub usage() { - print <] [[!] ...] - -Options: --q, --queue= Run jobs in the given queue - --session= Job windows should be opened within the given screen - session (default: BEX) - --text Use textual user interface instead of curses -AMEN - exit 0; -} - -GetOptions( - "q|queue=s" => \$queue_name, - "session=s" => \$screen_session, - "text!" => \$text_mode, - "help" => \&usage, -) or die "Try `bex prun --help' for more information.\n"; - -system 'screen', '-S', $screen_session, '-X', 'select', '.'; -!$? or die "Screen session $screen_session not found\n"; - -my $queue = BEX::Queue->new($queue_name); -my $fifo_name = $queue->{'Name'} . '/status-fifo'; -unlink $fifo_name; -mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; -open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; - -my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); - -my @machines = (); -for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { - my @jobs = $queue->scan($mach); - @jobs or next; - push @machines, $mach; - for (@jobs) { $ui->update($mach, $_, 'READY'); } -} - -my %running = (); -my $max = $BEX::Config::max_parallel_jobs; - -while (keys %running || @machines) { - if (@machines && keys %running < $max) { - my $mach = shift @machines; - $ui->update($mach, undef, 'START'); - my @scr = ('screen', '-t', $mach); - push @scr, '-S', $screen_session if defined $screen_session; - push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach; - system @scr; - !$? or $ui->update($mach, undef, 'INTERR'); - $running{$mach} = 'START'; - next; - } - $_ = ; - chomp; - my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; - if (!defined $stat) { - $ui->err("Received invalid status message <$_>"); - next; - } - if (!defined $running{$mach}) { - $ui->err("Received status message <$_> for a machine which does not run"); - next; - } - $running{$mach} = $stat; - $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); - if ($stat eq 'DONE') { - delete $running{$mach}; - } -} - -close FIFO; -unlink $fifo_name; -$ui->done; - -package BEX::bprun::text; - -sub new($) { - return bless {}; -} - -sub done($) { -} - -sub update($$$$) { - my ($ui, $mach, $jid, $stat) = @_; - print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; -} - -sub err($$) { - my ($ui, $msg) = @_; - print STDERR "ERROR: $msg\n"; -} - -package BEX::bprun::curses; - -use Curses; - -my $C; - -my $nrows; -my @by_row = (); -my %by_host = (); - -my %host_state; -my %host_cnt; - -my %job_state; -my %job_cnt; - -my %host_last_fail_job; -my %host_last_fail_stat; - -sub new($) { - $C = new Curses; - start_color; - has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; - cbreak; noecho; - $C->intrflush(0); - $C->keypad(1); - $C->meta(1); - $C->clear; - init_pair(1, COLOR_YELLOW, COLOR_BLUE); - init_pair(2, COLOR_YELLOW, COLOR_RED); - init_pair(3, COLOR_YELLOW, COLOR_BLACK); - init_pair(4, COLOR_RED, COLOR_BLACK); - - $nrows = $C->getmaxy - 2; - if ($BEX::Config::max_parallel_jobs > $nrows) { - $BEX::Config::max_parallel_jobs = $nrows; - } - - %host_state = %host_cnt = (); - %job_state = %job_cnt = (); - for my $s ('unknown', 'ready', 'running', 'done', 'failed') { - $host_cnt{$s} = 0; - $job_cnt{'*'}{$s} = 0; - } - - my $ui = bless {}; - $ui->refresh_status; - return $ui; -} - -sub done($) -{ - $C->bkgdset(COLOR_PAIR(1) | A_BOLD); - $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); - $C->clrtoeol; - $C->getch; - endwin; -} - -sub err($$) { - my ($ui, $msg) = @_; - $C->bkgdset(COLOR_PAIR(2) | A_BOLD); - $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); - $C->clrtoeol; - $C->refresh; -} - -sub set_host_status($$$) { - my ($ui, $mach, $stat) = @_; - my $prev_stat = $host_state{$mach}; - if (defined $prev_stat) { - $host_cnt{$prev_stat}--; - } else { - for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; } - } - $host_state{$mach} = $stat; - $host_cnt{$stat}++; -} - -sub set_job_status($$$$) { - my ($ui, $mach, $jid, $stat) = @_; - my $prev_stat = $job_state{$mach}{$jid} // 'unknown'; - $job_cnt{$mach}{$prev_stat}--; - $job_cnt{'*'}{$prev_stat}--; - $job_state{$mach}{$jid} = $stat; - $job_cnt{$mach}{$stat}++; - $job_cnt{'*'}{$stat}++; -} - -sub refresh_status($) { - $C->bkgdset(COLOR_PAIR(1) | A_BOLD); - $C->addnstr(0, 0, - sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW", - $host_cnt{'running'}, - $host_cnt{'done'}, - $host_cnt{'failed'}, - $host_cnt{'ready'}, - $job_cnt{'*'}{'running'}, - $job_cnt{'*'}{'done'}, - $job_cnt{'*'}{'failed'}, - $job_cnt{'*'}{'ready'}, - ), $C->getmaxx); - $C->clrtoeol; - $C->refresh; -} - -sub get_slot($) { - my ($mach) = @_; - my $s; - if (defined ($s = $by_host{$mach})) { - delete $s->{'Gone'}; - } else { - my ($best, $besti); - for my $i (0..$nrows-1) { - my $r = $by_row[$i]; - if (!defined $r) { - $besti = $i; - $best = undef; - last; - } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) { - $besti = $i; - $best = $r; - } - } - if ($best) { - delete $by_host{$best->{'Host'}}; - } - $s->{'Host'} = $mach; - $s->{'Row'} = $besti; - $by_host{$mach} = $s; - $by_row[$besti] = $s; - } - return $s; -} - -my $gone_counter = 1; -sub delete_slot($) { - my ($s) = @_; - $s->{'Gone'} = $gone_counter++; -} - -sub redraw_slot($) { - my ($s) = @_; - my $mach = $s->{'Host'}; - my $stat = $s->{'Status'} // "?"; - my $jid = $s->{'Job'} // ""; - my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); - my $jcnt = $job_cnt{$mach}; - if ($jcnt->{'running'}) { - if ($jcnt->{'failed'}) { - $C->bkgdset(COLOR_PAIR(4) | A_BOLD); - } else { - $C->bkgdset(COLOR_PAIR(3) | A_BOLD); - } - } else { - if ($jcnt->{'failed'}) { - $C->bkgdset(COLOR_PAIR(4)); - } else { - $C->bkgdset(0); - } - } - my $r = $s->{'Row'} + 1; - $C->addstr($r, 0, sprintf("%-20.20s", $mach)); - if ($jcnt->{'failed'}) { - $C->bkgdset(COLOR_PAIR(4)); - $C->addstr(sprintf("%3dE ", $jcnt->{'failed'})); - } else { - $C->bkgdset(0); - $C->addstr(" "); - } - $C->bkgdset(0); - $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'})); - if ($stat eq 'DONE') { - if (defined $host_last_fail_stat{$mach}) { - $C->bkgdset(COLOR_PAIR(4)); - $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach}))); - } - } else { - my $text = sprintf(" %-8s %s", $stat, $jname); - $C->addstr($text); - } - $C->clrtoeol; - $C->refresh; -} - -sub update($$$$) { - my ($ui, $mach, $jid, $stat) = @_; - my $s = get_slot($mach); - given ($stat) { - when ('READY') { - # Pseudo-state generated internally - $ui->set_host_status($mach, 'ready'); - $ui->set_job_status($mach, $jid, 'ready'); - } - when ('OK') { - $ui->set_job_status($mach, $jid, 'done'); - } - when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) { - $ui->set_job_status($mach, $jid, 'failed'); - $host_last_fail_job{$mach} = $jid; - $host_last_fail_stat{$mach} = $stat; - } - when ('DONE') { - if ($job_cnt{$mach}{'failed'}) { - $ui->set_host_status($mach, 'failed'); - } else { - $ui->set_host_status($mach, 'done'); - } - } - when ('INIT') { - $ui->set_host_status($mach, 'running'); - $ui->set_job_status($mach, $jid, 'running') if defined $jid; - } - when ('LOCKED') { - if (defined $jid) { - $ui->set_job_status($mach, $jid, 'failed'); - } else { - for my $j (keys %{$job_state{$mach}}) { - $ui->set_job_status($mach, $jid, 'failed'); - } - $ui->set_host_status($mach, 'failed'); - $host_last_fail_job{$mach} = $jid; - $host_last_fail_stat{$mach} = $stat; - } - } - when (['START', 'PING', 'SEND', 'RUN']) { - } - default { - $ui->err("Received unknown job status $stat"); - } - } - $s->{'Job'} = $jid; - $s->{'Status'} = $stat; - redraw_slot($s); - if ($stat eq 'DONE') { delete_slot($s); } - $ui->refresh_status; -} diff --git a/lib/bin/queue b/lib/bin/queue deleted file mode 100755 index aa90755..0000000 --- a/lib/bin/queue +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- Operations on Queues -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use BEX; - -my $init; - -sub usage() { - print <] - -Subcommands: -init Create a new queue -ls List all queues - -Options: -None defined so far. -AMEN - exit 0; -} - -GetOptions( - "init!" => \$init, - "help" => \&usage, -) or die "Try `bex queue --help' for more information.\n"; - -my $op = shift @ARGV // 'ls'; - -if ($op eq 'init') { - my $queue_name = shift @ARGV or die "bex queue init requires a queue name\n"; - my $path = $BEX::Config::home . '/' . $queue_name; - -d $path and die "Queue directory $path already exists\n"; - mkdir $path; - mkdir "$path/hosts"; - mkdir "$path/jobs"; - print "Queue $queue_name initialized.\n"; -} elsif ($op eq 'ls' && @ARGV == 0) { - opendir D, $BEX::Config::home or die "Cannot read BEX home directory\n"; - for my $q (sort readdir D) { - next if $q =~ /^\./; - my $d = $BEX::Config::home . '/' . $q; - if (-d $d && -d "$d/hosts" && -d "$d/jobs") { - print "$q\n"; - } - } - closedir D; -} else { - die "Invalid subcommand\n"; -} diff --git a/lib/bin/run b/lib/bin/run deleted file mode 100755 index 66b9efc..0000000 --- a/lib/bin/run +++ /dev/null @@ -1,198 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 3.0 -- Run Queued Jobs -# (c) 2011-2012 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use BEX; - -sub usage() { - print <] [[!] ...] - -Options: --j, --job= Run only the specified job --q, --queue= Select job queue - --status-fifo= Send status updates to the given named pipe -AMEN - exit 0; -} - -my $given_job; -my $queue_name; -my $status_fifo; - -GetOptions( - "j|job=s" => \$given_job, - "q|queue=s" => \$queue_name, - "s|status-fifo=s" => \$status_fifo, - "help" => \&usage, -) or die "Try `bex run --help' for more information.\n"; - -my $status_fd; -if (defined $status_fifo) { - open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; - autoflush $status_fd, 1; -} - -sub update_status($$$$;$) { - my ($mach, $job, $status, $log_on_queue, $msg) = @_; - if ($status_fd) { - print $status_fd "! $mach $job $status\n"; - } - if ($log_on_queue) { - $log_on_queue->update_job_status($mach, $job, $status, $msg); - } -} - -my %pings; - -sub ping_machine($) { - my ($mach) = @_; - if (!defined $pings{$mach}) { - if ($BEX::Config::ping_hosts) { - update_status($mach, '-', 'PING', undef); - my $host = BEX::Config::host_name($mach); - `ping -c1 -n $host >/dev/null 2>/dev/null`; - $pings{$mach} = !$?; - } else { - $pings{$mach} = 1; - } - } - if ($pings{$mach}) { - return ('OK', undef); - } else { - return ('NOPING', 'Does not ping'); - } -} - -sub exit_status($) { - my ($s) = @_; - if ($s >> 8) { - return "with exit code " . ($s >> 8); - } else { - return "on fatal signal " . ($s & 127); - } -} - -sub run_job_prep($$$) { - my ($job, $queue, $mach) = @_; - my $prep = $job->attr('Prep'); - defined($prep) && $prep !~ /^\s*$/ or return 'OK'; - - my $jid = $job->id; - update_status($mach, $jid, 'PREP', $queue); - my $lf = $queue->log_file($mach, $jid); - $ENV{'HOST'} = BEX::Config::host_name($mach); - system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf"; - delete $ENV{'HOST'}; - if ($?) { - return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?)); - } else { - return 'OK'; - } -} - -sub run_job_body($$$) { - my ($job, $queue, $mach) = @_; - - if ($job->attr('body') =~ /^\s*$/s) { - # Shortcut if the body is empty - return 'OK' - } - - my $host = BEX::Config::host_name($mach); - my $jid = $job->id; - - my $tmp = $queue->temp_file($mach, $jid); - open T, '>', $tmp or die; - if (defined $BEX::Config::job_prolog) { - open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); - while (

) { print T; } - close P; - } else { - print T "#!/bin/sh\n"; - } - print T "# BEX job ", $jid, "\n"; - print T $job->attr('body'); - if (defined $BEX::Config::job_epilog) { - open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); - while () { print T; } - close E; - } - close T; - - update_status($mach, $jid, 'SEND', undef); - my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; - my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`; - !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); - chomp $rtmp; - - update_status($mach, $jid, 'RUN', $queue); - my $lf = $queue->log_file($mach, $jid); - system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf"; - if ($?) { - return ('FAILED', 'Job failed ' . exit_status($?)); - } else { - return 'OK'; - } -} - -sub run_job($$$) { - my ($job, $queue, $mach) = @_; - my ($stat, $msg); - - ($stat, $msg) = ping_machine($mach); - $stat eq 'OK' or return ($stat, $msg); - - ($stat, $msg) = run_job_prep($job, $queue, $mach); - $stat eq 'OK' or return ($stat, $msg); - - return run_job_body($job, $queue, $mach); -} - -my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); -my $queue = BEX::Queue->new($queue_name); - -$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n"; - -for my $mach (@machines) { - my @q = $queue->scan($mach) or next; - if (!$queue->lock($mach, undef)) { - print "### Machine $mach is locked by another brun, skipping...\n"; - update_status($mach, '-', 'LOCKED', undef); - update_status($mach, '-', 'DONE', undef); - next; - } - update_status($mach, '-', 'INIT', undef); - while (my $jid = shift @q) { - if (defined $given_job) { - $jid eq $given_job or next; - } - my $job = BEX::Job->new_from_file($queue->job_file($jid)); - update_status($mach, $jid, 'INIT', undef); - if (!$queue->lock($mach, $jid)) { - print "### Skipping locked $jid on $mach ###\n"; - update_status($mach, $jid, 'LOCKED', undef); - next; - } - print "### Running ", $job->name, " on $mach ###\n"; - my ($s, $msg) = run_job($job, $queue, $mach); - update_status($mach, $jid, $s, $queue, $msg); - - if ($s eq 'OK') { - print "+++ OK\n"; - $queue->remove($mach, $jid); - } else { - print "--- $s: $msg\n"; - if ($BEX::Config::skip_on_fail) { - print "### Skipping other jobs on the same host ###\n" if @q; - last; - } - } - } -} continue { - update_status($mach, '-', 'DONE', undef); -} -$queue->unlock;