#!/usr/bin/perl # Batch EXecutor 3.0 -- Run Queued Jobs # (c) 2011-2012 Martin Mares use strict; use warnings; use Getopt::Long; use BEX; sub usage() { print <] [[!] ...] Options: -j, --job= Run only the specified job -q, --queue= Select job queue --status-fifo= Send status updates to the given named pipe AMEN exit 0; } my $given_job; my $queue_name; my $status_fifo; GetOptions( "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, "s|status-fifo=s" => \$status_fifo, "help" => \&usage, ) or die "Try `bex run --help' for more information.\n"; # We do not want SIGPIPE on writes to the status FIFO $SIG{'PIPE'} = 'IGNORE'; my $status_fd; if (defined $status_fifo) { open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; autoflush $status_fd, 1; } sub update_status($$$$;$) { my ($mach, $job, $status, $log_on_queue, $msg) = @_; if ($status_fd) { print $status_fd "! $mach $job $status\n"; } if ($log_on_queue) { $log_on_queue->update_job_status($mach, $job, $status, $msg); } } my %pings; sub ping_machine($) { my ($mach) = @_; if (!defined $pings{$mach}) { if ($BEX::Config::ping_hosts) { update_status($mach, '-', 'PING', undef); my $host = BEX::Config::host_name($mach); `ping -c1 -n $host >/dev/null 2>/dev/null`; $pings{$mach} = !$?; } else { $pings{$mach} = 1; } } if ($pings{$mach}) { return ('OK', undef); } else { return ('NOPING', 'Does not ping'); } } sub exit_status($) { my ($s) = @_; if ($s >> 8) { return "with exit code " . ($s >> 8); } else { return "on fatal signal " . ($s & 127); } } sub run_job_prep($$$) { my ($job, $queue, $mach) = @_; my $prep = $job->attr('Prep'); defined($prep) && $prep !~ /^\s*$/ or return 'OK'; my $jid = $job->id; update_status($mach, $jid, 'PREP', $queue); my $lf = $queue->log_file($mach, $jid); $ENV{'HOST'} = BEX::Config::host_name($mach); system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf"; delete $ENV{'HOST'}; if ($?) { return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?)); } else { return 'OK'; } } sub run_job_body($$$) { my ($job, $queue, $mach) = @_; if ($job->attr('body') =~ /^\s*$/s) { # Shortcut if the body is empty return 'OK' } my $host = BEX::Config::host_name($mach); my $jid = $job->id; my $tmp = $queue->temp_file($mach, $jid); open T, '>', $tmp or die; if (defined $BEX::Config::job_prolog) { open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); while (

) { print T; } close P; } else { print T "#!/bin/sh\n"; } print T "# BEX job ", $jid, "\n"; print T $job->attr('body'); if (defined $BEX::Config::job_epilog) { open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); while () { print T; } close E; } close T; update_status($mach, $jid, 'SEND', undef); my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`; !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); chomp $rtmp; update_status($mach, $jid, 'RUN', $queue); my $lf = $queue->log_file($mach, $jid); system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf"; if ($?) { return ('FAILED', 'Job failed ' . exit_status($?)); } else { return 'OK'; } } sub run_job($$$) { my ($job, $queue, $mach) = @_; my ($stat, $msg); ($stat, $msg) = ping_machine($mach); $stat eq 'OK' or return ($stat, $msg); ($stat, $msg) = run_job_prep($job, $queue, $mach); $stat eq 'OK' or return ($stat, $msg); return run_job_body($job, $queue, $mach); } my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); $queue->lock(undef, undef) or die "The queue is locked by another bex run, cannot continue.\n"; for my $mach (@machines) { my @q = $queue->scan($mach) or next; if (!$queue->lock($mach, undef)) { print "### Machine $mach is locked by another bex run, skipping...\n"; update_status($mach, '-', 'LOCKED', undef); update_status($mach, '-', 'DONE', undef); next; } update_status($mach, '-', 'INIT', undef); while (my $jid = shift @q) { if (defined $given_job) { $jid eq $given_job or next; } my $job = BEX::Job->new_from_file($queue->job_file($jid)); update_status($mach, $jid, 'INIT', undef); if (!$queue->lock($mach, $jid)) { print "### Skipping locked $jid on $mach ###\n"; update_status($mach, $jid, 'LOCKED', undef); next; } print "### Running ", $job->name, " on $mach ###\n"; my ($s, $msg) = run_job($job, $queue, $mach); update_status($mach, $jid, $s, $queue, $msg); if ($s eq 'OK') { print "+++ OK\n"; $queue->remove($mach, $jid); } else { print "--- $s: $msg\n"; if ($BEX::Config::skip_on_fail) { print "### Skipping other jobs on the same host ###\n" if @q; last; } } } } continue { update_status($mach, '-', 'DONE', undef); } $queue->unlock;