]> mj.ucw.cz Git - bex.git/commitdiff
First attempt at parallel execution
authorMartin Mares <mj@ucw.cz>
Mon, 31 Oct 2011 08:27:20 +0000 (09:27 +0100)
committerMartin Mares <mj@ucw.cz>
Mon, 31 Oct 2011 08:27:20 +0000 (09:27 +0100)
bprun [new file with mode: 0755]
brun

diff --git a/bprun b/bprun
new file mode 100755 (executable)
index 0000000..3c36ce9
--- /dev/null
+++ b/bprun
@@ -0,0 +1,65 @@
+#!/usr/bin/perl
+# Batch EXecutor 2.0 -- Parallel Execution Using Screen
+# (c) 2011 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+use Getopt::Long;
+use POSIX;
+
+use lib 'lib';
+use BEX;
+
+my $queue_name;
+
+GetOptions(
+       "q|queue=s" => \$queue_name,
+) or die <<AMEN ;
+Usage: bprun [<options>] [[!]<machine-or-class> ...]
+
+Options:
+-q, --queue=<name>     Run jobs in the given queue
+AMEN
+
+$ENV{'STY'} or die "Please run me under Screen\n";
+my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
+my $queue = BEX::Queue->new($queue_name);
+
+my $fifo_name = $queue->{'Name'} . '/status-fifo';
+unlink $fifo_name;
+mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
+open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
+
+my %running = ();
+my $max = 1;   # FIXME
+
+while (%running || @machines) {
+       if (@machines && keys %running < $max) {
+               my $mach = shift @machines;
+               $queue->scan($mach) or next;
+               print "$mach: START\n";
+               system 'screen', './brun', "--status-fifo=$fifo_name", $mach;
+               !$? or print "$mach: Failed to run!\n";
+               $running{$mach} = 'START';
+               next;
+       }
+       $_ = <FIFO>;
+       chomp;
+       my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
+       if (!defined $stat) {
+               print "ERROR: Received invalid status message <$_>\n";
+               next;
+       }
+       if (!defined $running{$mach}) {
+               print "ERROR: Received status message <$_> for a machine which does not run\n";
+               next;
+       }
+       $running{$mach} = $stat . ($jid eq '-' ? "" : ":$jid");
+       print "$mach: ", $running{$mach}, "\n";
+       if ($stat eq 'DONE') {
+               delete $running{$mach};
+       }
+}
+
+close FIFO;
+unlink $fifo_name;
diff --git a/brun b/brun
index 2c364ee2e74c6ff79da117e53f744f1f3ff2a865..01bff140e7368ea823743936c7e0d80fadb37b20 100755 (executable)
--- a/brun
+++ b/brun
@@ -5,40 +5,37 @@
 use strict;
 use warnings;
 use Getopt::Long;
-use IO::Socket::UNIX;
 
 use lib 'lib';
 use BEX;
 
 my $given_job;
 my $queue_name;
-my $status_socket;
+my $status_fifo;
 
 GetOptions(
        "j|job=s" => \$given_job,
        "q|queue=s" => \$queue_name,
-       "s|status-socket=s" => \$status_socket,
+       "s|status-fifo=s" => \$status_fifo,
 ) or die <<AMEN ;
 Usage: brun [<options>] [[!]<machine-or-class> ...]
 
 Options:
 -j, --job=<id>         Run only the specified job
 -q, --queue=<name>     Run jobs in the given queue
-    --status-socket=<s>        Send status updates to the given filesystem socket
+    --status-fifo=<f>  Send status updates to the given named pipe
 AMEN
 
-my $status_sk;
-if (defined $status_socket) {
-       $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die;
+my $status_fd;
+if (defined $status_fifo) {
+       open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
+       autoflush $status_fd, 1;
 }
 
 sub send_status($$$) {
        my ($mach, $job, $status) = @_;
-       if ($status_sk) {
-               print $status_sk "$mach $job $status\n";
-       } else {
-               # FIXME: Debug
-               print ">>> $mach $job $status\n";
+       if ($status_fd) {
+               print $status_fd "! $mach $job $status\n";
        }
 }