]> mj.ucw.cz Git - bex.git/blob - lib/perl/BEX/Queue.pm
68d8d792cc400b3f28d93abd1f3a5a4cde44745a
[bex.git] / lib / perl / BEX / Queue.pm
1 # Batch EXecutor -- Queues
2 # (c) 2011-2015 Martin Mares <mj@ucw.cz>
3
4 use strict;
5 use warnings;
6 use feature 'switch';
7
8 package BEX::Queue;
9
10 use IO::File;
11 use File::Path;
12 use Fcntl qw(:flock);
13 use POSIX ();
14
15 sub new($;$) {
16         my ($class, $name) = @_;
17         $name //= 'queue';
18         my $path = $BEX::Config::home . '/' . $name;
19         -d $path or die "Queue directory $path does not exist (use 'bex qman init' to create it)\n";
20         -d "$path/hosts" && -d "$path/jobs" or die "Queue directory $path is misconfigured\n";
21         my $queue = {
22                 'Name' => $name,
23                 'Path' => $path,
24                 'MetaCache' => {},
25         };
26         return bless $queue;
27 }
28
29 sub log_file($$) {
30         my ($queue, $machine, $jid) = @_;
31         return $queue->host_dir($machine) . '/' . $jid . '.log';
32 }
33
34 # Most actions have to be logged by the caller
35 sub log($$$$;$) {
36         my ($queue, $mach, $jid, $stat, $msg) = @_;
37         my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
38         my $m = join(" ", $t, $mach, $jid, $stat);
39         $m .= " $msg" if defined $msg;
40
41         my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Path'} . '/log', '>>' or die "Cannot open log: $!";
42         print $fh "$m\n";
43
44         # Append to the per-job log file
45         if (open L, '>>', $queue->log_file($mach, $jid)) {
46                 print L "### $m\n";
47                 close L;
48         }
49 }
50
51 sub host_dir($$) {
52         my ($queue, $machine) = @_;
53         return $queue->{'Path'} . '/hosts/' . $machine;
54 }
55
56 sub queue_file($$) {
57         my ($queue, $machine, $jid) = @_;
58         return $queue->host_dir($machine) . '/' . $jid . '.job';
59 }
60
61 sub status_file($$) {
62         my ($queue, $machine, $jid) = @_;
63         return $queue->host_dir($machine) . '/' . $jid . '.stat';
64 }
65
66 sub temp_file($$) {
67         my ($queue, $machine, $jid) = @_;
68         return $queue->host_dir($machine) . '/' . $jid . '.tmp';
69 }
70
71 sub job_file($$) {
72         my ($queue, $jid) = @_;
73         return $queue->{'Path'} . '/jobs/' . $jid. '.job';
74 }
75
76 sub attachment_dir($$) {
77         my ($queue, $jid) = @_;
78         return $queue->{'Path'} . '/jobs/' . $jid. '.attach';
79 }
80
81 sub save_job($$) {
82         my ($queue, $job) = @_;
83         # If the job already exists, it is shamelessly rewritten by new contents
84         $job->save($queue->job_file($job->id));
85 }
86
87 sub all_job_ids($) {
88         my ($queue) = @_;
89         opendir my $dir, $queue->{'Path'} . '/jobs' or die "Cannot open job directory: $!\n";
90         my @jobs = ();
91         while (readdir $dir) {
92                 s{\.job$}{} or next;
93                 push @jobs, $_;
94         }
95         closedir $dir;
96         return @jobs;
97 }
98
99 sub match_job_id($$) {
100         my ($id, $pattern) = @_;
101         return $id =~ m{\b$pattern};
102 }
103
104 # Resolve a (possibly partial) job ID given by the user
105 sub resolve_job_id($$) {
106         my ($queue, $name) = @_;
107         BEX::Job::check_id($name) or die "Invalid job ID $name\n";
108         if (-f $queue->job_file($name)) {
109                 return $name;
110         }
111
112         my @candidates = grep { match_job_id($_, $name) } $queue->all_job_ids();
113         @candidates or die "No job called $name exists\n";
114         @candidates == 1 or die "Partial job ID $name is not unique\n";
115         return $candidates[0];
116 }
117
118 sub enqueue($$$) {
119         my ($queue, $machine, $job) = @_;
120         # The job must be already saved to the current queue
121         my $qf = $queue->queue_file($machine, $job->id);
122         if (-f $qf) {
123                 return 0;
124         }
125         my $dir = $queue->host_dir($machine);
126         -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
127         symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
128         return 1;
129 }
130
131 sub scan($$) {
132         my ($queue, $machine) = @_;
133         my @list = ();
134         if (opendir D, $queue->host_dir($machine)) {
135                 while ($_ = readdir D) {
136                         /^\./ and next;
137                         s{\.job}{} or next;
138                         push @list, $_;
139                 }
140                 closedir D;
141         }
142         return sort @list;
143 }
144
145 sub remove($$;$) {
146         my ($queue, $machine, $jid, $force_remove) = @_;
147         if ($BEX::Config::keep_history && !$force_remove) {
148                 my $s = $queue->{'Path'} . '/hosts/' . $machine;
149                 my $d = $queue->{'Path'} . '/history/' . $machine;
150                 File::Path::mkpath($d);
151                 for my $suff ('job', 'stat', 'log') {
152                         my $src = "$s/$jid.$suff";
153                         my $dst = "$d/$jid.$suff";
154                         if (-f $src) {
155                                 rename $src, $dst or die "Cannot rename $src to $dst: $!";
156                         } else {
157                                 # Might be present from the previous incarnation of the same job
158                                 unlink $dst;
159                         }
160                 }
161         } else {
162                 unlink $queue->queue_file($machine, $jid);
163                 unlink $queue->status_file($machine, $jid);
164                 unlink $queue->log_file($machine, $jid);
165         }
166         unlink $queue->temp_file($machine, $jid);
167 }
168
169 sub job_metadata($$) {
170         my ($queue, $jid) = @_;
171         my $cache = $queue->{'MetaCache'};
172         if (!defined $cache->{$jid}) {
173                 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
174         }
175         return $cache->{$jid};
176 }
177
178 sub job_name($$) {
179         my ($queue, $jid) = @_;
180         return $queue->job_metadata($jid)->name;
181 }
182
183 sub read_job_status($$$) {
184         my ($queue, $machine, $jid) = @_;
185         my %s = ();
186         my $sf = $queue->status_file($machine, $jid);
187         if (open S, '<', $sf) {
188                 while (<S>) {
189                         chomp;
190                         /^(\w+):\s*(.*)/ or die "Parse error in $sf";
191                         $s{$1} = $2;
192                 }
193                 close S;
194         }
195         return \%s;
196 }
197
198 sub write_job_status($$$$) {
199         my ($queue, $machine, $jid, $stat) = @_;
200         my $sf = $queue->status_file($machine, $jid);
201         open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
202         for my $k (sort keys %$stat) {
203                 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
204         }
205         close S;
206         rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
207 }
208
209 sub update_job_status($$$$;$) {
210         my ($queue, $machine, $jid, $stat, $msg) = @_;
211         my $s = {
212                 'Time' => time,
213                 'Status' => $stat,
214                 'Message' => $msg,
215         };
216         $queue->write_job_status($machine, $jid, $s);
217         $queue->log($machine, $jid, $stat, $msg);
218 }
219
220 sub lock_name($$$) {
221         my ($queue, $machine, $jid) = @_;
222         my $lock = $queue->{'Path'};
223         if (defined $jid) {
224                 $lock .= "/hosts/$machine/$jid.lock";
225         } elsif (defined $machine) {
226                 $lock .= "/hosts/$machine/lock";
227         } else {
228                 $lock .= '/lock';
229         }
230 }
231
232 # Whenever we want to run a job on a machine, we must obtain a lock;
233 # at most one lock can be held at a time by a single BEX::Queue object.
234 # See the description of locking schemes in BEX::Config.
235 sub lock($$$) {
236         my ($queue, $machine, $jid) = @_;
237         my $lock;
238         given ($BEX::Config::locking_scheme) {
239                 when ('queue') {
240                         $lock = lock_name($queue, undef, undef);
241                 }
242                 when ('host') {
243                         defined($machine) or return 1;
244                         $lock = lock_name($queue, $machine, undef);
245                 }
246                 when ('job') {
247                         defined($machine) && defined($jid) or return 1;
248                         $lock = lock_name($queue, $machine, $jid);
249                 }
250                 when ('none') { return 1; }
251                 default { die "Invalid BEX::Config::locking_scheme"; }
252         }
253         if (defined($queue->{'LockName'})) {
254                 return 1 if ($queue->{'LockName'} eq $lock);
255                 $queue->unlock;
256         }
257         open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
258         if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
259                 close $queue->{'LockHandle'};
260                 delete $queue->{'LockHandle'};
261                 return 0;
262         }
263         $queue->{'LockName'} = $lock;
264         return 1;
265 }
266
267 sub unlock($) {
268         my ($queue) = @_;
269         defined $queue->{'LockName'} or return;
270         unlink $queue->{'LockName'};
271         flock $queue->{'LockHandle'}, LOCK_UN;
272         close $queue->{'LockHandle'};
273         delete $queue->{'LockHandle'};
274         delete $queue->{'LockName'};
275 }
276
277 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for `bex ls'
278 sub is_locked($$$) {
279         my ($queue, $machine, $jid) = @_;
280         given ($BEX::Config::locking_scheme) {
281                 # Shortcuts
282                 when ('host') { return unless defined $machine; }
283                 when ('jid') { return unless defined $jid; }
284                 when ('none') { return; }
285         }
286         my $lock = lock_name($queue, $machine, $jid);
287         return -f $lock;
288 }
289
290 42;