use strict;
use warnings;
use Getopt::Long;
+use IO::Socket::UNIX;
use lib 'lib';
use BEX;
+my $given_job;
my $queue_name;
+my $status_socket;
GetOptions(
+ "j|job=s" => \$given_job,
"q|queue=s" => \$queue_name,
+ "s|status-socket=s" => \$status_socket,
) or die <<AMEN ;
Usage: brun [<options>] [[!]<machine-or-class> ...]
Options:
+-j, --job=<id> Run only the specified job
-q, --queue=<name> Run jobs in the given queue
+ --status-socket=<s> Send status updates to the given filesystem socket
AMEN
+my $status_sk;
+if (defined $status_socket) {
+ $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die;
+}
+
+sub send_status($$$) {
+ my ($mach, $job, $status) = @_;
+ if ($status_sk) {
+ print $status_sk "$mach $job $status\n";
+ } else {
+ # FIXME: Debug
+ print ">>> $mach $job $status\n";
+ }
+}
+
sub ping_machine($) {
my ($mach) = @_;
+ send_status($mach, '-', 'PING');
`ping -c1 -n $mach >/dev/null 2>/dev/null`;
return !$?;
}
sub run_job($$$) {
my ($job, $queue, $mach) = @_;
+ my $jid = $job->{'ID'};
+
# FIXME: rsyncing, rsync-only jobs
# FIXME: Locking
- my $tmp = $queue->temp_file($mach, $job->{'ID'});
+ my $tmp = $queue->temp_file($mach, $jid);
open T, '>', $tmp or die;
if (defined $BEX::Config::job_prolog) {
open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
} else {
print T "#!/bin/sh\n";
}
- print T "# BEX job ", $job->{'ID'}, "\n";
+ print T "# BEX job ", $jid, "\n";
print T $job->{'body'};
if (defined $BEX::Config::job_epilog) {
open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
}
close T;
+ send_status($mach, $jid, 'SEND');
my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
my $rtmp = `ssh <$tmp $mach '$cmd'`;
!$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
chomp $rtmp;
+ send_status($mach, $jid, 'RUN');
system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
if ($?) {
return 'Failed';
my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
my $queue = BEX::Queue->new($queue_name);
+
for my $mach (@machines) {
my @q = $queue->scan($mach) or next;
+ send_status($mach, '-', 'INIT');
my $ping;
for my $jid (@q) {
+ if (defined $given_job) {
+ $jid eq $given_job or next;
+ }
my $job = BEX::Job->new_from_file($queue->job_file($jid));
my $stat = {
'Time' => time,
if ($s eq 'OK') {
print "+++ OK\n";
$queue->remove($mach, $jid);
+ send_status($mach, $jid, 'OK');
} else {
print "--- $s\n";
$stat->{'Status'} = $s;
$queue->write_job_status($mach, $jid, $stat);
+ send_status($mach, $jid, 'ERR');
}
}
+ send_status($mach, '-', 'DONE');
}