#!/usr/bin/perl # Batch EXecutor 2.0 -- Run Queued Jobs # (c) 2011 Martin Mares use strict; use warnings; use Getopt::Long; use lib 'lib'; use BEX; my $given_job; my $queue_name; my $status_fifo; GetOptions( "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, "s|status-fifo=s" => \$status_fifo, ) or die <] [[!] ...] Options: -j, --job= Run only the specified job -q, --queue= Select job queue --status-fifo= Send status updates to the given named pipe AMEN my $status_fd; if (defined $status_fifo) { open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; autoflush $status_fd, 1; } sub update_status($$$$;$) { my ($mach, $job, $status, $log_on_queue, $msg) = @_; if ($status_fd) { print $status_fd "! $mach $job $status\n"; } if ($log_on_queue) { $log_on_queue->log($mach, $job, $status, $msg); } } sub ping_machine($) { my ($mach) = @_; update_status($mach, '-', 'PING', undef); `ping -c1 -n $mach >/dev/null 2>/dev/null`; return !$?; } sub run_job($$$) { my ($job, $queue, $mach) = @_; my $jid = $job->{'ID'}; my $tmp = $queue->temp_file($mach, $jid); open T, '>', $tmp or die; if (defined $BEX::Config::job_prolog) { open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); while (

) { print T; } close P; } else { print T "#!/bin/sh\n"; } print T "# BEX job ", $jid, "\n"; print T $job->{'body'}; if (defined $BEX::Config::job_epilog) { open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); while () { print T; } close E; } close T; update_status($mach, $jid, 'SEND', undef); my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; my $rtmp = `ssh <$tmp $mach '$cmd'`; !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); chomp $rtmp; update_status($mach, $jid, 'RUN', $queue); my $lf = $queue->log_file($mach, $jid); system 'bash', '-o', 'pipefail', '-c', "ssh -t $mach '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf"; if ($?) { return ('FAILED', 'Job failed'); } else { return ('OK', undef); } } my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); for my $mach (@machines) { my @q = $queue->scan($mach) or next; update_status($mach, '-', 'INIT', undef); my $ping; for my $jid (@q) { if (defined $given_job) { $jid eq $given_job or next; } my $job = BEX::Job->new_from_file($queue->job_file($jid)); my $stat = { 'Time' => time, }; print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n"; $ping //= ping_machine($mach); my ($s, $msg); if (!$ping) { ($s, $msg) = ('NOPING', 'Does not ping'); } else { ($s, $msg) = run_job($job, $queue, $mach); } update_status($mach, $jid, $s, $queue, $msg); if ($s eq 'OK') { print "+++ OK\n"; $queue->remove($mach, $jid); } else { print "--- $s: $msg\n"; $stat->{'Status'} = $s; $stat->{'Message'} = $msg; $queue->write_job_status($mach, $jid, $stat); } } update_status($mach, '-', 'DONE', undef); }