]> mj.ucw.cz Git - bex.git/blob - lib/BEX.pm
Change file naming conventions
[bex.git] / lib / BEX.pm
1 # Batch EXecutor 2.0
2 # (c) 2011 Martin Mares <mj@ucw.cz>
3
4 use strict;
5 use warnings;
6
7 use BEX::Config;
8
9 package BEX;
10
11 use IO::File;
12
13 use POSIX;
14
15 my $log_file;
16
17 sub log($) {
18         $log_file //= new IO::File '>>log' or die "Cannot open log: $!";
19         print $log_file POSIX::strftime("%Y-%m-%d %H:%M:%S ", localtime), $_[0], "\n";
20 }
21
22 package BEX::Job;
23
24 use POSIX;
25
26 our $job_cnt = 0;
27
28 sub new($;$) {
29         my ($class, $id) = @_;
30         my $job = { };
31         bless $job;
32         if (defined $id) {
33                 $job->{'ID'} = $id;
34         } else {
35                 $job_cnt++;
36                 $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
37         }
38         $job->{'Subject'} = '(no subject)';
39         return $job;
40 }
41
42 sub new_from_file($$;$) {
43         my ($class, $file, $header_only) = @_;
44         my $job = { };
45         open T, '<', $file or die "Cannot open $file: $!";
46         while (<T>) {
47                 chomp;
48                 /^$/ and last;
49                 /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
50                 !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
51                 $job->{$1} = $2;
52         }
53         if (!$header_only) {
54                 my @cmds = <T>;
55                 $job->{'body'} = join("", @cmds);
56         }
57         close T;
58         $job->{'Subject'} //= '?';
59         $job->{'ID'} or die "Cannot load $file: Missing ID";
60         return bless $job;
61 }
62
63 sub attr($$;$) {
64         my ($job, $attr, $val) = @_;
65         $job->{$attr} = $val if defined $val;
66         return $job->{$attr};
67 }
68
69 sub dump($) {
70         my ($job) = @_;
71         for my $k (sort keys %$job) {
72                 print "$k: ", $job->{$k}, "\n";
73         }
74 }
75
76 sub save($;$) {
77         my ($job, $fn) = @_;
78         -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
79         $fn //= 'tmp/' . $job->{'ID'};
80         open T, '>', $fn or die "Cannot create $fn: $!";
81         for my $k (sort grep { /^[A-Z]/ } keys %$job) {
82                 print T "$k: ", $job->{$k}, "\n";
83         }
84         print T "\n";
85         print T $job->{'body'} if defined $job->{'body'};
86         close T;
87         return $fn;
88 }
89
90 package BEX::Queue;
91
92 sub new($;$) {
93         my ($class, $name) = @_;
94         $name //= 'queue';
95         -d $name or die "Queue directory $name does not exist\n";
96         for my $d ("hosts", "jobs") {
97                 -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
98         }
99         my $queue = {
100                 'Name' => $name,
101                 'MetaCache' => {},
102         };
103         return bless $queue;
104 }
105
106 sub host_dir($$) {
107         my ($queue, $machine) = @_;
108         return $queue->{'Name'} . '/hosts/' . $machine;
109 }
110
111 sub queue_file($$) {
112         my ($queue, $machine, $jid) = @_;
113         return $queue->host_dir($machine) . '/' . $jid . '.job';
114 }
115
116 sub status_file($$) {
117         my ($queue, $machine, $jid) = @_;
118         return $queue->host_dir($machine) . '/' . $jid . '.stat';
119 }
120
121 sub temp_file($$) {
122         my ($queue, $machine, $jid) = @_;
123         return $queue->host_dir($machine) . '/' . $jid . '.tmp';
124 }
125
126 sub job_file($$) {
127         my ($queue, $jid) = @_;
128         return $queue->{'Name'} . '/jobs/' . $jid. '.job';
129 }
130
131 sub enqueue($$$) {
132         my ($queue, $machine, $job) = @_;
133         my $qf = $queue->queue_file($machine, $job->{'ID'});
134         if (-f $qf) { return 0; }
135         my $fn = $queue->job_file($job->{'ID'});
136         -f $fn or $job->save($fn);
137         my $dir = $queue->host_dir($machine);
138         -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
139         symlink '../../jobs/' . $job->{'ID'} . '.job', $qf or die "Cannot create $qf: $!";
140         return 1;
141 }
142
143 sub scan($$) {
144         my ($queue, $machine) = @_;
145         my @list = ();
146         if (opendir D, $queue->host_dir($machine)) {
147                 while ($_ = readdir D) {
148                         /^\./ and next;
149                         s{\.job}{} or next;
150                         push @list, $_;
151                 }
152                 closedir D;
153         }
154         return @list;
155 }
156
157 sub remove($$) {
158         my ($queue, $machine, $jid) = @_;
159         unlink $queue->queue_file($machine, $jid);
160         unlink $queue->status_file($machine, $jid);
161         unlink $queue->temp_file($machine, $jid);
162 }
163
164 sub job_metadata($$) {
165         my ($queue, $jid) = @_;
166         my $cache = $queue->{'MetaCache'};
167         if (!defined $cache->{$jid}) {
168                 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
169         }
170         return $cache->{$jid};
171 }
172
173 sub read_job_status($$$) {
174         my ($queue, $machine, $jid) = @_;
175         my %s = ();
176         my $sf = $queue->status_file($machine, $jid);
177         if (open S, '<', $sf) {
178                 while (<S>) {
179                         chomp;
180                         /^(\w+):\s*(.*)/ or die "Parse error in $sf";
181                         $s{$1} = $2;
182                 }
183                 close S;
184         }
185         return \%s;
186 }
187
188 sub write_job_status($$$$) {
189         my ($queue, $machine, $jid, $stat) = @_;
190         my $sf = $queue->status_file($machine, $jid);
191         open S, '>', $sf or die "Cannot create $sf: $!";
192         for my $k (sort keys %$stat) {
193                 print S "$k: ", $stat->{$k}, "\n";
194         }
195         close S;
196 }
197
198 42;