resync hydra with upstream #4

Merged
ahuston-0 merged 36 commits from feature/resync into add-gitea-pulls 2025-04-09 11:38:41 -04:00
2 changed files with 116 additions and 66 deletions
Showing only changes of commit 847a8ae6cd - Show all commits

View File

@ -9,10 +9,13 @@
#include "path.hh" #include "path.hh"
#include "legacy-ssh-store.hh" #include "legacy-ssh-store.hh"
#include "serve-protocol.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh" #include "state.hh"
#include "current-process.hh" #include "current-process.hh"
#include "processes.hh" #include "processes.hh"
#include "util.hh" #include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh" #include "ssh.hh"
#include "finally.hh" #include "finally.hh"
#include "url.hh" #include "url.hh"
@ -36,6 +39,38 @@ bool ::Machine::isLocalhost() const
namespace nix::build_remote { namespace nix::build_remote {
static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
} else {
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
}
auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
});
// XXX: determine the actual max value we can use from /proc.
// FIXME: Should this be upstreamed into `startCommand` in Nix?
int pipesize = 1024 * 1024;
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
return ret;
}
static void copyClosureTo( static void copyClosureTo(
::Machine::Connection & conn, ::Machine::Connection & conn,
Store & destStore, Store & destStore,
@ -52,8 +87,8 @@ static void copyClosureTo(
// FIXME: substitute output pollutes our build log // FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote /* Get back the set of paths that are already valid on the remote
host. */ host. */
auto present = conn.store->queryValidPaths( auto present = conn.queryValidPaths(
closure, true, useSubstitutes); destStore, true, closure, useSubstitutes);
if (present.size() == closure.size()) return; if (present.size() == closure.size()) return;
@ -68,7 +103,12 @@ static void copyClosureTo(
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock, std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600)); std::chrono::seconds(600));
conn.store->addMultipleToStoreLegacy(destStore, missing); conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();
if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
} }
@ -188,7 +228,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding counter & nrStepsBuilding
) )
{ {
auto kont = conn.store->buildDerivationAsync(drvPath, drv, options); conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
BuildResult result; BuildResult result;
@ -197,10 +237,7 @@ static BuildResult performBuild(
startTime = time(0); startTime = time(0);
{ {
MaintainCount<counter> mc(nrStepsBuilding); MaintainCount<counter> mc(nrStepsBuilding);
result = kont(); result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
} }
stopTime = time(0); stopTime = time(0);
@ -216,7 +253,7 @@ static BuildResult performBuild(
// If the protocol was too old to give us `builtOutputs`, initialize // If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation. // it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6) if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
{ {
// If the remote is too old to handle CA derivations, we cant get this // If the remote is too old to handle CA derivations, we cant get this
// far anyways // far anyways
@ -249,25 +286,26 @@ static void copyPathFromRemote(
const ValidPathInfo & info const ValidPathInfo & info
) )
{ {
/* Receive the NAR from the remote and add it to the /* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */ NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink) auto source2 = sinkToSource([&](Sink & sink)
{ {
/* Note: we should only send the command to dump the store /* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path by the destination store, which won't happen if this path
is already valid on the destination store. Since this is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather from source2, we will send the command from here rather
than outside the lambda. */ than outside the lambda. */
conn.store->narFromPath(info.path, [&](Source & source) { conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
TeeSource tee{source, sink}; conn.to.flush();
extractNarData(tee, conn.store->printStorePath(info.path), narMembers);
});
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
} }
static void copyPathsFromRemote( static void copyPathsFromRemote(
@ -366,39 +404,30 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssConnecting); updateStep(ssConnecting);
// FIXME: rewrite to use Store. auto storeRef = machine->completeStoreReference();
::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>(); auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
assert(remoteStore); if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
remoteStore->connPipeSize = 1024 * 1024; LegacySSHStoreConfig storeConfig {
pSpecified->scheme,
if (machine->isLocalhost()) { pSpecified->authority,
auto rp_new = remoteStore->remoteProgram.get(); storeRef.params
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
}
remoteStore->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
return nix::ref{remoteStore};
}(),
}; };
auto master = storeConfig.createSSHMaster(
false, // no SSH master yet
logFD.get());
// FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master);
{ {
auto activeStepState(activeStep->state_.lock()); auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled"); if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = conn.store->getConnectionPid(); activeStepState->pid = child->sshPid;
} }
Finally clearPid([&]() { Finally clearPid([&]() {
@ -413,12 +442,35 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */ process. Meh. */
}); });
::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};
Finally updateStats([&]() { Finally updateStats([&]() {
auto stats = conn.store->getConnectionStats(); bytesReceived += conn.from.read;
bytesReceived += stats.bytesReceived; bytesSent += conn.to.written;
bytesSent += stats.bytesSent;
}); });
constexpr ServeProto::Version our_version = 0x206;
try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->storeUri.render(), s);
}
{ {
auto info(machine->state->connectInfo.lock()); auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0; info->consecutiveFailures = 0;
@ -487,7 +539,7 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
auto infos = conn.store->queryPathInfosUncached(outputs); auto infos = conn.queryPathInfos(*localStore, outputs);
size_t totalNarSize = 0; size_t totalNarSize = 0;
for (auto & [_, info] : infos) totalNarSize += info.narSize; for (auto & [_, info] : infos) totalNarSize += info.narSize;
@ -522,11 +574,9 @@ void State::buildRemote(ref<Store> destStore,
} }
} }
/* Shut down the connection done by RAII. /* Shut down the connection. */
child->in = -1;
Only difference is kill() instead of wait() (i.e. send signal child->sshPid.wait();
then wait())
*/
} catch (Error & e) { } catch (Error & e) {
/* Disable this machine until a certain period of time has /* Disable this machine until a certain period of time has

View File

@ -20,7 +20,9 @@
#include "store-api.hh" #include "store-api.hh"
#include "sync.hh" #include "sync.hh"
#include "nar-extractor.hh" #include "nar-extractor.hh"
#include "legacy-ssh-store.hh" #include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "machines.hh" #include "machines.hh"
@ -290,11 +292,9 @@ struct Machine : nix::Machine
bool isLocalhost() const; bool isLocalhost() const;
// A connection to a machine // A connection to a machine
struct Connection { struct Connection : nix::ServeProto::BasicClientConnection {
// Backpointer to the machine // Backpointer to the machine
ptr machine; ptr machine;
// Opened store
nix::ref<nix::LegacySSHStore> store;
}; };
}; };