#! /usr/bin/env perl use strict; use utf8; use Hydra::Schema; use Hydra::Plugin; use Hydra::Helper::Nix; use Hydra::Helper::AddBuilds; use Hydra::Helper::Email; use Hydra::Model::DB; use Digest::SHA qw(sha256_hex); use Config::General; use Data::Dump qw(dump); use Try::Tiny; use Net::Statsd; use Time::HiRes qw(clock_gettime CLOCK_REALTIME); use JSON; use File::Slurp; STDOUT->autoflush(); STDERR->autoflush(1); binmode STDERR, ":encoding(utf8)"; my $db = Hydra::Model::DB->new(); my $config = getHydraConfig(); my $plugins = [Hydra::Plugin->instantiate(db => $db, config => $config)]; my $dryRun = defined $ENV{'HYDRA_DRY_RUN'}; sub fetchInputs { my ($project, $jobset, $inputInfo) = @_; foreach my $input ($jobset->jobsetinputs->all) { foreach my $alt ($input->jobsetinputalts->all) { push @{$$inputInfo{$input->name}}, $_ foreach fetchInput($plugins, $db, $project, $jobset, $input->name, $input->type, $alt->value, $input->emailresponsible); } } } sub setJobsetError { my ($jobset, $errorMsg) = @_; my $prevError = $jobset->errormsg; eval { txn_do($db, sub { $jobset->update({ errormsg => $errorMsg, errortime => time, fetcherrormsg => undef }); }); }; if (defined $errorMsg && $errorMsg ne ($prevError // "") || $ENV{'HYDRA_MAIL_TEST'}) { sendJobsetErrorNotification($jobset, $errorMsg); } } sub sendJobsetErrorNotification() { my ($jobset, $errorMsg) = @_; chomp $errorMsg; return if $jobset->project->owner->emailonerror == 0; return if $errorMsg eq ""; my $projectName = $jobset->project->name; my $jobsetName = $jobset->name; my $body = "Hi,\n" . "\n" . "This is to let you know that evaluation of the Hydra jobset ‘$projectName:$jobsetName’\n" . "resulted in the following error:\n" . "\n" . "$errorMsg" . "\n" . "Regards,\n\nThe Hydra build daemon.\n"; try { sendEmail( $config, $jobset->project->owner->emailaddress, "Hydra $projectName:$jobsetName evaluation error", $body, [ 'X-Hydra-Project' => $projectName , 'X-Hydra-Jobset' => $jobsetName ]); } catch { warn "error sending email: $_\n"; }; } sub permute { my @list = @_; for (my $n = scalar @list - 1; $n > 0; $n--) { my $k = int(rand($n + 1)); # 0 <= $k <= $n @list[$n, $k] = @list[$k, $n]; } return @list; } sub checkJobsetWrapped { my ($jobset) = @_; my $project = $jobset->project; my $jobsetsJobset = length($project->declfile) && $jobset->name eq ".jobsets"; my $inputInfo = {}; if ($jobsetsJobset) { my @declInputs = fetchInput($plugins, $db, $project, $jobset, "decl", $project->decltype, $project->declvalue, 0); my $declInput = @declInputs[0] or die "cannot find the input containing the declarative project specification\n"; die "multiple alternatives for the input containing the declarative project specificaiton are not supported\n" if scalar @declInputs != 1; my $declFile = $declInput->{storePath} . "/" . $project->declfile; my $declText = read_file($declFile) or die "Couldn't read declarative specification file $declFile: $!\n"; my $declSpec; eval { $declSpec = decode_json($declText); }; die "Declarative specification file $declFile not valid JSON: $@\n" if $@; updateDeclarativeJobset($db, $project, ".jobsets", $declSpec); $jobset->discard_changes; $inputInfo->{"declInput"} = [ $declInput ]; } my $exprType = $jobset->nixexprpath =~ /.scm$/ ? "guile" : "nix"; # Fetch all values for all inputs. my $checkoutStart = clock_gettime(CLOCK_REALTIME); eval { fetchInputs($project, $jobset, $inputInfo); }; my $fetchError = $@; Net::Statsd::increment("hydra.evaluator.checkouts"); my $checkoutStop = clock_gettime(CLOCK_REALTIME); Net::Statsd::timing("hydra.evaluator.checkout_time", int(($checkoutStop - $checkoutStart) * 1000)); if ($fetchError) { Net::Statsd::increment("hydra.evaluator.failed_checkouts"); print STDERR $fetchError; txn_do($db, sub { $jobset->update({ lastcheckedtime => time, fetcherrormsg => $fetchError }) if !$dryRun; }); return; } # Hash the arguments to hydra-eval-jobs and check the # JobsetInputHashes to see if the previous evaluation had the same # inputs. If so, bail out. my @args = ($jobset->nixexprinput, $jobset->nixexprpath, inputsToArgs($inputInfo, $exprType)); my $argsHash = sha256_hex("@args"); my $prevEval = getPrevJobsetEval($db, $jobset, 0); if (defined $prevEval && $prevEval->hash eq $argsHash && !$dryRun) { print STDERR " jobset is unchanged, skipping\n"; Net::Statsd::increment("hydra.evaluator.unchanged_checkouts"); txn_do($db, sub { $jobset->update({ lastcheckedtime => time, fetcherrormsg => undef }); }); return; } # Evaluate the job expression. my $evalStart = clock_gettime(CLOCK_REALTIME); my ($jobs, $nixExprInput) = evalJobs($inputInfo, $exprType, $jobset->nixexprinput, $jobset->nixexprpath); my $evalStop = clock_gettime(CLOCK_REALTIME); if ($jobsetsJobset) { my @keys = keys %$jobs; die "The .jobsets jobset must only have a single job named 'jobsets'" unless (scalar @keys) == 1 && $keys[0] eq "jobsets"; } Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000)); if ($dryRun) { foreach my $name (keys %{$jobs}) { my $job = $jobs->{$name}; if (defined $job->{drvPath}) { print STDERR "good job $name: $job->{drvPath}\n"; } else { print STDERR "failed job $name: $job->{error}\n"; } } return; } $jobs->{$_}->{jobName} = $_ for keys %{$jobs}; my $jobOutPathMap = {}; my $jobsetChanged = 0; my $dbStart = clock_gettime(CLOCK_REALTIME); txn_do($db, sub { my $prevEval = getPrevJobsetEval($db, $jobset, 1); # Clear the "current" flag on all builds. Since we're in a # transaction this will only become visible after the new # current builds have been added. $jobset->builds->search({iscurrent => 1})->update({iscurrent => 0}); # Schedule each successfully evaluated job. my %buildMap; foreach my $job (permute(values %{$jobs})) { next if defined $job->{error}; #print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n"; checkBuild($db, $jobset, $inputInfo, $nixExprInput, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins); } # Have any builds been added or removed since last time? $jobsetChanged = (scalar(grep { $_->{new} } values(%buildMap)) > 0) || (defined $prevEval && $prevEval->jobsetevalmembers->count != scalar(keys %buildMap)); my $ev = $jobset->jobsetevals->create( { hash => $argsHash , timestamp => time , checkouttime => abs(int($checkoutStop - $checkoutStart)) , evaltime => abs(int($evalStop - $evalStart)) , hasnewbuilds => $jobsetChanged ? 1 : 0 , nrbuilds => $jobsetChanged ? scalar(keys %buildMap) : undef }); if ($jobsetChanged) { # Create JobsetEvalMembers mappings. while (my ($id, $x) = each %buildMap) { $ev->jobsetevalmembers->create({ build => $id, isnew => $x->{new} }); } # Create AggregateConstituents mappings. Since there can # be jobs that alias each other, if there are multiple # builds for the same derivation, pick the one with the # shortest name. my %drvPathToId; while (my ($id, $x) = each %buildMap) { my $y = $drvPathToId{$x->{drvPath}}; if (defined $y) { next if length $x->{jobName} > length $y->{jobName}; next if length $x->{jobName} == length $y->{jobName} && $x->{jobName} ge $y->{jobName}; } $drvPathToId{$x->{drvPath}} = $x; } foreach my $job (values %{$jobs}) { next unless $job->{constituents}; my $x = $drvPathToId{$job->{drvPath}} or die; foreach my $drvPath (split / /, $job->{constituents}) { my $constituent = $drvPathToId{$drvPath}; if (defined $constituent) { $db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}}); } else { warn "aggregate job ‘$job->{jobName}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n"; } } } foreach my $name (keys %{$inputInfo}) { for (my $n = 0; $n < scalar(@{$inputInfo->{$name}}); $n++) { my $input = $inputInfo->{$name}->[$n]; $ev->jobsetevalinputs->create( { name => $name , altnr => $n , type => $input->{type} , uri => $input->{uri} , revision => $input->{revision} , value => $input->{value} , dependency => $input->{id} , path => $input->{storePath} || "" # !!! temporary hack , sha256hash => $input->{sha256hash} }); } } print STDERR " created new eval ", $ev->id, "\n"; $ev->builds->update({iscurrent => 1}); } else { print STDERR " created cached eval ", $ev->id, "\n"; $prevEval->builds->update({iscurrent => 1}) if defined $prevEval; } # If this is a one-shot jobset, disable it now. $jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ lastcheckedtime => time }); }); my $dbStop = clock_gettime(CLOCK_REALTIME); Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000)); Net::Statsd::increment("hydra.evaluator.evals"); Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged; # Store the error messages for jobs that failed to evaluate. my $msg = ""; foreach my $job (values %{$jobs}) { next unless defined $job->{error}; $msg .= ($job->{jobName} ne "" ? "in job ‘$job->{jobName}’" : "at top-level") . ":\n" . $job->{error} . "\n\n"; } setJobsetError($jobset, $msg); } sub checkJobset { my ($jobset) = @_; print STDERR "considering jobset ", $jobset->project->name, ":", $jobset->name, $jobset->lastcheckedtime ? " (last checked " . (time() - $jobset->lastcheckedtime) . "s ago)\n" : " (never checked)\n"; my $triggerTime = $jobset->triggertime; my $startTime = clock_gettime(CLOCK_REALTIME); eval { checkJobsetWrapped($jobset); }; my $checkError = $@; my $stopTime = clock_gettime(CLOCK_REALTIME); Net::Statsd::timing("hydra.evaluator.total_time", int(($stopTime - $startTime) * 1000)); my $failed = 0; if ($checkError) { print STDERR $checkError; txn_do($db, sub { $jobset->update({lastcheckedtime => time}); setJobsetError($jobset, $checkError); }) if !$dryRun; $failed = 1; } if (defined $triggerTime) { txn_do($db, sub { # Only clear the trigger time if the jobset hasn't been # triggered in the meantime. In that case, we need to # evaluate again. my $new = $jobset->get_from_storage(); $jobset->update({ triggertime => undef }) if $new->triggertime == $triggerTime; }) if !$dryRun; } return $failed; } sub checkSomeJobset { # If any jobset has been triggered by a push, check it. my ($jobset) = $db->resultset('Jobsets')->search( { 'triggertime' => { '!=', undef } }, { join => 'project', order_by => [ 'triggertime' ], rows => 1 }); # Otherwise, check the jobset that hasn't been checked for the # longest time (but don't check more often than the jobset's # minimal check interval). ($jobset) = $db->resultset('Jobsets')->search( { 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 }, , 'checkinterval' => { '!=', 0 } , -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] }, { join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 }) unless defined $jobset; return 0 unless defined $jobset; return system($0, $jobset->project->name, $jobset->name) == 0; } if (scalar @ARGV == 2) { my $projectName = $ARGV[0]; my $jobsetName = $ARGV[1]; my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or die "$0: specified jobset does not exist\n"; exit checkJobset($jobset); } while (1) { eval { if (checkSomeJobset) { # Just so we don't go completely crazy if lastcheckedtime # isn't updated properly. sleep 1; } else { # print STDERR "sleeping...\n"; sleep 30; } }; if ($@) { print STDERR "$@"; } }