2 # Batch EXecutor 2.0 -- Run Queued Jobs
3 # (c) 2011 Martin Mares <mj@ucw.cz>
17 "j|job=s" => \$given_job,
18 "q|queue=s" => \$queue_name,
19 "s|status-fifo=s" => \$status_fifo,
21 Usage: brun [<options>] [[!]<machine-or-class> ...]
24 -j, --job=<id> Run only the specified job
25 -q, --queue=<name> Run jobs in the given queue
26 --status-fifo=<f> Send status updates to the given named pipe
30 if (defined $status_fifo) {
31 open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
32 autoflush $status_fd, 1;
35 sub send_status($$$) {
36 my ($mach, $job, $status) = @_;
38 print $status_fd "! $mach $job $status\n";
44 send_status($mach, '-', 'PING');
45 `ping -c1 -n $mach >/dev/null 2>/dev/null`;
50 my ($job, $queue, $mach) = @_;
51 my $jid = $job->{'ID'};
53 # FIXME: rsyncing, rsync-only jobs
56 my $tmp = $queue->temp_file($mach, $jid);
57 open T, '>', $tmp or die;
58 if (defined $BEX::Config::job_prolog) {
59 open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
60 while (<P>) { print T; }
63 print T "#!/bin/sh\n";
65 print T "# BEX job ", $jid, "\n";
66 print T $job->{'body'};
67 if (defined $BEX::Config::job_epilog) {
68 open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
69 while (<E>) { print T; }
74 send_status($mach, $jid, 'SEND');
75 my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
76 my $rtmp = `ssh <$tmp $mach '$cmd'`;
77 !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
80 send_status($mach, $jid, 'RUN');
81 system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
89 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
90 my $queue = BEX::Queue->new($queue_name);
92 for my $mach (@machines) {
93 my @q = $queue->scan($mach) or next;
94 send_status($mach, '-', 'INIT');
97 if (defined $given_job) {
98 $jid eq $given_job or next;
100 my $job = BEX::Job->new_from_file($queue->job_file($jid));
104 print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
105 $ping //= ping_machine($mach);
110 $s = run_job($job, $queue, $mach);
113 BEX::log("$mach $jid $s");
116 $queue->remove($mach, $jid);
117 send_status($mach, $jid, 'OK');
120 $stat->{'Status'} = $s;
121 $queue->write_job_status($mach, $jid, $stat);
122 send_status($mach, $jid, 'ERR');
125 send_status($mach, '-', 'DONE');