From 8d34ddafee25097b3dec59e569200ff2dfa1f4ac Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 31 Oct 2011 09:27:20 +0100 Subject: [PATCH] First attempt at parallel execution --- bprun | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ brun | 21 +++++++++---------- 2 files changed, 74 insertions(+), 12 deletions(-) create mode 100755 bprun diff --git a/bprun b/bprun new file mode 100755 index 0000000..3c36ce9 --- /dev/null +++ b/bprun @@ -0,0 +1,65 @@ +#!/usr/bin/perl +# Batch EXecutor 2.0 -- Parallel Execution Using Screen +# (c) 2011 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use POSIX; + +use lib 'lib'; +use BEX; + +my $queue_name; + +GetOptions( + "q|queue=s" => \$queue_name, +) or die <] [[!] ...] + +Options: +-q, --queue= Run jobs in the given queue +AMEN + +$ENV{'STY'} or die "Please run me under Screen\n"; +my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); +my $queue = BEX::Queue->new($queue_name); + +my $fifo_name = $queue->{'Name'} . '/status-fifo'; +unlink $fifo_name; +mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; +open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; + +my %running = (); +my $max = 1; # FIXME + +while (%running || @machines) { + if (@machines && keys %running < $max) { + my $mach = shift @machines; + $queue->scan($mach) or next; + print "$mach: START\n"; + system 'screen', './brun', "--status-fifo=$fifo_name", $mach; + !$? or print "$mach: Failed to run!\n"; + $running{$mach} = 'START'; + next; + } + $_ = ; + chomp; + my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; + if (!defined $stat) { + print "ERROR: Received invalid status message <$_>\n"; + next; + } + if (!defined $running{$mach}) { + print "ERROR: Received status message <$_> for a machine which does not run\n"; + next; + } + $running{$mach} = $stat . ($jid eq '-' ? "" : ":$jid"); + print "$mach: ", $running{$mach}, "\n"; + if ($stat eq 'DONE') { + delete $running{$mach}; + } +} + +close FIFO; +unlink $fifo_name; diff --git a/brun b/brun index 2c364ee..01bff14 100755 --- a/brun +++ b/brun @@ -5,40 +5,37 @@ use strict; use warnings; use Getopt::Long; -use IO::Socket::UNIX; use lib 'lib'; use BEX; my $given_job; my $queue_name; -my $status_socket; +my $status_fifo; GetOptions( "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, - "s|status-socket=s" => \$status_socket, + "s|status-fifo=s" => \$status_fifo, ) or die <] [[!] ...] Options: -j, --job= Run only the specified job -q, --queue= Run jobs in the given queue - --status-socket= Send status updates to the given filesystem socket + --status-fifo= Send status updates to the given named pipe AMEN -my $status_sk; -if (defined $status_socket) { - $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die; +my $status_fd; +if (defined $status_fifo) { + open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; + autoflush $status_fd, 1; } sub send_status($$$) { my ($mach, $job, $status) = @_; - if ($status_sk) { - print $status_sk "$mach $job $status\n"; - } else { - # FIXME: Debug - print ">>> $mach $job $status\n"; + if ($status_fd) { + print $status_fd "! $mach $job $status\n"; } } -- 2.39.2