Merge branch 'NixOS:master' into add-gitea-pulls

This commit is contained in:
Faye Chun 2025-03-01 22:04:13 -05:00 committed by GitHub
commit 99e3ad325c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 538 additions and 976 deletions

54
flake.lock generated
View File

@ -1,30 +1,10 @@
{ {
"nodes": { "nodes": {
"libgit2": {
"flake": false,
"locked": {
"lastModified": 1715853528,
"narHash": "sha256-J2rCxTecyLbbDdsyBWn9w7r3pbKRMkI9E7RvRgAqBdY=",
"owner": "libgit2",
"repo": "libgit2",
"rev": "36f7e21ad757a3dacc58cf7944329da6bc1d6e96",
"type": "github"
},
"original": {
"owner": "libgit2",
"ref": "v1.8.1",
"repo": "libgit2",
"type": "github"
}
},
"nix": { "nix": {
"inputs": { "inputs": {
"flake-compat": [], "flake-compat": [],
"flake-parts": [], "flake-parts": [],
"git-hooks-nix": [], "git-hooks-nix": [],
"libgit2": [
"libgit2"
],
"nixpkgs": [ "nixpkgs": [
"nixpkgs" "nixpkgs"
], ],
@ -32,40 +12,56 @@
"nixpkgs-regression": [] "nixpkgs-regression": []
}, },
"locked": { "locked": {
"lastModified": 1726787955, "lastModified": 1739899400,
"narHash": "sha256-XFznzb8L4SdUm9u+w3DPpMWJhffuv+/6+aiVl00slns=", "narHash": "sha256-q/RgA4bB7zWai4oPySq9mch7qH14IEeom2P64SXdqHs=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nix", "repo": "nix",
"rev": "a7fdef6858dd45b9d7bda7c92324c63faee7f509", "rev": "e310c19a1aeb1ce1ed4d41d5ab2d02db596e0918",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "2.24-maintenance", "ref": "2.26-maintenance",
"repo": "nix", "repo": "nix",
"type": "github" "type": "github"
} }
}, },
"nix-eval-jobs": {
"flake": false,
"locked": {
"lastModified": 1739500569,
"narHash": "sha256-3wIReAqdTALv39gkWXLMZQvHyBOc3yPkWT2ZsItxedY=",
"owner": "nix-community",
"repo": "nix-eval-jobs",
"rev": "4b392b284877d203ae262e16af269f702df036bc",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-eval-jobs",
"type": "github"
}
},
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1726688310, "lastModified": 1739461644,
"narHash": "sha256-Xc9lEtentPCEtxc/F1e6jIZsd4MPDYv4Kugl9WtXlz0=", "narHash": "sha256-1o1qR0KYozYGRrnqytSpAhVBYLNBHX+Lv6I39zGRzKM=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "dbebdd67a6006bb145d98c8debf9140ac7e651d0", "rev": "97a719c9f0a07923c957cf51b20b329f9fb9d43f",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-24.05-small", "ref": "nixos-24.11-small",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }
}, },
"root": { "root": {
"inputs": { "inputs": {
"libgit2": "libgit2",
"nix": "nix", "nix": "nix",
"nix-eval-jobs": "nix-eval-jobs",
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
} }
} }

View File

@ -1,21 +1,27 @@
{ {
description = "A Nix-based continuous build system"; description = "A Nix-based continuous build system";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.05-small"; inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11-small";
inputs.libgit2 = { url = "github:libgit2/libgit2/v1.8.1"; flake = false; }; inputs.nix = {
inputs.nix.url = "github:NixOS/nix/2.24-maintenance"; url = "github:NixOS/nix/2.26-maintenance";
inputs.nix.inputs.nixpkgs.follows = "nixpkgs"; inputs.nixpkgs.follows = "nixpkgs";
inputs.nix.inputs.libgit2.follows = "libgit2";
# hide nix dev tooling from our lock file # hide nix dev tooling from our lock file
inputs.nix.inputs.flake-parts.follows = ""; inputs.flake-parts.follows = "";
inputs.nix.inputs.git-hooks-nix.follows = ""; inputs.git-hooks-nix.follows = "";
inputs.nix.inputs.nixpkgs-regression.follows = ""; inputs.nixpkgs-regression.follows = "";
inputs.nix.inputs.nixpkgs-23-11.follows = ""; inputs.nixpkgs-23-11.follows = "";
inputs.nix.inputs.flake-compat.follows = ""; inputs.flake-compat.follows = "";
};
outputs = { self, nixpkgs, nix, ... }: inputs.nix-eval-jobs = {
url = "github:nix-community/nix-eval-jobs";
# We want to control the deps precisely
flake = false;
};
outputs = { self, nixpkgs, nix, nix-eval-jobs, ... }:
let let
systems = [ "x86_64-linux" "aarch64-linux" ]; systems = [ "x86_64-linux" "aarch64-linux" ];
forEachSystem = nixpkgs.lib.genAttrs systems; forEachSystem = nixpkgs.lib.genAttrs systems;
@ -24,6 +30,7 @@
# A Nixpkgs overlay that provides a 'hydra' package. # A Nixpkgs overlay that provides a 'hydra' package.
overlays.default = final: prev: { overlays.default = final: prev: {
nix-eval-jobs = final.callPackage nix-eval-jobs {};
hydra = final.callPackage ./package.nix { hydra = final.callPackage ./package.nix {
inherit (nixpkgs.lib) fileset; inherit (nixpkgs.lib) fileset;
rawSrc = self; rawSrc = self;
@ -67,10 +74,19 @@
}); });
packages = forEachSystem (system: { packages = forEachSystem (system: {
nix-eval-jobs = nixpkgs.legacyPackages.${system}.callPackage nix-eval-jobs {
nix = nix.packages.${system}.nix;
};
hydra = nixpkgs.legacyPackages.${system}.callPackage ./package.nix { hydra = nixpkgs.legacyPackages.${system}.callPackage ./package.nix {
inherit (nixpkgs.lib) fileset; inherit (nixpkgs.lib) fileset;
inherit (self.packages.${system}) nix-eval-jobs;
rawSrc = self; rawSrc = self;
nix = nix.packages.${system}.nix; inherit (nix.packages.${system})
nix-util
nix-store
nix-main
nix-cli
;
nix-perl-bindings = nix.hydraJobs.perlBindings.${system}; nix-perl-bindings = nix.hydraJobs.perlBindings.${system};
}; };
default = self.packages.${system}.hydra; default = self.packages.${system}.hydra;

View File

@ -8,22 +8,22 @@ project('hydra', 'cpp',
], ],
) )
nix_util_dep = dependency('nix-util', required: true)
nix_store_dep = dependency('nix-store', required: true) nix_store_dep = dependency('nix-store', required: true)
nix_main_dep = dependency('nix-main', required: true) nix_main_dep = dependency('nix-main', required: true)
nix_expr_dep = dependency('nix-expr', required: true)
nix_flake_dep = dependency('nix-flake', required: true)
nix_cmd_dep = dependency('nix-cmd', required: true)
# Nix need extra flags not provided in its pkg-config files. # Nix need extra flags not provided in its pkg-config files.
nix_dep = declare_dependency( nix_dep = declare_dependency(
dependencies: [ dependencies: [
nix_util_dep,
nix_store_dep, nix_store_dep,
nix_main_dep, nix_main_dep,
nix_expr_dep,
nix_flake_dep,
nix_cmd_dep,
], ],
compile_args: ['-include', 'nix/config.h'], compile_args: [
'-include', 'nix/config-util.hh',
'-include', 'nix/config-store.hh',
'-include', 'nix/config-main.hh',
],
) )
pqxx_dep = dependency('libpqxx', required: true) pqxx_dep = dependency('libpqxx', required: true)

View File

@ -8,7 +8,10 @@
, perlPackages , perlPackages
, nix , nix-util
, nix-store
, nix-main
, nix-cli
, nix-perl-bindings , nix-perl-bindings
, git , git
@ -50,6 +53,7 @@
, xz , xz
, gnutar , gnutar
, gnused , gnused
, nix-eval-jobs
, rpm , rpm
, dpkg , dpkg
@ -161,7 +165,7 @@ stdenv.mkDerivation (finalAttrs: {
nukeReferences nukeReferences
pkg-config pkg-config
mdbook mdbook
nix nix-cli
perlDeps perlDeps
perl perl
unzip unzip
@ -171,7 +175,9 @@ stdenv.mkDerivation (finalAttrs: {
libpqxx libpqxx
openssl openssl
libxslt libxslt
nix nix-util
nix-store
nix-main
perlDeps perlDeps
perl perl
boost boost
@ -190,6 +196,7 @@ stdenv.mkDerivation (finalAttrs: {
openldap openldap
postgresql_13 postgresql_13
pixz pixz
nix-eval-jobs
]; ];
checkInputs = [ checkInputs = [
@ -197,13 +204,14 @@ stdenv.mkDerivation (finalAttrs: {
glibcLocales glibcLocales
libressl.nc libressl.nc
python3 python3
nix-cli
]; ];
hydraPath = lib.makeBinPath ( hydraPath = lib.makeBinPath (
[ [
subversion subversion
openssh openssh
nix nix-cli
coreutils coreutils
findutils findutils
pixz pixz
@ -218,6 +226,7 @@ stdenv.mkDerivation (finalAttrs: {
darcs darcs
gnused gnused
breezy breezy
nix-eval-jobs
] ++ lib.optionals stdenv.isLinux [ rpm dpkg cdrkit ] ] ++ lib.optionals stdenv.isLinux [ rpm dpkg cdrkit ]
); );
@ -232,7 +241,7 @@ stdenv.mkDerivation (finalAttrs: {
shellHook = '' shellHook = ''
pushd $(git rev-parse --show-toplevel) >/dev/null pushd $(git rev-parse --show-toplevel) >/dev/null
PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-eval-jobs:$(pwd)/src/hydra-queue-runner:$PATH PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-queue-runner:$PATH
PERL5LIB=$(pwd)/src/lib:$PERL5LIB PERL5LIB=$(pwd)/src/lib:$PERL5LIB
export HYDRA_HOME="$(pwd)/src/" export HYDRA_HOME="$(pwd)/src/"
mkdir -p .hydra-data mkdir -p .hydra-data
@ -263,12 +272,13 @@ stdenv.mkDerivation (finalAttrs: {
--prefix PATH ':' $out/bin:$hydraPath \ --prefix PATH ':' $out/bin:$hydraPath \
--set HYDRA_RELEASE ${version} \ --set HYDRA_RELEASE ${version} \
--set HYDRA_HOME $out/libexec/hydra \ --set HYDRA_HOME $out/libexec/hydra \
--set NIX_RELEASE ${nix.name or "unknown"} --set NIX_RELEASE ${nix-cli.name or "unknown"} \
--set NIX_EVAL_JOBS_RELEASE ${nix-eval-jobs.name or "unknown"}
done done
''; '';
dontStrip = true; dontStrip = true;
meta.description = "Build of Hydra on ${stdenv.system}"; meta.description = "Build of Hydra on ${stdenv.system}";
passthru = { inherit perlDeps nix; }; passthru = { inherit perlDeps; };
}) })

View File

@ -1,587 +0,0 @@
#include <iostream>
#include <thread>
#include <optional>
#include <unordered_map>
#include "shared.hh"
#include "store-api.hh"
#include "eval.hh"
#include "eval-gc.hh"
#include "eval-inline.hh"
#include "eval-settings.hh"
#include "signals.hh"
#include "terminal.hh"
#include "util.hh"
#include "get-drvs.hh"
#include "globals.hh"
#include "common-eval-args.hh"
#include "flake/flakeref.hh"
#include "flake/flake.hh"
#include "attr-path.hh"
#include "derivations.hh"
#include "local-fs-store.hh"
#include "hydra-config.hh"
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <nlohmann/json.hpp>
void check_pid_status_nonblocking(pid_t check_pid)
{
// Only check 'initialized' and known PID's
if (check_pid <= 0) { return; }
int wstatus = 0;
pid_t pid = waitpid(check_pid, &wstatus, WNOHANG);
// -1 = failure, WNOHANG: 0 = no change
if (pid <= 0) { return; }
std::cerr << "child process (" << pid << ") ";
if (WIFEXITED(wstatus)) {
std::cerr << "exited with status=" << WEXITSTATUS(wstatus) << std::endl;
} else if (WIFSIGNALED(wstatus)) {
std::cerr << "killed by signal=" << WTERMSIG(wstatus) << std::endl;
} else if (WIFSTOPPED(wstatus)) {
std::cerr << "stopped by signal=" << WSTOPSIG(wstatus) << std::endl;
} else if (WIFCONTINUED(wstatus)) {
std::cerr << "continued" << std::endl;
}
}
using namespace nix;
static Path gcRootsDir;
static size_t maxMemorySize;
struct MyArgs : MixEvalArgs, MixCommonArgs, RootArgs
{
Path releaseExpr;
bool flake = false;
bool dryRun = false;
MyArgs() : MixCommonArgs("hydra-eval-jobs")
{
addFlag({
.longName = "gc-roots-dir",
.description = "garbage collector roots directory",
.labels = {"path"},
.handler = {&gcRootsDir}
});
addFlag({
.longName = "dry-run",
.description = "don't create store derivations",
.handler = {&dryRun, true}
});
addFlag({
.longName = "flake",
.description = "build a flake",
.handler = {&flake, true}
});
expectArg("expr", &releaseExpr);
}
};
static MyArgs myArgs;
static std::string queryMetaStrings(EvalState & state, PackageInfo & drv, const std::string & name, const std::string & subAttribute)
{
Strings res;
std::function<void(Value & v)> rec;
rec = [&](Value & v) {
state.forceValue(v, noPos);
if (v.type() == nString)
res.emplace_back(v.string_view());
else if (v.isList())
for (unsigned int n = 0; n < v.listSize(); ++n)
rec(*v.listElems()[n]);
else if (v.type() == nAttrs) {
auto a = v.attrs()->find(state.symbols.create(subAttribute));
if (a != v.attrs()->end())
res.push_back(std::string(state.forceString(*a->value, a->pos, "while evaluating meta attributes")));
}
};
Value * v = drv.queryMeta(name);
if (v) rec(*v);
return concatStringsSep(", ", res);
}
static void worker(
EvalState & state,
Bindings & autoArgs,
AutoCloseFD & to,
AutoCloseFD & from)
{
Value vTop;
if (myArgs.flake) {
using namespace flake;
auto [flakeRef, fragment, outputSpec] = parseFlakeRefWithFragmentAndExtendedOutputsSpec(fetchSettings, myArgs.releaseExpr, absPath("."));
auto vFlake = state.allocValue();
auto lockedFlake = lockFlake(
flakeSettings,
state,
flakeRef,
LockFlags {
.updateLockFile = false,
.useRegistries = false,
.allowUnlocked = false,
});
callFlake(state, lockedFlake, *vFlake);
auto vOutputs = vFlake->attrs()->get(state.symbols.create("outputs"))->value;
state.forceValue(*vOutputs, noPos);
auto aHydraJobs = vOutputs->attrs()->get(state.symbols.create("hydraJobs"));
if (!aHydraJobs)
aHydraJobs = vOutputs->attrs()->get(state.symbols.create("checks"));
if (!aHydraJobs)
throw Error("flake '%s' does not provide any Hydra jobs or checks", flakeRef);
vTop = *aHydraJobs->value;
} else {
state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop);
}
auto vRoot = state.allocValue();
state.autoCallFunction(autoArgs, vTop, *vRoot);
while (true) {
/* Wait for the master to send us a job name. */
writeLine(to.get(), "next");
auto s = readLine(from.get());
if (s == "exit") break;
if (!hasPrefix(s, "do ")) abort();
std::string attrPath(s, 3);
debug("worker process %d at '%s'", getpid(), attrPath);
/* Evaluate it and send info back to the master. */
nlohmann::json reply;
try {
auto vTmp = findAlongAttrPath(state, attrPath, autoArgs, *vRoot).first;
auto v = state.allocValue();
state.autoCallFunction(autoArgs, *vTmp, *v);
if (auto drv = getDerivation(state, *v, false)) {
// CA derivations do not have static output paths, so we
// have to defensively not query output paths in case we
// encounter one.
PackageInfo::Outputs outputs = drv->queryOutputs(
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
if (drv->querySystem() == "unknown")
state.error<EvalError>("derivation must have a 'system' attribute").debugThrow();
auto drvPath = state.store->printStorePath(drv->requireDrvPath());
nlohmann::json job;
job["nixName"] = drv->queryName();
job["system"] =drv->querySystem();
job["drvPath"] = drvPath;
job["description"] = drv->queryMetaString("description");
job["license"] = queryMetaStrings(state, *drv, "license", "shortName");
job["homepage"] = drv->queryMetaString("homepage");
job["maintainers"] = queryMetaStrings(state, *drv, "maintainers", "email");
job["schedulingPriority"] = drv->queryMetaInt("schedulingPriority", 100);
job["timeout"] = drv->queryMetaInt("timeout", 36000);
job["maxSilent"] = drv->queryMetaInt("maxSilent", 7200);
job["isChannel"] = drv->queryMetaBool("isHydraChannel", false);
/* If this is an aggregate, then get its constituents. */
auto a = v->attrs()->get(state.symbols.create("_hydraAggregate"));
if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
auto a = v->attrs()->get(state.symbols.create("constituents"));
if (!a)
state.error<EvalError>("derivation must have a constituents attribute").debugThrow();
NixStringContext context;
state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
for (auto & c : context)
std::visit(overloaded {
[&](const NixStringContextElem::Built & b) {
job["constituents"].push_back(b.drvPath->to_string(*state.store));
},
[&](const NixStringContextElem::Opaque & o) {
},
[&](const NixStringContextElem::DrvDeep & d) {
},
}, c.raw);
state.forceList(*a->value, a->pos, "while evaluating the `constituents` attribute");
for (unsigned int n = 0; n < a->value->listSize(); ++n) {
auto v = a->value->listElems()[n];
state.forceValue(*v, noPos);
if (v->type() == nString)
job["namedConstituents"].push_back(v->string_view());
}
}
/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
auto localStore = state.store.dynamic_pointer_cast<LocalFSStore>();
if (gcRootsDir != "" && localStore) {
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root))
localStore->addPermRoot(localStore->parseStorePath(drvPath), root);
}
nlohmann::json out;
for (auto & [outputName, optOutputPath] : outputs) {
if (optOutputPath) {
out[outputName] = state.store->printStorePath(*optOutputPath);
} else {
// See the `queryOutputs` call above; we should
// not encounter missing output paths otherwise.
assert(experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
out[outputName] = nullptr;
}
}
job["outputs"] = std::move(out);
reply["job"] = std::move(job);
}
else if (v->type() == nAttrs) {
auto attrs = nlohmann::json::array();
StringSet ss;
for (auto & i : v->attrs()->lexicographicOrder(state.symbols)) {
std::string name(state.symbols[i->name]);
if (name.find(' ') != std::string::npos) {
printError("skipping job with illegal name '%s'", name);
continue;
}
attrs.push_back(name);
}
reply["attrs"] = std::move(attrs);
}
else if (v->type() == nNull)
;
else state.error<TypeError>("attribute '%s' is %s, which is not supported", attrPath, showType(*v)).debugThrow();
} catch (EvalError & e) {
auto msg = e.msg();
// Transmits the error we got from the previous evaluation
// in the JSON output.
reply["error"] = filterANSIEscapes(msg, true);
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
printError(msg);
}
writeLine(to.get(), reply.dump());
/* If our RSS exceeds the maximum, exit. The master will
start a new process. */
struct rusage r;
getrusage(RUSAGE_SELF, &r);
if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break;
}
writeLine(to.get(), "restart");
}
int main(int argc, char * * argv)
{
/* Prevent undeclared dependencies in the evaluation via
$NIX_PATH. */
unsetenv("NIX_PATH");
return handleExceptions(argv[0], [&]() {
auto config = std::make_unique<HydraConfig>();
auto nrWorkers = config->getIntOption("evaluator_workers", 1);
maxMemorySize = config->getIntOption("evaluator_max_memory_size", 4096);
initNix();
initGC();
myArgs.parseCmdline(argvToStrings(argc, argv));
auto pureEval = config->getBoolOption("evaluator_pure_eval", myArgs.flake);
/* FIXME: The build hook in conjunction with import-from-derivation is causing "unexpected EOF" during eval */
settings.builders = "";
/* Prevent access to paths outside of the Nix search path and
to the environment. */
evalSettings.restrictEval = true;
/* When building a flake, use pure evaluation (no access to
'getEnv', 'currentSystem' etc. */
evalSettings.pureEval = pureEval;
if (myArgs.dryRun) settings.readOnlyMode = true;
if (myArgs.releaseExpr == "") throw UsageError("no expression specified");
if (gcRootsDir == "") printMsg(lvlError, "warning: `--gc-roots-dir' not specified");
struct State
{
std::set<std::string> todo{""};
std::set<std::string> active;
nlohmann::json jobs;
std::exception_ptr exc;
};
std::condition_variable wakeup;
Sync<State> state_;
/* Start a handler thread per worker process. */
auto handler = [&]()
{
pid_t pid = -1;
try {
AutoCloseFD from, to;
while (true) {
/* Start a new worker process if necessary. */
if (pid == -1) {
Pipe toPipe, fromPipe;
toPipe.create();
fromPipe.create();
pid = startProcess(
[&,
to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}
]()
{
try {
auto evalStore = myArgs.evalStoreUrl
? openStore(*myArgs.evalStoreUrl)
: openStore();
EvalState state(myArgs.lookupPath,
evalStore, fetchSettings, evalSettings);
Bindings & autoArgs = *myArgs.getAutoArgs(state);
worker(state, autoArgs, *to, *from);
} catch (Error & e) {
nlohmann::json err;
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions { .allowVfork = false });
from = std::move(fromPipe.readSide);
to = std::move(toPipe.writeSide);
debug("created worker process %d", pid);
}
/* Check whether the existing worker process is still there. */
auto s = readLine(from.get());
if (s == "restart") {
pid = -1;
continue;
} else if (s != "next") {
auto json = nlohmann::json::parse(s);
throw Error("worker error: %s", (std::string) json["error"]);
}
/* Wait for a job name to become available. */
std::string attrPath;
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) || state->exc) {
writeLine(to.get(), "exit");
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
writeLine(to.get(), "do " + attrPath);
/* Wait for the response. */
auto response = nlohmann::json::parse(readLine(from.get()));
/* Handle the response. */
StringSet newAttrs;
if (response.find("job") != response.end()) {
auto state(state_.lock());
state->jobs[attrPath] = response["job"];
}
if (response.find("attrs") != response.end()) {
for (auto & i : response["attrs"]) {
std::string path = i;
if (path.find(".") != std::string::npos){
path = "\"" + path + "\"";
}
auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) path;
newAttrs.insert(s);
}
}
if (response.find("error") != response.end()) {
auto state(state_.lock());
state->jobs[attrPath]["error"] = response["error"];
}
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto & s : newAttrs)
state->todo.insert(s);
wakeup.notify_all();
}
}
} catch (...) {
check_pid_status_nonblocking(pid);
auto state(state_.lock());
state->exc = std::current_exception();
wakeup.notify_all();
}
};
std::vector<std::thread> threads;
for (size_t i = 0; i < nrWorkers; i++)
threads.emplace_back(std::thread(handler));
for (auto & thread : threads)
thread.join();
auto state(state_.lock());
if (state->exc)
std::rethrow_exception(state->exc);
/* For aggregate jobs that have named consistuents
(i.e. constituents that are a job name rather than a
derivation), look up the referenced job and add it to the
dependencies of the aggregate derivation. */
auto store = openStore();
for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) {
auto jobName = i.key();
auto & job = i.value();
auto named = job.find("namedConstituents");
if (named == job.end()) continue;
std::unordered_map<std::string, std::string> brokenJobs;
auto getNonBrokenJobOrRecordError = [&brokenJobs, &jobName, &state](
const std::string & childJobName) -> std::optional<nlohmann::json> {
auto childJob = state->jobs.find(childJobName);
if (childJob == state->jobs.end()) {
printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName);
brokenJobs[childJobName] = "does not exist";
return std::nullopt;
}
if (childJob->find("error") != childJob->end()) {
std::string error = (*childJob)["error"];
printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error);
brokenJobs[childJobName] = error;
return std::nullopt;
}
return *childJob;
};
if (myArgs.dryRun) {
for (std::string jobName2 : *named) {
auto job2 = getNonBrokenJobOrRecordError(jobName2);
if (!job2) {
continue;
}
std::string drvPath2 = (*job2)["drvPath"];
job["constituents"].push_back(drvPath2);
}
} else {
auto drvPath = store->parseStorePath((std::string) job["drvPath"]);
auto drv = store->readDerivation(drvPath);
for (std::string jobName2 : *named) {
auto job2 = getNonBrokenJobOrRecordError(jobName2);
if (!job2) {
continue;
}
auto drvPath2 = store->parseStorePath((std::string) (*job2)["drvPath"]);
auto drv2 = store->readDerivation(drvPath2);
job["constituents"].push_back(store->printStorePath(drvPath2));
drv.inputDrvs.map[drvPath2].value = {drv2.outputs.begin()->first};
}
if (brokenJobs.empty()) {
std::string drvName(drvPath.name());
assert(hasSuffix(drvName, drvExtension));
drvName.resize(drvName.size() - drvExtension.size());
auto hashModulo = hashDerivationModulo(*store, drv, true);
if (hashModulo.kind != DrvHash::Kind::Regular) continue;
auto h = hashModulo.hashes.find("out");
if (h == hashModulo.hashes.end()) continue;
auto outPath = store->makeOutputPath("out", h->second, drvName);
drv.env["out"] = store->printStorePath(outPath);
drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath });
auto newDrvPath = store->printStorePath(writeDerivation(*store, drv));
debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath);
job["drvPath"] = newDrvPath;
job["outputs"]["out"] = store->printStorePath(outPath);
}
}
job.erase("namedConstituents");
/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
auto localStore = store.dynamic_pointer_cast<LocalFSStore>();
if (gcRootsDir != "" && localStore) {
auto drvPath = job["drvPath"].get<std::string>();
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root))
localStore->addPermRoot(localStore->parseStorePath(drvPath), root);
}
if (!brokenJobs.empty()) {
std::stringstream ss;
for (const auto& [jobName, error] : brokenJobs) {
ss << jobName << ": " << error << "\n";
}
job["error"] = ss.str();
}
}
std::cout << state->jobs.dump(2) << "\n";
});
}

View File

@ -1,8 +0,0 @@
hydra_eval_jobs = executable('hydra-eval-jobs',
'hydra-eval-jobs.cc',
dependencies: [
libhydra_dep,
nix_dep,
],
install: true,
)

View File

@ -7,70 +7,35 @@
#include "build-result.hh" #include "build-result.hh"
#include "path.hh" #include "path.hh"
#include "legacy-ssh-store.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh" #include "state.hh"
#include "current-process.hh" #include "current-process.hh"
#include "processes.hh" #include "processes.hh"
#include "util.hh" #include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh" #include "ssh.hh"
#include "finally.hh" #include "finally.hh"
#include "url.hh" #include "url.hh"
using namespace nix; using namespace nix;
bool ::Machine::isLocalhost() const
{
return storeUri.params.empty() && std::visit(overloaded {
[](const StoreReference::Auto &) {
return true;
},
[](const StoreReference::Specified & s) {
return
(s.scheme == "local" || s.scheme == "unix") ||
((s.scheme == "ssh" || s.scheme == "ssh-ng") &&
s.authority == "localhost");
},
}, storeUri.variant);
}
namespace nix::build_remote { namespace nix::build_remote {
static Strings extraStoreArgs(std::string & machine)
{
Strings result;
try {
auto parsed = parseURL(machine);
if (parsed.scheme != "ssh") {
throw SysError("Currently, only (legacy-)ssh stores are supported!");
}
machine = parsed.authority.value_or("");
auto remoteStore = parsed.query.find("remote-store");
if (remoteStore != parsed.query.end()) {
result = {"--store", shellEscape(remoteStore->second)};
}
} catch (BadURL &) {
// We just try to continue with `machine->sshName` here for backwards compat.
}
return result;
}
static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
} else {
command.splice(command.end(), extraStoreArgs(machine->sshName));
}
auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
});
// XXX: determine the actual max value we can use from /proc.
// FIXME: Should this be upstreamed into `startCommand` in Nix?
int pipesize = 1024 * 1024;
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
return ret;
}
static void copyClosureTo( static void copyClosureTo(
::Machine::Connection & conn, ::Machine::Connection & conn,
Store & destStore, Store & destStore,
@ -87,8 +52,8 @@ static void copyClosureTo(
// FIXME: substitute output pollutes our build log // FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote /* Get back the set of paths that are already valid on the remote
host. */ host. */
auto present = conn.queryValidPaths( auto present = conn.store->queryValidPaths(
destStore, true, closure, useSubstitutes); closure, true, useSubstitutes);
if (present.size() == closure.size()) return; if (present.size() == closure.size()) return;
@ -103,12 +68,7 @@ static void copyClosureTo(
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock, std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600)); std::chrono::seconds(600));
conn.to << ServeProto::Command::ImportPaths; conn.store->addMultipleToStoreLegacy(destStore, missing);
destStore.exportPaths(missing, conn.to);
conn.to.flush();
if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
} }
@ -198,7 +158,7 @@ static BasicDerivation sendInputs(
MaintainCount<counter> mc2(nrStepsCopyingTo); MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s", printMsg(lvlDebug, "sending closure of %s to %s",
localStore.printStorePath(step.drvPath), conn.machine->sshName); localStore.printStorePath(step.drvPath), conn.machine->storeUri.render());
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
@ -228,7 +188,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding counter & nrStepsBuilding
) )
{ {
conn.putBuildDerivationRequest(localStore, drvPath, drv, options); auto kont = conn.store->buildDerivationAsync(drvPath, drv, options);
BuildResult result; BuildResult result;
@ -237,7 +197,10 @@ static BuildResult performBuild(
startTime = time(0); startTime = time(0);
{ {
MaintainCount<counter> mc(nrStepsBuilding); MaintainCount<counter> mc(nrStepsBuilding);
result = ServeProto::Serialise<BuildResult>::read(localStore, conn); result = kont();
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
} }
stopTime = time(0); stopTime = time(0);
@ -253,7 +216,7 @@ static BuildResult performBuild(
// If the protocol was too old to give us `builtOutputs`, initialize // If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation. // it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6) if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
{ {
// If the remote is too old to handle CA derivations, we cant get this // If the remote is too old to handle CA derivations, we cant get this
// far anyways // far anyways
@ -278,32 +241,6 @@ static BuildResult performBuild(
return result; return result;
} }
static std::map<StorePath, UnkeyedValidPathInfo> queryPathInfos(
::Machine::Connection & conn,
Store & localStore,
StorePathSet & outputs,
size_t & totalNarSize
)
{
/* Get info about each output path. */
std::map<StorePath, UnkeyedValidPathInfo> infos;
conn.to << ServeProto::Command::QueryPathInfos;
ServeProto::write(localStore, conn, outputs);
conn.to.flush();
while (true) {
auto storePathS = readString(conn.from);
if (storePathS == "") break;
auto storePath = localStore.parseStorePath(storePathS);
auto info = ServeProto::Serialise<UnkeyedValidPathInfo>::read(localStore, conn);
totalNarSize += info.narSize;
infos.insert_or_assign(std::move(storePath), std::move(info));
}
return infos;
}
static void copyPathFromRemote( static void copyPathFromRemote(
::Machine::Connection & conn, ::Machine::Connection & conn,
NarMemberDatas & narMembers, NarMemberDatas & narMembers,
@ -324,11 +261,10 @@ static void copyPathFromRemote(
lambda function only gets executed if someone tries to read lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather from source2, we will send the command from here rather
than outside the lambda. */ than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); conn.store->narFromPath(info.path, [&](Source & source) {
conn.to.flush(); TeeSource tee{source, sink};
extractNarData(tee, conn.store->printStorePath(info.path), narMembers);
TeeSource tee(conn.from, sink); });
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
}); });
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
@ -430,22 +366,39 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssConnecting); updateStep(ssConnecting);
SSHMaster master {
machine->sshName,
machine->sshKey,
machine->sshPublicHostKey,
false, // no SSH master yet
false, // no compression yet
logFD.get(),
};
// FIXME: rewrite to use Store. // FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master); ::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>();
assert(remoteStore);
remoteStore->connPipeSize = 1024 * 1024;
if (machine->isLocalhost()) {
auto rp_new = remoteStore->remoteProgram.get();
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
}
remoteStore->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
return nix::ref{remoteStore};
}(),
};
{ {
auto activeStepState(activeStep->state_.lock()); auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled"); if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = child->sshPid; activeStepState->pid = conn.store->getConnectionPid();
} }
Finally clearPid([&]() { Finally clearPid([&]() {
@ -460,41 +413,12 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */ process. Meh. */
}); });
::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};
Finally updateStats([&]() { Finally updateStats([&]() {
bytesReceived += conn.from.read; auto stats = conn.store->getConnectionStats();
bytesSent += conn.to.written; bytesReceived += stats.bytesReceived;
bytesSent += stats.bytesSent;
}); });
constexpr ServeProto::Version our_version = 0x206;
try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->sshName);
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->sshName, s);
}
// Do not attempt to speak a newer version of the protocol.
//
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
// part of `handshake` in upstream nix.
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
{ {
auto info(machine->state->connectInfo.lock()); auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0; info->consecutiveFailures = 0;
@ -523,7 +447,7 @@ void State::buildRemote(ref<Store> destStore,
/* Do the build. */ /* Do the build. */
printMsg(lvlDebug, "building %s on %s", printMsg(lvlDebug, "building %s on %s",
localStore->printStorePath(step->drvPath), localStore->printStorePath(step->drvPath),
machine->sshName); machine->storeUri.render());
updateStep(ssBuilding); updateStep(ssBuilding);
@ -546,7 +470,7 @@ void State::buildRemote(ref<Store> destStore,
get a build log. */ get a build log. */
if (result.isCached) { if (result.isCached) {
printMsg(lvlInfo, "outputs of %s substituted or already valid on %s", printMsg(lvlInfo, "outputs of %s substituted or already valid on %s",
localStore->printStorePath(step->drvPath), machine->sshName); localStore->printStorePath(step->drvPath), machine->storeUri.render());
unlink(result.logFile.c_str()); unlink(result.logFile.c_str());
result.logFile = ""; result.logFile = "";
} }
@ -563,8 +487,10 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
auto infos = conn.store->queryPathInfosUncached(outputs);
size_t totalNarSize = 0; size_t totalNarSize = 0;
auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize); for (auto & [_, info] : infos) totalNarSize += info.narSize;
if (totalNarSize > maxOutputSize) { if (totalNarSize > maxOutputSize) {
result.stepStatus = bsNarSizeLimitExceeded; result.stepStatus = bsNarSizeLimitExceeded;
@ -573,7 +499,7 @@ void State::buildRemote(ref<Store> destStore,
/* Copy each path. */ /* Copy each path. */
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)", printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize);
build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();
@ -596,9 +522,11 @@ void State::buildRemote(ref<Store> destStore,
} }
} }
/* Shut down the connection. */ /* Shut down the connection done by RAII.
child->in = -1;
child->sshPid.wait(); Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/
} catch (Error & e) { } catch (Error & e) {
/* Disable this machine until a certain period of time has /* Disable this machine until a certain period of time has
@ -612,7 +540,7 @@ void State::buildRemote(ref<Store> destStore,
info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4); info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4);
info->lastFailure = now; info->lastFailure = now;
int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30); int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30);
printMsg(lvlInfo, "will disable machine %1% for %2%s", machine->sshName, delta); printMsg(lvlInfo, "will disable machine %1% for %2%s", machine->storeUri.render(), delta);
info->disabledUntil = now + std::chrono::seconds(delta); info->disabledUntil = now + std::chrono::seconds(delta);
} }
throw; throw;

View File

@ -41,7 +41,7 @@ void State::builder(MachineReservation::ptr reservation)
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, "uncaught exception building %s on %s: %s", printMsg(lvlError, "uncaught exception building %s on %s: %s",
localStore->printStorePath(reservation->step->drvPath), localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName, reservation->machine->storeUri.render(),
e.what()); e.what());
} }
} }
@ -150,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
buildOptions.buildTimeout = build->buildTimeout; buildOptions.buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)", printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1)); localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
} }
if (!buildOneDone) if (!buildOneDone)
@ -178,7 +178,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
unlink(result.logFile.c_str()); unlink(result.logFile.c_str());
} }
} catch (...) { } catch (...) {
ignoreException(); ignoreExceptionInDestructor();
} }
} }
}); });
@ -196,7 +196,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{ {
auto mc = startDbUpdate(); auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy); stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->storeUri.render(), bsBusy);
txn.commit(); txn.commit();
} }
@ -253,7 +253,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Finish the step in the database. */ /* Finish the step in the database. */
if (stepNr) { if (stepNr) {
pqxx::work txn(*conn); pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName); finishBuildStep(txn, result, buildId, stepNr, machine->storeUri.render());
txn.commit(); txn.commit();
} }
@ -261,7 +261,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
issue). Retry a number of times. */ issue). Retry a number of times. */
if (result.canRetry) { if (result.canRetry) {
printMsg(lvlError, "possibly transient failure building %s on %s: %s", printMsg(lvlError, "possibly transient failure building %s on %s: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg); localStore->printStorePath(step->drvPath), machine->storeUri.render(), result.errorMsg);
assert(stepNr); assert(stepNr);
bool retry; bool retry;
{ {
@ -452,7 +452,7 @@ void State::failStep(
build->finishedInDB) build->finishedInDB)
continue; continue;
createBuildStep(txn, createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "", 0, build->id, step, machine ? machine->storeUri.render() : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId); result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
} }

View File

@ -256,7 +256,7 @@ system_time State::doDispatch()
/* Can this machine do this step? */ /* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) { if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')", debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform); mi.machine->storeUri.render(), localStore->printStorePath(step->drvPath), step->drv->platform);
continue; continue;
} }

View File

@ -135,65 +135,26 @@ void State::parseMachines(const std::string & contents)
oldMachines = *machines_; oldMachines = *machines_;
} }
for (auto line : tokenizeString<Strings>(contents, "\n")) { for (auto && machine_ : nix::Machine::parseConfig({}, contents)) {
line = trim(std::string(line, 0, line.find('#'))); auto machine = std::make_shared<::Machine>(std::move(machine_));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(8);
if (tokens[5] == "-") tokens[5] = "";
auto supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
if (tokens[6] == "-") tokens[6] = "";
auto mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
for (auto & f : mandatoryFeatures)
supportedFeatures.insert(f);
using MaxJobs = std::remove_const<decltype(nix::Machine::maxJobs)>::type;
auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used
"",
// `systemTypes`
tokenizeString<StringSet>(tokens[1], ","),
// `sshKey`
tokens[2] == "-" ? "" : tokens[2],
// `maxJobs`
tokens[3] != ""
? string2Int<MaxJobs>(tokens[3]).value()
: 1,
// `speedFactor`
std::stof(tokens[4].c_str()),
// `supportedFeatures`
std::move(supportedFeatures),
// `mandatoryFeatures`
std::move(mandatoryFeatures),
// `sshPublicHostKey`
tokens[7] != "" && tokens[7] != "-"
? tokens[7]
: "",
});
machine->sshName = tokens[0];
/* Re-use the State object of the previous machine with the /* Re-use the State object of the previous machine with the
same name. */ same name. */
auto i = oldMachines.find(machine->sshName); auto i = oldMachines.find(machine->storeUri.variant);
if (i == oldMachines.end()) if (i == oldMachines.end())
printMsg(lvlChatty, "adding new machine %1%", machine->sshName); printMsg(lvlChatty, "adding new machine %1%", machine->storeUri.render());
else else
printMsg(lvlChatty, "updating machine %1%", machine->sshName); printMsg(lvlChatty, "updating machine %1%", machine->storeUri.render());
machine->state = i == oldMachines.end() machine->state = i == oldMachines.end()
? std::make_shared<::Machine::State>() ? std::make_shared<::Machine::State>()
: i->second->state; : i->second->state;
newMachines[machine->sshName] = machine; newMachines[machine->storeUri.variant] = machine;
} }
for (auto & m : oldMachines) for (auto & m : oldMachines)
if (newMachines.find(m.first) == newMachines.end()) { if (newMachines.find(m.first) == newMachines.end()) {
if (m.second->enabled) if (m.second->enabled)
printInfo("removing machine %1%", m.first); printInfo("removing machine %1%", m.second->storeUri.render());
/* Add a disabled ::Machine object to make sure stats are /* Add a disabled ::Machine object to make sure stats are
maintained. */ maintained. */
auto machine = std::make_shared<::Machine>(*(m.second)); auto machine = std::make_shared<::Machine>(*(m.second));
@ -657,7 +618,7 @@ void State::dumpStatus(Connection & conn)
machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone; machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone;
machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone; machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone;
} }
statusJson["machines"][m->sshName] = machine; statusJson["machines"][m->storeUri.render()] = machine;
} }
} }

View File

@ -6,7 +6,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <regex>
#include <prometheus/counter.h> #include <prometheus/counter.h>
#include <prometheus/gauge.h> #include <prometheus/gauge.h>
@ -21,9 +20,7 @@
#include "store-api.hh" #include "store-api.hh"
#include "sync.hh" #include "sync.hh"
#include "nar-extractor.hh" #include "nar-extractor.hh"
#include "serve-protocol.hh" #include "legacy-ssh-store.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "machines.hh" #include "machines.hh"
@ -241,10 +238,6 @@ struct Machine : nix::Machine
{ {
typedef std::shared_ptr<Machine> ptr; typedef std::shared_ptr<Machine> ptr;
/* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way
we are not yet used to, but once we are, we don't need this. */
std::string sshName;
struct State { struct State {
typedef std::shared_ptr<State> ptr; typedef std::shared_ptr<State> ptr;
counter currentJobs{0}; counter currentJobs{0};
@ -294,16 +287,14 @@ struct Machine : nix::Machine
return true; return true;
} }
bool isLocalhost() bool isLocalhost() const;
{
std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r);
}
// A connection to a machine // A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection { struct Connection {
// Backpointer to the machine // Backpointer to the machine
ptr machine; ptr machine;
// Opened store
nix::ref<nix::LegacySSHStore> store;
}; };
}; };
@ -358,7 +349,7 @@ private:
/* The build machines. */ /* The build machines. */
std::mutex machinesReadyLock; std::mutex machinesReadyLock;
typedef std::map<std::string, Machine::ptr> Machines; typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Various stats. */ /* Various stats. */

View File

@ -51,6 +51,7 @@ sub begin :Private {
$c->stash->{curUri} = $c->request->uri; $c->stash->{curUri} = $c->request->uri;
$c->stash->{version} = $ENV{"HYDRA_RELEASE"} || "<devel>"; $c->stash->{version} = $ENV{"HYDRA_RELEASE"} || "<devel>";
$c->stash->{nixVersion} = $ENV{"NIX_RELEASE"} || "<devel>"; $c->stash->{nixVersion} = $ENV{"NIX_RELEASE"} || "<devel>";
$c->stash->{nixEvalJobsVersion} = $ENV{"NIX_EVAL_JOBS_RELEASE"} || "<devel>";
$c->stash->{curTime} = time; $c->stash->{curTime} = time;
$c->stash->{logo} = defined $c->config->{hydra_logo} ? "/logo" : ""; $c->stash->{logo} = defined $c->config->{hydra_logo} ? "/logo" : "";
$c->stash->{tracker} = defined $c->config->{tracker} ? $c->config->{tracker} : ""; $c->stash->{tracker} = defined $c->config->{tracker} ? $c->config->{tracker} : "";

View File

@ -1,6 +1,5 @@
# Native code # Native code
subdir('libhydra') subdir('libhydra')
subdir('hydra-eval-jobs')
subdir('hydra-evaluator') subdir('hydra-evaluator')
subdir('hydra-queue-runner') subdir('hydra-queue-runner')

View File

@ -93,7 +93,7 @@
<footer class="navbar"> <footer class="navbar">
<hr /> <hr />
<small> <small>
<em><a href="http://nixos.org/hydra" target="_blank" class="squiggle">Hydra</a> [% HTML.escape(version) %] (using [% HTML.escape(nixVersion) %]).</em> <em><a href="http://nixos.org/hydra" target="_blank" class="squiggle">Hydra</a> [% HTML.escape(version) %] (using [% HTML.escape(nixVersion) %] and [% HTML.escape(nixEvalJobsVersion) %]).</em>
[% IF c.user_exists %] [% IF c.user_exists %]
You are signed in as <tt>[% HTML.escape(c.user.username) %]</tt> You are signed in as <tt>[% HTML.escape(c.user.username) %]</tt>
[%- IF c.user.type == 'google' %] via Google[% END %]. [%- IF c.user.type == 'google' %] via Google[% END %].

View File

@ -17,6 +17,7 @@ use Hydra::Helper::Nix;
use Hydra::Model::DB; use Hydra::Model::DB;
use Hydra::Plugin; use Hydra::Plugin;
use Hydra::Schema; use Hydra::Schema;
use IPC::Run;
use JSON::MaybeXS; use JSON::MaybeXS;
use Net::Statsd; use Net::Statsd;
use Nix::Store; use Nix::Store;
@ -357,22 +358,32 @@ sub evalJobs {
my @cmd; my @cmd;
if (defined $flakeRef) { if (defined $flakeRef) {
@cmd = ("hydra-eval-jobs", my $nix_expr =
"--flake", $flakeRef, "let " .
"--gc-roots-dir", getGCRootsDir, "flake = builtins.getFlake (toString \"$flakeRef\"); " .
"--max-jobs", 1); "in " .
"flake.hydraJobs " .
"or flake.checks " .
"or (throw \"flake '$flakeRef' does not provide any Hydra jobs or checks\")";
@cmd = ("nix-eval-jobs", "--expr", $nix_expr);
} else { } else {
my $nixExprInput = $inputInfo->{$nixExprInputName}->[0] my $nixExprInput = $inputInfo->{$nixExprInputName}->[0]
or die "cannot find the input containing the job expression\n"; or die "cannot find the input containing the job expression\n";
@cmd = ("hydra-eval-jobs", @cmd = ("nix-eval-jobs",
"<" . $nixExprInputName . "/" . $nixExprPath . ">", "<" . $nixExprInputName . "/" . $nixExprPath . ">",
"--gc-roots-dir", getGCRootsDir,
"--max-jobs", 1,
inputsToArgs($inputInfo)); inputsToArgs($inputInfo));
} }
push @cmd, "--no-allow-import-from-derivation" if $config->{allow_import_from_derivation} // "true" ne "true"; push @cmd, ("--gc-roots-dir", getGCRootsDir);
push @cmd, ("--max-jobs", 1);
push @cmd, "--meta";
push @cmd, "--constituents";
push @cmd, "--force-recurse";
push @cmd, ("--option", "allow-import-from-derivation", "false") if $config->{allow_import_from_derivation} // "true" ne "true";
push @cmd, ("--workers", $config->{evaluator_workers} // 1);
push @cmd, ("--max-memory-size", $config->{evaluator_max_memory_size} // 4096);
if (defined $ENV{'HYDRA_DEBUG'}) { if (defined $ENV{'HYDRA_DEBUG'}) {
sub escape { sub escape {
@ -384,14 +395,40 @@ sub evalJobs {
print STDERR "evaluator: @escaped\n"; print STDERR "evaluator: @escaped\n";
} }
(my $res, my $jobsJSON, my $stderr) = captureStdoutStderr(21600, @cmd); my $evalProc = IPC::Run::start \@cmd,
die "hydra-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8)) '>', IPC::Run::new_chunker, \my $out,
. ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n") '2>', \my $err;
if $res;
print STDERR "$stderr"; return sub {
while (1) {
$evalProc->pump;
if (!defined $out && !defined $err) {
$evalProc->finish;
if ($?) {
die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n";
}
return;
}
return decode_json($jobsJSON); if (defined $err) {
print STDERR "$err";
undef $err;
}
if (defined $out && $out ne '') {
my $job;
try {
$job = decode_json($out);
} catch {
warn "nix-eval-jobs sent invalid JSON.\n parse error: $_\n invalid json: $out\n";
};
undef $out;
if (defined $job) {
return $job;
}
}
}
};
} }
@ -420,7 +457,7 @@ sub checkBuild {
my $firstOutputName = $outputNames[0]; my $firstOutputName = $outputNames[0];
my $firstOutputPath = $buildInfo->{outputs}->{$firstOutputName}; my $firstOutputPath = $buildInfo->{outputs}->{$firstOutputName};
my $jobName = $buildInfo->{jobName} or die; my $jobName = $buildInfo->{attr} or die;
my $drvPath = $buildInfo->{drvPath} or die; my $drvPath = $buildInfo->{drvPath} or die;
my $build; my $build;
@ -474,9 +511,30 @@ sub checkBuild {
my $time = time(); my $time = time();
sub null { sub getMeta {
my ($s) = @_; my ($s, $def) = @_;
return $s eq "" ? undef : $s; return ($s || "") eq "" ? $def : $s;
}
sub getMetaStrings {
my ($v, $k, $acc) = @_;
my $t = ref $v;
if ($t eq 'HASH') {
push @$acc, $v->{$k} if exists $v->{$k};
} elsif ($t eq 'ARRAY') {
getMetaStrings($_, $k, $acc) foreach @$v;
} elsif (defined $v) {
push @$acc, $v;
}
}
sub getMetaConcatStrings {
my ($v, $k) = @_;
my @strings;
getMetaStrings($v, $k, \@strings);
return join(", ", @strings) || undef;
} }
# Add the build to the database. # Add the build to the database.
@ -484,19 +542,19 @@ sub checkBuild {
{ timestamp => $time { timestamp => $time
, jobset_id => $jobset->id , jobset_id => $jobset->id
, job => $jobName , job => $jobName
, description => null($buildInfo->{description}) , description => getMeta($buildInfo->{meta}->{description}, undef)
, license => null($buildInfo->{license}) , license => getMetaConcatStrings($buildInfo->{meta}->{license}, "shortName")
, homepage => null($buildInfo->{homepage}) , homepage => getMeta($buildInfo->{meta}->{homepage}, undef)
, maintainers => null($buildInfo->{maintainers}) , maintainers => getMetaConcatStrings($buildInfo->{meta}->{maintainers}, "email")
, maxsilent => $buildInfo->{maxSilent} , maxsilent => getMeta($buildInfo->{meta}->{maxSilent}, 7200)
, timeout => $buildInfo->{timeout} , timeout => getMeta($buildInfo->{meta}->{timeout}, 36000)
, nixname => $buildInfo->{nixName} , nixname => $buildInfo->{name}
, drvpath => $drvPath , drvpath => $drvPath
, system => $buildInfo->{system} , system => $buildInfo->{system}
, priority => $buildInfo->{schedulingPriority} , priority => getMeta($buildInfo->{meta}->{schedulingPriority}, 100)
, finished => 0 , finished => 0
, iscurrent => 1 , iscurrent => 1
, ischannel => $buildInfo->{isChannel} , ischannel => getMeta($buildInfo->{meta}->{isChannel}, 0)
}); });
$build->buildoutputs->create({ name => $_, path => $buildInfo->{outputs}->{$_} }) $build->buildoutputs->create({ name => $_, path => $buildInfo->{outputs}->{$_} })
@ -665,7 +723,7 @@ sub checkJobsetWrapped {
return; return;
} }
# Hash the arguments to hydra-eval-jobs and check the # Hash the arguments to nix-eval-jobs and check the
# JobsetInputHashes to see if the previous evaluation had the same # JobsetInputHashes to see if the previous evaluation had the same
# inputs. If so, bail out. # inputs. If so, bail out.
my @args = ($jobset->nixexprinput // "", $jobset->nixexprpath // "", inputsToArgs($inputInfo)); my @args = ($jobset->nixexprinput // "", $jobset->nixexprpath // "", inputsToArgs($inputInfo));
@ -687,19 +745,12 @@ sub checkJobsetWrapped {
# Evaluate the job expression. # Evaluate the job expression.
my $evalStart = clock_gettime(CLOCK_MONOTONIC); my $evalStart = clock_gettime(CLOCK_MONOTONIC);
my $jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef); my $evalStop;
my $evalStop = clock_gettime(CLOCK_MONOTONIC); my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
if ($jobsetsJobset) {
my @keys = keys %$jobs;
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless (scalar @keys) == 1 && $keys[0] eq "jobsets";
}
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
if ($dryRun) { if ($dryRun) {
foreach my $name (keys %{$jobs}) { while (defined(my $job = $jobsIter->())) {
my $job = $jobs->{$name}; my $name = $job->{attr};
if (defined $job->{drvPath}) { if (defined $job->{drvPath}) {
print STDERR "good job $name: $job->{drvPath}\n"; print STDERR "good job $name: $job->{drvPath}\n";
} else { } else {
@ -709,36 +760,20 @@ sub checkJobsetWrapped {
return; return;
} }
die "Jobset contains a job with an empty name. Make sure the jobset evaluates to an attrset of jobs.\n"
if defined $jobs->{""};
$jobs->{$_}->{jobName} = $_ for keys %{$jobs};
my $jobOutPathMap = {};
my $jobsetChanged = 0;
my $dbStart = clock_gettime(CLOCK_MONOTONIC);
# Store the error messages for jobs that failed to evaluate. # Store the error messages for jobs that failed to evaluate.
my $evaluationErrorTime = time; my $evaluationErrorTime = time;
my $evaluationErrorMsg = ""; my $evaluationErrorMsg = "";
foreach my $job (values %{$jobs}) {
next unless defined $job->{error};
$evaluationErrorMsg .=
($job->{jobName} ne "" ? "in job $job->{jobName}" : "at top-level") .
":\n" . $job->{error} . "\n\n";
}
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create( my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create(
{ errormsg => $evaluationErrorMsg { errormsg => $evaluationErrorMsg
, errortime => $evaluationErrorTime , errortime => $evaluationErrorTime
} }
); );
my $jobOutPathMap = {};
my $jobsetChanged = 0;
my %buildMap; my %buildMap;
$db->txn_do(sub {
$db->txn_do(sub {
my $prevEval = getPrevJobsetEval($db, $jobset, 1); my $prevEval = getPrevJobsetEval($db, $jobset, 1);
# Clear the "current" flag on all builds. Since we're in a # Clear the "current" flag on all builds. Since we're in a
@ -751,7 +786,7 @@ sub checkJobsetWrapped {
, evaluationerror => $evaluationErrorRecord , evaluationerror => $evaluationErrorRecord
, timestamp => time , timestamp => time
, checkouttime => abs(int($checkoutStop - $checkoutStart)) , checkouttime => abs(int($checkoutStop - $checkoutStart))
, evaltime => abs(int($evalStop - $evalStart)) , evaltime => 0
, hasnewbuilds => 0 , hasnewbuilds => 0
, nrbuilds => 0 , nrbuilds => 0
, flake => $flakeRef , flake => $flakeRef
@ -759,11 +794,24 @@ sub checkJobsetWrapped {
, nixexprpath => $jobset->nixexprpath , nixexprpath => $jobset->nixexprpath
}); });
# Schedule each successfully evaluated job. my @jobsWithConstituents;
foreach my $job (permute(values %{$jobs})) {
next if defined $job->{error}; while (defined(my $job = $jobsIter->())) {
#print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n"; if ($jobsetsJobset) {
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins); die "The .jobsets jobset must only have a single job named 'jobsets'"
unless $job->{attr} eq "jobsets";
}
$evaluationErrorMsg .=
($job->{attr} ne "" ? "in job $job->{attr}" : "at top-level") .
":\n" . $job->{error} . "\n\n" if defined $job->{error};
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins)
unless defined $job->{error};
if (defined $job->{constituents}) {
push @jobsWithConstituents, $job;
}
} }
# Have any builds been added or removed since last time? # Have any builds been added or removed since last time?
@ -801,21 +849,20 @@ sub checkJobsetWrapped {
$drvPathToId{$x->{drvPath}} = $x; $drvPathToId{$x->{drvPath}} = $x;
} }
foreach my $job (values %{$jobs}) { foreach my $job (values @jobsWithConstituents) {
next unless $job->{constituents}; next unless defined $job->{constituents};
if (defined $job->{error}) { if (defined $job->{error}) {
die "aggregate job $job->{jobName} failed with the error: $job->{error}\n"; die "aggregate job $job->{attr} failed with the error: $job->{error}\n";
} }
my $x = $drvPathToId{$job->{drvPath}} or my $x = $drvPathToId{$job->{drvPath}} or
die "aggregate job $job->{jobName} has no corresponding build record.\n"; die "aggregate job $job->{attr} has no corresponding build record.\n";
foreach my $drvPath (@{$job->{constituents}}) { foreach my $drvPath (@{$job->{constituents}}) {
my $constituent = $drvPathToId{$drvPath}; my $constituent = $drvPathToId{$drvPath};
if (defined $constituent) { if (defined $constituent) {
$db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}}); $db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}});
} else { } else {
warn "aggregate job $job->{jobName} has a constituent $drvPath that doesn't correspond to a Hydra build\n"; warn "aggregate job $job->{attr} has a constituent $drvPath that doesn't correspond to a Hydra build\n";
} }
} }
} }
@ -857,11 +904,15 @@ sub checkJobsetWrapped {
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ enabled => 0 }) if $jobset->enabled == 2;
$jobset->update({ lastcheckedtime => time, forceeval => undef }); $jobset->update({ lastcheckedtime => time, forceeval => undef });
$evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg });
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
$evalStop = clock_gettime(CLOCK_MONOTONIC);
$ev->update({ evaltime => abs(int($evalStop - $evalStart)) });
}); });
my $dbStop = clock_gettime(CLOCK_MONOTONIC); Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000));
Net::Statsd::increment("hydra.evaluator.evals"); Net::Statsd::increment("hydra.evaluator.evals");
Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged; Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged;
} }

View File

@ -18,14 +18,14 @@ isnt($res, 0, "hydra-eval-jobset exits non-zero");
ok(utf8::decode($stderr), "Stderr output is UTF8-clean"); ok(utf8::decode($stderr), "Stderr output is UTF8-clean");
like( like(
$stderr, $stderr,
qr/aggregate job mixed_aggregate failed with the error: constituentA: does not exist/, qr/aggregate job mixed_aggregate failed with the error: "constituentA": does not exist/,
"The stderr record includes a relevant error message" "The stderr record includes a relevant error message"
); );
$jobset->discard_changes; # refresh from DB $jobset->discard_changes({ '+columns' => {'errormsg' => 'errormsg'} }); # refresh from DB
like( like(
$jobset->errormsg, $jobset->errormsg,
qr/aggregate job mixed_aggregate failed with the error: constituentA: does not exist/, qr/aggregate job mixed_aggregate failed with the error: "constituentA": does not exist/,
"The jobset records a relevant error message" "The jobset records a relevant error message"
); );

View File

@ -5,13 +5,58 @@ use Test2::V0;
my $ctx = test_context(); my $ctx = test_context();
my $builds = $ctx->makeAndEvaluateJobset( my $expression = 'constituents.nix';
expression => 'constituents.nix', my $jobsetCtx = $ctx->makeJobset(
expression => $expression,
);
my $builds = $ctx->evaluateJobset(
jobset => $jobsetCtx->{"jobset"},
expression => $expression,
build => 0,
); );
my $constituentA = $builds->{"constituentA"}; my $constituentA = $builds->{"constituentA"};
my $directAggregate = $builds->{"direct_aggregate"}; my $directAggregate = $builds->{"direct_aggregate"};
my $indirectAggregate = $builds->{"indirect_aggregate"}; my $indirectAggregate = $builds->{"indirect_aggregate"};
my $mixedAggregate = $builds->{"mixed_aggregate"};
# Ensure that we get exactly the aggregates we expect
my %expected_constituents = (
'direct_aggregate' => {
'constituentA' => 1,
},
'indirect_aggregate' => {
'constituentA' => 1,
},
'mixed_aggregate' => {
# Note that `constituentA_alias` becomes `constituentA`, because
# the shorter name is preferred
'constituentA' => 1,
'constituentB' => 1,
},
);
my $rs = $ctx->db->resultset('AggregateConstituents')->search(
{},
{
join => [ 'aggregate', 'constituent' ], # Use correct relationship names
columns => [],
'+select' => [ 'aggregate.job', 'constituent.job' ],
'+as' => [ 'aggregate_job', 'constituent_job' ],
}
);
my %actual_constituents;
while (my $row = $rs->next) {
my $aggregate_job = $row->get_column('aggregate_job');
my $constituent_job = $row->get_column('constituent_job');
$actual_constituents{$aggregate_job} //= {};
$actual_constituents{$aggregate_job}{$constituent_job} = 1;
}
is(\%actual_constituents, \%expected_constituents, "Exact aggregate constituents as expected");
# Check that deletion also doesn't work accordingly
is(system('nix-store', '--delete', $constituentA->drvpath), 256, "Deleting a constituent derivation fails"); is(system('nix-store', '--delete', $constituentA->drvpath), 256, "Deleting a constituent derivation fails");
is(system('nix-store', '--delete', $directAggregate->drvpath), 256, "Deleting the direct aggregate derivation fails"); is(system('nix-store', '--delete', $directAggregate->drvpath), 256, "Deleting the direct aggregate derivation fails");

View File

@ -0,0 +1,67 @@
use feature 'unicode_strings';
use strict;
use warnings;
use Setup;
use Test2::V0;
use File::Copy qw(cp);
my $ctx = test_context(
nix_config => qq|
experimental-features = nix-command flakes
|,
hydra_config => q|
<runcommand>
evaluator_pure_eval = false
</runcommand>
|
);
sub checkFlake {
my ($flake) = @_;
cp($ctx->jobsdir . "/basic.nix", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/config.nix", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/empty-dir-builder.sh", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/fail.sh", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/succeed-with-failed.sh", $ctx->jobsdir . "/" . $flake);
chmod 0755, $ctx->jobsdir . "/" . $flake . "/empty-dir-builder.sh";
chmod 0755, $ctx->jobsdir . "/" . $flake . "/fail.sh";
chmod 0755, $ctx->jobsdir . "/" . $flake . "/succeed-with-failed.sh";
my $builds = $ctx->makeAndEvaluateJobset(
flake => 'path:' . $ctx->jobsdir . "/" . $flake,
build => 1
);
subtest "Build: succeed_with_failed" => sub {
my $build = $builds->{"succeed_with_failed"};
is($build->finished, 1, "Build should be finished.");
is($build->buildstatus, 6, "succeeeded-but-failed should have buildstatus 6.");
};
subtest "Build: empty_dir" => sub {
my $build = $builds->{"empty_dir"};
is($build->finished, 1, "Build should be finished.");
is($build->buildstatus, 0, "Should have succeeded.");
};
subtest "Build: fails" => sub {
my $build = $builds->{"fails"};
is($build->finished, 1, "Build should be finished.");
is($build->buildstatus, 1, "Should have failed.");
};
}
subtest "Flake using `checks`" => sub {
checkFlake 'flake-checks'
};
subtest "Flake using `hydraJobs`" => sub {
checkFlake 'flake-hydraJobs'
};
done_testing;

View File

@ -0,0 +1,22 @@
use feature 'unicode_strings';
use strict;
use warnings;
use Setup;
use Test2::V0;
my $ctx = test_context();
my $builds = $ctx->makeAndEvaluateJobset(
expression => "meta.nix",
build => 1
);
my $build = $builds->{"full-of-meta"};
is($build->finished, 1, "Build should be finished.");
is($build->description, "This is the description of the job.", "Wrong description extracted from the build.");
is($build->license, "MIT, BSD", "Wrong licenses extracted from the build.");
is($build->homepage, "https://example.com/", "Wrong homepage extracted from the build.");
is($build->maintainers, 'alice@example.com, bob@not.found', "Wrong maintainers extracted from the build.");
done_testing;

View File

@ -5,6 +5,8 @@ rec {
builder = ./empty-dir-builder.sh; builder = ./empty-dir-builder.sh;
}; };
constituentA_alias = constituentA;
constituentB = mkDerivation { constituentB = mkDerivation {
name = "empty-dir-B"; name = "empty-dir-B";
builder = ./empty-dir-builder.sh; builder = ./empty-dir-builder.sh;
@ -32,7 +34,7 @@ rec {
name = "mixed_aggregate"; name = "mixed_aggregate";
_hydraAggregate = true; _hydraAggregate = true;
constituents = [ constituents = [
"constituentA" "constituentA_alias"
constituentB constituentB
]; ];
builder = ./empty-dir-builder.sh; builder = ./empty-dir-builder.sh;

View File

@ -0,0 +1,6 @@
{
outputs = { ... }: {
checks =
import ./basic.nix;
};
}

View File

@ -0,0 +1,6 @@
{
outputs = { ... }: {
hydraJobs =
import ./basic.nix;
};
}

17
t/jobs/meta.nix Normal file
View File

@ -0,0 +1,17 @@
with import ./config.nix;
{
full-of-meta =
mkDerivation {
name = "full-of-meta";
builder = ./empty-dir-builder.sh;
meta = {
description = "This is the description of the job.";
license = [ { shortName = "MIT"; } "BSD" ];
homepage = "https://example.com/";
maintainers = [ "alice@example.com" { email = "bob@not.found"; } ];
outPath = "${placeholder "out"}";
};
};
}

View File

@ -165,20 +165,46 @@ sub nix_state_dir {
sub makeAndEvaluateJobset { sub makeAndEvaluateJobset {
my ($self, %opts) = @_; my ($self, %opts) = @_;
my $expression = $opts{'expression'} || die "Mandatory 'expression' option not passed to makeAndEvaluateJobset.\n"; my $expression = $opts{'expression'};
my $jobsdir = $opts{'jobsdir'} // $self->jobsdir; my $flake = $opts{'flake'};
my $should_build = $opts{'build'} // 0; if (not $expression and not $flake) {
die "One of 'expression' or 'flake' must be passed to makeEvaluateJobset.\n";
}
my $jobsetCtx = $self->makeJobset( my $jobsdir = $opts{'jobsdir'} // $self->jobsdir;
expression => $expression,
my %args = (
jobsdir => $jobsdir, jobsdir => $jobsdir,
); );
my $jobset = $jobsetCtx->{"jobset"}; if ($expression) {
$args{expression} = $expression;
}
if ($flake) {
$args{flake} = $flake;
}
my $jobsetCtx = $self->makeJobset(%args);
return $self->evaluateJobset(
jobset => $jobsetCtx->{"jobset"},
expression => $expression,
flake => $flake,
build => $opts{"build"} // 0,
)
}
sub evaluateJobset {
my ($self, %opts) = @_;
my $jobset = $opts{'jobset'};
my $expression = $opts{'expression'} // $opts{'flake'};
evalSucceeds($jobset) or die "Evaluating jobs/$expression should exit with return code 0.\n"; evalSucceeds($jobset) or die "Evaluating jobs/$expression should exit with return code 0.\n";
my $builds = {}; my $builds = {};
my $should_build = $opts{'build'};
for my $build ($jobset->builds) { for my $build ($jobset->builds) {
if ($should_build) { if ($should_build) {
runBuild($build) or die "Build '".$build->job."' from jobs/$expression should exit with return code 0.\n"; runBuild($build) or die "Build '".$build->job."' from jobs/$expression should exit with return code 0.\n";
@ -195,7 +221,7 @@ sub makeAndEvaluateJobset {
# #
# In return, you get a hash of the user, project, and jobset records. # In return, you get a hash of the user, project, and jobset records.
# #
# This always uses an `expression` from the `jobsdir` directory. # This always uses an `expression` or `flake` from the `jobsdir` directory.
# #
# Hash Parameters: # Hash Parameters:
# #
@ -204,7 +230,12 @@ sub makeAndEvaluateJobset {
sub makeJobset { sub makeJobset {
my ($self, %opts) = @_; my ($self, %opts) = @_;
my $expression = $opts{'expression'} || die "Mandatory 'expression' option not passed to makeJobset.\n"; my $expression = $opts{'expression'};
my $flake = $opts{'flake'};
if (not $expression and not $flake) {
die "One of 'expression' or 'flake' must be passed to makeJobset.\n";
}
my $jobsdir = $opts{'jobsdir'} // $self->jobsdir; my $jobsdir = $opts{'jobsdir'} // $self->jobsdir;
# Create a new user for this test # Create a new user for this test
@ -222,12 +253,20 @@ sub makeJobset {
}); });
# Create a new jobset for this test and set up the inputs # Create a new jobset for this test and set up the inputs
my $jobset = $project->jobsets->create({ my %args = (
name => rand_chars(), name => rand_chars(),
nixexprinput => "jobs",
nixexprpath => $expression,
emailoverride => "" emailoverride => ""
}); );
if ($expression) {
$args{type} = 0;
$args{nixexprinput} = "jobs";
$args{nixexprpath} = $expression;
}
if ($flake) {
$args{type} = 1;
$args{flake} = $flake;
}
my $jobset = $project->jobsets->create(\%args);
my $jobsetinput = $jobset->jobsetinputs->create({name => "jobs", type => "path"}); my $jobsetinput = $jobset->jobsetinputs->create({name => "jobs", type => "path"});
$jobsetinput->jobsetinputalts->create({altnr => 0, value => $jobsdir}); $jobsetinput->jobsetinputalts->create({altnr => 0, value => $jobsdir});

View File

@ -27,7 +27,7 @@ testenv.prepend('PERL5LIB',
separator: ':' separator: ':'
) )
testenv.prepend('PATH', testenv.prepend('PATH',
fs.parent(hydra_eval_jobs.full_path()), fs.parent(find_program('nix').full_path()),
fs.parent(hydra_evaluator.full_path()), fs.parent(hydra_evaluator.full_path()),
fs.parent(hydra_queue_runner.full_path()), fs.parent(hydra_queue_runner.full_path()),
meson.project_source_root() / 'src/script', meson.project_source_root() / 'src/script',

View File

@ -22,11 +22,11 @@ is(nrQueuedBuildsForJobset($jobset), 0, "Evaluating jobs/broken-constituent.nix
like( like(
$jobset->errormsg, $jobset->errormsg,
qr/^does-not-exist: does not exist$/m, qr/^"does-not-exist": does not exist$/m,
"Evaluating jobs/broken-constituent.nix should log an error for does-not-exist"); "Evaluating jobs/broken-constituent.nix should log an error for does-not-exist");
like( like(
$jobset->errormsg, $jobset->errormsg,
qr/^does-not-evaluate: error: assertion 'false' failed$/m, qr/^"does-not-evaluate": "error: assertion 'false' failed/m,
"Evaluating jobs/broken-constituent.nix should log an error for does-not-evaluate"); "Evaluating jobs/broken-constituent.nix should log an error for does-not-evaluate");
done_testing; done_testing;