]> mj.ucw.cz Git - bex.git/blob - lib/bin/bex-run
25241201094e2cbea1256b72fa1950e32ed75446
[bex.git] / lib / bin / bex-run
1 #!/usr/bin/perl
2 # Batch EXecutor -- Run Queued Jobs
3 # (c) 2011-2013 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use Getopt::Long;
8 use BEX;
9
10 sub usage() {
11         print <<AMEN ;
12 Usage: bex run [<options>] [[!]<machine-or-class> ...]
13
14 Options:
15 -j, --job=<id>          Run only the specified job
16 -q, --queue=<name>      Select job queue
17     --status-fifo=<f>   Send status updates to the given named pipe
18 AMEN
19         exit 0;
20 }
21
22 my $given_job;
23 my $queue_name;
24 my $status_fifo;
25
26 GetOptions(
27         "j|job=s" => \$given_job,
28         "q|queue=s" => \$queue_name,
29         "s|status-fifo=s" => \$status_fifo,
30         "help" => \&usage,
31 ) or die "Try `bex run --help' for more information.\n";
32
33 # We do not want SIGPIPE on writes to the status FIFO
34 $SIG{'PIPE'} = 'IGNORE';
35
36 my $status_fd;
37 if (defined $status_fifo) {
38         open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
39         autoflush $status_fd, 1;
40 }
41
42 sub update_status($$$$;$) {
43         my ($mach, $job, $status, $log_on_queue, $msg) = @_;
44         if ($status_fd) {
45                 print $status_fd "! $mach $job $status\n";
46         }
47         if ($log_on_queue) {
48                 $log_on_queue->update_job_status($mach, $job, $status, $msg);
49         }
50 }
51
52 my %pings;
53
54 sub ping_machine($) {
55         my ($mach) = @_;
56         if (!defined $pings{$mach}) {
57                 if ($BEX::Config::ping_hosts) {
58                         update_status($mach, '-', 'PING', undef);
59                         my $host = BEX::Config::host_name($mach, 0);
60                         `ping -c1 -n $host >/dev/null 2>/dev/null`;
61                         $pings{$mach} = !$?;
62                 } else {
63                         $pings{$mach} = 1;
64                 }
65         }
66         if ($pings{$mach}) {
67                 return ('OK', undef);
68         } else {
69                 return ('NOPING', 'Does not ping');
70         }
71 }
72
73 sub exit_status($) {
74         my ($s) = @_;
75         if ($s >> 8) {
76                 return "with exit code " . ($s >> 8);
77         } else {
78                 return "on fatal signal " . ($s & 127);
79         }
80 }
81
82 sub run_job_prep($$$) {
83         my ($job, $queue, $mach) = @_;
84         my $prep = $job->attr('Prep');
85         defined($prep) && $prep !~ /^\s*$/ or return 'OK';
86
87         my $jid = $job->id;
88         update_status($mach, $jid, 'PREP', $queue);
89         my $lf = $queue->log_file($mach, $jid);
90         $ENV{'HOST'} = BEX::Config::host_name($mach, 0);
91         system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
92         delete $ENV{'HOST'};
93         if ($?) {
94                 return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
95         } else {
96                 return 'OK';
97         }
98 }
99
100 sub make_job_body($$$) {
101         my ($job, $queue, $mach) = @_;
102         my $jid = $job->id;
103         my $tmp = $queue->temp_file($mach, $jid);
104         open T, '>', $tmp or die;
105         if (defined $BEX::Config::job_prolog) {
106                 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
107                 while (<P>) { print T; }
108                 close P;
109         } else {
110                 print T "#!/bin/sh\n";
111         }
112         print T "# BEX job ", $jid, "\n";
113         print T $job->attr('body');
114         if (defined $BEX::Config::job_epilog) {
115                 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
116                 while (<E>) { print T; }
117                 close E;
118         }
119         close T;
120         return ('OK', $tmp);
121 }
122
123 sub run_simple_job($$$$) {
124         my ($job, $queue, $mach, $body) = @_;
125
126         my $host = BEX::Config::host_name($mach, 1);
127         my $jid = $job->id;
128
129         update_status($mach, $jid, 'SEND', undef);
130
131         my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
132         my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'`;
133         !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
134         chomp $rtmp;
135
136         update_status($mach, $jid, 'RUN', $queue);
137         my $lf = $queue->log_file($mach, $jid);
138         system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
139         if ($?) {
140                 return ('FAILED', 'Job failed ' . exit_status($?));
141         } else {
142                 return 'OK';
143         }
144 }
145
146 sub run_complex_job($$$$) {
147         my ($job, $queue, $mach, $body) = @_;
148
149         my $host = BEX::Config::host_name($mach, 1);
150         my $jid = $job->id;
151
152         update_status($mach, $jid, 'SEND', undef);
153
154         my $cmd = 't=$(mktemp -d -t bex-XXXXXXXX) && cd $t && cat >job && chmod u+x job && mkdir attach && echo $t';
155         my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'` // "";
156         !$? && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
157         chomp $rtmp;
158
159         # Send attachments. We created an extra level of directory hierarchy for attachments
160         # to avoid rsync relaxing permissions on the temporary directory.
161         my $adir = $queue->attachment_dir($jid);
162         `$BEX::Config::rsync_command $adir/ $host:$rtmp/attach/`;
163         !$? or return ('NOXFER', 'Attachment transfer failed');
164
165         update_status($mach, $jid, 'RUN', $queue);
166         my $lf = $queue->log_file($mach, $jid);
167         system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host 'cd $rtmp/attach && $rtmp/job ; e=\$? ; rm -rf $rtmp ; exit \$e' 2>&1 | tee -a $lf";
168         if ($?) {
169                 return ('FAILED', 'Job failed ' . exit_status($?));
170         } else {
171                 return 'OK';
172         }
173 }
174
175 sub run_job($$$) {
176         my ($job, $queue, $mach) = @_;
177         my ($stat, $msg);
178
179         ($stat, $msg) = ping_machine($mach);
180         $stat eq 'OK' or return ($stat, $msg);
181
182         ($stat, $msg) = run_job_prep($job, $queue, $mach);
183         $stat eq 'OK' or return ($stat, $msg);
184
185         if ($job->attr('body') =~ /^\s*$/s) {
186                 # Shortcut if the body is empty
187                 return 'OK'
188         }
189
190         ($stat, $msg) = make_job_body($job, $queue, $mach);
191         $stat eq 'OK' or return ($stat, $msg);
192         my $body = $msg;
193
194         if (-d $queue->attachment_dir($job->id)) {
195                 return run_complex_job($job, $queue, $mach, $body);
196         } else {
197                 return run_simple_job($job, $queue, $mach, $body);
198         }
199 }
200
201 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
202 my $queue = BEX::Queue->new($queue_name);
203 $given_job = $queue->resolve_job_id($given_job) if defined $given_job;
204
205 $queue->lock(undef, undef) or die "The queue is locked by another bex run, cannot continue.\n";
206
207 for my $mach (@machines) {
208         my @q = $queue->scan($mach) or next;
209         if (!$queue->lock($mach, undef)) {
210                 print "### Machine $mach is locked by another bex run, skipping...\n";
211                 update_status($mach, '-', 'LOCKED', undef);
212                 update_status($mach, '-', 'DONE', undef);
213                 next;
214         }
215         update_status($mach, '-', 'INIT', undef);
216         while (my $jid = shift @q) {
217                 if (defined $given_job) {
218                         $jid eq $given_job or next;
219                 }
220                 my $job = BEX::Job->new_from_file($queue->job_file($jid));
221                 update_status($mach, $jid, 'INIT', undef);
222                 if (!$queue->lock($mach, $jid)) {
223                         print "### Skipping locked $jid on $mach ###\n";
224                         update_status($mach, $jid, 'LOCKED', undef);
225                         next;
226                 }
227                 print "### Running ", $job->name, " on $mach ###\n";
228                 my ($s, $msg) = run_job($job, $queue, $mach);
229                 update_status($mach, $jid, $s, $queue, $msg);
230
231                 if ($s eq 'OK') {
232                         print "+++ OK\n";
233                         $queue->remove($mach, $jid);
234                 } else {
235                         print "--- $s: $msg\n";
236                         if ($BEX::Config::skip_on_fail) {
237                                 print "### Skipping other jobs on the same host ###\n" if @q;
238                                 last;
239                         }
240                 }
241         }
242 } continue {
243         update_status($mach, '-', 'DONE', undef);
244 }
245 $queue->unlock;