]> mj.ucw.cz Git - bex.git/blob - lib/BEX/Queue.pm
Implemented `bq --rm' and `bq --move-to'
[bex.git] / lib / BEX / Queue.pm
1 # Batch EXecutor 2.0 -- Queues
2 # (c) 2011 Martin Mares <mj@ucw.cz>
3
4 use strict;
5 use warnings;
6 use feature 'switch';
7
8 package BEX::Queue;
9
10 use IO::File;
11 use File::Path;
12 use Fcntl qw(:flock);
13 use POSIX ();
14
15 sub new($;$) {
16         my ($class, $name) = @_;
17         $name //= 'queue';
18         -d $name or die "Queue directory $name does not exist\n";
19         for my $d ("hosts", "jobs") {
20                 -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
21         }
22         my $queue = {
23                 'Name' => $name,
24                 'MetaCache' => {},
25         };
26         return bless $queue;
27 }
28
29 sub log_file($$) {
30         my ($queue, $machine, $jid) = @_;
31         return $queue->host_dir($machine) . '/' . $jid . '.log';
32 }
33
34 # Most actions have to be logged by the caller
35 sub log($$$$;$) {
36         my ($queue, $mach, $jid, $stat, $msg) = @_;
37         my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
38         my $m = join(" ", $t, $mach, $jid, $stat);
39         $m .= " $msg" if defined $msg;
40
41         my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!";
42         print $fh "$m\n";
43
44         # Append to the per-job log file
45         if (open L, '>>', $queue->log_file($mach, $jid)) {
46                 print L "### $m\n";
47                 close L;
48         }
49 }
50
51 sub host_dir($$) {
52         my ($queue, $machine) = @_;
53         return $queue->{'Name'} . '/hosts/' . $machine;
54 }
55
56 sub queue_file($$) {
57         my ($queue, $machine, $jid) = @_;
58         return $queue->host_dir($machine) . '/' . $jid . '.job';
59 }
60
61 sub status_file($$) {
62         my ($queue, $machine, $jid) = @_;
63         return $queue->host_dir($machine) . '/' . $jid . '.stat';
64 }
65
66 sub temp_file($$) {
67         my ($queue, $machine, $jid) = @_;
68         return $queue->host_dir($machine) . '/' . $jid . '.tmp';
69 }
70
71 sub job_file($$) {
72         my ($queue, $jid) = @_;
73         return $queue->{'Name'} . '/jobs/' . $jid. '.job';
74 }
75
76 sub enqueue($$$) {
77         my ($queue, $machine, $job) = @_;
78         my $qf = $queue->queue_file($machine, $job->id);
79         if (-f $qf) {
80                 $queue->log($machine, $job->id, 'REQUEUE');
81                 return 0;
82         }
83         my $fn = $queue->job_file($job->id);
84         -f $fn or $job->save($fn);
85         my $dir = $queue->host_dir($machine);
86         -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
87         symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
88         $queue->log($machine, $job->id, 'QUEUE');
89         return 1;
90 }
91
92 sub scan($$) {
93         my ($queue, $machine) = @_;
94         my @list = ();
95         if (opendir D, $queue->host_dir($machine)) {
96                 while ($_ = readdir D) {
97                         /^\./ and next;
98                         s{\.job}{} or next;
99                         push @list, $_;
100                 }
101                 closedir D;
102         }
103         return sort @list;
104 }
105
106 sub remove($$;$) {
107         my ($queue, $machine, $jid, $force_remove) = @_;
108         if ($BEX::Config::keep_history && !$force_remove) {
109                 my $s = $queue->{'Name'} . '/hosts/' . $machine;
110                 my $d = $queue->{'Name'} . '/history/' . $machine;
111                 File::Path::mkpath($d);
112                 for my $suff ('job', 'stat', 'log') {
113                         my $src = "$s/$jid.$suff";
114                         my $dst = "$d/$jid.$suff";
115                         if (-f $src) {
116                                 rename $src, $dst or die "Cannot rename $src to $dst: $!";
117                         }
118                 }
119         } else {
120                 unlink $queue->queue_file($machine, $jid);
121                 unlink $queue->status_file($machine, $jid);
122                 unlink $queue->log_file($machine, $jid);
123         }
124         unlink $queue->temp_file($machine, $jid);
125 }
126
127 sub job_metadata($$) {
128         my ($queue, $jid) = @_;
129         my $cache = $queue->{'MetaCache'};
130         if (!defined $cache->{$jid}) {
131                 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
132         }
133         return $cache->{$jid};
134 }
135
136 sub job_name($$) {
137         my ($queue, $jid) = @_;
138         return $queue->job_metadata($jid)->name;
139 }
140
141 sub read_job_status($$$) {
142         my ($queue, $machine, $jid) = @_;
143         my %s = ();
144         my $sf = $queue->status_file($machine, $jid);
145         if (open S, '<', $sf) {
146                 while (<S>) {
147                         chomp;
148                         /^(\w+):\s*(.*)/ or die "Parse error in $sf";
149                         $s{$1} = $2;
150                 }
151                 close S;
152         }
153         return \%s;
154 }
155
156 sub write_job_status($$$$) {
157         my ($queue, $machine, $jid, $stat) = @_;
158         my $sf = $queue->status_file($machine, $jid);
159         open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
160         for my $k (sort keys %$stat) {
161                 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
162         }
163         close S;
164         rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
165 }
166
167 sub lock_name($$$) {
168         my ($queue, $machine, $jid) = @_;
169         my $lock = $queue->{'Name'};
170         if (defined $jid) {
171                 $lock .= "/hosts/$machine/$jid.lock";
172         } elsif (defined $machine) {
173                 $lock .= "/hosts/$machine/lock";
174         } else {
175                 $lock .= '/lock';
176         }
177 }
178
179 # Whenever we want to run a job on a machine, we must obtain a lock;
180 # at most one lock can be held at a time by a single BEX::Queue object.
181 # See the description of locking schemes in BEX::Config.
182 sub lock($$$) {
183         my ($queue, $machine, $jid) = @_;
184         my $lock;
185         given ($BEX::Config::locking_scheme) {
186                 when ('queue') {
187                         $lock = lock_name($queue, undef, undef);
188                 }
189                 when ('host') {
190                         defined($machine) or return 1;
191                         $lock = lock_name($queue, $machine, undef);
192                 }
193                 when ('job') {
194                         defined($machine) && defined($jid) or return 1;
195                         $lock = lock_name($queue, $machine, $jid);
196                 }
197                 when ('none') { return 1; }
198                 default { die "Invalid BEX::Config::locking_scheme"; }
199         }
200         if (defined($queue->{'LockName'})) {
201                 return 1 if ($queue->{'LockName'} eq $lock);
202                 $queue->unlock;
203         }
204         open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
205         if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
206                 close $queue->{'LockHandle'};
207                 delete $queue->{'LockHandle'};
208                 return 0;
209         }
210         $queue->{'LockName'} = $lock;
211         return 1;
212 }
213
214 sub unlock($) {
215         my ($queue) = @_;
216         defined $queue->{'LockName'} or return;
217         unlink $queue->{'LockName'};
218         flock $queue->{'LockHandle'}, LOCK_UN;
219         close $queue->{'LockHandle'};
220         delete $queue->{'LockHandle'};
221         delete $queue->{'LockName'};
222 }
223
224 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq
225 sub is_locked($$$) {
226         my ($queue, $machine, $jid) = @_;
227         given ($BEX::Config::locking_scheme) {
228                 # Shortcuts
229                 when ('host') { return unless defined $machine; }
230                 when ('jid') { return unless defined $jid; }
231                 when ('none') { return; }
232         }
233         my $lock = lock_name($queue, $machine, $jid);
234         return -f $lock;
235 }
236
237 42;