2 # Batch EXecutor -- Run Queued Jobs
3 # (c) 2011-2013 Martin Mares <mj@ucw.cz>
12 Usage: bex run [<options>] [[!]<machine-or-group> ...]
15 -j, --job=<id> Run only the specified job
16 -q, --queue=<name> Select job queue
17 --status-fifo=<f> Send status updates to the given named pipe
27 "j|job=s" => \$given_job,
28 "q|queue=s" => \$queue_name,
29 "s|status-fifo=s" => \$status_fifo,
31 ) or die "Try `bex run --help' for more information.\n";
33 # We do not want SIGPIPE on writes to the status FIFO
34 $SIG{'PIPE'} = 'IGNORE';
37 if (defined $status_fifo) {
38 open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
39 autoflush $status_fd, 1;
42 sub update_status($$$$;$) {
43 my ($mach, $job, $status, $log_on_queue, $msg) = @_;
45 print $status_fd "! $mach $job $status\n";
48 $log_on_queue->update_job_status($mach, $job, $status, $msg);
56 if (!defined $pings{$mach}) {
57 if ($BEX::Config::ping_hosts) {
58 update_status($mach, '-', 'PING', undef);
59 my $host = BEX::Config::host_name($mach, 0);
60 `ping -c1 -n $host >/dev/null 2>/dev/null`;
69 return ('NOPING', 'Does not ping');
76 return "with exit code " . ($s >> 8);
78 return "on fatal signal " . ($s & 127);
82 sub run_job_prep($$$) {
83 my ($job, $queue, $mach) = @_;
84 my $prep = $job->attr('Prep');
85 defined($prep) && $prep !~ /^\s*$/ or return 'OK';
88 update_status($mach, $jid, 'PREP', $queue);
89 my $lf = $queue->log_file($mach, $jid);
90 $ENV{'HOST'} = BEX::Config::host_name($mach, 0);
91 system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
94 return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
100 sub make_job_body($$$) {
101 my ($job, $queue, $mach) = @_;
103 my $tmp = $queue->temp_file($mach, $jid);
104 open T, '>', $tmp or die;
105 if (defined $BEX::Config::job_prolog) {
106 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
107 while (<P>) { print T; }
110 print T "#!/bin/sh\n";
112 print T "# BEX job ", $jid, "\n";
113 print T $job->attr('body');
114 if (defined $BEX::Config::job_epilog) {
115 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
116 while (<E>) { print T; }
123 sub run_simple_job($$$$) {
124 my ($job, $queue, $mach, $body) = @_;
126 my $host = BEX::Config::host_name($mach, 1);
129 update_status($mach, $jid, 'SEND', undef);
131 my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
132 my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'`;
133 !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
136 update_status($mach, $jid, 'RUN', $queue);
137 my $lf = $queue->log_file($mach, $jid);
138 system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
140 return ('FAILED', 'Job failed ' . exit_status($?));
146 sub run_complex_job($$$$) {
147 my ($job, $queue, $mach, $body) = @_;
149 my $host = BEX::Config::host_name($mach, 1);
152 update_status($mach, $jid, 'SEND', undef);
154 my $cmd = 't=$(mktemp -d -t bex-XXXXXXXX) && cd $t && cat >job && chmod u+x job && mkdir attach && echo $t';
155 my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'` // "";
156 !$? && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
159 # Send attachments. We created an extra level of directory hierarchy for attachments
160 # to avoid rsync relaxing permissions on the temporary directory.
161 my $adir = $queue->attachment_dir($jid);
162 `$BEX::Config::rsync_command $adir/ $host:$rtmp/attach/`;
163 !$? or return ('NOXFER', 'Attachment transfer failed');
165 update_status($mach, $jid, 'RUN', $queue);
166 my $lf = $queue->log_file($mach, $jid);
167 system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host 'cd $rtmp/attach && $rtmp/job ; e=\$? ; rm -rf $rtmp ; exit \$e' 2>&1 | tee -a $lf";
169 return ('FAILED', 'Job failed ' . exit_status($?));
176 my ($job, $queue, $mach) = @_;
179 ($stat, $msg) = ping_machine($mach);
180 $stat eq 'OK' or return ($stat, $msg);
182 ($stat, $msg) = run_job_prep($job, $queue, $mach);
183 $stat eq 'OK' or return ($stat, $msg);
185 if ($job->attr('body') =~ /^\s*$/s) {
186 # Shortcut if the body is empty
190 ($stat, $msg) = make_job_body($job, $queue, $mach);
191 $stat eq 'OK' or return ($stat, $msg);
194 if (-d $queue->attachment_dir($job->id)) {
195 return run_complex_job($job, $queue, $mach, $body);
197 return run_simple_job($job, $queue, $mach, $body);
201 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
202 my $queue = BEX::Queue->new($queue_name);
203 $given_job = $queue->resolve_job_id($given_job) if defined $given_job;
205 $queue->lock(undef, undef) or die "The queue is locked by another bex run, cannot continue.\n";
207 for my $mach (@machines) {
208 my @q = $queue->scan($mach) or next;
209 if (!$queue->lock($mach, undef)) {
210 print "### Machine $mach is locked by another bex run, skipping...\n";
211 update_status($mach, '-', 'LOCKED', undef);
212 update_status($mach, '-', 'DONE', undef);
215 update_status($mach, '-', 'INIT', undef);
216 while (my $jid = shift @q) {
217 if (defined $given_job) {
218 $jid eq $given_job or next;
220 my $job = BEX::Job->new_from_file($queue->job_file($jid));
221 update_status($mach, $jid, 'INIT', undef);
222 if (!$queue->lock($mach, $jid)) {
223 print "### Skipping locked $jid on $mach ###\n";
224 update_status($mach, $jid, 'LOCKED', undef);
227 print "### Running ", $job->name, " on $mach ###\n";
228 my ($s, $msg) = run_job($job, $queue, $mach);
229 update_status($mach, $jid, $s, $queue, $msg);
233 $queue->remove($mach, $jid);
235 print "--- $s: $msg\n";
236 if ($BEX::Config::skip_on_fail) {
237 print "### Skipping other jobs on the same host ###\n" if @q;
243 update_status($mach, '-', 'DONE', undef);