2 Commits

Author SHA1 Message Date
John Ericson
9ebc15e709 Upgrade Nix to 2.32 2025-10-20 21:20:16 -04:00
John Ericson
dbae951443 Deduplicate protocol code more with ServeProto::BasicClientConnection
I did this in Nix for this purpose, but didn't get around to actually
taking advantage of it here, until now.
2025-10-20 21:20:16 -04:00
3 changed files with 105 additions and 86 deletions

16
flake.lock generated
View File

@@ -3,16 +3,16 @@
"nix": { "nix": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1759956402, "lastModified": 1760573252,
"narHash": "sha256-CM27YK+KMi3HLRXqjPaJwkTabmKW+CDXOE3kMMtXH3s=", "narHash": "sha256-mcvNeNdJP5R7huOc8Neg0qZESx/0DMg8Fq6lsdx0x8U=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nix", "repo": "nix",
"rev": "3019db2c87006817b6201113ad4ceee0c53c3b62", "rev": "3c39583e5512729f9c5a44c3b03b6467a2acd963",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "2.31-maintenance", "ref": "2.32-maintenance",
"repo": "nix", "repo": "nix",
"type": "github" "type": "github"
} }
@@ -20,16 +20,16 @@
"nix-eval-jobs": { "nix-eval-jobs": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1757626891, "lastModified": 1760478325,
"narHash": "sha256-VrHPtHxVIboqgnw+tlCQepgtBOhBvU5hxbMHsPo8LAc=", "narHash": "sha256-hA+NOH8KDcsuvH7vJqSwk74PyZP3MtvI/l+CggZcnTc=",
"owner": "nix-community", "owner": "nix-community",
"repo": "nix-eval-jobs", "repo": "nix-eval-jobs",
"rev": "c975efc5b2bec0c1ff93c67de4a03306af258ff7", "rev": "daa42f9e9c84aeff1e325dd50fda321f53dfd02c",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "nix-community", "owner": "nix-community",
"ref": "v2.31.0", "ref": "v2.32.1",
"repo": "nix-eval-jobs", "repo": "nix-eval-jobs",
"type": "github" "type": "github"
} }

View File

@@ -4,13 +4,13 @@
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05-small"; inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05-small";
inputs.nix = { inputs.nix = {
url = "github:NixOS/nix/2.31-maintenance"; url = "github:NixOS/nix/2.32-maintenance";
# We want to control the deps precisely # We want to control the deps precisely
flake = false; flake = false;
}; };
inputs.nix-eval-jobs = { inputs.nix-eval-jobs = {
url = "github:nix-community/nix-eval-jobs/v2.31.0"; url = "github:nix-community/nix-eval-jobs/v2.32.1";
# We want to control the deps precisely # We want to control the deps precisely
flake = false; flake = false;
}; };

View File

@@ -14,6 +14,7 @@
#include <nix/util/current-process.hh> #include <nix/util/current-process.hh>
#include <nix/util/processes.hh> #include <nix/util/processes.hh>
#include <nix/util/util.hh> #include <nix/util/util.hh>
#include <nix/store/export-import.hh>
#include <nix/store/serve-protocol.hh> #include <nix/store/serve-protocol.hh>
#include <nix/store/serve-protocol-impl.hh> #include <nix/store/serve-protocol-impl.hh>
#include <nix/store/ssh.hh> #include <nix/store/ssh.hh>
@@ -103,9 +104,9 @@ static void copyClosureTo(
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock, std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600)); std::chrono::seconds(600));
conn.to << ServeProto::Command::ImportPaths; conn.importPaths(destStore, [&](Sink & sink) {
destStore.exportPaths(missing, conn.to); exportPaths(destStore, missing, sink);
conn.to.flush(); });
if (readInt(conn.from) != 1) if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure"); throw Error("remote machine failed to import closure");
@@ -262,16 +263,18 @@ static BuildResult performBuild(
// Since this a `BasicDerivation`, `staticOutputHashes` will not // Since this a `BasicDerivation`, `staticOutputHashes` will not
// do any real work. // do any real work.
auto outputHashes = staticOutputHashes(localStore, drv); auto outputHashes = staticOutputHashes(localStore, drv);
for (auto & [outputName, output] : drvOutputs) { if (auto * successP = result.tryGetSuccess()) {
auto outputPath = output.second; for (auto & [outputName, output] : drvOutputs) {
// Weve just asserted that the output paths of the derivation auto outputPath = output.second;
// were known // Weve just asserted that the output paths of the derivation
assert(outputPath); // were known
auto outputHash = outputHashes.at(outputName); assert(outputPath);
auto drvOutput = DrvOutput { outputHash, outputName }; auto outputHash = outputHashes.at(outputName);
result.builtOutputs.insert_or_assign( auto drvOutput = DrvOutput { outputHash, outputName };
std::move(outputName), successP->builtOutputs.insert_or_assign(
Realisation { drvOutput, *outputPath }); std::move(outputName),
Realisation { drvOutput, *outputPath });
}
} }
} }
@@ -298,11 +301,10 @@ static void copyPathFromRemote(
lambda function only gets executed if someone tries to read lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather from source2, we will send the command from here rather
than outside the lambda. */ than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); conn.narFromPath(localStore, info.path, [&](Source & source) {
conn.to.flush(); TeeSource tee(source, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
TeeSource tee(conn.from, sink); });
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
}); });
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
@@ -336,54 +338,68 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
startTime = buildResult.startTime; startTime = buildResult.startTime;
stopTime = buildResult.stopTime; stopTime = buildResult.stopTime;
timesBuilt = buildResult.timesBuilt; timesBuilt = buildResult.timesBuilt;
errorMsg = buildResult.errorMsg;
isNonDeterministic = buildResult.isNonDeterministic;
switch ((BuildResult::Status) buildResult.status) { std::visit(overloaded{
case BuildResult::Built: [&](const BuildResult::Success & success) {
stepStatus = bsSuccess; stepStatus = bsSuccess;
break; switch (success.status) {
case BuildResult::Substituted: case BuildResult::Success::Built:
case BuildResult::AlreadyValid: break;
stepStatus = bsSuccess; case BuildResult::Success::Substituted:
isCached = true; case BuildResult::Success::AlreadyValid:
break; case BuildResult::Success::ResolvesToAlreadyValid:
case BuildResult::PermanentFailure: isCached = true;
stepStatus = bsFailed; break;
canCache = true; default:
errorMsg = ""; assert(false);
break; }
case BuildResult::InputRejected: },
case BuildResult::OutputRejected: [&](const BuildResult::Failure & failure) {
stepStatus = bsFailed; errorMsg = failure.errorMsg;
canCache = true; isNonDeterministic = failure.isNonDeterministic;
break; switch (failure.status) {
case BuildResult::TransientFailure: case BuildResult::Failure::PermanentFailure:
stepStatus = bsFailed; stepStatus = bsFailed;
canRetry = true; canCache = true;
errorMsg = ""; errorMsg = "";
break; break;
case BuildResult::TimedOut: case BuildResult::Failure::InputRejected:
stepStatus = bsTimedOut; case BuildResult::Failure::OutputRejected:
errorMsg = ""; stepStatus = bsFailed;
break; canCache = true;
case BuildResult::MiscFailure: break;
stepStatus = bsAborted; case BuildResult::Failure::TransientFailure:
canRetry = true; stepStatus = bsFailed;
break; canRetry = true;
case BuildResult::LogLimitExceeded: errorMsg = "";
stepStatus = bsLogLimitExceeded; break;
break; case BuildResult::Failure::TimedOut:
case BuildResult::NotDeterministic: stepStatus = bsTimedOut;
stepStatus = bsNotDeterministic; errorMsg = "";
canRetry = false; break;
canCache = true; case BuildResult::Failure::MiscFailure:
break; stepStatus = bsAborted;
default: canRetry = true;
stepStatus = bsAborted; break;
break; case BuildResult::Failure::LogLimitExceeded:
} stepStatus = bsLogLimitExceeded;
break;
case BuildResult::Failure::NotDeterministic:
stepStatus = bsNotDeterministic;
canRetry = false;
canCache = true;
break;
case BuildResult::Failure::CachedFailure:
case BuildResult::Failure::DependencyFailed:
case BuildResult::Failure::NoSubstituters:
case BuildResult::Failure::HashMismatch:
stepStatus = bsAborted;
break;
default:
assert(false);
}
},
}, buildResult.inner);
} }
/* Utility guard object to auto-release a semaphore on destruction. */ /* Utility guard object to auto-release a semaphore on destruction. */
@@ -405,7 +421,7 @@ void State::buildRemote(ref<Store> destStore,
std::function<void(StepState)> updateStep, std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers) NarMemberDatas & narMembers)
{ {
assert(BuildResult::TimedOut == 8); assert(BuildResult::Failure::TimedOut == 8);
auto [logFile, logFD] = build_remote::openLogFile(logDir, step->drvPath); auto [logFile, logFD] = build_remote::openLogFile(logDir, step->drvPath);
AutoDelete logFileDel(logFile, false); AutoDelete logFileDel(logFile, false);
@@ -514,7 +530,7 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssBuilding); updateStep(ssBuilding);
BuildResult buildResult = build_remote::performBuild( auto buildResult = build_remote::performBuild(
conn, conn,
*localStore, *localStore,
step->drvPath, step->drvPath,
@@ -556,8 +572,9 @@ void State::buildRemote(ref<Store> destStore,
wakeDispatcher(); wakeDispatcher();
StorePathSet outputs; StorePathSet outputs;
for (auto & [_, realisation] : buildResult.builtOutputs) if (auto * successP = buildResult.tryGetSuccess())
outputs.insert(realisation.outPath); for (auto & [_, realisation] : successP->builtOutputs)
outputs.insert(realisation.outPath);
/* Copy the output paths. */ /* Copy the output paths. */
if (!machine->isLocalhost() || localStore != std::shared_ptr<Store>(destStore)) { if (!machine->isLocalhost() || localStore != std::shared_ptr<Store>(destStore)) {
@@ -590,15 +607,17 @@ void State::buildRemote(ref<Store> destStore,
/* Register the outputs of the newly built drv */ /* Register the outputs of the newly built drv */
if (experimentalFeatureSettings.isEnabled(Xp::CaDerivations)) { if (experimentalFeatureSettings.isEnabled(Xp::CaDerivations)) {
auto outputHashes = staticOutputHashes(*localStore, *step->drv); auto outputHashes = staticOutputHashes(*localStore, *step->drv);
for (auto & [outputName, realisation] : buildResult.builtOutputs) { if (auto * successP = buildResult.tryGetSuccess()) {
// Register the resolved drv output for (auto & [outputName, realisation] : successP->builtOutputs) {
destStore->registerDrvOutput(realisation); // Register the resolved drv output
destStore->registerDrvOutput(realisation);
// Also register the unresolved one // Also register the unresolved one
auto unresolvedRealisation = realisation; auto unresolvedRealisation = realisation;
unresolvedRealisation.signatures.clear(); unresolvedRealisation.signatures.clear();
unresolvedRealisation.id.drvHash = outputHashes.at(outputName); unresolvedRealisation.id.drvHash = outputHashes.at(outputName);
destStore->registerDrvOutput(unresolvedRealisation); destStore->registerDrvOutput(unresolvedRealisation);
}
} }
} }