]> mj.ucw.cz Git - bex.git/blob - lib/BEX/Queue.pm
Jobs should be sorted by their IDs when scanning queue
[bex.git] / lib / BEX / Queue.pm
1 # Batch EXecutor 2.0 -- Queues
2 # (c) 2011 Martin Mares <mj@ucw.cz>
3
4 use strict;
5 use warnings;
6 use feature 'switch';
7
8 package BEX::Queue;
9
10 use IO::File;
11 use File::Path;
12 use Fcntl qw(:flock);
13 use POSIX ();
14
15 sub new($;$) {
16         my ($class, $name) = @_;
17         $name //= 'queue';
18         -d $name or die "Queue directory $name does not exist\n";
19         for my $d ("hosts", "jobs") {
20                 -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
21         }
22         my $queue = {
23                 'Name' => $name,
24                 'MetaCache' => {},
25         };
26         return bless $queue;
27 }
28
29 sub log_file($$) {
30         my ($queue, $machine, $jid) = @_;
31         return $queue->host_dir($machine) . '/' . $jid . '.log';
32 }
33
34 # Most actions have to be logged by the caller
35 sub log($$$$;$) {
36         my ($queue, $mach, $jid, $stat, $msg) = @_;
37         my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
38         my $m = join(" ", $t, $mach, $jid, $stat);
39         $m .= " $msg" if defined $msg;
40
41         my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!";
42         print $fh "$m\n";
43
44         # Append to the per-job log file
45         if (open L, '>>', $queue->log_file($mach, $jid)) {
46                 print L "### $m\n";
47                 close L;
48         }
49 }
50
51 sub host_dir($$) {
52         my ($queue, $machine) = @_;
53         return $queue->{'Name'} . '/hosts/' . $machine;
54 }
55
56 sub queue_file($$) {
57         my ($queue, $machine, $jid) = @_;
58         return $queue->host_dir($machine) . '/' . $jid . '.job';
59 }
60
61 sub status_file($$) {
62         my ($queue, $machine, $jid) = @_;
63         return $queue->host_dir($machine) . '/' . $jid . '.stat';
64 }
65
66 sub temp_file($$) {
67         my ($queue, $machine, $jid) = @_;
68         return $queue->host_dir($machine) . '/' . $jid . '.tmp';
69 }
70
71 sub job_file($$) {
72         my ($queue, $jid) = @_;
73         return $queue->{'Name'} . '/jobs/' . $jid. '.job';
74 }
75
76 sub enqueue($$$) {
77         my ($queue, $machine, $job) = @_;
78         my $qf = $queue->queue_file($machine, $job->id);
79         if (-f $qf) {
80                 $queue->log($machine, $job->id, 'REQUEUE');
81                 return 0;
82         }
83         my $fn = $queue->job_file($job->id);
84         -f $fn or $job->save($fn);
85         my $dir = $queue->host_dir($machine);
86         -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
87         symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
88         $queue->log($machine, $job->id, 'QUEUE');
89         return 1;
90 }
91
92 sub scan($$) {
93         my ($queue, $machine) = @_;
94         my @list = ();
95         if (opendir D, $queue->host_dir($machine)) {
96                 while ($_ = readdir D) {
97                         /^\./ and next;
98                         s{\.job}{} or next;
99                         push @list, $_;
100                 }
101                 closedir D;
102         }
103         return sort @list;
104 }
105
106 sub remove($$) {
107         my ($queue, $machine, $jid) = @_;
108         if ($BEX::Config::keep_history) {
109                 my $s = $queue->{'Name'} . '/hosts/' . $machine;
110                 my $d = $queue->{'Name'} . '/history/' . $machine;
111                 File::Path::mkpath($d);
112                 for my $suff ('job', 'stat', 'log') {
113                         my $src = "$s/$jid.$suff";
114                         my $dst = "$d/$jid.$suff";
115                         if (-f $src) {
116                                 rename $src, $dst or die "Cannot rename $src to $dst: $!";
117                         }
118                 }
119         } else {
120                 unlink $queue->queue_file($machine, $jid);
121                 unlink $queue->status_file($machine, $jid);
122                 unlink $queue->log_file($machine, $jid);
123         }
124         unlink $queue->temp_file($machine, $jid);
125 }
126
127 sub job_metadata($$) {
128         my ($queue, $jid) = @_;
129         my $cache = $queue->{'MetaCache'};
130         if (!defined $cache->{$jid}) {
131                 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
132         }
133         return $cache->{$jid};
134 }
135
136 sub read_job_status($$$) {
137         my ($queue, $machine, $jid) = @_;
138         my %s = ();
139         my $sf = $queue->status_file($machine, $jid);
140         if (open S, '<', $sf) {
141                 while (<S>) {
142                         chomp;
143                         /^(\w+):\s*(.*)/ or die "Parse error in $sf";
144                         $s{$1} = $2;
145                 }
146                 close S;
147         }
148         return \%s;
149 }
150
151 sub write_job_status($$$$) {
152         my ($queue, $machine, $jid, $stat) = @_;
153         my $sf = $queue->status_file($machine, $jid);
154         open S, '>', $sf or die "Cannot create $sf: $!";
155         for my $k (sort keys %$stat) {
156                 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
157         }
158         close S;
159 }
160
161 sub lock_name($$$) {
162         my ($queue, $machine, $jid) = @_;
163         my $lock = $queue->{'Name'};
164         if (defined $jid) {
165                 $lock .= "/hosts/$machine/$jid.lock";
166         } elsif (defined $machine) {
167                 $lock .= "/hosts/$machine/lock";
168         } else {
169                 $lock .= '/lock';
170         }
171 }
172
173 # Whenever we want to run a job on a machine, we must obtain a lock;
174 # at most one lock can be held at a time by a single BEX::Queue object.
175 # See the description of locking schemes in BEX::Config.
176 sub lock($$$) {
177         my ($queue, $machine, $jid) = @_;
178         my $lock;
179         given ($BEX::Config::locking_scheme) {
180                 when ('queue') {
181                         $lock = lock_name($queue, undef, undef);
182                 }
183                 when ('host') {
184                         defined($machine) or return 1;
185                         $lock = lock_name($queue, $machine, undef);
186                 }
187                 when ('job') {
188                         defined($machine) && defined($jid) or return 1;
189                         $lock = lock_name($queue, $machine, $jid);
190                 }
191                 when ('none') { return 1; }
192                 default { die "Invalid BEX::Config::locking_scheme"; }
193         }
194         if (defined($queue->{'LockName'})) {
195                 return 1 if ($queue->{'LockName'} eq $lock);
196                 $queue->unlock;
197         }
198         open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
199         if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
200                 close $queue->{'LockHandle'};
201                 delete $queue->{'LockHandle'};
202                 return 0;
203         }
204         $queue->{'LockName'} = $lock;
205         return 1;
206 }
207
208 sub unlock($) {
209         my ($queue) = @_;
210         defined $queue->{'LockName'} or return;
211         unlink $queue->{'LockName'};
212         flock $queue->{'LockHandle'}, LOCK_UN;
213         close $queue->{'LockHandle'};
214         delete $queue->{'LockHandle'};
215         delete $queue->{'LockName'};
216 }
217
218 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq
219 sub is_locked($$$) {
220         my ($queue, $machine, $jid) = @_;
221         given ($BEX::Config::locking_scheme) {
222                 # Shortcuts
223                 when ('host') { return unless defined $machine; }
224                 when ('jid') { return unless defined $jid; }
225                 when ('none') { return; }
226         }
227         my $lock = lock_name($queue, $machine, $jid);
228         return -f $lock;
229 }
230
231 42;