From f83026ed9386990465bf80156054c9d94529df49 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Sun, 30 Oct 2011 22:18:07 +0100 Subject: [PATCH] brun: Status socket --- brun | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/brun b/brun index 48afd8c..2c364ee 100755 --- a/brun +++ b/brun @@ -5,33 +5,58 @@ use strict; use warnings; use Getopt::Long; +use IO::Socket::UNIX; use lib 'lib'; use BEX; +my $given_job; my $queue_name; +my $status_socket; GetOptions( + "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, + "s|status-socket=s" => \$status_socket, ) or die <] [[!] ...] Options: +-j, --job= Run only the specified job -q, --queue= Run jobs in the given queue + --status-socket= Send status updates to the given filesystem socket AMEN +my $status_sk; +if (defined $status_socket) { + $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die; +} + +sub send_status($$$) { + my ($mach, $job, $status) = @_; + if ($status_sk) { + print $status_sk "$mach $job $status\n"; + } else { + # FIXME: Debug + print ">>> $mach $job $status\n"; + } +} + sub ping_machine($) { my ($mach) = @_; + send_status($mach, '-', 'PING'); `ping -c1 -n $mach >/dev/null 2>/dev/null`; return !$?; } sub run_job($$$) { my ($job, $queue, $mach) = @_; + my $jid = $job->{'ID'}; + # FIXME: rsyncing, rsync-only jobs # FIXME: Locking - my $tmp = $queue->temp_file($mach, $job->{'ID'}); + my $tmp = $queue->temp_file($mach, $jid); open T, '>', $tmp or die; if (defined $BEX::Config::job_prolog) { open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!"; @@ -40,7 +65,7 @@ sub run_job($$$) { } else { print T "#!/bin/sh\n"; } - print T "# BEX job ", $job->{'ID'}, "\n"; + print T "# BEX job ", $jid, "\n"; print T $job->{'body'}; if (defined $BEX::Config::job_epilog) { open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!"; @@ -49,11 +74,13 @@ sub run_job($$$) { } close T; + send_status($mach, $jid, 'SEND'); my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; my $rtmp = `ssh <$tmp $mach '$cmd'`; !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed"; chomp $rtmp; + send_status($mach, $jid, 'RUN'); system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e"; if ($?) { return 'Failed'; @@ -64,10 +91,15 @@ sub run_job($$$) { my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); + for my $mach (@machines) { my @q = $queue->scan($mach) or next; + send_status($mach, '-', 'INIT'); my $ping; for my $jid (@q) { + if (defined $given_job) { + $jid eq $given_job or next; + } my $job = BEX::Job->new_from_file($queue->job_file($jid)); my $stat = { 'Time' => time, @@ -85,10 +117,13 @@ for my $mach (@machines) { if ($s eq 'OK') { print "+++ OK\n"; $queue->remove($mach, $jid); + send_status($mach, $jid, 'OK'); } else { print "--- $s\n"; $stat->{'Status'} = $s; $queue->write_job_status($mach, $jid, $stat); + send_status($mach, $jid, 'ERR'); } } + send_status($mach, '-', 'DONE'); } -- 2.39.2