#!/usr/bin/perl # Batch EXecutor 2.0 -- Run Queued Jobs # (c) 2011 Martin Mares use strict; use warnings; use Getopt::Long; use IO::Socket::UNIX; use lib 'lib'; use BEX; my $given_job; my $queue_name; my $status_socket; GetOptions( "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, "s|status-socket=s" => \$status_socket, ) or die <] [[!] ...] Options: -j, --job= Run only the specified job -q, --queue= Run jobs in the given queue --status-socket= Send status updates to the given filesystem socket AMEN my $status_sk; if (defined $status_socket) { $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die; } sub send_status($$$) { my ($mach, $job, $status) = @_; if ($status_sk) { print $status_sk "$mach $job $status\n"; } else { # FIXME: Debug print ">>> $mach $job $status\n"; } } sub ping_machine($) { my ($mach) = @_; send_status($mach, '-', 'PING'); `ping -c1 -n $mach >/dev/null 2>/dev/null`; return !$?; } sub run_job($$$) { my ($job, $queue, $mach) = @_; my $jid = $job->{'ID'}; # FIXME: rsyncing, rsync-only jobs # FIXME: Locking my $tmp = $queue->temp_file($mach, $jid); open T, '>', $tmp or die; if (defined $BEX::Config::job_prolog) { open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!"; while (

) { print T; } close P; } else { print T "#!/bin/sh\n"; } print T "# BEX job ", $jid, "\n"; print T $job->{'body'}; if (defined $BEX::Config::job_epilog) { open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!"; while () { print T; } close E; } close T; send_status($mach, $jid, 'SEND'); my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; my $rtmp = `ssh <$tmp $mach '$cmd'`; !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed"; chomp $rtmp; send_status($mach, $jid, 'RUN'); system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e"; if ($?) { return 'Failed'; } else { return 'OK'; } } my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); for my $mach (@machines) { my @q = $queue->scan($mach) or next; send_status($mach, '-', 'INIT'); my $ping; for my $jid (@q) { if (defined $given_job) { $jid eq $given_job or next; } my $job = BEX::Job->new_from_file($queue->job_file($jid)); my $stat = { 'Time' => time, }; print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n"; $ping //= ping_machine($mach); my $s; if (!$ping) { $s = 'No ping'; } else { $s = run_job($job, $queue, $mach); } BEX::log("$mach $jid $s"); if ($s eq 'OK') { print "+++ OK\n"; $queue->remove($mach, $jid); send_status($mach, $jid, 'OK'); } else { print "--- $s\n"; $stat->{'Status'} = $s; $queue->write_job_status($mach, $jid, $stat); send_status($mach, $jid, 'ERR'); } } send_status($mach, '-', 'DONE'); }