]> mj.ucw.cz Git - bex.git/blob - brun
Host names are configurable
[bex.git] / brun
1 #!/usr/bin/perl
2 # Batch EXecutor 2.0 -- Run Queued Jobs
3 # (c) 2011 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use Getopt::Long;
8
9 use lib 'lib';
10 use BEX;
11
12 my $given_job;
13 my $queue_name;
14 my $status_fifo;
15
16 GetOptions(
17         "j|job=s" => \$given_job,
18         "q|queue=s" => \$queue_name,
19         "s|status-fifo=s" => \$status_fifo,
20 ) or die <<AMEN ;
21 Usage: brun [<options>] [[!]<machine-or-class> ...]
22
23 Options:
24 -j, --job=<id>          Run only the specified job
25 -q, --queue=<name>      Select job queue
26     --status-fifo=<f>   Send status updates to the given named pipe
27 AMEN
28
29 my $status_fd;
30 if (defined $status_fifo) {
31         open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
32         autoflush $status_fd, 1;
33 }
34
35 sub update_status($$$$;$) {
36         my ($mach, $job, $status, $log_on_queue, $msg) = @_;
37         if ($status_fd) {
38                 print $status_fd "! $mach $job $status\n";
39         }
40         if ($log_on_queue) {
41                 $log_on_queue->log($mach, $job, $status, $msg);
42         }
43 }
44
45 sub ping_machine($) {
46         my ($mach) = @_;
47         update_status($mach, '-', 'PING', undef);
48         my $host = BEX::Config::host_name($mach);
49         `ping -c1 -n $host >/dev/null 2>/dev/null`;
50         return !$?;
51 }
52
53 sub run_job($$$) {
54         my ($job, $queue, $mach) = @_;
55         my $jid = $job->{'ID'};
56         my $host = BEX::Config::host_name($mach);
57
58         my $tmp = $queue->temp_file($mach, $jid);
59         open T, '>', $tmp or die;
60         if (defined $BEX::Config::job_prolog) {
61                 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
62                 while (<P>) { print T; }
63                 close P;
64         } else {
65                 print T "#!/bin/sh\n";
66         }
67         print T "# BEX job ", $jid, "\n";
68         print T $job->{'body'};
69         if (defined $BEX::Config::job_epilog) {
70                 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
71                 while (<E>) { print T; }
72                 close E;
73         }
74         close T;
75
76         update_status($mach, $jid, 'SEND', undef);
77         my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
78         my $rtmp = `ssh <$tmp $host '$cmd'`;
79         !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
80         chomp $rtmp;
81
82         update_status($mach, $jid, 'RUN', $queue);
83         my $lf = $queue->log_file($mach, $jid);
84         system 'bash', '-o', 'pipefail', '-c', "ssh -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
85         if ($?) {
86                 return ('FAILED', 'Job failed');
87         } else {
88                 return ('OK', undef);
89         }
90 }
91
92 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
93 my $queue = BEX::Queue->new($queue_name);
94
95 for my $mach (@machines) {
96         my @q = $queue->scan($mach) or next;
97         update_status($mach, '-', 'INIT', undef);
98         my $ping;
99         for my $jid (@q) {
100                 if (defined $given_job) {
101                         $jid eq $given_job or next;
102                 }
103                 my $job = BEX::Job->new_from_file($queue->job_file($jid));
104                 my $stat = {
105                         'Time' => time,
106                 };
107                 print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
108                 $ping //= ping_machine($mach);
109                 my ($s, $msg);
110                 if (!$ping) {
111                         ($s, $msg) = ('NOPING', 'Does not ping');
112                 } else {
113                         ($s, $msg) = run_job($job, $queue, $mach);
114                 }
115                 update_status($mach, $jid, $s, $queue, $msg);
116
117                 if ($s eq 'OK') {
118                         print "+++ OK\n";
119                         $queue->remove($mach, $jid);
120                 } else {
121                         print "--- $s: $msg\n";
122                         $stat->{'Status'} = $s;
123                         $stat->{'Message'} = $msg;
124                         $queue->write_job_status($mach, $jid, $stat);
125                 }
126         }
127         update_status($mach, '-', 'DONE', undef);
128 }