1 # Batch EXecutor -- Queues
2 # (c) 2011-2015 Martin Mares <mj@ucw.cz>
7 use experimental 'smartmatch';
17 my ($class, $name) = @_;
19 my $path = $BEX::Config::home . '/' . $name;
20 -d $path or die "Queue directory $path does not exist (use 'bex qman init' to create it)\n";
21 -d "$path/hosts" && -d "$path/jobs" or die "Queue directory $path is misconfigured\n";
31 my ($queue, $machine, $jid) = @_;
32 return $queue->host_dir($machine) . '/' . $jid . '.log';
35 # Most actions have to be logged by the caller
37 my ($queue, $mach, $jid, $stat, $msg) = @_;
38 my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
39 my $m = join(" ", $t, $mach, $jid, $stat);
40 $m .= " $msg" if defined $msg;
42 my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Path'} . '/log', '>>' or die "Cannot open log: $!";
45 # Append to the per-job log file
46 if (open L, '>>', $queue->log_file($mach, $jid)) {
53 my ($queue, $machine) = @_;
54 return $queue->{'Path'} . '/hosts/' . $machine;
58 my ($queue, $machine, $jid) = @_;
59 return $queue->host_dir($machine) . '/' . $jid . '.job';
63 my ($queue, $machine, $jid) = @_;
64 return $queue->host_dir($machine) . '/' . $jid . '.stat';
68 my ($queue, $machine, $jid) = @_;
69 return $queue->host_dir($machine) . '/' . $jid . '.tmp';
73 my ($queue, $jid) = @_;
74 return $queue->{'Path'} . '/jobs/' . $jid. '.job';
77 sub attachment_dir($$) {
78 my ($queue, $jid) = @_;
79 return $queue->{'Path'} . '/jobs/' . $jid. '.attach';
83 my ($queue, $job) = @_;
84 # If the job already exists, it is shamelessly rewritten by new contents
85 $job->save($queue->job_file($job->id));
90 opendir my $dir, $queue->{'Path'} . '/jobs' or die "Cannot open job directory: $!\n";
92 while (readdir $dir) {
100 sub match_job_id($$) {
101 my ($id, $pattern) = @_;
102 return $id =~ m{\b$pattern};
105 # Resolve a (possibly partial) job ID given by the user
106 sub resolve_job_id($$) {
107 my ($queue, $name) = @_;
108 BEX::Job::check_id($name) or die "Invalid job ID $name\n";
109 if (-f $queue->job_file($name)) {
113 my @candidates = grep { match_job_id($_, $name) } $queue->all_job_ids();
114 @candidates or die "No job called $name exists\n";
115 @candidates == 1 or die "Partial job ID $name is not unique\n";
116 return $candidates[0];
120 my ($queue, $machine, $job) = @_;
121 # The job must be already saved to the current queue
122 my $qf = $queue->queue_file($machine, $job->id);
126 my $dir = $queue->host_dir($machine);
127 -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
128 symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
133 my ($queue, $machine) = @_;
135 if (opendir D, $queue->host_dir($machine)) {
136 while ($_ = readdir D) {
147 my ($queue, $machine, $jid, $force_remove) = @_;
148 if ($BEX::Config::keep_history && !$force_remove) {
149 my $s = $queue->{'Path'} . '/hosts/' . $machine;
150 my $d = $queue->{'Path'} . '/history/' . $machine;
151 File::Path::mkpath($d);
152 for my $suff ('job', 'stat', 'log') {
153 my $src = "$s/$jid.$suff";
154 my $dst = "$d/$jid.$suff";
156 rename $src, $dst or die "Cannot rename $src to $dst: $!";
158 # Might be present from the previous incarnation of the same job
163 unlink $queue->queue_file($machine, $jid);
164 unlink $queue->status_file($machine, $jid);
165 unlink $queue->log_file($machine, $jid);
167 unlink $queue->temp_file($machine, $jid);
170 sub job_metadata($$) {
171 my ($queue, $jid) = @_;
172 my $cache = $queue->{'MetaCache'};
173 if (!defined $cache->{$jid}) {
174 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
176 return $cache->{$jid};
180 my ($queue, $jid) = @_;
181 return $queue->job_metadata($jid)->name;
184 sub read_job_status($$$) {
185 my ($queue, $machine, $jid) = @_;
187 my $sf = $queue->status_file($machine, $jid);
188 if (open S, '<', $sf) {
191 /^(\w+):\s*(.*)/ or die "Parse error in $sf";
199 sub write_job_status($$$$) {
200 my ($queue, $machine, $jid, $stat) = @_;
201 my $sf = $queue->status_file($machine, $jid);
202 open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
203 for my $k (sort keys %$stat) {
204 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
207 rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
210 sub update_job_status($$$$;$) {
211 my ($queue, $machine, $jid, $stat, $msg) = @_;
217 $queue->write_job_status($machine, $jid, $s);
218 $queue->log($machine, $jid, $stat, $msg);
222 my ($queue, $machine, $jid) = @_;
223 my $lock = $queue->{'Path'};
225 $lock .= "/hosts/$machine/$jid.lock";
226 } elsif (defined $machine) {
227 $lock .= "/hosts/$machine/lock";
233 # Whenever we want to run a job on a machine, we must obtain a lock;
234 # at most one lock can be held at a time by a single BEX::Queue object.
235 # See the description of locking schemes in BEX::Config.
237 my ($queue, $machine, $jid) = @_;
239 given ($BEX::Config::locking_scheme) {
241 $lock = lock_name($queue, undef, undef);
244 defined($machine) or return 1;
245 $lock = lock_name($queue, $machine, undef);
248 defined($machine) && defined($jid) or return 1;
249 $lock = lock_name($queue, $machine, $jid);
251 when ('none') { return 1; }
252 default { die "Invalid BEX::Config::locking_scheme"; }
254 if (defined($queue->{'LockName'})) {
255 return 1 if ($queue->{'LockName'} eq $lock);
258 open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
259 if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
260 close $queue->{'LockHandle'};
261 delete $queue->{'LockHandle'};
264 $queue->{'LockName'} = $lock;
270 defined $queue->{'LockName'} or return;
271 unlink $queue->{'LockName'};
272 flock $queue->{'LockHandle'}, LOCK_UN;
273 close $queue->{'LockHandle'};
274 delete $queue->{'LockHandle'};
275 delete $queue->{'LockName'};
278 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for `bex ls'
280 my ($queue, $machine, $jid) = @_;
281 given ($BEX::Config::locking_scheme) {
283 when ('host') { return unless defined $machine; }
284 when ('jid') { return unless defined $jid; }
285 when ('none') { return; }
287 my $lock = lock_name($queue, $machine, $jid);