use strict;
use warnings;
use Getopt::Long;
-use IO::Socket::UNIX;
use lib 'lib';
use BEX;
my $given_job;
my $queue_name;
-my $status_socket;
+my $status_fifo;
GetOptions(
"j|job=s" => \$given_job,
"q|queue=s" => \$queue_name,
- "s|status-socket=s" => \$status_socket,
+ "s|status-fifo=s" => \$status_fifo,
) or die <<AMEN ;
Usage: brun [<options>] [[!]<machine-or-class> ...]
Options:
-j, --job=<id> Run only the specified job
--q, --queue=<name> Run jobs in the given queue
- --status-socket=<s> Send status updates to the given filesystem socket
+-q, --queue=<name> Select job queue
+ --status-fifo=<f> Send status updates to the given named pipe
AMEN
-my $status_sk;
-if (defined $status_socket) {
- $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die;
+my $status_fd;
+if (defined $status_fifo) {
+ open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
+ autoflush $status_fd, 1;
}
-sub send_status($$$) {
- my ($mach, $job, $status) = @_;
- if ($status_sk) {
- print $status_sk "$mach $job $status\n";
- } else {
- # FIXME: Debug
- print ">>> $mach $job $status\n";
+sub update_status($$$$;$) {
+ my ($mach, $job, $status, $log_on_queue, $msg) = @_;
+ if ($status_fd) {
+ print $status_fd "! $mach $job $status\n";
+ }
+ if ($log_on_queue) {
+ $log_on_queue->log($mach, $job, $status, $msg);
}
}
sub ping_machine($) {
my ($mach) = @_;
- send_status($mach, '-', 'PING');
+ update_status($mach, '-', 'PING', undef);
`ping -c1 -n $mach >/dev/null 2>/dev/null`;
return !$?;
}
my $tmp = $queue->temp_file($mach, $jid);
open T, '>', $tmp or die;
if (defined $BEX::Config::job_prolog) {
- open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
+ open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
while (<P>) { print T; }
close P;
} else {
print T "# BEX job ", $jid, "\n";
print T $job->{'body'};
if (defined $BEX::Config::job_epilog) {
- open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
+ open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
while (<E>) { print T; }
close E;
}
close T;
- send_status($mach, $jid, 'SEND');
+ update_status($mach, $jid, 'SEND', undef);
my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
my $rtmp = `ssh <$tmp $mach '$cmd'`;
- !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
+ !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
chomp $rtmp;
- send_status($mach, $jid, 'RUN');
+ update_status($mach, $jid, 'RUN', $queue);
system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
if ($?) {
- return 'Failed';
+ return ('FAILED', 'Job failed');
} else {
- return 'OK';
+ return ('OK', undef);
}
}
for my $mach (@machines) {
my @q = $queue->scan($mach) or next;
- send_status($mach, '-', 'INIT');
+ update_status($mach, '-', 'INIT', undef);
my $ping;
for my $jid (@q) {
if (defined $given_job) {
};
print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
$ping //= ping_machine($mach);
- my $s;
+ my $s, $msg;
if (!$ping) {
- $s = 'No ping';
+ ($s, $msg) = ('NOPING', 'Does not ping');
} else {
- $s = run_job($job, $queue, $mach);
+ ($s, $msg) = run_job($job, $queue, $mach);
}
+ update_status($mach, $jid, $s, $queue, $msg);
- BEX::log("$mach $jid $s");
if ($s eq 'OK') {
print "+++ OK\n";
$queue->remove($mach, $jid);
- send_status($mach, $jid, 'OK');
} else {
- print "--- $s\n";
+ print "--- $s: $msg\n";
$stat->{'Status'} = $s;
+ $stat->{'Message'} = $msg;
$queue->write_job_status($mach, $jid, $stat);
- send_status($mach, $jid, 'ERR');
}
}
- send_status($mach, '-', 'DONE');
+ update_status($mach, '-', 'DONE', undef);
}