]> mj.ucw.cz Git - bex.git/blob - brun
Implemented `bq --rm' and `bq --move-to'
[bex.git] / brun
1 #!/usr/bin/perl
2 # Batch EXecutor 2.0 -- Run Queued Jobs
3 # (c) 2011 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use Getopt::Long;
8
9 use lib 'lib';
10 use BEX;
11
12 my $given_job;
13 my $queue_name;
14 my $status_fifo;
15
16 GetOptions(
17         "j|job=s" => \$given_job,
18         "q|queue=s" => \$queue_name,
19         "s|status-fifo=s" => \$status_fifo,
20 ) or die <<AMEN ;
21 Usage: brun [<options>] [[!]<machine-or-class> ...]
22
23 Options:
24 -j, --job=<id>          Run only the specified job
25 -q, --queue=<name>      Select job queue
26     --status-fifo=<f>   Send status updates to the given named pipe
27 AMEN
28
29 my $status_fd;
30 if (defined $status_fifo) {
31         open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
32         autoflush $status_fd, 1;
33 }
34
35 sub update_status($$$$;$) {
36         my ($mach, $job, $status, $log_on_queue, $msg) = @_;
37         if ($status_fd) {
38                 print $status_fd "! $mach $job $status\n";
39         }
40         if ($log_on_queue) {
41                 $log_on_queue->log($mach, $job, $status, $msg);
42         }
43 }
44
45 my %pings;
46
47 sub ping_machine($) {
48         my ($mach) = @_;
49         if (!defined $pings{$mach}) {
50                 if ($BEX::Config::ping_hosts) {
51                         update_status($mach, '-', 'PING', undef);
52                         my $host = BEX::Config::host_name($mach);
53                         `ping -c1 -n $host >/dev/null 2>/dev/null`;
54                         $pings{$mach} = !$?;
55                 } else {
56                         $pings{$mach} = 1;
57                 }
58         }
59         if ($pings{$mach}) {
60                 return ('OK', undef);
61         } else {
62                 return ('NOPING', 'Does not ping');
63         }
64 }
65
66 sub exit_status($) {
67         my ($s) = @_;
68         if ($s >> 8) {
69                 return "with exit code " . ($s >> 8);
70         } else {
71                 return "on fatal signal " . ($s & 127);
72         }
73 }
74
75 sub run_job_prep($$$) {
76         my ($job, $queue, $mach) = @_;
77         my $prep = $job->attr('Prep');
78         defined($prep) && $prep !~ /^\s*$/ or return 'OK';
79
80         my $jid = $job->id;
81         update_status($mach, $jid, 'PREP', $queue);
82         my $lf = $queue->log_file($mach, $jid);
83         $ENV{'HOST'} = BEX::Config::host_name($mach);
84         system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
85         delete $ENV{'HOST'};
86         if ($?) {
87                 return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
88         } else {
89                 return 'OK';
90         }
91 }
92
93 sub run_job_body($$$) {
94         my ($job, $queue, $mach) = @_;
95
96         if ($job->attr('body') =~ /^\s*$/s) {
97                 # Shortcut if the body is empty
98                 return 'OK'
99         }
100
101         my $host = BEX::Config::host_name($mach);
102         my $jid = $job->id;
103
104         my $tmp = $queue->temp_file($mach, $jid);
105         open T, '>', $tmp or die;
106         if (defined $BEX::Config::job_prolog) {
107                 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
108                 while (<P>) { print T; }
109                 close P;
110         } else {
111                 print T "#!/bin/sh\n";
112         }
113         print T "# BEX job ", $jid, "\n";
114         print T $job->attr('body');
115         if (defined $BEX::Config::job_epilog) {
116                 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
117                 while (<E>) { print T; }
118                 close E;
119         }
120         close T;
121
122         update_status($mach, $jid, 'SEND', undef);
123         my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
124         my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
125         !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
126         chomp $rtmp;
127
128         update_status($mach, $jid, 'RUN', $queue);
129         my $lf = $queue->log_file($mach, $jid);
130         system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
131         if ($?) {
132                 return ('FAILED', 'Job failed ' . exit_status($?));
133         } else {
134                 return 'OK';
135         }
136 }
137
138 sub run_job($$$) {
139         my ($job, $queue, $mach) = @_;
140         my ($stat, $msg);
141
142         ($stat, $msg) = ping_machine($mach);
143         $stat eq 'OK' or return ($stat, $msg);
144
145         ($stat, $msg) = run_job_prep($job, $queue, $mach);
146         $stat eq 'OK' or return ($stat, $msg);
147
148         return run_job_body($job, $queue, $mach);
149 }
150
151 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
152 my $queue = BEX::Queue->new($queue_name);
153
154 $queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
155
156 for my $mach (@machines) {
157         my @q = $queue->scan($mach) or next;
158         if (!$queue->lock($mach, undef)) {
159                 print "### Machine $mach is locked by another brun, skipping...\n";
160                 update_status($mach, '-', 'LOCKED', undef);
161                 update_status($mach, '-', 'DONE', undef);
162                 next;
163         }
164         update_status($mach, '-', 'INIT', undef);
165         while (my $jid = shift @q) {
166                 if (defined $given_job) {
167                         $jid eq $given_job or next;
168                 }
169                 my $job = BEX::Job->new_from_file($queue->job_file($jid));
170                 if (!$queue->lock($mach, $jid)) {
171                         print "### Skipping locked $jid on $mach ###\n";
172                         update_status($mach, $jid, 'LOCKED', undef);
173                         next;
174                 }
175                 my $stat = {
176                         'Time' => time,
177                 };
178                 print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
179                 my ($s, $msg) = run_job($job, $queue, $mach);
180
181                 $stat->{'Status'} = $s;
182                 $stat->{'Message'} = $msg;
183                 $queue->write_job_status($mach, $jid, $stat);
184
185                 # Called after writing the status file, so that the front-end watching
186                 # our status FIFO can see the new status file.
187                 update_status($mach, $jid, $s, $queue, $msg);
188
189                 if ($s eq 'OK') {
190                         print "+++ OK\n";
191                         $queue->remove($mach, $jid);
192                 } else {
193                         print "--- $s: $msg\n";
194                         if ($BEX::Config::skip_on_fail) {
195                                 print "### Skipping other jobs on the same host ###\n" if @q;
196                                 last;
197                         }
198                 }
199         }
200         update_status($mach, '-', 'DONE', undef);
201 }
202 $queue->unlock;