diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 1cabd291..5b7b81b7 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -21,28 +21,23 @@ using namespace nix; -namespace nix::build_remote { - -static Strings extraStoreArgs(std::string & machine) +bool ::Machine::isLocalhost() const { - Strings result; - try { - auto parsed = parseURL(machine); - if (parsed.scheme != "ssh") { - throw SysError("Currently, only (legacy-)ssh stores are supported!"); - } - machine = parsed.authority.value_or(""); - auto remoteStore = parsed.query.find("remote-store"); - if (remoteStore != parsed.query.end()) { - result = {"--store", shellEscape(remoteStore->second)}; - } - } catch (BadURL &) { - // We just try to continue with `machine->sshName` here for backwards compat. - } - - return result; + return storeUri.params.empty() && std::visit(overloaded { + [](const StoreReference::Auto &) { + return true; + }, + [](const StoreReference::Specified & s) { + return + (s.scheme == "local" || s.scheme == "unix") || + ((s.scheme == "ssh" || s.scheme == "ssh-ng") && + s.authority == "localhost"); + }, + }, storeUri.variant); } +namespace nix::build_remote { + static std::unique_ptr openConnection( ::Machine::ptr machine, SSHMaster & master) { @@ -51,7 +46,11 @@ static std::unique_ptr openConnection( command.push_back("--builders"); command.push_back(""); } else { - command.splice(command.end(), extraStoreArgs(machine->sshName)); + auto remoteStore = machine->storeUri.params.find("remote-store"); + if (remoteStore != machine->storeUri.params.end()) { + command.push_back("--store"); + command.push_back(shellEscape(remoteStore->second)); + } } auto ret = master.startCommand(std::move(command), { @@ -198,7 +197,7 @@ static BasicDerivation sendInputs( MaintainCount mc2(nrStepsCopyingTo); printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", - localStore.printStorePath(step.drvPath), conn.machine->sshName); + localStore.printStorePath(step.drvPath), conn.machine->storeUri.render()); auto now1 = std::chrono::steady_clock::now(); @@ -278,32 +277,6 @@ static BuildResult performBuild( return result; } -static std::map queryPathInfos( - ::Machine::Connection & conn, - Store & localStore, - StorePathSet & outputs, - size_t & totalNarSize -) -{ - - /* Get info about each output path. */ - std::map infos; - conn.to << ServeProto::Command::QueryPathInfos; - ServeProto::write(localStore, conn, outputs); - conn.to.flush(); - while (true) { - auto storePathS = readString(conn.from); - if (storePathS == "") break; - - auto storePath = localStore.parseStorePath(storePathS); - auto info = ServeProto::Serialise::read(localStore, conn); - totalNarSize += info.narSize; - infos.insert_or_assign(std::move(storePath), std::move(info)); - } - - return infos; -} - static void copyPathFromRemote( ::Machine::Connection & conn, NarMemberDatas & narMembers, @@ -430,8 +403,13 @@ void State::buildRemote(ref destStore, updateStep(ssConnecting); + auto * pSpecified = std::get_if(&machine->storeUri.variant); + if (!pSpecified || pSpecified->scheme != "ssh") { + throw Error("Currently, only (legacy-)ssh stores are supported!"); + } + SSHMaster master { - machine->sshName, + pSpecified->authority, machine->sshKey, machine->sshPublicHostKey, false, // no SSH master yet @@ -482,19 +460,13 @@ void State::buildRemote(ref destStore, conn.to, conn.from, our_version, - machine->sshName); + machine->storeUri.render()); } catch (EndOfFile & e) { child->sshPid.wait(); std::string s = chomp(readFile(result.logFile)); - throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s); + throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s); } - // Do not attempt to speak a newer version of the protocol. - // - // Per https://github.com/NixOS/nix/issues/9584 should be handled as - // part of `handshake` in upstream nix. - conn.remoteVersion = std::min(conn.remoteVersion, our_version); - { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; @@ -523,7 +495,7 @@ void State::buildRemote(ref destStore, /* Do the build. */ printMsg(lvlDebug, "building ‘%s’ on ‘%s’", localStore->printStorePath(step->drvPath), - machine->sshName); + machine->storeUri.render()); updateStep(ssBuilding); @@ -546,7 +518,7 @@ void State::buildRemote(ref destStore, get a build log. */ if (result.isCached) { printMsg(lvlInfo, "outputs of ‘%s’ substituted or already valid on ‘%s’", - localStore->printStorePath(step->drvPath), machine->sshName); + localStore->printStorePath(step->drvPath), machine->storeUri.render()); unlink(result.logFile.c_str()); result.logFile = ""; } @@ -563,8 +535,10 @@ void State::buildRemote(ref destStore, auto now1 = std::chrono::steady_clock::now(); + auto infos = conn.queryPathInfos(*localStore, outputs); + size_t totalNarSize = 0; - auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize); + for (auto & [_, info] : infos) totalNarSize += info.narSize; if (totalNarSize > maxOutputSize) { result.stepStatus = bsNarSizeLimitExceeded; @@ -573,7 +547,7 @@ void State::buildRemote(ref destStore, /* Copy each path. */ printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)", - localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); + localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize); build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); auto now2 = std::chrono::steady_clock::now(); @@ -612,7 +586,7 @@ void State::buildRemote(ref destStore, info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4); info->lastFailure = now; int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30); - printMsg(lvlInfo, "will disable machine ‘%1%’ for %2%s", machine->sshName, delta); + printMsg(lvlInfo, "will disable machine ‘%1%’ for %2%s", machine->storeUri.render(), delta); info->disabledUntil = now + std::chrono::seconds(delta); } throw; diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 5269febd..a0773511 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -41,7 +41,7 @@ void State::builder(MachineReservation::ptr reservation) } catch (std::exception & e) { printMsg(lvlError, "uncaught exception building ‘%s’ on ‘%s’: %s", localStore->printStorePath(reservation->step->drvPath), - reservation->machine->sshName, + reservation->machine->storeUri.render(), e.what()); } } @@ -150,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, buildOptions.buildTimeout = build->buildTimeout; printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)", - localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1)); + localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1)); } if (!buildOneDone) @@ -196,7 +196,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, { auto mc = startDbUpdate(); pqxx::work txn(*conn); - stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy); + stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->storeUri.render(), bsBusy); txn.commit(); } @@ -253,7 +253,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, /* Finish the step in the database. */ if (stepNr) { pqxx::work txn(*conn); - finishBuildStep(txn, result, buildId, stepNr, machine->sshName); + finishBuildStep(txn, result, buildId, stepNr, machine->storeUri.render()); txn.commit(); } @@ -261,7 +261,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, issue). Retry a number of times. */ if (result.canRetry) { printMsg(lvlError, "possibly transient failure building ‘%s’ on ‘%s’: %s", - localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg); + localStore->printStorePath(step->drvPath), machine->storeUri.render(), result.errorMsg); assert(stepNr); bool retry; { @@ -452,7 +452,7 @@ void State::failStep( build->finishedInDB) continue; createBuildStep(txn, - 0, build->id, step, machine ? machine->sshName : "", + 0, build->id, step, machine ? machine->storeUri.render() : "", result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId); } diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index a4c84252..cbf982bf 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -256,7 +256,7 @@ system_time State::doDispatch() /* Can this machine do this step? */ if (!mi.machine->supportsStep(step)) { debug("machine '%s' does not support step '%s' (system type '%s')", - mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform); + mi.machine->storeUri.render(), localStore->printStorePath(step->drvPath), step->drv->platform); continue; } diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 28ed6deb..99411f9f 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -135,65 +135,26 @@ void State::parseMachines(const std::string & contents) oldMachines = *machines_; } - for (auto line : tokenizeString(contents, "\n")) { - line = trim(std::string(line, 0, line.find('#'))); - auto tokens = tokenizeString>(line); - if (tokens.size() < 3) continue; - tokens.resize(8); - - if (tokens[5] == "-") tokens[5] = ""; - auto supportedFeatures = tokenizeString(tokens[5], ","); - - if (tokens[6] == "-") tokens[6] = ""; - auto mandatoryFeatures = tokenizeString(tokens[6], ","); - - for (auto & f : mandatoryFeatures) - supportedFeatures.insert(f); - - using MaxJobs = std::remove_const::type; - - auto machine = std::make_shared<::Machine>(nix::Machine { - // `storeUri`, not yet used - "", - // `systemTypes` - tokenizeString(tokens[1], ","), - // `sshKey` - tokens[2] == "-" ? "" : tokens[2], - // `maxJobs` - tokens[3] != "" - ? string2Int(tokens[3]).value() - : 1, - // `speedFactor` - std::stof(tokens[4].c_str()), - // `supportedFeatures` - std::move(supportedFeatures), - // `mandatoryFeatures` - std::move(mandatoryFeatures), - // `sshPublicHostKey` - tokens[7] != "" && tokens[7] != "-" - ? tokens[7] - : "", - }); - - machine->sshName = tokens[0]; + for (auto && machine_ : nix::Machine::parseConfig({}, contents)) { + auto machine = std::make_shared<::Machine>(std::move(machine_)); /* Re-use the State object of the previous machine with the same name. */ - auto i = oldMachines.find(machine->sshName); + auto i = oldMachines.find(machine->storeUri.variant); if (i == oldMachines.end()) - printMsg(lvlChatty, "adding new machine ‘%1%’", machine->sshName); + printMsg(lvlChatty, "adding new machine ‘%1%’", machine->storeUri.render()); else - printMsg(lvlChatty, "updating machine ‘%1%’", machine->sshName); + printMsg(lvlChatty, "updating machine ‘%1%’", machine->storeUri.render()); machine->state = i == oldMachines.end() ? std::make_shared<::Machine::State>() : i->second->state; - newMachines[machine->sshName] = machine; + newMachines[machine->storeUri.variant] = machine; } for (auto & m : oldMachines) if (newMachines.find(m.first) == newMachines.end()) { if (m.second->enabled) - printInfo("removing machine ‘%1%’", m.first); + printInfo("removing machine ‘%1%’", m.second->storeUri.render()); /* Add a disabled ::Machine object to make sure stats are maintained. */ auto machine = std::make_shared<::Machine>(*(m.second)); @@ -657,7 +618,7 @@ void State::dumpStatus(Connection & conn) machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone; machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone; } - statusJson["machines"][m->sshName] = machine; + statusJson["machines"][m->storeUri.render()] = machine; } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 839239fe..30e01c74 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -241,10 +240,6 @@ struct Machine : nix::Machine { typedef std::shared_ptr ptr; - /* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way - we are not yet used to, but once we are, we don't need this. */ - std::string sshName; - struct State { typedef std::shared_ptr ptr; counter currentJobs{0}; @@ -294,11 +289,7 @@ struct Machine : nix::Machine return true; } - bool isLocalhost() - { - std::regex r("^(ssh://|ssh-ng://)?localhost$"); - return std::regex_search(sshName, r); - } + bool isLocalhost() const; // A connection to a machine struct Connection : nix::ServeProto::BasicClientConnection { @@ -358,7 +349,7 @@ private: /* The build machines. */ std::mutex machinesReadyLock; - typedef std::map Machines; + typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr /* Various stats. */