Merge pull request #1439 from NixOS/nix-next

Dedup machine file parsing, and other improvements
This commit is contained in:
John Ericson 2025-02-13 18:10:08 -05:00 committed by GitHub
commit 810781a802
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 52 additions and 126 deletions

View File

@ -21,28 +21,23 @@
using namespace nix; using namespace nix;
namespace nix::build_remote { bool ::Machine::isLocalhost() const
static Strings extraStoreArgs(std::string & machine)
{ {
Strings result; return storeUri.params.empty() && std::visit(overloaded {
try { [](const StoreReference::Auto &) {
auto parsed = parseURL(machine); return true;
if (parsed.scheme != "ssh") { },
throw SysError("Currently, only (legacy-)ssh stores are supported!"); [](const StoreReference::Specified & s) {
} return
machine = parsed.authority.value_or(""); (s.scheme == "local" || s.scheme == "unix") ||
auto remoteStore = parsed.query.find("remote-store"); ((s.scheme == "ssh" || s.scheme == "ssh-ng") &&
if (remoteStore != parsed.query.end()) { s.authority == "localhost");
result = {"--store", shellEscape(remoteStore->second)}; },
} }, storeUri.variant);
} catch (BadURL &) {
// We just try to continue with `machine->sshName` here for backwards compat.
}
return result;
} }
namespace nix::build_remote {
static std::unique_ptr<SSHMaster::Connection> openConnection( static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master) ::Machine::ptr machine, SSHMaster & master)
{ {
@ -51,7 +46,11 @@ static std::unique_ptr<SSHMaster::Connection> openConnection(
command.push_back("--builders"); command.push_back("--builders");
command.push_back(""); command.push_back("");
} else { } else {
command.splice(command.end(), extraStoreArgs(machine->sshName)); auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
} }
auto ret = master.startCommand(std::move(command), { auto ret = master.startCommand(std::move(command), {
@ -198,7 +197,7 @@ static BasicDerivation sendInputs(
MaintainCount<counter> mc2(nrStepsCopyingTo); MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s", printMsg(lvlDebug, "sending closure of %s to %s",
localStore.printStorePath(step.drvPath), conn.machine->sshName); localStore.printStorePath(step.drvPath), conn.machine->storeUri.render());
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
@ -278,32 +277,6 @@ static BuildResult performBuild(
return result; return result;
} }
static std::map<StorePath, UnkeyedValidPathInfo> queryPathInfos(
::Machine::Connection & conn,
Store & localStore,
StorePathSet & outputs,
size_t & totalNarSize
)
{
/* Get info about each output path. */
std::map<StorePath, UnkeyedValidPathInfo> infos;
conn.to << ServeProto::Command::QueryPathInfos;
ServeProto::write(localStore, conn, outputs);
conn.to.flush();
while (true) {
auto storePathS = readString(conn.from);
if (storePathS == "") break;
auto storePath = localStore.parseStorePath(storePathS);
auto info = ServeProto::Serialise<UnkeyedValidPathInfo>::read(localStore, conn);
totalNarSize += info.narSize;
infos.insert_or_assign(std::move(storePath), std::move(info));
}
return infos;
}
static void copyPathFromRemote( static void copyPathFromRemote(
::Machine::Connection & conn, ::Machine::Connection & conn,
NarMemberDatas & narMembers, NarMemberDatas & narMembers,
@ -430,8 +403,13 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssConnecting); updateStep(ssConnecting);
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
SSHMaster master { SSHMaster master {
machine->sshName, pSpecified->authority,
machine->sshKey, machine->sshKey,
machine->sshPublicHostKey, machine->sshPublicHostKey,
false, // no SSH master yet false, // no SSH master yet
@ -482,19 +460,13 @@ void State::buildRemote(ref<Store> destStore,
conn.to, conn.to,
conn.from, conn.from,
our_version, our_version,
machine->sshName); machine->storeUri.render());
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
child->sshPid.wait(); child->sshPid.wait();
std::string s = chomp(readFile(result.logFile)); std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->sshName, s); throw Error("cannot connect to %1%: %2%", machine->storeUri.render(), s);
} }
// Do not attempt to speak a newer version of the protocol.
//
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
// part of `handshake` in upstream nix.
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
{ {
auto info(machine->state->connectInfo.lock()); auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0; info->consecutiveFailures = 0;
@ -523,7 +495,7 @@ void State::buildRemote(ref<Store> destStore,
/* Do the build. */ /* Do the build. */
printMsg(lvlDebug, "building %s on %s", printMsg(lvlDebug, "building %s on %s",
localStore->printStorePath(step->drvPath), localStore->printStorePath(step->drvPath),
machine->sshName); machine->storeUri.render());
updateStep(ssBuilding); updateStep(ssBuilding);
@ -546,7 +518,7 @@ void State::buildRemote(ref<Store> destStore,
get a build log. */ get a build log. */
if (result.isCached) { if (result.isCached) {
printMsg(lvlInfo, "outputs of %s substituted or already valid on %s", printMsg(lvlInfo, "outputs of %s substituted or already valid on %s",
localStore->printStorePath(step->drvPath), machine->sshName); localStore->printStorePath(step->drvPath), machine->storeUri.render());
unlink(result.logFile.c_str()); unlink(result.logFile.c_str());
result.logFile = ""; result.logFile = "";
} }
@ -563,8 +535,10 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
auto infos = conn.queryPathInfos(*localStore, outputs);
size_t totalNarSize = 0; size_t totalNarSize = 0;
auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize); for (auto & [_, info] : infos) totalNarSize += info.narSize;
if (totalNarSize > maxOutputSize) { if (totalNarSize > maxOutputSize) {
result.stepStatus = bsNarSizeLimitExceeded; result.stepStatus = bsNarSizeLimitExceeded;
@ -573,7 +547,7 @@ void State::buildRemote(ref<Store> destStore,
/* Copy each path. */ /* Copy each path. */
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)", printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize);
build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();
@ -612,7 +586,7 @@ void State::buildRemote(ref<Store> destStore,
info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4); info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4);
info->lastFailure = now; info->lastFailure = now;
int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30); int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30);
printMsg(lvlInfo, "will disable machine %1% for %2%s", machine->sshName, delta); printMsg(lvlInfo, "will disable machine %1% for %2%s", machine->storeUri.render(), delta);
info->disabledUntil = now + std::chrono::seconds(delta); info->disabledUntil = now + std::chrono::seconds(delta);
} }
throw; throw;

View File

@ -41,7 +41,7 @@ void State::builder(MachineReservation::ptr reservation)
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, "uncaught exception building %s on %s: %s", printMsg(lvlError, "uncaught exception building %s on %s: %s",
localStore->printStorePath(reservation->step->drvPath), localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName, reservation->machine->storeUri.render(),
e.what()); e.what());
} }
} }
@ -150,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
buildOptions.buildTimeout = build->buildTimeout; buildOptions.buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)", printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1)); localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
} }
if (!buildOneDone) if (!buildOneDone)
@ -196,7 +196,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{ {
auto mc = startDbUpdate(); auto mc = startDbUpdate();
pqxx::work txn(*conn); pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy); stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->storeUri.render(), bsBusy);
txn.commit(); txn.commit();
} }
@ -253,7 +253,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Finish the step in the database. */ /* Finish the step in the database. */
if (stepNr) { if (stepNr) {
pqxx::work txn(*conn); pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName); finishBuildStep(txn, result, buildId, stepNr, machine->storeUri.render());
txn.commit(); txn.commit();
} }
@ -261,7 +261,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
issue). Retry a number of times. */ issue). Retry a number of times. */
if (result.canRetry) { if (result.canRetry) {
printMsg(lvlError, "possibly transient failure building %s on %s: %s", printMsg(lvlError, "possibly transient failure building %s on %s: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg); localStore->printStorePath(step->drvPath), machine->storeUri.render(), result.errorMsg);
assert(stepNr); assert(stepNr);
bool retry; bool retry;
{ {
@ -452,7 +452,7 @@ void State::failStep(
build->finishedInDB) build->finishedInDB)
continue; continue;
createBuildStep(txn, createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "", 0, build->id, step, machine ? machine->storeUri.render() : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId); result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
} }

View File

@ -256,7 +256,7 @@ system_time State::doDispatch()
/* Can this machine do this step? */ /* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) { if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')", debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform); mi.machine->storeUri.render(), localStore->printStorePath(step->drvPath), step->drv->platform);
continue; continue;
} }

View File

@ -135,65 +135,26 @@ void State::parseMachines(const std::string & contents)
oldMachines = *machines_; oldMachines = *machines_;
} }
for (auto line : tokenizeString<Strings>(contents, "\n")) { for (auto && machine_ : nix::Machine::parseConfig({}, contents)) {
line = trim(std::string(line, 0, line.find('#'))); auto machine = std::make_shared<::Machine>(std::move(machine_));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(8);
if (tokens[5] == "-") tokens[5] = "";
auto supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
if (tokens[6] == "-") tokens[6] = "";
auto mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
for (auto & f : mandatoryFeatures)
supportedFeatures.insert(f);
using MaxJobs = std::remove_const<decltype(nix::Machine::maxJobs)>::type;
auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used
"",
// `systemTypes`
tokenizeString<StringSet>(tokens[1], ","),
// `sshKey`
tokens[2] == "-" ? "" : tokens[2],
// `maxJobs`
tokens[3] != ""
? string2Int<MaxJobs>(tokens[3]).value()
: 1,
// `speedFactor`
std::stof(tokens[4].c_str()),
// `supportedFeatures`
std::move(supportedFeatures),
// `mandatoryFeatures`
std::move(mandatoryFeatures),
// `sshPublicHostKey`
tokens[7] != "" && tokens[7] != "-"
? tokens[7]
: "",
});
machine->sshName = tokens[0];
/* Re-use the State object of the previous machine with the /* Re-use the State object of the previous machine with the
same name. */ same name. */
auto i = oldMachines.find(machine->sshName); auto i = oldMachines.find(machine->storeUri.variant);
if (i == oldMachines.end()) if (i == oldMachines.end())
printMsg(lvlChatty, "adding new machine %1%", machine->sshName); printMsg(lvlChatty, "adding new machine %1%", machine->storeUri.render());
else else
printMsg(lvlChatty, "updating machine %1%", machine->sshName); printMsg(lvlChatty, "updating machine %1%", machine->storeUri.render());
machine->state = i == oldMachines.end() machine->state = i == oldMachines.end()
? std::make_shared<::Machine::State>() ? std::make_shared<::Machine::State>()
: i->second->state; : i->second->state;
newMachines[machine->sshName] = machine; newMachines[machine->storeUri.variant] = machine;
} }
for (auto & m : oldMachines) for (auto & m : oldMachines)
if (newMachines.find(m.first) == newMachines.end()) { if (newMachines.find(m.first) == newMachines.end()) {
if (m.second->enabled) if (m.second->enabled)
printInfo("removing machine %1%", m.first); printInfo("removing machine %1%", m.second->storeUri.render());
/* Add a disabled ::Machine object to make sure stats are /* Add a disabled ::Machine object to make sure stats are
maintained. */ maintained. */
auto machine = std::make_shared<::Machine>(*(m.second)); auto machine = std::make_shared<::Machine>(*(m.second));
@ -657,7 +618,7 @@ void State::dumpStatus(Connection & conn)
machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone; machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone;
machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone; machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone;
} }
statusJson["machines"][m->sshName] = machine; statusJson["machines"][m->storeUri.render()] = machine;
} }
} }

View File

@ -6,7 +6,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <regex>
#include <prometheus/counter.h> #include <prometheus/counter.h>
#include <prometheus/gauge.h> #include <prometheus/gauge.h>
@ -241,10 +240,6 @@ struct Machine : nix::Machine
{ {
typedef std::shared_ptr<Machine> ptr; typedef std::shared_ptr<Machine> ptr;
/* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way
we are not yet used to, but once we are, we don't need this. */
std::string sshName;
struct State { struct State {
typedef std::shared_ptr<State> ptr; typedef std::shared_ptr<State> ptr;
counter currentJobs{0}; counter currentJobs{0};
@ -294,11 +289,7 @@ struct Machine : nix::Machine
return true; return true;
} }
bool isLocalhost() bool isLocalhost() const;
{
std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r);
}
// A connection to a machine // A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection { struct Connection : nix::ServeProto::BasicClientConnection {
@ -358,7 +349,7 @@ private:
/* The build machines. */ /* The build machines. */
std::mutex machinesReadyLock; std::mutex machinesReadyLock;
typedef std::map<std::string, Machine::ptr> Machines; typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Various stats. */ /* Various stats. */