From f0db6a85bb186258d023ff64f10cc7a7440256f1 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 31 Oct 2011 10:25:48 +0100 Subject: [PATCH] BEX module split --- benq | 3 +- brun | 42 ++++++----- lib/BEX.pm | 191 +---------------------------------------------- lib/BEX/Job.pm | 80 ++++++++++++++++++++ lib/BEX/Queue.pm | 131 ++++++++++++++++++++++++++++++++ 5 files changed, 237 insertions(+), 210 deletions(-) create mode 100644 lib/BEX/Job.pm create mode 100644 lib/BEX/Queue.pm diff --git a/benq b/benq index a26660f..c401720 100755 --- a/benq +++ b/benq @@ -39,12 +39,11 @@ if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size # Re-load the job and put it to the queue $job = BEX::Job->new_from_file($fn); -print "New job ", $job->{'ID'}, "\n"; +print "New job ", $job->id, "\n"; my $queue = BEX::Queue->new($queue_name); for my $m (@machines) { if ($queue->enqueue($m, $job)) { print "\t$m\n"; - BEX::log("$m " . $job->{'ID'} . " queued"); } else { print "\t$m (already queued)\n"; } diff --git a/brun b/brun index 01bff14..760bf9c 100755 --- a/brun +++ b/brun @@ -22,7 +22,7 @@ Usage: brun [] [[!] ...] Options: -j, --job= Run only the specified job --q, --queue= Run jobs in the given queue +-q, --queue= Select job queue --status-fifo= Send status updates to the given named pipe AMEN @@ -32,16 +32,19 @@ if (defined $status_fifo) { autoflush $status_fd, 1; } -sub send_status($$$) { - my ($mach, $job, $status) = @_; +sub update_status($$$$;$) { + my ($mach, $job, $status, $log_on_queue, $msg) = @_; if ($status_fd) { print $status_fd "! $mach $job $status\n"; } + if ($log_on_queue) { + $log_on_queue->log($mach, $job, $status, $msg); + } } sub ping_machine($) { my ($mach) = @_; - send_status($mach, '-', 'PING'); + update_status($mach, '-', 'PING', undef); `ping -c1 -n $mach >/dev/null 2>/dev/null`; return !$?; } @@ -56,7 +59,7 @@ sub run_job($$$) { my $tmp = $queue->temp_file($mach, $jid); open T, '>', $tmp or die; if (defined $BEX::Config::job_prolog) { - open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!"; + open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); while (

) { print T; } close P; } else { @@ -65,24 +68,24 @@ sub run_job($$$) { print T "# BEX job ", $jid, "\n"; print T $job->{'body'}; if (defined $BEX::Config::job_epilog) { - open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!"; + open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); while () { print T; } close E; } close T; - send_status($mach, $jid, 'SEND'); + update_status($mach, $jid, 'SEND', undef); my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; my $rtmp = `ssh <$tmp $mach '$cmd'`; - !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed"; + !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); chomp $rtmp; - send_status($mach, $jid, 'RUN'); + update_status($mach, $jid, 'RUN', $queue); system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e"; if ($?) { - return 'Failed'; + return ('FAILED', 'Job failed'); } else { - return 'OK'; + return ('OK', undef); } } @@ -91,7 +94,7 @@ my $queue = BEX::Queue->new($queue_name); for my $mach (@machines) { my @q = $queue->scan($mach) or next; - send_status($mach, '-', 'INIT'); + update_status($mach, '-', 'INIT', undef); my $ping; for my $jid (@q) { if (defined $given_job) { @@ -103,24 +106,23 @@ for my $mach (@machines) { }; print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n"; $ping //= ping_machine($mach); - my $s; + my $s, $msg; if (!$ping) { - $s = 'No ping'; + ($s, $msg) = ('NOPING', 'Does not ping'); } else { - $s = run_job($job, $queue, $mach); + ($s, $msg) = run_job($job, $queue, $mach); } + update_status($mach, $jid, $s, $queue, $msg); - BEX::log("$mach $jid $s"); if ($s eq 'OK') { print "+++ OK\n"; $queue->remove($mach, $jid); - send_status($mach, $jid, 'OK'); } else { - print "--- $s\n"; + print "--- $s: $msg\n"; $stat->{'Status'} = $s; + $stat->{'Message'} = $msg; $queue->write_job_status($mach, $jid, $stat); - send_status($mach, $jid, 'ERR'); } } - send_status($mach, '-', 'DONE'); + update_status($mach, '-', 'DONE', undef); } diff --git a/lib/BEX.pm b/lib/BEX.pm index 7d7f99c..579eff2 100644 --- a/lib/BEX.pm +++ b/lib/BEX.pm @@ -4,195 +4,10 @@ use strict; use warnings; -use BEX::Config; - package BEX; -use IO::File; - -use POSIX; - -my $log_file; - -sub log($) { - $log_file //= new IO::File '>>log' or die "Cannot open log: $!"; - print $log_file POSIX::strftime("%Y-%m-%d %H:%M:%S ", localtime), $_[0], "\n"; -} - -package BEX::Job; - -use POSIX; - -our $job_cnt = 0; - -sub new($;$) { - my ($class, $id) = @_; - my $job = { }; - bless $job; - if (defined $id) { - $job->{'ID'} = $id; - } else { - $job_cnt++; - $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime); - } - $job->{'Subject'} = '(no subject)'; - return $job; -} - -sub new_from_file($$;$) { - my ($class, $file, $header_only) = @_; - my $job = { }; - open T, '<', $file or die "Cannot open $file: $!"; - while () { - chomp; - /^$/ and last; - /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error"; - !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined"; - $job->{$1} = $2; - } - if (!$header_only) { - my @cmds = ; - $job->{'body'} = join("", @cmds); - } - close T; - $job->{'Subject'} //= '?'; - $job->{'ID'} or die "Cannot load $file: Missing ID"; - return bless $job; -} - -sub attr($$;$) { - my ($job, $attr, $val) = @_; - $job->{$attr} = $val if defined $val; - return $job->{$attr}; -} - -sub dump($) { - my ($job) = @_; - for my $k (sort keys %$job) { - print "$k: ", $job->{$k}, "\n"; - } -} - -sub save($;$) { - my ($job, $fn) = @_; - -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!"; - $fn //= 'tmp/' . $job->{'ID'}; - open T, '>', $fn or die "Cannot create $fn: $!"; - for my $k (sort grep { /^[A-Z]/ } keys %$job) { - print T "$k: ", $job->{$k}, "\n"; - } - print T "\n"; - print T $job->{'body'} if defined $job->{'body'}; - close T; - return $fn; -} - -package BEX::Queue; - -sub new($;$) { - my ($class, $name) = @_; - $name //= 'queue'; - -d $name or die "Queue directory $name does not exist\n"; - for my $d ("hosts", "jobs") { - -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!"; - } - my $queue = { - 'Name' => $name, - 'MetaCache' => {}, - }; - return bless $queue; -} - -sub host_dir($$) { - my ($queue, $machine) = @_; - return $queue->{'Name'} . '/hosts/' . $machine; -} - -sub queue_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.job'; -} - -sub status_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.stat'; -} - -sub temp_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.tmp'; -} - -sub job_file($$) { - my ($queue, $jid) = @_; - return $queue->{'Name'} . '/jobs/' . $jid. '.job'; -} - -sub enqueue($$$) { - my ($queue, $machine, $job) = @_; - my $qf = $queue->queue_file($machine, $job->{'ID'}); - if (-f $qf) { return 0; } - my $fn = $queue->job_file($job->{'ID'}); - -f $fn or $job->save($fn); - my $dir = $queue->host_dir($machine); - -d $dir or mkdir $dir or die "Cannot create directory $dir: $!"; - symlink '../../jobs/' . $job->{'ID'} . '.job', $qf or die "Cannot create $qf: $!"; - return 1; -} - -sub scan($$) { - my ($queue, $machine) = @_; - my @list = (); - if (opendir D, $queue->host_dir($machine)) { - while ($_ = readdir D) { - /^\./ and next; - s{\.job}{} or next; - push @list, $_; - } - closedir D; - } - return @list; -} - -sub remove($$) { - my ($queue, $machine, $jid) = @_; - unlink $queue->queue_file($machine, $jid); - unlink $queue->status_file($machine, $jid); - unlink $queue->temp_file($machine, $jid); -} - -sub job_metadata($$) { - my ($queue, $jid) = @_; - my $cache = $queue->{'MetaCache'}; - if (!defined $cache->{$jid}) { - $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1); - } - return $cache->{$jid}; -} - -sub read_job_status($$$) { - my ($queue, $machine, $jid) = @_; - my %s = (); - my $sf = $queue->status_file($machine, $jid); - if (open S, '<', $sf) { - while () { - chomp; - /^(\w+):\s*(.*)/ or die "Parse error in $sf"; - $s{$1} = $2; - } - close S; - } - return \%s; -} - -sub write_job_status($$$$) { - my ($queue, $machine, $jid, $stat) = @_; - my $sf = $queue->status_file($machine, $jid); - open S, '>', $sf or die "Cannot create $sf: $!"; - for my $k (sort keys %$stat) { - print S "$k: ", $stat->{$k}, "\n"; - } - close S; -} +use BEX::Config; +use BEX::Job; +use BEX::Queue; 42; diff --git a/lib/BEX/Job.pm b/lib/BEX/Job.pm new file mode 100644 index 0000000..16e55f5 --- /dev/null +++ b/lib/BEX/Job.pm @@ -0,0 +1,80 @@ +# Batch EXecutor 2.0 -- Jobs +# (c) 2011 Martin Mares + +use strict; +use warnings; + +package BEX::Job; + +use POSIX (); + +our $job_cnt = 0; + +sub new($;$) { + my ($class, $id) = @_; + my $job = { }; + bless $job; + if (defined $id) { + $job->{'ID'} = $id; + } else { + $job_cnt++; + $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime); + } + $job->{'Subject'} = '(no subject)'; + return $job; +} + +sub new_from_file($$;$) { + my ($class, $file, $header_only) = @_; + my $job = { }; + open T, '<', $file or die "Cannot open $file: $!"; + while () { + chomp; + /^$/ and last; + /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error"; + !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined"; + $job->{$1} = $2; + } + if (!$header_only) { + my @cmds = ; + $job->{'body'} = join("", @cmds); + } + close T; + $job->{'Subject'} //= '?'; + $job->{'ID'} or die "Cannot load $file: Missing ID"; + $job->{'ID'} !~ /\.[a-z]+$/ or die "Cannot load $file: Invalid ID syntax"; + return bless $job; +} + +sub id($) { + return $_->{'ID'}; +} + +sub attr($$;$) { + my ($job, $attr, $val) = @_; + $job->{$attr} = $val if defined $val; + return $job->{$attr}; +} + +sub dump($) { + my ($job) = @_; + for my $k (sort keys %$job) { + print "$k: ", $job->{$k}, "\n"; + } +} + +sub save($;$) { + my ($job, $fn) = @_; + -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!"; + $fn //= 'tmp/' . $job->id; + open T, '>', $fn or die "Cannot create $fn: $!"; + for my $k (sort grep { /^[A-Z]/ } keys %$job) { + print T "$k: ", $job->{$k}, "\n"; + } + print T "\n"; + print T $job->{'body'} if defined $job->{'body'}; + close T; + return $fn; +} + +42; diff --git a/lib/BEX/Queue.pm b/lib/BEX/Queue.pm new file mode 100644 index 0000000..429c7a4 --- /dev/null +++ b/lib/BEX/Queue.pm @@ -0,0 +1,131 @@ +# Batch EXecutor 2.0 -- Queues +# (c) 2011 Martin Mares + +use strict; +use warnings; + +package BEX::Queue; + +use IO::File; +use POSIX (); + +sub new($;$) { + my ($class, $name) = @_; + $name //= 'queue'; + -d $name or die "Queue directory $name does not exist\n"; + for my $d ("hosts", "jobs") { + -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!"; + } + my $queue = { + 'Name' => $name, + 'MetaCache' => {}, + }; + return bless $queue; +} + +# Most actions have to be logged by the caller +sub log($$$$;$) { + my ($queue, $mach, $jid, $stat, $msg) = @_; + my $fh = $queue->{'LogFH'} //= new IO::File '>>', $queue->{'Name'} . '/log' or die "Cannot open log: $!"; + my $m = join(" ", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), $mach, $jid, $stat); + $m .= " $msg" if defined $msg; + print $fh "$m\n"; +} + +sub host_dir($$) { + my ($queue, $machine) = @_; + return $queue->{'Name'} . '/hosts/' . $machine; +} + +sub queue_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.job'; +} + +sub status_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.stat'; +} + +sub temp_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.tmp'; +} + +sub job_file($$) { + my ($queue, $jid) = @_; + return $queue->{'Name'} . '/jobs/' . $jid. '.job'; +} + +sub enqueue($$$) { + my ($queue, $machine, $job) = @_; + my $qf = $queue->queue_file($machine, $job->id); + if (-f $qf) { + $queue->log($machine, $job->id, 'REQUEUE'); + return 0; + } + my $fn = $queue->job_file($job->id); + -f $fn or $job->save($fn); + my $dir = $queue->host_dir($machine); + -d $dir or mkdir $dir or die "Cannot create directory $dir: $!"; + symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!"; + $queue->log($machine, $job->id, 'QUEUE'); + return 1; +} + +sub scan($$) { + my ($queue, $machine) = @_; + my @list = (); + if (opendir D, $queue->host_dir($machine)) { + while ($_ = readdir D) { + /^\./ and next; + s{\.job}{} or next; + push @list, $_; + } + closedir D; + } + return @list; +} + +sub remove($$) { + my ($queue, $machine, $jid) = @_; + unlink $queue->queue_file($machine, $jid); + unlink $queue->status_file($machine, $jid); + unlink $queue->temp_file($machine, $jid); +} + +sub job_metadata($$) { + my ($queue, $jid) = @_; + my $cache = $queue->{'MetaCache'}; + if (!defined $cache->{$jid}) { + $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1); + } + return $cache->{$jid}; +} + +sub read_job_status($$$) { + my ($queue, $machine, $jid) = @_; + my %s = (); + my $sf = $queue->status_file($machine, $jid); + if (open S, '<', $sf) { + while () { + chomp; + /^(\w+):\s*(.*)/ or die "Parse error in $sf"; + $s{$1} = $2; + } + close S; + } + return \%s; +} + +sub write_job_status($$$$) { + my ($queue, $machine, $jid, $stat) = @_; + my $sf = $queue->status_file($machine, $jid); + open S, '>', $sf or die "Cannot create $sf: $!"; + for my $k (sort keys %$stat) { + print S "$k: ", $stat->{$k}, "\n"; + } + close S; +} + +42; -- 2.39.2