]> mj.ucw.cz Git - bex.git/blob - lib/perl/BEX/Queue.pm
The big move -- introduced subcommands
[bex.git] / lib / perl / BEX / Queue.pm
1 # Batch EXecutor 3.0 -- Queues
2 # (c) 2011-2012 Martin Mares <mj@ucw.cz>
3
4 use strict;
5 use warnings;
6 use feature 'switch';
7
8 package BEX::Queue;
9
10 use IO::File;
11 use File::Path;
12 use Fcntl qw(:flock);
13 use POSIX ();
14
15 sub new($;$) {
16         my ($class, $name) = @_;
17         $name //= 'queue';
18         -d $name or die "Queue directory $name does not exist\n";
19         for my $d ("hosts", "jobs") {
20                 -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
21         }
22         my $queue = {
23                 'Name' => $name,
24                 'MetaCache' => {},
25         };
26         return bless $queue;
27 }
28
29 sub log_file($$) {
30         my ($queue, $machine, $jid) = @_;
31         return $queue->host_dir($machine) . '/' . $jid . '.log';
32 }
33
34 # Most actions have to be logged by the caller
35 sub log($$$$;$) {
36         my ($queue, $mach, $jid, $stat, $msg) = @_;
37         my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
38         my $m = join(" ", $t, $mach, $jid, $stat);
39         $m .= " $msg" if defined $msg;
40
41         my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!";
42         print $fh "$m\n";
43
44         # Append to the per-job log file
45         if (open L, '>>', $queue->log_file($mach, $jid)) {
46                 print L "### $m\n";
47                 close L;
48         }
49 }
50
51 sub host_dir($$) {
52         my ($queue, $machine) = @_;
53         return $queue->{'Name'} . '/hosts/' . $machine;
54 }
55
56 sub queue_file($$) {
57         my ($queue, $machine, $jid) = @_;
58         return $queue->host_dir($machine) . '/' . $jid . '.job';
59 }
60
61 sub status_file($$) {
62         my ($queue, $machine, $jid) = @_;
63         return $queue->host_dir($machine) . '/' . $jid . '.stat';
64 }
65
66 sub temp_file($$) {
67         my ($queue, $machine, $jid) = @_;
68         return $queue->host_dir($machine) . '/' . $jid . '.tmp';
69 }
70
71 sub job_file($$) {
72         my ($queue, $jid) = @_;
73         return $queue->{'Name'} . '/jobs/' . $jid. '.job';
74 }
75
76 sub enqueue($$$) {
77         my ($queue, $machine, $job) = @_;
78         my $qf = $queue->queue_file($machine, $job->id);
79         if (-f $qf) {
80                 return 0;
81         }
82         my $fn = $queue->job_file($job->id);
83         -f $fn or $job->save($fn);
84         my $dir = $queue->host_dir($machine);
85         -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
86         symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
87         return 1;
88 }
89
90 sub scan($$) {
91         my ($queue, $machine) = @_;
92         my @list = ();
93         if (opendir D, $queue->host_dir($machine)) {
94                 while ($_ = readdir D) {
95                         /^\./ and next;
96                         s{\.job}{} or next;
97                         push @list, $_;
98                 }
99                 closedir D;
100         }
101         return sort @list;
102 }
103
104 sub remove($$;$) {
105         my ($queue, $machine, $jid, $force_remove) = @_;
106         if ($BEX::Config::keep_history && !$force_remove) {
107                 my $s = $queue->{'Name'} . '/hosts/' . $machine;
108                 my $d = $queue->{'Name'} . '/history/' . $machine;
109                 File::Path::mkpath($d);
110                 for my $suff ('job', 'stat', 'log') {
111                         my $src = "$s/$jid.$suff";
112                         my $dst = "$d/$jid.$suff";
113                         if (-f $src) {
114                                 rename $src, $dst or die "Cannot rename $src to $dst: $!";
115                         } else {
116                                 # Might be present from the previous incarnation of the same job
117                                 unlink $dst;
118                         }
119                 }
120         } else {
121                 unlink $queue->queue_file($machine, $jid);
122                 unlink $queue->status_file($machine, $jid);
123                 unlink $queue->log_file($machine, $jid);
124         }
125         unlink $queue->temp_file($machine, $jid);
126 }
127
128 sub job_metadata($$) {
129         my ($queue, $jid) = @_;
130         my $cache = $queue->{'MetaCache'};
131         if (!defined $cache->{$jid}) {
132                 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
133         }
134         return $cache->{$jid};
135 }
136
137 sub job_name($$) {
138         my ($queue, $jid) = @_;
139         return $queue->job_metadata($jid)->name;
140 }
141
142 sub read_job_status($$$) {
143         my ($queue, $machine, $jid) = @_;
144         my %s = ();
145         my $sf = $queue->status_file($machine, $jid);
146         if (open S, '<', $sf) {
147                 while (<S>) {
148                         chomp;
149                         /^(\w+):\s*(.*)/ or die "Parse error in $sf";
150                         $s{$1} = $2;
151                 }
152                 close S;
153         }
154         return \%s;
155 }
156
157 sub write_job_status($$$$) {
158         my ($queue, $machine, $jid, $stat) = @_;
159         my $sf = $queue->status_file($machine, $jid);
160         open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
161         for my $k (sort keys %$stat) {
162                 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
163         }
164         close S;
165         rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
166 }
167
168 sub update_job_status($$$$;$) {
169         my ($queue, $machine, $jid, $stat, $msg) = @_;
170         my $s = {
171                 'Time' => time,
172                 'Status' => $stat,
173                 'Message' => $msg,
174         };
175         $queue->write_job_status($machine, $jid, $s);
176         $queue->log($machine, $jid, $stat, $msg);
177 }
178
179 sub lock_name($$$) {
180         my ($queue, $machine, $jid) = @_;
181         my $lock = $queue->{'Name'};
182         if (defined $jid) {
183                 $lock .= "/hosts/$machine/$jid.lock";
184         } elsif (defined $machine) {
185                 $lock .= "/hosts/$machine/lock";
186         } else {
187                 $lock .= '/lock';
188         }
189 }
190
191 # Whenever we want to run a job on a machine, we must obtain a lock;
192 # at most one lock can be held at a time by a single BEX::Queue object.
193 # See the description of locking schemes in BEX::Config.
194 sub lock($$$) {
195         my ($queue, $machine, $jid) = @_;
196         my $lock;
197         given ($BEX::Config::locking_scheme) {
198                 when ('queue') {
199                         $lock = lock_name($queue, undef, undef);
200                 }
201                 when ('host') {
202                         defined($machine) or return 1;
203                         $lock = lock_name($queue, $machine, undef);
204                 }
205                 when ('job') {
206                         defined($machine) && defined($jid) or return 1;
207                         $lock = lock_name($queue, $machine, $jid);
208                 }
209                 when ('none') { return 1; }
210                 default { die "Invalid BEX::Config::locking_scheme"; }
211         }
212         if (defined($queue->{'LockName'})) {
213                 return 1 if ($queue->{'LockName'} eq $lock);
214                 $queue->unlock;
215         }
216         open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
217         if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
218                 close $queue->{'LockHandle'};
219                 delete $queue->{'LockHandle'};
220                 return 0;
221         }
222         $queue->{'LockName'} = $lock;
223         return 1;
224 }
225
226 sub unlock($) {
227         my ($queue) = @_;
228         defined $queue->{'LockName'} or return;
229         unlink $queue->{'LockName'};
230         flock $queue->{'LockHandle'}, LOCK_UN;
231         close $queue->{'LockHandle'};
232         delete $queue->{'LockHandle'};
233         delete $queue->{'LockName'};
234 }
235
236 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq
237 sub is_locked($$$) {
238         my ($queue, $machine, $jid) = @_;
239         given ($BEX::Config::locking_scheme) {
240                 # Shortcuts
241                 when ('host') { return unless defined $machine; }
242                 when ('jid') { return unless defined $jid; }
243                 when ('none') { return; }
244         }
245         my $lock = lock_name($queue, $machine, $jid);
246         return -f $lock;
247 }
248
249 42;