]> mj.ucw.cz Git - bex.git/blob - lib/perl/BEX/Queue.pm
use experimental 'smartmatch' for new Perls
[bex.git] / lib / perl / BEX / Queue.pm
1 # Batch EXecutor -- Queues
2 # (c) 2011-2015 Martin Mares <mj@ucw.cz>
3
4 use strict;
5 use warnings;
6 use feature 'switch';
7 use experimental 'smartmatch';
8
9 package BEX::Queue;
10
11 use IO::File;
12 use File::Path;
13 use Fcntl qw(:flock);
14 use POSIX ();
15
16 sub new($;$) {
17         my ($class, $name) = @_;
18         $name //= 'queue';
19         my $path = $BEX::Config::home . '/' . $name;
20         -d $path or die "Queue directory $path does not exist (use 'bex qman init' to create it)\n";
21         -d "$path/hosts" && -d "$path/jobs" or die "Queue directory $path is misconfigured\n";
22         my $queue = {
23                 'Name' => $name,
24                 'Path' => $path,
25                 'MetaCache' => {},
26         };
27         return bless $queue;
28 }
29
30 sub log_file($$) {
31         my ($queue, $machine, $jid) = @_;
32         return $queue->host_dir($machine) . '/' . $jid . '.log';
33 }
34
35 # Most actions have to be logged by the caller
36 sub log($$$$;$) {
37         my ($queue, $mach, $jid, $stat, $msg) = @_;
38         my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
39         my $m = join(" ", $t, $mach, $jid, $stat);
40         $m .= " $msg" if defined $msg;
41
42         my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Path'} . '/log', '>>' or die "Cannot open log: $!";
43         print $fh "$m\n";
44
45         # Append to the per-job log file
46         if (open L, '>>', $queue->log_file($mach, $jid)) {
47                 print L "### $m\n";
48                 close L;
49         }
50 }
51
52 sub host_dir($$) {
53         my ($queue, $machine) = @_;
54         return $queue->{'Path'} . '/hosts/' . $machine;
55 }
56
57 sub queue_file($$) {
58         my ($queue, $machine, $jid) = @_;
59         return $queue->host_dir($machine) . '/' . $jid . '.job';
60 }
61
62 sub status_file($$) {
63         my ($queue, $machine, $jid) = @_;
64         return $queue->host_dir($machine) . '/' . $jid . '.stat';
65 }
66
67 sub temp_file($$) {
68         my ($queue, $machine, $jid) = @_;
69         return $queue->host_dir($machine) . '/' . $jid . '.tmp';
70 }
71
72 sub job_file($$) {
73         my ($queue, $jid) = @_;
74         return $queue->{'Path'} . '/jobs/' . $jid. '.job';
75 }
76
77 sub attachment_dir($$) {
78         my ($queue, $jid) = @_;
79         return $queue->{'Path'} . '/jobs/' . $jid. '.attach';
80 }
81
82 sub save_job($$) {
83         my ($queue, $job) = @_;
84         # If the job already exists, it is shamelessly rewritten by new contents
85         $job->save($queue->job_file($job->id));
86 }
87
88 sub all_job_ids($) {
89         my ($queue) = @_;
90         opendir my $dir, $queue->{'Path'} . '/jobs' or die "Cannot open job directory: $!\n";
91         my @jobs = ();
92         while (readdir $dir) {
93                 s{\.job$}{} or next;
94                 push @jobs, $_;
95         }
96         closedir $dir;
97         return @jobs;
98 }
99
100 sub match_job_id($$) {
101         my ($id, $pattern) = @_;
102         return $id =~ m{\b$pattern};
103 }
104
105 # Resolve a (possibly partial) job ID given by the user
106 sub resolve_job_id($$) {
107         my ($queue, $name) = @_;
108         BEX::Job::check_id($name) or die "Invalid job ID $name\n";
109         if (-f $queue->job_file($name)) {
110                 return $name;
111         }
112
113         my @candidates = grep { match_job_id($_, $name) } $queue->all_job_ids();
114         @candidates or die "No job called $name exists\n";
115         @candidates == 1 or die "Partial job ID $name is not unique\n";
116         return $candidates[0];
117 }
118
119 sub enqueue($$$) {
120         my ($queue, $machine, $job) = @_;
121         # The job must be already saved to the current queue
122         my $qf = $queue->queue_file($machine, $job->id);
123         if (-f $qf) {
124                 return 0;
125         }
126         my $dir = $queue->host_dir($machine);
127         -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
128         symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
129         return 1;
130 }
131
132 sub scan($$) {
133         my ($queue, $machine) = @_;
134         my @list = ();
135         if (opendir D, $queue->host_dir($machine)) {
136                 while ($_ = readdir D) {
137                         /^\./ and next;
138                         s{\.job}{} or next;
139                         push @list, $_;
140                 }
141                 closedir D;
142         }
143         return sort @list;
144 }
145
146 sub remove($$;$) {
147         my ($queue, $machine, $jid, $force_remove) = @_;
148         if ($BEX::Config::keep_history && !$force_remove) {
149                 my $s = $queue->{'Path'} . '/hosts/' . $machine;
150                 my $d = $queue->{'Path'} . '/history/' . $machine;
151                 File::Path::mkpath($d);
152                 for my $suff ('job', 'stat', 'log') {
153                         my $src = "$s/$jid.$suff";
154                         my $dst = "$d/$jid.$suff";
155                         if (-f $src) {
156                                 rename $src, $dst or die "Cannot rename $src to $dst: $!";
157                         } else {
158                                 # Might be present from the previous incarnation of the same job
159                                 unlink $dst;
160                         }
161                 }
162         } else {
163                 unlink $queue->queue_file($machine, $jid);
164                 unlink $queue->status_file($machine, $jid);
165                 unlink $queue->log_file($machine, $jid);
166         }
167         unlink $queue->temp_file($machine, $jid);
168 }
169
170 sub job_metadata($$) {
171         my ($queue, $jid) = @_;
172         my $cache = $queue->{'MetaCache'};
173         if (!defined $cache->{$jid}) {
174                 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
175         }
176         return $cache->{$jid};
177 }
178
179 sub job_name($$) {
180         my ($queue, $jid) = @_;
181         return $queue->job_metadata($jid)->name;
182 }
183
184 sub read_job_status($$$) {
185         my ($queue, $machine, $jid) = @_;
186         my %s = ();
187         my $sf = $queue->status_file($machine, $jid);
188         if (open S, '<', $sf) {
189                 while (<S>) {
190                         chomp;
191                         /^(\w+):\s*(.*)/ or die "Parse error in $sf";
192                         $s{$1} = $2;
193                 }
194                 close S;
195         }
196         return \%s;
197 }
198
199 sub write_job_status($$$$) {
200         my ($queue, $machine, $jid, $stat) = @_;
201         my $sf = $queue->status_file($machine, $jid);
202         open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
203         for my $k (sort keys %$stat) {
204                 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
205         }
206         close S;
207         rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
208 }
209
210 sub update_job_status($$$$;$) {
211         my ($queue, $machine, $jid, $stat, $msg) = @_;
212         my $s = {
213                 'Time' => time,
214                 'Status' => $stat,
215                 'Message' => $msg,
216         };
217         $queue->write_job_status($machine, $jid, $s);
218         $queue->log($machine, $jid, $stat, $msg);
219 }
220
221 sub lock_name($$$) {
222         my ($queue, $machine, $jid) = @_;
223         my $lock = $queue->{'Path'};
224         if (defined $jid) {
225                 $lock .= "/hosts/$machine/$jid.lock";
226         } elsif (defined $machine) {
227                 $lock .= "/hosts/$machine/lock";
228         } else {
229                 $lock .= '/lock';
230         }
231 }
232
233 # Whenever we want to run a job on a machine, we must obtain a lock;
234 # at most one lock can be held at a time by a single BEX::Queue object.
235 # See the description of locking schemes in BEX::Config.
236 sub lock($$$) {
237         my ($queue, $machine, $jid) = @_;
238         my $lock;
239         given ($BEX::Config::locking_scheme) {
240                 when ('queue') {
241                         $lock = lock_name($queue, undef, undef);
242                 }
243                 when ('host') {
244                         defined($machine) or return 1;
245                         $lock = lock_name($queue, $machine, undef);
246                 }
247                 when ('job') {
248                         defined($machine) && defined($jid) or return 1;
249                         $lock = lock_name($queue, $machine, $jid);
250                 }
251                 when ('none') { return 1; }
252                 default { die "Invalid BEX::Config::locking_scheme"; }
253         }
254         if (defined($queue->{'LockName'})) {
255                 return 1 if ($queue->{'LockName'} eq $lock);
256                 $queue->unlock;
257         }
258         open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
259         if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
260                 close $queue->{'LockHandle'};
261                 delete $queue->{'LockHandle'};
262                 return 0;
263         }
264         $queue->{'LockName'} = $lock;
265         return 1;
266 }
267
268 sub unlock($) {
269         my ($queue) = @_;
270         defined $queue->{'LockName'} or return;
271         unlink $queue->{'LockName'};
272         flock $queue->{'LockHandle'}, LOCK_UN;
273         close $queue->{'LockHandle'};
274         delete $queue->{'LockHandle'};
275         delete $queue->{'LockName'};
276 }
277
278 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for `bex ls'
279 sub is_locked($$$) {
280         my ($queue, $machine, $jid) = @_;
281         given ($BEX::Config::locking_scheme) {
282                 # Shortcuts
283                 when ('host') { return unless defined $machine; }
284                 when ('jid') { return unless defined $jid; }
285                 when ('none') { return; }
286         }
287         my $lock = lock_name($queue, $machine, $jid);
288         return -f $lock;
289 }
290
291 42;