various: add read-only mode support
[girocco.git] / taskd / taskd.pl
blob50be39df6e6ba45237edc4ef68f02fa2517410c3
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 BEGIN {
37 eval 'require Pod::Text::Termcap; 1;' and
38 @Pod::Usage::ISA = (qw( Pod::Text::Termcap ));
39 defined($ENV{PERLDOC}) && $ENV{PERLDOC} ne "" or
40 $ENV{PERLDOC} = "-oterm -oman";
42 use Socket;
43 use Errno;
44 use Fcntl;
45 use POSIX qw(:sys_wait_h :fcntl_h);
46 use File::Basename;
47 use File::Spec ();
48 use Cwd qw(realpath);
50 use lib "__BASEDIR__";
51 use Girocco::Config;
52 use Girocco::Notify;
53 use Girocco::Project;
54 use Girocco::User;
55 use Girocco::Util qw(noFatalsToBrowser get_git human_duration ref_indicator);
56 BEGIN {noFatalsToBrowser}
57 use Girocco::ExecUtil;
59 use constant SOCKFDENV => "GIROCCO_TASKD_SOCKET_FD";
61 # Throttle Classes Defaults
62 # Note that any same-named classes in @Girocco::Config::throttle_classes
63 # will override (completely replacing the entire hash) these ones.
64 my @throttle_defaults = (
66 name => "ref-change",
67 maxproc => 0,
68 maxjobs => 1,
69 interval => 2
72 name => "clone",
73 maxproc => 0,
74 maxjobs => 2,
75 interval => 5
78 name => "snapshot",
79 #maxproc => max(5, cpucount + maxjobs), # this is the default
80 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
81 interval => 5
85 # Options
86 my $quiet;
87 my $progress;
88 my $syslog;
89 my $stderr;
90 my $inetd;
91 my $idle_timeout;
92 my $daemon;
93 my $max_lifetime;
94 my $abbrev = 8;
95 my $showff = 1;
96 my $same_pid;
97 my $statusintv = 60;
98 my $idleintv = 3600;
99 my $maxspawn = 8;
101 $| = 1;
103 my $progname = basename($0);
104 my $children = 0;
105 my $idlestart = time;
106 my $idlestatus = 0;
108 sub cpucount {
109 use Girocco::Util "online_cpus";
110 our $online_cpus_result;
111 $online_cpus_result = online_cpus unless $online_cpus_result;
112 return $online_cpus_result;
115 sub logmsg {
116 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
117 if (tied *STDOUT) {
118 $OStream::only = 2; # STDERR only
119 print "$hdr@_\n";
120 $OStream::only = 1; # syslog only
121 print "@_\n";
122 $OStream::only = 0; # back to default
123 } else {
124 print "$hdr@_\n";
128 sub statmsg {
129 return unless $progress;
130 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
131 if (tied *STDERR) {
132 $OStream::only = 2; # STDERR only
133 print STDERR "$hdr@_\n";
134 $OStream::only = 1; # syslog only
135 print STDERR "@_\n";
136 $OStream::only = 0; # back to default
137 } else {
138 print STDERR "$hdr@_\n";
142 sub isfdopen {
143 my $fd = shift;
144 return undef unless defined($fd) && $fd >= 0;
145 my $result = POSIX::dup($fd);
146 POSIX::close($result) if defined($result);
147 defined($result);
150 sub setnoncloexec {
151 my $fd = shift;
152 fcntl($fd, F_SETFD, 0) or die "fcntl failed: $!";
155 sub setcloexec {
156 my $fd = shift;
157 fcntl($fd, F_SETFD, FD_CLOEXEC) or die "fcntl failed: $!";
160 sub setnonblock {
161 my $fd = shift;
162 my $flags = fcntl($fd, F_GETFL, 0);
163 defined($flags) or die "fcntl failed: $!";
164 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
167 sub setblock {
168 my $fd = shift;
169 my $flags = fcntl($fd, F_GETFL, 0);
170 defined($flags) or die "fcntl failed: $!";
171 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
174 package Throttle;
177 ## Throttle protocol
179 ## 1) Process needing throttle services acquire a control file descriptor
180 ## a) Either as a result of a fork + exec (the write end of a pipe)
181 ## b) Or by connecting to the taskd socket (not yet implemented)
183 ## 2) The process requesting throttle services will be referred to
184 ## as the supplicant or just "supp" for short.
186 ## 3) The supp first completes any needed setup which may include
187 ## gathering data it needs to perform the action -- if that fails
188 ## then there's no need for any throttling.
190 ## 4) The supp writes a throttle request to the control descriptor in
191 ## this format:
192 ## throttle <pid> <class>\n
193 ## for example if the supp's pid was 1234 and it was requesting throttle
194 ## control as a member of the mail class it would write this message:
195 ## throttle 1234 mail\n
196 ## Note that if the control descriptor happens to be a pipe rather than a
197 ## socket, the message should be preceded by another "\n" just be be safe.
198 ## If the control descriptor is a socket, not a pipe, the message may be
199 ## preceded by a "\n" but that's not recommended.
201 ## 5) For supplicants with a control descriptor that is a pipe
202 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
203 ## If the control descriptor is a socket (getsockname succeeds) then
204 ## protocol (5b) should be used.
206 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
207 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
208 ## to write a "keepalive\n" message to the control descriptor. If that
209 ## fails, the controller has gone away and it may make its own decision
210 ## whether or not to proceed at that point. If, on the other hand, it
211 ## receives a SIGTERM, the process limit for its class has been reached
212 ## and it should abort without performing its action. If it receives
213 ## SIGUSR1, it may proceed without writing anything more to the control
214 ## descriptor, any MAY even close the control descriptor. Finally, a
215 ## SIGUSR2 indicates rejection of the throttle request for some other reason
216 ## such as unrecognized class name or invalid pid in which case the supp may
217 ## make its own decision how to proceed.
219 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
220 ## more than 512 bytes and if a '\n' does not appear within that number of
221 ## bytes the read should be considered failed. Otherwise the read should
222 ## be retried until either a full line has been read or the socket is
223 ## closed from the other end. If the lone read is "proceed\n" then it may
224 ## proceed without reading or writing anything more to the control
225 ## descriptor, but MUST keep the control descriptor open and not call
226 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
227 ## should be retried) constitutes failure. If a full line starting with at
228 ## least one alpha character was read but it was not "proceed" then it
229 ## should abort without performing its action. For any other failure it
230 ## may make its own decision whether or not to proceed as the controller has
231 ## gone away.
233 ## 6) The supp now performs its throttled action.
235 ## 7) The supp now closes its control descriptor (if it hasn't already in the
236 ## case of (5a)) and exits -- in the case of a socket, the other end receives
237 ## notification that the socket has been closed (read EOF). In the case of
238 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
239 ## of the other end of the pipe, so it will not reaach EOF by the supp's
240 ## exit in that case).
243 # keys are class names, values are hash refs with these fields:
244 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
245 # many may be queued waiting plus how many may be
246 # concurrently active) with 0 meaning no limit.
247 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
248 # of 0 is unlimited but the number of concurrent
249 # supplicants will always be limited to no more than
250 # the 'maxproc' value (if > 0) no matter what the
251 # 'maxjobs' value is.
252 # 'total' -> integer; the total number of pids belonging to this class that
253 # can currently be found in %pid.
254 # 'active' -> integer; the number of currently active supplicants which should
255 # be the same as (the number of elements of %pid with a
256 # matching class name) - (number of my class in @queue).
257 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
258 # or SIGUSR1 signals to members of this class.
259 # 'lastqueue' -> time; last time a supplicant was successfully queued.
260 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
261 # 'lastthrottle' => time; last time a supplicant was throttled
262 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
263 my %classes = ();
265 # keys are pid numbers, values are array refs with these elements:
266 # [0] => name of class (key to classes hash)
267 # [1] => supplicant state (0 => queued, non-zero => time it started running)
268 # [2] => descriptive text (e.g. project name)
269 my %pid = ();
271 # minimum number of seconds between any two proceed responses no matter what
272 # class. this takes priority in that it can effectively increase the
273 # class's 'interval' value by delaying proceed notifications if the minimum
274 # interval has not yet elapsed.
275 my $interval = 1;
277 # fifo of pids awaiting notification as soon as the next $interval elapses
278 # provided interval and maxjobs requirements are satisfied
279 # for the class of the pid that will next be triggered.
280 my @queue = ();
282 # time of most recent successful call to AddSupplicant
283 my $lastqueue = 0;
285 # time of most recent proceed notification
286 my $lastproceed = 0;
288 # time of most recent throttle
289 my $lastthrottle = 0;
291 # time of most recent removal
292 my $lastdied = 0;
294 # lifetime count of how many have been queued
295 my $totalqueue = 0;
297 # lifetime count of how many have been allowed to proceed
298 my $totalproceed = 0;
300 # lifetime count of how many have been throttled
301 my $totalthrottle = 0;
303 # lifetime count of how many have died
304 # It should always be true that $totalqueued - $totaldied == $curentlyactive
305 my $totaldied = 0;
307 # Returns an unordered list of currently registered class names
308 sub GetClassList {
309 return keys(%classes);
312 sub _max {
313 return $_[0] if $_[0] >= $_[1];
314 return $_[1];
317 sub _getnum {
318 my ($min, $val, $default) = @_;
319 my $ans;
320 if (defined($val) && $val =~ /^[+-]?\d+$/) {
321 $ans = 0 + $val;
322 } else {
323 $ans = &$default;
325 return _max($min, $ans);
328 # [0] => name of class to find
329 # [1] => if true, create class if it doesn't exist, if a hashref then
330 # it contains initial values for maxproc, maxjobs and interval.
331 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
332 # defaults to the max(5, number of cpu cores + maxjobs) and interval
333 # defaults to 1.
334 # Returns a hash ref with info about the class on success
335 sub GetClassInfo {
336 my ($classname, $init) = @_;
337 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
338 or return;
339 $classname = lc($classname);
340 my %info;
341 if ($classes{$classname}) {
342 %info = %{$classes{$classname}};
343 return \%info;
345 return unless $init;
346 my %newclass = ();
347 ref($init) eq 'HASH' or $init = {};
348 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
349 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
350 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
351 $newclass{'total'} = 0;
352 $newclass{'active'} = 0;
353 $newclass{'lastqueue'} = 0;
354 $newclass{'lastproceed'} = 0;
355 $newclass{'lastthrottle'} = 0;
356 $newclass{'lastdied'} = 0;
357 $classes{$classname} = \%newclass;
358 %info = %newclass;
359 return \%info;
362 # [0] => pid to look up
363 # Returns () if not found otherwise ($classname, $timestarted, $description)
364 # Where $timestarted will be 0 if it's still queued otherwise a time() value
365 sub GetPidInfo {
366 my $pid = shift;
367 return () unless exists $pid{$pid};
368 return @{$pid{$pid}};
371 # Returns array of pid numbers that are currently running sorted
372 # by time started (oldest to newest). Can return an empty array.
373 sub GetRunningPids {
374 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
375 grep({ ${$pid{$_}}[1] } keys(%pid)));
378 # Returns a hash with various about the current state
379 # 'interval' => global minimum interval between proceeds
380 # 'active' => how many pids are currently queued + how many are running
381 # 'queue' => how many pids are currently queued
382 # 'lastqueue' => time (epoch seconds) of last queue
383 # 'lastproceed' => time (epoch seconds) of last proceed
384 # 'lastthrottle' => time (epoch seconds) of last throttle
385 # 'lastdied' => time (epoch seconds) of last removal
386 # 'totalqueue' => lifetime total number of processes queued
387 # 'totalproceed' => lifetime total number of processes proceeded
388 # 'totalthrottle' => lifetime total number of processes throttled
389 # 'totaldied' => lifetime total number of removed processes
390 sub GetInfo {
391 return {
392 interval => $interval,
393 active => scalar(keys(%pid)) - scalar(@queue),
394 queue => scalar(@queue),
395 lastqueue => $lastqueue,
396 lastproceed => $lastproceed,
397 lastthrottle => $lastthrottle,
398 lastdied => $lastdied,
399 totalqueue => $totalqueue,
400 totalproceed => $totalproceed,
401 totalthrottle => $totalthrottle,
402 totaldied => $totaldied
406 # with no args get the global interval
407 # with one arg set it, returns previous value if set
408 sub Interval {
409 my $ans = $interval;
410 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
411 return $ans;
414 sub RemoveSupplicant;
416 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
417 # Returns minimum interval until next proceed is possible
418 # Returns undef if there's nothing waiting to proceed or
419 # the 'maxjobs' limits have been reached for all queued items (in which
420 # case it won't be possible to proceed until one of them exits, hence undef)
421 # This is called automatially by AddSupplicant and RemoveSupplicant
422 sub ServiceQueue {
423 RETRY:
424 return undef unless @queue; # if there's nothing queued, nothing to do
425 my $now = time;
426 my $min = _max(0, $interval - ($now - $lastproceed));
427 my $classmin = undef;
428 my $classchecked = 0;
429 my %seenclass = ();
430 my $classcount = scalar(keys(%classes));
431 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
432 my $pid = $queue[$i];
433 my $procinfo = $pid{$pid};
434 if (!$procinfo) {
435 RemoveSupplicant($pid, 1);
436 goto RETRY;
438 my $classinfo = $classes{$$procinfo[0]};
439 if (!$classinfo) {
440 RemoveSupplicant($pid, 1);
441 goto RETRY;
443 if (!$seenclass{$$procinfo[0]}) {
444 $seenclass{$$procinfo[0]} = 1;
445 ++$classchecked;
446 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
447 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
448 if (!$cmin && !$min) {
449 $now = time;
450 $$procinfo[1] = $now;
451 splice(@queue, $i, 1);
452 ++$totalproceed;
453 $lastproceed = $now;
454 $classinfo->{'lastproceed'} = $now;
455 ++$classinfo->{'active'};
456 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
457 goto RETRY;
459 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
463 return defined($classmin) ? _max($min, $classmin) : undef;
466 # $1 => pid to add (must not already be in %pids)
467 # $2 => class name (must exist)
468 # Returns -1 if no such class or pid already present or invalid
469 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
470 # Return 1 if throttled and cannot be added
471 sub AddSupplicant {
472 my ($pid, $classname, $text, $noservice) = @_;
473 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
474 $pid += 0;
475 kill(0, $pid) or return -1;
476 my $classinfo = $classes{$classname};
477 return -1 unless $classinfo;
478 return -1 if $pid{$pid};
479 $text = '' unless defined($text);
480 my $now = time;
481 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
482 ++$totalthrottle;
483 $lastthrottle = $now;
484 $classinfo->{'lastthrottle'} = $now;
485 return 1;
487 ++$totalqueue;
488 $lastqueue = $now;
489 $pid{$pid} = [$classname, 0, $text];
490 ++$classinfo->{'total'};
491 $classinfo->{'lastqueue'} = $now;
492 push(@queue, $pid);
493 ServiceQueue unless $noservice;
494 return 0;
497 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
498 # Returns 0 if removed
499 # Returns -1 if unknown pid or other error during removal
500 sub RemoveSupplicant {
501 my ($pid, $noservice) = @_;
502 return -1 unless defined($pid) && $pid =~ /^\d+$/;
503 $pid += 0;
504 my $pidinfo = $pid{$pid};
505 $pidinfo or return -1;
506 my $now = time;
507 $lastdied = $now;
508 ++$totaldied;
509 delete $pid{$pid};
510 if (!$$pidinfo[1]) {
511 for (my $i=0; $i<=$#queue; ++$i) {
512 if ($queue[$i] == $pid) {
513 splice(@queue, $i, 1);
514 --$i;
518 my $classinfo = $classes{$$pidinfo[0]};
519 ServiceQueue, return -1 unless $classinfo;
520 --$classinfo->{'active'} if $$pidinfo[1];
521 --$classinfo->{'total'};
522 $classinfo->{'lastdied'} = $now;
523 ServiceQueue unless $noservice;
524 return 0;
527 # Instance Methods
529 package main;
532 ## ---------
533 ## Functions
534 ## ---------
537 my @reapedpids = ();
538 my %signame = (
539 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
540 1 => 'SIGHUP',
541 2 => 'SIGINT',
542 3 => 'SIGQUIT',
543 6 => 'SIGABRT',
544 9 => 'SIGKILL',
545 14 => 'SIGALRM',
546 15 => 'SIGTERM',
548 sub REAPER {
549 local $!;
550 my $child;
551 my $waitedpid;
552 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
553 my $code = $? & 0xffff;
554 $idlestart = time if !--$children;
555 my $codemsg = '';
556 if (!($code & 0xff)) {
557 $codemsg = " with exit code ".($code >> 8) if $code;
558 } elsif ($code & 0x7f) {
559 my $signum = ($code & 0x7f);
560 $codemsg = " with signal ".
561 ($signame{$signum}?$signame{$signum}:$signum);
563 logmsg "reaped $waitedpid$codemsg";
564 push(@reapedpids, $waitedpid);
566 $SIG{CHLD} = \&REAPER; # loathe sysV
569 sub set_sigchld_reaper() {
570 $SIG{CHLD} = \&REAPER; # Apollo 440
573 my ($piperead, $pipewrite);
574 sub spawn {
575 my $coderef = shift;
577 my $pid = fork;
578 if (not defined $pid) {
579 logmsg "cannot fork: $!";
580 return;
581 } elsif ($pid) {
582 $idlestart = time if !++$children;
583 $idlestatus = 0;
584 logmsg "begat $pid";
585 return; # I'm the parent
588 close(Server) unless fileno(Server) == 0;
589 close($piperead);
590 $SIG{'CHLD'} = sub {};
592 open STDIN, "+<&Client" or die "can't dup client to stdin";
593 close(Client);
594 exit &$coderef();
597 # returns:
598 # < 0: error
599 # = 0: proceed
600 # > 0: throttled
601 sub request_throttle {
602 use POSIX qw(sigprocmask sigsuspend SIG_SETMASK);
603 my $classname = shift;
604 my $text = shift;
606 Throttle::GetClassInfo($classname)
607 or return -1; # no such throttle class
609 my $throttled = 0;
610 my $proceed = 0;
611 my $error = 0;
612 my $controldead = 0;
613 my $setempty = POSIX::SigSet->new;
614 my $setfull = POSIX::SigSet->new;
615 $setempty->emptyset();
616 $setfull->fillset();
617 $SIG{'TERM'} = sub {$throttled = 1};
618 $SIG{'USR1'} = sub {$proceed = 1};
619 $SIG{'USR2'} = sub {$error = 1};
620 $SIG{'PIPE'} = sub {$controldead = 1};
621 $SIG{'ALRM'} = sub {};
623 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
624 print $pipewrite "\nthrottle $$ $classname $text\n";
625 my $old = POSIX::SigSet->new;
626 sigprocmask(SIG_SETMASK, $setfull, $old);
627 until ($controldead || $throttled || $proceed || $error) {
628 alarm(30);
629 sigsuspend($setempty);
630 alarm(0);
631 sigprocmask(SIG_SETMASK, $setempty, $old);
632 print $pipewrite "\nkeepalive $$\n";
633 sigprocmask(SIG_SETMASK, $setfull, $old);
635 sigprocmask(SIG_SETMASK, $setempty, $old);
636 $SIG{'TERM'} = "DEFAULT";
637 $SIG{'USR1'} = "DEFAULT";
638 $SIG{'USR2'} = "DEFAULT";
639 $SIG{'ALRM'} = "DEFAULT";
640 $SIG{'PIPE'} = "DEFAULT";
642 my $result = -1;
643 if ($throttled) {
644 $result = 1;
645 } elsif ($proceed) {
646 $result = 0;
648 return $result;
651 sub clone {
652 my ($name) = @_;
653 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
654 my $proj;
655 eval {$proj = Girocco::Project->load($name)};
656 if (!$proj && Girocco::Project::does_exist($name, 1)) {
657 # If the .clone_in_progress file exists, but the .clonelog does not
658 # and neither does the .clone_failed, be helpful and touch the
659 # .clone_failed file so that the mirror can be restarted
660 my $projdir = $Girocco::Config::reporoot."/$name.git";
661 if (-d "$projdir" && -f "$projdir/.clone_in_progress" && ! -f "$projdir/.clonelog" && ! -f "$projdir/.clone_failed") {
662 open X, '>', "$projdir/.clone_failed" and close(X);
665 $proj or die "failed to load project $name";
666 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
667 $proj->{clone_logged} and die "project $name is already being cloned";
668 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
669 statmsg "cloning $name";
670 my $devnullfd = POSIX::open(File::Spec->devnull, O_RDWR);
671 defined($devnullfd) && $devnullfd >= 0 or die "cannot open /dev/null: $!";
672 POSIX::dup2($devnullfd, 0) or
673 die "cannot dup2 STDIN_FILENO: $!";
674 POSIX::close($devnullfd);
675 my $duperr;
676 open $duperr, '>&2' or
677 die "cannot dup STDERR_FILENO: $!";
678 my $clonelogfd = POSIX::open("$Girocco::Config::reporoot/$name.git/.clonelog", O_WRONLY|O_TRUNC|O_CREAT, 0664);
679 defined($clonelogfd) && $clonelogfd >= 0 or die "cannot open clonelog for writing: $!";
680 POSIX::dup2($clonelogfd, 1) or
681 die "cannot dup2 STDOUT_FILENO: $!";
682 POSIX::dup2($clonelogfd, 2) or
683 POSIX::dup2(fileno($duperr), 2), die "cannot dup2 STDERR_FILENO: $!";
684 POSIX::close($clonelogfd);
685 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or
686 POSIX::dup2(fileno($duperr), 2), die "exec failed: $!";
689 sub _ref_indicator {
690 return ' -> ' unless $showff && defined($_[0]);
691 return ref_indicator(@_);
694 sub _statproc {
695 my $info = shift;
696 my ($old, $new, $ref, $mail_sh_ran, $git_ran, $ind) = @_;
697 defined($ind) && ($ind eq ' -> ' || $ind eq '..' || $ind eq '...') or
698 ($ind, $git_ran) = _ref_indicator($$info{path}, $old, $new);
699 statmsg "ref-$$info{type} $$info{username} $$info{name} ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
700 if ($mail_sh_ran) {
701 sleep 2;
702 } elsif ($git_ran) {
703 sleep 1;
707 sub ref_change {
708 my ($arg) = @_;
709 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
710 $username && $name && $oldrev && $newrev && $ref or return 0;
711 $oldrev =~ /^([0-9a-f]{40,})$/ or return 0; $oldrev = $1;
712 $newrev =~ /^([0-9a-f]{40,})$/ or return 0; $newrev = $1;
713 $ref =~ m{^(refs/.+)$} or return 0; $ref = $1;
714 $newrev ne $oldrev or return 0;
715 $Girocco::Config::notify_single_level || $ref =~ m(^refs/[^/]+/[^/]) or return 0;
717 Girocco::Project::does_exist(\$name, 1) or die "no such project: $name";
718 my $proj = Girocco::Project->load($name);
719 $proj or die "failed to load project $name";
720 my $has_notify = $proj->has_notify;
721 my $type = $has_notify ? "notify" : "change";
723 my $user;
724 if ($username && $username !~ /^%.*%$/) {
725 Girocco::User::does_exist($username, 1) or die "no such user: $username";
726 $user = Girocco::User->load($username);
727 $user or die "failed to load user $username";
728 } elsif ($username eq "%$name%") {
729 $username = "-";
732 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
733 my $info = { "username" => $username, "name" => $name, "type" => $type, "path" => $proj->{path} };
734 my $statproc = sub { _statproc($info, @_); };
735 open STDIN, '<', File::Spec->devnull;
736 Girocco::Notify::ref_changes($proj, $user, $statproc, [$oldrev, $newrev, $ref]) if $has_notify;
737 return 0;
740 sub ref_changes {
741 my ($arg) = @_;
742 my ($username, $name) = split(/\s+/, $arg);
743 $username && $name or return 0;
745 Girocco::Project::does_exist(\$name, 1) or die "no such project: $name";
746 my $proj = Girocco::Project->load($name);
747 $proj or die "failed to load project $name";
748 my $has_notify = $proj->has_notify;
749 my $type = $has_notify ? "notify" : "change";
751 my $user;
752 if ($username && $username !~ /^%.*%$/) {
753 Girocco::User::does_exist($username, 1) or die "no such user: $username";
754 $user = Girocco::User->load($username);
755 $user or die "failed to load user $username";
756 } elsif ($username eq "%$name%") {
757 $username = "-";
760 my @changes = ();
761 my %oldheads = ();
762 my %deletedheads = ();
763 while (my $change = <STDIN>) {
764 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
765 $oldrev ne "done" or last;
766 $oldrev =~ /^([0-9a-f]{40,})$/ or next; $oldrev = $1;
767 $newrev =~ /^([0-9a-f]{40,})$/ or next; $newrev = $1;
768 $ref =~ m{^(refs/.+)$} or next; $ref = $1;
769 $Girocco::Config::notify_single_level || $ref =~ m(^refs/[^/]+/[^/]) or next;
770 if ($ref =~ m{^refs/heads/.}) {
771 if ($oldrev =~ /^0{40,}$/) {
772 delete $oldheads{$ref};
773 $deletedheads{$ref} = 1;
774 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
775 $oldheads{$ref} = $oldrev;
778 $newrev ne $oldrev or next;
779 push(@changes, [$oldrev, $newrev, $ref]);
781 return 0 unless @changes;
782 open STDIN, '<', File::Spec->devnull;
783 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
784 my $info = { "username" => $username, "name" => $name, "type" => $type, "path" => $proj->{path} };
785 my $statproc = sub { _statproc($info, @_); };
786 if ($has_notify) {
787 Girocco::Notify::ref_changes($proj, $user, $statproc, \%oldheads, @changes);
788 } else {
789 &$statproc(@$_) foreach @changes;
791 return 0;
794 sub throttle {
795 my ($arg) = @_;
796 my ($pid, $classname, $text) = split(/\s+/, $arg);
797 $pid =~ /^\d+/ or return 0; # invalid pid
798 $pid += 0;
799 $pid > 0 or return 0; # invalid pid
800 kill(0, $pid) || $!{EPERM} or return 0; # no such process
801 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
802 defined($text) && $text ne '' or return 0; # no text no service
804 my $throttled = 0;
805 my $proceed = 0;
806 my $error = 0;
807 my $controldead = 0;
808 my $suppdead = 0;
809 my ($waker, $wakew);
810 pipe($waker, $wakew) or die "pipe failed: $!";
811 select((select($wakew),$|=1)[0]);
812 setnonblock($wakew);
813 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
814 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
815 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
816 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
817 select((select(STDIN),$|=1)[0]);
819 logmsg "throttle $pid $classname $text request";
820 # After writing we can expect a SIGTERM or SIGUSR1
821 print $pipewrite "\nthrottle $$ $classname $text\n";
823 # NOTE: the only way to detect the socket close is to read all the
824 # data until EOF is reached -- recv can be used to peek.
825 my $v = '';
826 vec($v, fileno(STDIN), 1) = 1;
827 vec($v, fileno($waker), 1) = 1;
828 setnonblock(\*STDIN);
829 setnonblock($waker);
830 until ($controldead || $throttled || $proceed || $error || $suppdead) {
831 my ($r, $e);
832 select($r=$v, undef, $e=$v, 30);
833 my ($bytes, $discard);
834 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
835 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
836 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
837 print $pipewrite "\nkeepalive $$\n";
839 setblock(\*STDIN);
841 if ($throttled && !$suppdead) {
842 print STDIN "throttled\n";
843 logmsg "throttle $pid $classname $text throttled";
844 } elsif ($proceed && !$suppdead) {
845 print STDIN "proceed\n";
846 logmsg "throttle $pid $classname $text proceed";
847 $SIG{'TERM'} = 'DEFAULT';
848 # Stay alive until the child dies which we detect by EOF on STDIN
849 setnonblock(\*STDIN);
850 until ($controldead || $suppdead) {
851 my ($r, $e);
852 select($r=$v, undef, $e=$v, 30);
853 my ($bytes, $discard);
854 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
855 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
856 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
857 print $pipewrite "\nkeepalive $$\n";
859 setblock(\*STDIN);
860 } else {
861 my $prefix = '';
862 $prefix = "control" if $controldead && !$suppdead;
863 logmsg "throttle $pid $classname $text ${prefix}died";
865 exit 0;
868 sub process_pipe_msg {
869 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
870 if ($act eq "throttle") {
871 $pid =~ /^(\d+)$/ or return 0;
872 $pid = 0 + $1;
873 $pid > 0 or return 0; # invalid pid
874 kill(0, $pid) or return 0; # invalid pid
875 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
876 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
877 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
878 # the AddSupplicant call could send SIGUSR1 before it returns
879 my $result = Throttle::AddSupplicant($pid, $cls, $text);
880 kill('USR2', $pid), return 0 if $result < 0;
881 kill('TERM', $pid), return 0 if $result > 0;
882 # $pid was added to class $cls and will receive SIGUSR1 when
883 # it's time for it to proceed
884 return 0;
885 } elsif ($act eq "keepalive") {
886 # nothing to do although we could verify pid is valid and
887 # still in %Throttle::pids and send a SIGUSR2 if not, but
888 # really keepalive should just be ignored.
889 return 0;
891 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
892 return 0;
896 ## -------
897 ## OStream
898 ## -------
901 package OStream;
903 # Set to 1 for only syslog output (if enabled by mode)
904 # Set to 2 for only stderr output (if enabled by mode)
905 our $only = 0; # This is a hack
907 use Carp 'croak';
908 use Sys::Syslog qw(:DEFAULT :macros);
910 sub writeall {
911 my ($fd, $data) = @_;
912 my $offset = 0;
913 my $remaining = length($data);
914 while ($remaining) {
915 my $bytes = POSIX::write(
916 $fd,
917 substr($data, $offset, $remaining),
918 $remaining);
919 next if !defined($bytes) && $!{EINTR};
920 croak "POSIX::write failed: $!" unless defined $bytes;
921 croak "POSIX::write wrote 0 bytes" unless $bytes;
922 $remaining -= $bytes;
923 $offset += $bytes;
927 sub dumpline {
928 use POSIX qw(STDERR_FILENO);
929 my ($self, $line) = @_;
930 $only = 0 unless defined($only);
931 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
932 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
933 return unless length($line);
934 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
937 sub TIEHANDLE {
938 my $class = shift || 'OStream';
939 my $mode = shift;
940 my $syslogname = shift;
941 my $syslogfacility = shift;
942 defined($syslogfacility) or $syslogfacility = LOG_USER;
943 my $self = {};
944 $self->{'syslog'} = $mode > 0;
945 $self->{'stderr'} = $mode <= 0 || $mode > 1;
946 $self->{'lastline'} = '';
947 if ($self->{'syslog'}) {
948 # Some Sys::Syslog have a stupid default setlogsock order
949 eval {Sys::Syslog::setlogsock("native"); 1;} or
950 eval {Sys::Syslog::setlogsock("unix");};
951 openlog($syslogname, "ndelay,pid", $syslogfacility)
952 or croak "Sys::Syslog::openlog failed: $!";
954 return bless $self, $class;
957 sub BINMODE {return 1}
958 sub FILENO {return undef}
959 sub EOF {return 0}
960 sub CLOSE {return 1}
962 sub PRINTF {
963 my $self = shift;
964 my $template = shift;
965 return $self->PRINT(sprintf $template, @_);
968 sub PRINT {
969 my $self = shift;
970 my $data = join('', $self->{'lastline'}, @_);
971 my $pos = 0;
972 while ((my $idx = index($data, "\n", $pos)) >= 0) {
973 ++$idx;
974 my $line = substr($data, $pos, $idx - $pos);
975 substr($data, $pos, $idx - $pos) = '';
976 $pos = $idx;
977 $self->dumpline($line);
979 $self->{'lastline'} = $data;
980 return 1;
983 sub DESTROY {
984 my $self = shift;
985 $self->dumpline($self->{'lastline'})
986 if length($self->{'lastline'});
987 closelog;
990 sub WRITE {
991 my $self = shift;
992 my ($scalar, $length, $offset) = @_;
993 $scalar = '' if !defined($scalar);
994 $length = length($scalar) if !defined($length);
995 croak "OStream::WRITE invalid length $length"
996 if $length < 0;
997 $offset = 0 if !defined($offset);
998 $offset += length($scalar) if $offset < 0;
999 croak "OStream::WRITE invalid write offset"
1000 if $offset < 0 || $offset > $length;
1001 my $max = length($scalar) - $offset;
1002 $length = $max if $length > $max;
1003 $self->PRINT(substr($scalar, $offset, $length));
1004 return $length;
1008 ## ----
1009 ## main
1010 ## ----
1013 package main;
1015 # returns pid of process that will schedule jobd.pl restart on success
1016 # returns 0 if fork or other system call failed with error in $!
1017 # returns undef if jobd.pl does not currently appear to be running (no lockfile)
1018 sub schedule_jobd_restart {
1019 use POSIX qw(_exit setpgid dup2 :fcntl_h);
1020 my $devnull = File::Spec->devnull;
1021 my $newpg = shift;
1022 my $jdlf = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
1023 return undef unless -f $jdlf;
1024 my $oldsigchld = $SIG{'CHLD'};
1025 defined($oldsigchld) or $oldsigchld = sub {};
1026 my ($read, $write, $read2, $write2);
1027 pipe($read, $write) or return 0;
1028 select((select($write),$|=1)[0]);
1029 if (!pipe($read2, $write2)) {
1030 local $!;
1031 close $write;
1032 close $read;
1033 return 0;
1035 select((select($write2),$|=1)[0]);
1036 $SIG{'CHLD'} = sub {};
1037 my $retries = 3;
1038 my $child;
1039 while (!defined($child) && $retries--) {
1040 $child = fork;
1041 sleep 1 unless defined($child) || !$retries;
1043 if (!defined($child)) {
1044 local $!;
1045 close $write2;
1046 close $read2;
1047 close $write;
1048 close $read;
1049 $SIG{'CHLD'} = $oldsigchld;
1050 return 0;
1052 # double fork the child
1053 if (!$child) {
1054 close $read2;
1055 my $retries2 = 3;
1056 my $child2;
1057 while (!defined($child2) && $retries2--) {
1058 $child2 = fork;
1059 sleep 1 unless defined($child2) || !$retries2;
1061 if (!defined($child2)) {
1062 my $ec = 0 + $!;
1063 $ec = 255 unless $ec;
1064 print $write2 ":$ec";
1065 close $write2;
1066 _exit 127;
1068 if ($child2) {
1069 # pass new child pid up to parent and exit
1070 print $write2 $child2;
1071 close $write2;
1072 _exit 0;
1073 } else {
1074 # this is the grandchild
1075 close $write2;
1077 } else {
1078 close $write2;
1079 my $result = <$read2>;
1080 close $read2;
1081 chomp $result if defined($result);
1082 if (!defined($result) || $result !~ /^:?\d+$/) {
1083 # something's wrong with the child -- kill it
1084 kill(9, $child) && waitpid($child, 0);
1085 my $oldsigpipe = $SIG{'PIPE'};
1086 # make sure the grandchild, if any,
1087 # doesn't run the success proc
1088 $SIG{'PIPE'} = sub {};
1089 print $write 1;
1090 close $write;
1091 close $read;
1092 $SIG{'PIPE'} = defined($oldsigpipe) ?
1093 $oldsigpipe : 'DEFAULT';
1094 $! = 255;
1095 $SIG{'CHLD'} = $oldsigchld;
1096 return 0;
1098 if ($result =~ /^:(\d+)$/) {
1099 # fork failed in child, there is no grandchild
1100 my $ec = $1;
1101 waitpid($child, 0);
1102 close $write;
1103 close $read;
1104 $! = $ec;
1105 $SIG{'CHLD'} = $oldsigchld;
1106 return 0;
1108 # reap the child and set $child to grandchild's pid
1109 waitpid($child, 0);
1110 $child = $result;
1112 if (!$child) {
1113 # grandchild that actually initiates the jobd.pl restart
1114 close $write;
1115 my $wait = 5;
1116 my $ufd = POSIX::open($devnull, O_RDWR);
1117 if (defined($ufd)) {
1118 dup2($ufd, 0) unless $ufd == 0;
1119 dup2($ufd, 1) unless $ufd == 1;
1120 dup2($ufd, 2) unless $ufd == 2;
1121 POSIX::close($ufd) unless $ufd == 0 || $ufd == 1 || $ufd == 2;
1123 chdir "/";
1124 if ($newpg) {
1125 my $makepg = sub {
1126 my $result = setpgid(0, 0);
1127 if (!defined($result)) {
1128 --$wait;
1129 sleep 1;
1131 $result;
1133 my $result = &$makepg;
1134 defined($result) or $result = &$makepg;
1135 defined($result) or $result = &$makepg;
1136 defined($result) or $result = &$makepg;
1138 sleep $wait;
1139 my $result = <$read>;
1140 close $read;
1141 chomp $result if defined($result);
1142 if (!defined($result) || $result eq 0) {
1143 open JDLF, '+<', $jdlf or _exit(1);
1144 select((select(JDLF),$|=1)[0]);
1145 print JDLF "restart\n";
1146 truncate JDLF, tell(JDLF);
1147 close JDLF;
1149 _exit(0);
1151 close $write;
1152 close $read;
1153 $SIG{'CHLD'} = $oldsigchld;
1154 return $child;
1157 sub cancel_jobd_restart {
1158 my $restarter = shift;
1159 return unless defined($restarter) && $restarter != 0;
1160 return -1 unless kill(0, $restarter);
1161 kill(9, $restarter) or die "failed to kill jobd restarter process (pid $restarter): $!\n";
1162 # we must not waitpid because $restarter was doubly forked and will
1163 # NOT send us a SIGCHLD when it terminates
1164 return $restarter;
1167 my $reexec = Girocco::ExecUtil->new;
1168 my $realpath0 = realpath($0);
1169 chdir "/";
1170 close(DATA) if fileno(DATA);
1171 my $sfac;
1172 Getopt::Long::Configure('bundling');
1173 my ($stiv, $idiv);
1174 my $parse_res = GetOptions(
1175 'help|?|h' => sub {
1176 pod2usage(-verbose => 2, -exitval => 0, -input => $realpath0)},
1177 'quiet|q' => \$quiet,
1178 'no-quiet' => sub {$quiet = 0},
1179 'progress|P' => \$progress,
1180 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
1181 'idle-timeout|t=i' => \$idle_timeout,
1182 'daemon' => sub {$daemon = 1; $syslog = 1; $quiet = 1;},
1183 'max-lifetime=i' => \$max_lifetime,
1184 'syslog|s:s' => \$sfac,
1185 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
1186 'stderr' => \$stderr,
1187 'abbrev=i' => \$abbrev,
1188 'show-fast-forward-info' => \$showff,
1189 'no-show-fast-forward-info' => sub {$showff = 0},
1190 'same-pid' => \$same_pid,
1191 'no-same-pid' => sub {$same_pid = 0},
1192 'status-interval=i' => \$stiv,
1193 'idle-status-interval=i' => \$idiv,
1194 ) || pod2usage(-exitval => 2, -input => $realpath0);
1195 $same_pid = !$daemon unless defined($same_pid);
1196 $syslog = 1 if defined($sfac);
1197 $progress = 1 unless $quiet;
1198 $abbrev = 128 unless $abbrev > 0;
1199 pod2usage(-msg => "--inetd and --daemon are incompatible") if ($inetd && $daemon);
1200 if (defined($idle_timeout)) {
1201 die "--idle-timeout must be a whole number\n" unless $idle_timeout =~ /^\d+$/;
1202 die "--idle-timeout may not be used without --inetd\n" unless $inetd;
1204 if (defined($max_lifetime)) {
1205 die "--max-lifetime must be a whole number\n" unless $max_lifetime =~ /^\d+$/;
1206 $max_lifetime += 0;
1208 defined($max_lifetime) or $max_lifetime = 604800; # 1 week
1209 if (defined($stiv)) {
1210 die "--status-interval must be a whole number\n" unless $stiv =~ /^\d+$/;
1211 $statusintv = $stiv * 60;
1213 if (defined($idiv)) {
1214 die "--idle-status-interval must be a whole number\n" unless $idiv =~ /^\d+$/;
1215 $idleintv = $idiv * 60;
1218 open STDIN, '<'.File::Spec->devnull or die "could not redirect STDIN to /dev/null\n" unless $inetd;
1219 open STDOUT, '>&STDERR' if $inetd;
1220 if ($syslog) {
1221 use Sys::Syslog qw();
1222 my $mode = 1;
1223 ++$mode if $stderr;
1224 $sfac = "user" unless defined($sfac) && $sfac ne "";
1225 my $ofac = $sfac;
1226 $sfac = uc($sfac);
1227 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1228 my $facility;
1229 my %badfac = map({("LOG_$_" => 1)}
1230 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1231 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac\n";
1232 die "invalid syslog facility: $ofac\n"
1233 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1234 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
1236 if ($quiet) {
1237 open STDOUT, '>', File::Spec->devnull;
1238 } elsif ($inetd) {
1239 *STDOUT = *STDERR;
1242 my ($NAME, $INO);
1244 set_sigchld_reaper;
1245 my $restart_file = $Girocco::Config::chroot.'/etc/taskd.restart';
1246 my $restart_active = 1;
1247 my $resumefd = $ENV{(SOCKFDENV)};
1248 delete $ENV{(SOCKFDENV)};
1249 if (defined($resumefd)) {{
1250 unless ($resumefd =~ /^(\d+)(?::(-?\d+))?$/) {
1251 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- bad format\n";
1252 $resumefd = undef;
1253 last;
1255 my $resumeino;
1256 ($resumefd, $resumeino) = ($1, $2);
1257 $resumefd += 0;
1258 unless (isfdopen($resumefd)) {
1259 warn "ignoring invalid ".SOCKFDENV." environment value -- fd \"$resumefd\" not open\n";
1260 $resumefd = undef;
1261 last;
1263 unless ($inetd) {
1264 unless (defined($resumeino)) {
1265 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- missing inode\n";
1266 POSIX::close($resumefd);
1267 $resumefd = undef;
1268 last;
1270 $resumeino += 0;
1271 my $sockloc = $Girocco::Config::chroot.'/etc/taskd.socket';
1272 my $slinode = (stat($sockloc))[1];
1273 unless (defined($slinode) && -S _) {
1274 warn "ignoring ".SOCKFDENV." environment value; socket file does not exist: $sockloc\n";
1275 POSIX::close($resumefd);
1276 $resumefd = undef;
1277 last;
1279 open Test, "<&$resumefd" or die "open: $!";
1280 my $sockname = getsockname Test;
1281 my $sockpath;
1282 $sockpath = unpack_sockaddr_un $sockname if $sockname && sockaddr_family($sockname) == AF_UNIX;
1283 close Test;
1284 if (!defined($resumeino) || !defined($sockpath) || $resumeino != $slinode || realpath($sockloc) ne realpath($sockpath)) {
1285 warn "ignoring ".SOCKFDENV." environment value; does not match socket file: $sockloc\n";
1286 POSIX::close($resumefd);
1287 $resumefd = undef;
1289 $INO = $resumeino;
1292 if ($inetd || defined($resumefd)) {
1293 my $fdopen = defined($resumefd) ? $resumefd : 0;
1294 open Server, "<&=$fdopen" or die "open: $!";
1295 setcloexec(\*Server) if $fdopen > $^F;
1296 my $sockname = getsockname Server;
1297 die "getsockname: $!" unless $sockname;
1298 die "socket already connected! must be 'wait' socket\n" if getpeername Server;
1299 die "getpeername: $!" unless $!{ENOTCONN};
1300 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
1301 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1302 my $socktype = unpack('i', $st);
1303 die "stream socket required\n" unless defined $socktype && $socktype == SOCK_STREAM;
1304 die "AF_UNIX socket required\n" unless sockaddr_family($sockname) == AF_UNIX;
1305 $NAME = unpack_sockaddr_un $sockname;
1306 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
1307 if (realpath($NAME) ne realpath($expected)) {
1308 $restart_active = 0;
1309 warn "listening on \"$NAME\" but expected \"$expected\", restart file disabled\n";
1311 my $mode = (stat($NAME))[2];
1312 die "stat: $!" unless $mode;
1313 $mode &= 07777;
1314 if (($mode & 0660) != 0660) {
1315 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1317 } else {
1318 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
1319 my $uaddr = sockaddr_un($NAME);
1321 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1322 die "already exists but not a socket: $NAME\n" if -e $NAME && ! -S _;
1323 if (-e _) {
1324 # Do not unlink another instance's active listen socket!
1325 socket(my $sfd, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1326 connect($sfd, $uaddr) || $!{EPROTOTYPE} and
1327 die "Live socket '$NAME' exists. Please make sure no other instance of taskd is running.\n";
1328 close($sfd);
1329 unlink($NAME);
1331 bind(Server, $uaddr) or die "bind failed: $!";
1332 listen(Server, SOMAXCONN) or die "listen failed: $!";
1333 chmod 0666, $NAME or die "chmod failed: $!";
1334 $INO = (stat($NAME))[1] or die "stat failed: $!";
1337 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1338 my $classname = $throttle->{"name"};
1339 $classname or next;
1340 Throttle::GetClassInfo($classname, $throttle);
1343 sub _min {
1344 return $_[0] <= $_[1] ? $_[0] : $_[1];
1347 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1348 setnonblock($piperead);
1349 select((select($pipewrite), $|=1)[0]);
1350 my $pipebuff = '';
1351 my $fdset_both = '';
1352 vec($fdset_both, fileno($piperead), 1) = 1;
1353 my $fdset_pipe = $fdset_both;
1354 vec($fdset_both, fileno(Server), 1) = 1;
1355 my $penalty = 0;
1356 my $t = time;
1357 my $penaltytime = $t;
1358 my $nextwakeup = $t + 60;
1359 my $nextstatus = undef;
1360 $nextstatus = $t + $statusintv if $statusintv;
1361 if ($restart_active) {
1362 unless (unlink($restart_file) || $!{ENOENT}) {
1363 $restart_active = 0;
1364 statmsg "restart file disabled could not unlink \"$restart_file\": $!";
1367 daemon(1, 1) or die "failed to daemonize: $!\n" if $daemon;
1368 my $starttime = time;
1369 my $endtime = $max_lifetime ? $starttime + $max_lifetime : 0;
1370 statmsg "listening on $NAME";
1371 while (1) {
1372 my ($rout, $eout, $nfound);
1373 do {
1374 my $wait;
1375 my $now = time;
1376 my $adjustpenalty = sub {
1377 if ($penaltytime < $now) {
1378 my $credit = $now - $penaltytime;
1379 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1380 $penaltytime = $now;
1383 if (defined($nextstatus) && $now >= $nextstatus) {
1384 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1385 my $statmsg = "STATUS: $children active";
1386 my @running = ();
1387 if ($children) {
1388 my @stats = ();
1389 my $cnt = 0;
1390 foreach my $cls (sort(Throttle::GetClassList())) {
1391 my $inf = Throttle::GetClassInfo($cls);
1392 if ($inf->{'total'}) {
1393 $cnt += $inf->{'total'};
1394 push(@stats, substr(lc($cls),0,1)."=".
1395 $inf->{'total'}.'/'.$inf->{'active'});
1398 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1399 $statmsg .= " (".join(" ",@stats).")" if @stats;
1400 foreach (Throttle::GetRunningPids()) {
1401 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1402 next unless $ts;
1403 push(@running, "[${cls}::$desc] ".human_duration($now-$ts));
1406 my $idlesecs;
1407 $statmsg .= ", idle " . human_duration($idlesecs)
1408 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1409 statmsg $statmsg;
1410 statmsg "STATUS: currently running: ".join(", ", @running)
1411 if @running;
1412 $idlestatus = $now if !$children;
1414 $nextstatus += $statusintv while $nextstatus <= $now;
1416 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1417 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1418 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1419 my $fdset;
1420 if ($penalty <= $maxspawn) {
1421 $fdset = $fdset_both;
1422 } else {
1423 $fdset = $fdset_pipe;
1424 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1426 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1427 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1428 my $reaped;
1429 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1430 $now = time;
1431 &$adjustpenalty; # this prevents banking credits for elapsed time
1432 if (!$children && !$nfound && $restart_active && (($endtime && $now >= $endtime) || -e $restart_file)) {
1433 statmsg "RESTART: restart requested; max lifetime ($max_lifetime) exceeded" if $endtime && $now >= $endtime;
1434 $SIG{CHLD} = sub {};
1435 my $restarter = schedule_jobd_restart($inetd);
1436 if (defined($restarter) && !$restarter) {
1437 statmsg "RESTART: restart requested; retrying failed scheduling of jobd restart: $!";
1438 sleep 2; # *cough*
1439 $restarter = schedule_jobd_restart;
1440 if (!defined($restarter)) {
1441 statmsg "RESTART: restart requested; reschedule skipped jobd no longer running";
1442 } elsif (defined($restarter) && !$restarter) {
1443 statmsg "RESTART: restart requested; retry of jobd restart scheduling failed, skipping jobd restart: $!";
1444 $restarter = undef;
1447 if ($inetd) {
1448 statmsg "RESTART: restart requested; now exiting for inetd restart";
1449 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1450 sleep 2; # *cough*
1451 exit 0;
1452 } else {
1453 statmsg "RESTART: restart requested; now restarting";
1454 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1455 setnoncloexec(\*Server);
1456 $reexec->setenv(SOCKFDENV, fileno(Server).":$INO");
1457 $reexec->reexec($same_pid);
1458 setcloexec(\*Server) if fileno(Server) > $^F;
1459 statmsg "RESTART: continuing after failed restart: $!";
1460 chdir "/";
1461 cancel_jobd_restart($restarter) if $restarter;
1462 statmsg "RESTART: scheduled jobd restart has been cancelled" if $restarter;
1463 set_sigchld_reaper;
1466 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1467 statmsg "idle timeout (@{[human_duration($idle_timeout)]}) exceeded now exiting";
1468 exit 0;
1470 } while $nfound < 1;
1471 my $reout = $rout | $eout;
1472 if (vec($reout, fileno($piperead), 1)) {{
1473 my $nloff = -1;
1475 my $bytes;
1476 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1477 while (!defined($bytes) && $!{EINTR});
1478 last if !defined($bytes) && $!{EAGAIN};
1479 die "sysread failed: $!" unless defined $bytes;
1480 # since we always keep a copy of $pipewrite open EOF is fatal
1481 die "sysread returned EOF on pipe read" unless $bytes;
1482 $nloff = index($pipebuff, "\n", 0);
1483 if ($nloff < 0 && length($pipebuff) >= 512) {
1484 $pipebuff = '';
1485 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1487 redo unless $nloff >= 0;
1489 last unless $nloff >= 0;
1490 do {
1491 my $msg = substr($pipebuff, 0, $nloff);
1492 substr($pipebuff, 0, $nloff + 1) = '';
1493 $nloff = index($pipebuff, "\n", 0);
1494 process_pipe_msg($msg) if length($msg);
1495 } while $nloff >= 0;
1496 redo;
1498 next unless vec($reout, fileno(Server), 1);
1499 unless (accept(Client, Server)) {
1500 logmsg "accept failed: $!" unless $!{EINTR};
1501 next;
1503 logmsg "connection on $NAME";
1504 ++$penalty;
1505 spawn sub {
1506 my $inp = <STDIN>;
1507 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1508 chomp $inp if defined($inp);
1509 # ignore empty and "nop" connects
1510 defined($inp) && $inp ne "" && $inp ne "nop" or exit 0;
1511 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1512 defined($arg) or $arg = '';
1513 if ($cmd eq 'ref-changes') {
1514 ref_changes($arg);
1515 } elsif ($cmd eq 'clone') {
1516 clone($arg);
1517 } elsif ($cmd eq 'ref-change') {
1518 statmsg "processing obsolete ref-change message (please switch to ref-changes)";
1519 ref_change($arg);
1520 } elsif ($cmd eq 'throttle') {
1521 throttle($arg);
1522 } else {
1523 statmsg "ignoring unknown command: $cmd";
1524 exit 3;
1527 close Client;
1531 ## -------------
1532 ## Documentation
1533 ## -------------
1536 __END__
1538 =head1 NAME
1540 taskd.pl - Perform Girocco service tasks
1542 =head1 SYNOPSIS
1544 taskd.pl [options]
1546 Options:
1547 -h | --help detailed instructions
1548 -q | --quiet run quietly
1549 --no-quiet do not run quietly
1550 -P | --progress show occasional status updates
1551 -i | --inetd run as inetd unix stream wait service
1552 implies --quiet --syslog
1553 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1554 requires --inetd
1555 --daemon become a background daemon
1556 implies --quiet --syslog
1557 --max-lifetime=SECONDS how long before graceful restart
1558 default is 1 week, 0 disables
1559 -s | --syslog[=facility] send messages to syslog instead of
1560 stderr but see --stderr
1561 enabled by --inetd
1562 --no-syslog do not send message to syslog
1563 --stderr always send messages to stderr too
1564 --abbrev=n abbreviate hashes to n (default is 8)
1565 --show-fast-forward-info show fast-forward info (default is on)
1566 --no-show-fast-forward-info disable showing fast-forward info
1567 --same-pid keep same pid during graceful restart
1568 --no-same-pid do not keep same pid on graceful rstrt
1569 --status-interval=MINUTES status update interval (default 1)
1570 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1572 =head1 DESCRIPTION
1574 taskd.pl is Girocco's service request servant; it listens for service requests
1575 such as new clone requests and ref update notifications and spawns a task to
1576 perform the requested action.
1578 =head1 OPTIONS
1580 =over 8
1582 =item B<--help>
1584 Print the full description of taskd.pl's options.
1586 =item B<--quiet>
1588 Suppress non-error messages, e.g. for use when running this task as an inetd
1589 service. Enabled by default by --inetd.
1591 =item B<--no-quiet>
1593 Enable non-error messages. When running in --inetd mode these messages are
1594 sent to STDERR instead of STDOUT.
1596 =item B<--progress>
1598 Show information about the current status of the task operation occasionally.
1599 This is automatically enabled if --quiet is not given.
1601 =item B<--inetd>
1603 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1604 stream socket ready to have accept called on it. To be useful, the unix socket
1605 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1606 will be issued if the socket is not in the expected location. Socket file
1607 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1608 die. The --inetd option also enables the --quiet and --syslog options but
1609 --no-quiet and --no-syslog may be used to alter that.
1611 The correct specification for the inetd socket is a "unix" protocol "stream"
1612 socket in "wait" mode with user and group writable permissions (0660). An
1613 attempt will be made to alter the socket's file mode if needed and if that
1614 cannot be accomplished taskd.pl will die.
1616 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1617 in wait mode and will die if the passed in socket is already connected.
1619 Note that while *BSD's inetd happily supports unix sockets (and so does
1620 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1621 However, systemd does seem to.
1623 =item B<--idle-timeout=SECONDS>
1625 Only permitted when running in --inetd mode. After SECONDS of inactivity
1626 (i.e. all outstanding tasks have completed and no new requests have come in)
1627 exit normally. The default is no timeout at all (a SECONDS value of 0).
1628 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1630 =item B<--daemon>
1632 Fork and become a background daemon. Implies B<--syslog> and B<--quiet> (which
1633 can be altered by subsequent B<--no-syslog> and/or B<--no-quiet> options).
1634 Also implies B<--no-same-pid>, but since graceful restarts work by re-exec'ing
1635 taskd.pl with all of its original arguments, using B<--same-pid> won't really
1636 be effective with B<--daemon> since although it will cause the graceful restart
1637 exec to happen from the same pid, when the B<--daemon> option is subsequently
1638 processed it will end up in a new pid anyway.
1640 =item B<--max-lifetime=SECONDS>
1642 After taskd has been running for SECONDS of realtime, it will behave as though
1643 a graceful restart has been requested. A graceful restart takes place the
1644 next time taskd becomes idle (which may require up to 60 seconds to notice).
1645 If jobd is running when a graceful restart occurs, then jobd will also receive
1646 a graceful restart request at that time. The default value is 1 week (604800),
1647 set to 0 to disable.
1649 =item B<--syslog[=facility]>
1651 Normally error output is sent to STDERR. With this option it's sent to
1652 syslog instead. Note that when running in --inetd mode non-error output is
1653 also affected by this option as it's sent to STDERR in that case. If
1654 not specified, the default for facility is LOG_USER. Facility names are
1655 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1656 with the LOG_NOTICE priority.
1658 =item B<--no-syslog>
1660 Send error message output to STDERR but not syslog.
1662 =item B<--stderr>
1664 Always send error message output to STDERR. If --syslog is in effect then
1665 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1666 messages as well.
1668 =item B<--abbrev=n>
1670 Abbreviate displayed hash values to only the first n hexadecimal characters.
1671 The default is 8 characters. Set to 0 for no abbreviation at all.
1673 =item B<--show-fast-forward-info>
1675 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1676 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1677 This requires running an extra git command for each ref update that is not a
1678 creation or deletion in order to determine whether or not it's a fast forward.
1680 =item B<--no-show-fast-forward-info>
1682 Disable showing of fast-forward information for ref-change/ref-notify update
1683 messages. Instead just show a ' -> ' indicator.
1685 =item B<--same-pid>
1687 When performing a graceful restart, perform the graceful restart exec from
1688 the same pid rather than switching to a new one. This is implied when
1689 I<--daemon> is I<NOT> used.
1691 =item B<--no-same-pid>
1693 When performing a graceful restart, perform the graceful restart exec after
1694 switching to a new pid. This is implied when I<--daemon> I<IS> used.
1696 =item B<--status-interval=MINUTES>
1698 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1699 status updates are shown at each MINUTES interval. Setting the interval to 0
1700 disables them entirely even with --progress.
1702 =item B<--idle-status-interval=IDLEMINUTES>
1704 Two consecutive "idle" status updates with no intervening activity will not be
1705 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1706 Setting the interval to 0 prevents any consecutive idle updates (with no
1707 activity between them) from appearing at all.
1709 =back
1711 =cut