hydra-eval-jobset: Use nix-eval-jobs
instead of hydra-eval-jobs
incrementally ingest eval results nix-eval-jobs streams output, unlike hydra-eval-jobs. Now that we've migrated, we can use this to: 1. Use less RAM by avoiding buffering a whole eval's worth of metadata into a Perl string and an array of JSON objects. 2. Make evals latency a bit lower by allowing the queue runner to start ingesting builds faster. Also use the newly-restored constituents support in `nix-eval-jobs` Note, we pass --workers and --max-memory-size to n-e-j Lost in the h-e-j -> n-e-j migration, causing evaluation to always be single threaded and limited to 4GiB RAM. Follow the config settings like h-e-j used to do (via C++ code). `nix-eval-jobs` should check `hydraJobs` and then `checks` with flakes (cherry picked from commit 6d4ccff43c41adaf6e4b2b9bced7243bc2f6e97b) (cherry picked from commit b0e9b4b2f99f9d8f5c4e780e89f955c394b5ced4) (cherry picked from commit cdfc5c81e8037d3e4818a3e459d0804b2c157ea9) (cherry picked from commit 4b107e6ff36bd89958fba36e0fe0340903e7cd13) Co-Authored-By: Maximilian Bosch <maximilian@mbosch.me>
This commit is contained in:
committed by
John Ericson
parent
0c9726af59
commit
d84ff32ce6
@ -17,6 +17,7 @@ use Hydra::Helper::Nix;
|
||||
use Hydra::Model::DB;
|
||||
use Hydra::Plugin;
|
||||
use Hydra::Schema;
|
||||
use IPC::Run;
|
||||
use JSON::MaybeXS;
|
||||
use Net::Statsd;
|
||||
use Nix::Store;
|
||||
@ -357,22 +358,32 @@ sub evalJobs {
|
||||
my @cmd;
|
||||
|
||||
if (defined $flakeRef) {
|
||||
@cmd = ("hydra-eval-jobs",
|
||||
"--flake", $flakeRef,
|
||||
"--gc-roots-dir", getGCRootsDir,
|
||||
"--max-jobs", 1);
|
||||
my $nix_expr =
|
||||
"let " .
|
||||
"flake = builtins.getFlake (toString \"$flakeRef\"); " .
|
||||
"in " .
|
||||
"flake.hydraJobs " .
|
||||
"or flake.checks " .
|
||||
"or (throw \"flake '$flakeRef' does not provide any Hydra jobs or checks\")";
|
||||
|
||||
@cmd = ("nix-eval-jobs", "--expr", $nix_expr);
|
||||
} else {
|
||||
my $nixExprInput = $inputInfo->{$nixExprInputName}->[0]
|
||||
or die "cannot find the input containing the job expression\n";
|
||||
|
||||
@cmd = ("hydra-eval-jobs",
|
||||
@cmd = ("nix-eval-jobs",
|
||||
"<" . $nixExprInputName . "/" . $nixExprPath . ">",
|
||||
"--gc-roots-dir", getGCRootsDir,
|
||||
"--max-jobs", 1,
|
||||
inputsToArgs($inputInfo));
|
||||
}
|
||||
|
||||
push @cmd, "--no-allow-import-from-derivation" if $config->{allow_import_from_derivation} // "true" ne "true";
|
||||
push @cmd, ("--gc-roots-dir", getGCRootsDir);
|
||||
push @cmd, ("--max-jobs", 1);
|
||||
push @cmd, "--meta";
|
||||
push @cmd, "--constituents";
|
||||
push @cmd, "--force-recurse";
|
||||
push @cmd, ("--option", "allow-import-from-derivation", "false") if $config->{allow_import_from_derivation} // "true" ne "true";
|
||||
push @cmd, ("--workers", $config->{evaluator_workers} // 1);
|
||||
push @cmd, ("--max-memory-size", $config->{evaluator_max_memory_size} // 4096);
|
||||
|
||||
if (defined $ENV{'HYDRA_DEBUG'}) {
|
||||
sub escape {
|
||||
@ -384,14 +395,33 @@ sub evalJobs {
|
||||
print STDERR "evaluator: @escaped\n";
|
||||
}
|
||||
|
||||
(my $res, my $jobsJSON, my $stderr) = captureStdoutStderr(21600, @cmd);
|
||||
die "hydra-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8))
|
||||
. ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n")
|
||||
if $res;
|
||||
my $evalProc = IPC::Run::start \@cmd,
|
||||
'>', IPC::Run::new_chunker, \my $out,
|
||||
'2>', \my $err;
|
||||
|
||||
print STDERR "$stderr";
|
||||
return sub {
|
||||
while (1) {
|
||||
$evalProc->pump;
|
||||
if (!defined $out && !defined $err) {
|
||||
$evalProc->finish;
|
||||
if ($?) {
|
||||
die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n";
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
return decode_json($jobsJSON);
|
||||
if (defined $err) {
|
||||
print STDERR "$err";
|
||||
undef $err;
|
||||
}
|
||||
|
||||
if (defined $out && $out ne '') {
|
||||
my $job = decode_json($out);
|
||||
undef $out;
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -420,7 +450,7 @@ sub checkBuild {
|
||||
my $firstOutputName = $outputNames[0];
|
||||
my $firstOutputPath = $buildInfo->{outputs}->{$firstOutputName};
|
||||
|
||||
my $jobName = $buildInfo->{jobName} or die;
|
||||
my $jobName = $buildInfo->{attr} or die;
|
||||
my $drvPath = $buildInfo->{drvPath} or die;
|
||||
|
||||
my $build;
|
||||
@ -474,9 +504,30 @@ sub checkBuild {
|
||||
|
||||
my $time = time();
|
||||
|
||||
sub null {
|
||||
my ($s) = @_;
|
||||
return $s eq "" ? undef : $s;
|
||||
sub getMeta {
|
||||
my ($s, $def) = @_;
|
||||
return ($s || "") eq "" ? $def : $s;
|
||||
}
|
||||
|
||||
sub getMetaStrings {
|
||||
my ($v, $k, $acc) = @_;
|
||||
my $t = ref $v;
|
||||
|
||||
if ($t eq 'HASH') {
|
||||
push @$acc, $v->{$k} if exists $v->{$k};
|
||||
} elsif ($t eq 'ARRAY') {
|
||||
getMetaStrings($_, $k, $acc) foreach @$v;
|
||||
} elsif (defined $v) {
|
||||
push @$acc, $v;
|
||||
}
|
||||
}
|
||||
|
||||
sub getMetaConcatStrings {
|
||||
my ($v, $k) = @_;
|
||||
|
||||
my @strings;
|
||||
getMetaStrings($v, $k, \@strings);
|
||||
return join(", ", @strings) || undef;
|
||||
}
|
||||
|
||||
# Add the build to the database.
|
||||
@ -484,19 +535,19 @@ sub checkBuild {
|
||||
{ timestamp => $time
|
||||
, jobset_id => $jobset->id
|
||||
, job => $jobName
|
||||
, description => null($buildInfo->{description})
|
||||
, license => null($buildInfo->{license})
|
||||
, homepage => null($buildInfo->{homepage})
|
||||
, maintainers => null($buildInfo->{maintainers})
|
||||
, maxsilent => $buildInfo->{maxSilent}
|
||||
, timeout => $buildInfo->{timeout}
|
||||
, nixname => $buildInfo->{nixName}
|
||||
, description => getMeta($buildInfo->{meta}->{description}, undef)
|
||||
, license => getMetaConcatStrings($buildInfo->{meta}->{license}, "shortName")
|
||||
, homepage => getMeta($buildInfo->{meta}->{homepage}, undef)
|
||||
, maintainers => getMetaConcatStrings($buildInfo->{meta}->{maintainers}, "email")
|
||||
, maxsilent => getMeta($buildInfo->{meta}->{maxSilent}, 7200)
|
||||
, timeout => getMeta($buildInfo->{meta}->{timeout}, 36000)
|
||||
, nixname => $buildInfo->{name}
|
||||
, drvpath => $drvPath
|
||||
, system => $buildInfo->{system}
|
||||
, priority => $buildInfo->{schedulingPriority}
|
||||
, priority => getMeta($buildInfo->{meta}->{schedulingPriority}, 100)
|
||||
, finished => 0
|
||||
, iscurrent => 1
|
||||
, ischannel => $buildInfo->{isChannel}
|
||||
, ischannel => getMeta($buildInfo->{meta}->{isChannel}, 0)
|
||||
});
|
||||
|
||||
$build->buildoutputs->create({ name => $_, path => $buildInfo->{outputs}->{$_} })
|
||||
@ -665,7 +716,7 @@ sub checkJobsetWrapped {
|
||||
return;
|
||||
}
|
||||
|
||||
# Hash the arguments to hydra-eval-jobs and check the
|
||||
# Hash the arguments to nix-eval-jobs and check the
|
||||
# JobsetInputHashes to see if the previous evaluation had the same
|
||||
# inputs. If so, bail out.
|
||||
my @args = ($jobset->nixexprinput // "", $jobset->nixexprpath // "", inputsToArgs($inputInfo));
|
||||
@ -687,19 +738,12 @@ sub checkJobsetWrapped {
|
||||
|
||||
# Evaluate the job expression.
|
||||
my $evalStart = clock_gettime(CLOCK_MONOTONIC);
|
||||
my $jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
|
||||
my $evalStop = clock_gettime(CLOCK_MONOTONIC);
|
||||
|
||||
if ($jobsetsJobset) {
|
||||
my @keys = keys %$jobs;
|
||||
die "The .jobsets jobset must only have a single job named 'jobsets'"
|
||||
unless (scalar @keys) == 1 && $keys[0] eq "jobsets";
|
||||
}
|
||||
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
|
||||
my $evalStop;
|
||||
my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
|
||||
|
||||
if ($dryRun) {
|
||||
foreach my $name (keys %{$jobs}) {
|
||||
my $job = $jobs->{$name};
|
||||
while (defined(my $job = $jobsIter->())) {
|
||||
my $name = $job->{attr};
|
||||
if (defined $job->{drvPath}) {
|
||||
print STDERR "good job $name: $job->{drvPath}\n";
|
||||
} else {
|
||||
@ -709,36 +753,20 @@ sub checkJobsetWrapped {
|
||||
return;
|
||||
}
|
||||
|
||||
die "Jobset contains a job with an empty name. Make sure the jobset evaluates to an attrset of jobs.\n"
|
||||
if defined $jobs->{""};
|
||||
|
||||
$jobs->{$_}->{jobName} = $_ for keys %{$jobs};
|
||||
|
||||
my $jobOutPathMap = {};
|
||||
my $jobsetChanged = 0;
|
||||
my $dbStart = clock_gettime(CLOCK_MONOTONIC);
|
||||
|
||||
|
||||
# Store the error messages for jobs that failed to evaluate.
|
||||
my $evaluationErrorTime = time;
|
||||
my $evaluationErrorMsg = "";
|
||||
foreach my $job (values %{$jobs}) {
|
||||
next unless defined $job->{error};
|
||||
$evaluationErrorMsg .=
|
||||
($job->{jobName} ne "" ? "in job ‘$job->{jobName}’" : "at top-level") .
|
||||
":\n" . $job->{error} . "\n\n";
|
||||
}
|
||||
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
|
||||
|
||||
my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create(
|
||||
{ errormsg => $evaluationErrorMsg
|
||||
, errortime => $evaluationErrorTime
|
||||
}
|
||||
);
|
||||
|
||||
my $jobOutPathMap = {};
|
||||
my $jobsetChanged = 0;
|
||||
my %buildMap;
|
||||
$db->txn_do(sub {
|
||||
|
||||
$db->txn_do(sub {
|
||||
my $prevEval = getPrevJobsetEval($db, $jobset, 1);
|
||||
|
||||
# Clear the "current" flag on all builds. Since we're in a
|
||||
@ -751,7 +779,7 @@ sub checkJobsetWrapped {
|
||||
, evaluationerror => $evaluationErrorRecord
|
||||
, timestamp => time
|
||||
, checkouttime => abs(int($checkoutStop - $checkoutStart))
|
||||
, evaltime => abs(int($evalStop - $evalStart))
|
||||
, evaltime => 0
|
||||
, hasnewbuilds => 0
|
||||
, nrbuilds => 0
|
||||
, flake => $flakeRef
|
||||
@ -759,11 +787,24 @@ sub checkJobsetWrapped {
|
||||
, nixexprpath => $jobset->nixexprpath
|
||||
});
|
||||
|
||||
# Schedule each successfully evaluated job.
|
||||
foreach my $job (permute(values %{$jobs})) {
|
||||
next if defined $job->{error};
|
||||
#print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n";
|
||||
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins);
|
||||
my @jobsWithConstituents;
|
||||
|
||||
while (defined(my $job = $jobsIter->())) {
|
||||
if ($jobsetsJobset) {
|
||||
die "The .jobsets jobset must only have a single job named 'jobsets'"
|
||||
unless $job->{attr} eq "jobsets";
|
||||
}
|
||||
|
||||
$evaluationErrorMsg .=
|
||||
($job->{attr} ne "" ? "in job ‘$job->{attr}’" : "at top-level") .
|
||||
":\n" . $job->{error} . "\n\n" if defined $job->{error};
|
||||
|
||||
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins)
|
||||
unless defined $job->{error};
|
||||
|
||||
if (defined $job->{constituents}) {
|
||||
push @jobsWithConstituents, $job;
|
||||
}
|
||||
}
|
||||
|
||||
# Have any builds been added or removed since last time?
|
||||
@ -801,21 +842,20 @@ sub checkJobsetWrapped {
|
||||
$drvPathToId{$x->{drvPath}} = $x;
|
||||
}
|
||||
|
||||
foreach my $job (values %{$jobs}) {
|
||||
next unless $job->{constituents};
|
||||
|
||||
foreach my $job (values @jobsWithConstituents) {
|
||||
next unless defined $job->{constituents};
|
||||
if (defined $job->{error}) {
|
||||
die "aggregate job ‘$job->{jobName}’ failed with the error: $job->{error}\n";
|
||||
die "aggregate job ‘$job->{attr}’ failed with the error: $job->{error}\n";
|
||||
}
|
||||
|
||||
my $x = $drvPathToId{$job->{drvPath}} or
|
||||
die "aggregate job ‘$job->{jobName}’ has no corresponding build record.\n";
|
||||
die "aggregate job ‘$job->{attr}’ has no corresponding build record.\n";
|
||||
foreach my $drvPath (@{$job->{constituents}}) {
|
||||
my $constituent = $drvPathToId{$drvPath};
|
||||
if (defined $constituent) {
|
||||
$db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}});
|
||||
} else {
|
||||
warn "aggregate job ‘$job->{jobName}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n";
|
||||
warn "aggregate job ‘$job->{attr}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -857,11 +897,15 @@ sub checkJobsetWrapped {
|
||||
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2;
|
||||
|
||||
$jobset->update({ lastcheckedtime => time, forceeval => undef });
|
||||
|
||||
$evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg });
|
||||
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
|
||||
|
||||
$evalStop = clock_gettime(CLOCK_MONOTONIC);
|
||||
$ev->update({ evaltime => abs(int($evalStop - $evalStart)) });
|
||||
});
|
||||
|
||||
my $dbStop = clock_gettime(CLOCK_MONOTONIC);
|
||||
|
||||
Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000));
|
||||
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
|
||||
Net::Statsd::increment("hydra.evaluator.evals");
|
||||
Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged;
|
||||
}
|
||||
|
Reference in New Issue
Block a user