]> mj.ucw.cz Git - bex.git/commitdiff
Implemented job attachments
authorMartin Mares <mj@ucw.cz>
Fri, 14 Jun 2013 15:05:19 +0000 (17:05 +0200)
committerMartin Mares <mj@ucw.cz>
Fri, 14 Jun 2013 15:05:19 +0000 (17:05 +0200)
NOTES
lib/bin/bex-add
lib/bin/bex-queue
lib/bin/bex-run
lib/perl/BEX/Config.pm
lib/perl/BEX/Queue.pm

diff --git a/NOTES b/NOTES
index 28adf1971c959a42100a60e9bea75eeb25644a10..499ad43bf6aabf0579be64baf59d7910ab99cb7b 100644 (file)
--- a/NOTES
+++ b/NOTES
@@ -9,8 +9,10 @@
        /<job-id>.log           (Optional) transcript of output produced by the job (including
                                previous failed attempts)
 
-<queue>/jobs/<job-id>.job      All jobs issued on this queue, including those which
+<queue>/jobs/                  All jobs issued on this queue, including those which
                                are no longer queued for any machine
+       /<job-id>.job           Description of the job (see below)
+       /<job-id>.attach/       A directory containing attachments (if any)
 
 <queue>/history/<hostname>/    Successfully completed jobs (their .job, .stat and .log files)
                                are moved here if the keep_history config switch is set.
index 56f58fabfaad70a8c389eb0a5d665c315f89c74d..4f2c3b39138822594dac031e807c298db4637814 100755 (executable)
@@ -1,11 +1,12 @@
 #!/usr/bin/perl
 # Batch EXecutor 3.0 -- Insert to Queue
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+# (c) 2011-2013 Martin Mares <mj@ucw.cz>
 
 use strict;
 use warnings;
 use Getopt::Long;
 use File::stat;
+use File::Spec;
 use BEX;
 
 my $given_body;
@@ -16,18 +17,21 @@ my $queue_name;
 my $requeue_id;
 my $given_subject;
 my $given_template;
+my @attach = ();
 
 sub usage() {
        print <<AMEN ;
 Usage: bex add [<options>] [!]<machine-or-class> ...
 
 Options:
+-a, --attach=<path>    Attach a file or directory to the job
 -b, --body=<file>      Load job body from the given file
 -e, --execute=<command>        Set job body to the given command
 -g, --go               Do not run editor, go enqueue the job immediately
 -i, --id=<id>          Set job ID of the new job
 -q, --queue=<name>     Insert new jobs to the given queue
 -r, --requeue=<id>     Re-queue an existing job instead of creating a new one
+                       (and possibly add/replace its attachments)
 -s, --subject=<subj>   Set subject of the new job
 -t, --template=<file>  Load job template (headers and body) from the given file
 AMEN
@@ -35,6 +39,7 @@ AMEN
 }
 
 GetOptions(
+       "a|attach=s" => \@attach,
        "b|body=s" => \$given_body,
        "e|execute=s" => \$given_execute,
        "g|go!" => \$given_go,
@@ -51,6 +56,11 @@ GetOptions(
 my @machines = BEX::Config::parse_machine_list(@ARGV);
 @machines or die "No machines match\n";
 
+# Verify that all attachments exist
+for my $a (@attach) {
+       -f $a || -d $a or die "Attachment $a does not exist\n";
+}
+
 my $queue = BEX::Queue->new($queue_name);
 my $job;
 my $tmp_fn;
@@ -98,6 +108,32 @@ if (defined $requeue_id) {
 # Put the job to the queue
 print "New job ", $job->id, "\n";
 $queue->save_job($job);
+
+# Create attachments
+if (@attach) {
+       my $adir = $queue->attachment_dir($job->id);
+       -d $adir || mkdir $adir or die "Cannot create $adir: $!\n";
+       for my $asrc (@attach) {
+               $asrc =~ s{/+$}{};
+               my ($vol, $dir, $base) = File::Spec->splitpath($asrc);
+               my $adest = File::Spec->catfile($adir, $base);
+               my $msg = "Attached";
+               if (-e $adest) {
+                       system "/bin/rm", "-rf", $adest;
+                       $? and die "Cannot delete old attachment $adest\n";
+                       $msg = "Updated attachment";
+               }
+               system "/bin/cp", "-a", $asrc, $adest;
+               $? and die "Cannot copy attachment $asrc to $adest\n";
+               if (-d $asrc) {
+                       print "$msg $base/...\n";
+               } else {
+                       print "$msg $base\n";
+               }
+       }
+}
+
+# Enqueue the job on all machines
 for my $m (@machines) {
        if ($queue->enqueue($m, $job)) {
                $queue->update_job_status($m, $job->id, 'NEW');
index a109f70c1053fb1053e7264a8338a8b688d85049..18e75564e42c45537276a8a7c492bf9ac8de727d 100755 (executable)
@@ -117,11 +117,14 @@ sub do_ls()
                for my $j (@{$machs{$m}}) {
                        my $st = get_status($m, $j);
                        my $s = get_stat($m, $j);
+                       my $adir = $queue->attachment_dir($j);
+                       my $has_adir = -d $adir;
                        $cnt_by_job{$j}{$s}++;
                        $cnt_by_mach{$m}{$s}++;
                        $why{$m}{$j} = "";
                        if ($filenames) {
                                $why{$m}{$j} .= "\t\t== Job file: " . $queue->queue_file($m, $j) . "\n";
+                               $why{$m}{$j} .= "\t\t== Attachments: $adir\n" if $has_adir;
                        }
                        if (defined($st->{'Time'}) && defined($st->{'Status'})) {
                                $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
@@ -139,6 +142,7 @@ sub do_ls()
                        if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
                                $stat{$m}{$j} .= ' [LOCKED]';
                        }
+                       $stat{$m}{$j} .= ' +ATT' if $has_adir;
                }
        }
 
index 5ace2240e3a5f7ef8745f739d917f9192571dbdb..e54005359ebaf85267a96e7d685b5623c8306ea9 100755 (executable)
@@ -97,17 +97,9 @@ sub run_job_prep($$$) {
        }
 }
 
-sub run_job_body($$$) {
+sub make_job_body($$$) {
        my ($job, $queue, $mach) = @_;
-
-       if ($job->attr('body') =~ /^\s*$/s) {
-               # Shortcut if the body is empty
-               return 'OK'
-       }
-
-       my $host = BEX::Config::host_name($mach);
        my $jid = $job->id;
-
        my $tmp = $queue->temp_file($mach, $jid);
        open T, '>', $tmp or die;
        if (defined $BEX::Config::job_prolog) {
@@ -125,10 +117,19 @@ sub run_job_body($$$) {
                close E;
        }
        close T;
+       return ('OK', $tmp);
+}
+
+sub run_simple_job($$$$) {
+       my ($job, $queue, $mach, $body) = @_;
+
+       my $host = BEX::Config::host_name($mach);
+       my $jid = $job->id;
 
        update_status($mach, $jid, 'SEND', undef);
+
        my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
-       my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
+       my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'`;
        !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
        chomp $rtmp;
 
@@ -142,6 +143,35 @@ sub run_job_body($$$) {
        }
 }
 
+sub run_complex_job($$$$) {
+       my ($job, $queue, $mach, $body) = @_;
+
+       my $host = BEX::Config::host_name($mach);
+       my $jid = $job->id;
+
+       update_status($mach, $jid, 'SEND', undef);
+
+       my $cmd = 't=$(mktemp -d -t bex-XXXXXXXX) && cd $t && cat >job && chmod u+x job && mkdir attach && echo $t';
+       my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'` // "";
+       !$? && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
+       chomp $rtmp;
+
+       # Send attachments. We created an extra level of directory hierarchy for attachments
+       # to avoid rsync relaxing permissions on the temporary directory.
+       my $adir = $queue->attachment_dir($jid);
+       `$BEX::Config::rsync_command $adir/ $host:$rtmp/attach/`;
+       !$? or return ('NOXREF', 'Attachment transfer failed');
+
+       update_status($mach, $jid, 'RUN', $queue);
+       my $lf = $queue->log_file($mach, $jid);
+       system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host 'cd $rtmp/attach && $rtmp/job ; e=\$? ; rm -rf $rtmp ; exit \$e' 2>&1 | tee -a $lf";
+       if ($?) {
+               return ('FAILED', 'Job failed ' . exit_status($?));
+       } else {
+               return 'OK';
+       }
+}
+
 sub run_job($$$) {
        my ($job, $queue, $mach) = @_;
        my ($stat, $msg);
@@ -152,7 +182,20 @@ sub run_job($$$) {
        ($stat, $msg) = run_job_prep($job, $queue, $mach);
        $stat eq 'OK' or return ($stat, $msg);
 
-       return run_job_body($job, $queue, $mach);
+       if ($job->attr('body') =~ /^\s*$/s) {
+               # Shortcut if the body is empty
+               return 'OK'
+       }
+
+       ($stat, $msg) = make_job_body($job, $queue, $mach);
+       $stat eq 'OK' or return ($stat, $msg);
+       my $body = $msg;
+
+       if (-d $queue->attachment_dir($job->id)) {
+               return run_complex_job($job, $queue, $mach, $body);
+       } else {
+               return run_simple_job($job, $queue, $mach, $body);
+       }
 }
 
 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
index 8a36d45690317c9fd464c8c94d958eb4661c8c0b..c5a422df1c658cbe858318a90f339b83d0e23ad3 100644 (file)
@@ -56,6 +56,9 @@ our $skip_on_fail = 0;
 # How we run ssh (including options)
 our $ssh_command = "ssh";
 
+# How we run rsync to upload attachments (including options)
+our $rsync_command = "rsync -a";
+
 # Various utility functions
 
 sub parse_machine_list(@);
index 298c25d68373fedf89e8cc5c7bbdb1beb93ccd02..d3c42bef88a39d970945ff267a3ebeffc7f733ca 100644 (file)
@@ -73,6 +73,11 @@ sub job_file($$) {
        return $queue->{'Path'} . '/jobs/' . $jid. '.job';
 }
 
+sub attachment_dir($$) {
+       my ($queue, $jid) = @_;
+       return $queue->{'Path'} . '/jobs/' . $jid. '.attach';
+}
+
 sub save_job($$) {
        my ($queue, $job) = @_;
        # If the job already exists, it is shamelessly rewritten by new contents