From 4a4a0f901c70676ee47f830d2ff6a72789ba1baf Mon Sep 17 00:00:00 2001
From: John Ericson <John.Ericson@Obsidian.Systems>
Date: Mon, 20 May 2024 16:22:19 -0400
Subject: [PATCH] Use `LegacySSHStore`

In https://github.com/NixOS/nix/pull/10748 it is extended with
everything we need.
---
 src/hydra-queue-runner/build-remote.cc | 178 +++++++++----------------
 src/hydra-queue-runner/state.hh        |   8 +-
 2 files changed, 68 insertions(+), 118 deletions(-)

diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc
index 77bde2c4..39970bd3 100644
--- a/src/hydra-queue-runner/build-remote.cc
+++ b/src/hydra-queue-runner/build-remote.cc
@@ -9,13 +9,10 @@
 #include "path.hh"
 #include "legacy-ssh-store.hh"
 #include "serve-protocol.hh"
-#include "serve-protocol-impl.hh"
 #include "state.hh"
 #include "current-process.hh"
 #include "processes.hh"
 #include "util.hh"
-#include "serve-protocol.hh"
-#include "serve-protocol-impl.hh"
 #include "ssh.hh"
 #include "finally.hh"
 #include "url.hh"
@@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const
 
 namespace nix::build_remote {
 
-static std::unique_ptr<SSHMaster::Connection> openConnection(
-    ::Machine::ptr machine, SSHMaster & master)
-{
-    Strings command = {"nix-store", "--serve", "--write"};
-    if (machine->isLocalhost()) {
-        command.push_back("--builders");
-        command.push_back("");
-    } else {
-        auto remoteStore = machine->storeUri.params.find("remote-store");
-        if (remoteStore != machine->storeUri.params.end()) {
-            command.push_back("--store");
-            command.push_back(shellEscape(remoteStore->second));
-        }
-    }
-
-    auto ret = master.startCommand(std::move(command), {
-        "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
-    });
-
-    // XXX: determine the actual max value we can use from /proc.
-
-    // FIXME: Should this be upstreamed into `startCommand` in Nix?
-
-    int pipesize = 1024 * 1024;
-
-    fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
-    fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
-
-    return ret;
-}
-
-
 static void copyClosureTo(
     ::Machine::Connection & conn,
     Store & destStore,
@@ -87,8 +52,8 @@ static void copyClosureTo(
     // FIXME: substitute output pollutes our build log
     /* Get back the set of paths that are already valid on the remote
        host. */
-    auto present = conn.queryValidPaths(
-        destStore, true, closure, useSubstitutes);
+    auto present = conn.store->queryValidPaths(
+        closure, true, useSubstitutes);
 
     if (present.size() == closure.size()) return;
 
@@ -103,12 +68,7 @@ static void copyClosureTo(
     std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
         std::chrono::seconds(600));
 
-    conn.to << ServeProto::Command::ImportPaths;
-    destStore.exportPaths(missing, conn.to);
-    conn.to.flush();
-
-    if (readInt(conn.from) != 1)
-        throw Error("remote machine failed to import closure");
+    conn.store->addMultipleToStoreLegacy(destStore, missing);
 }
 
 
@@ -228,7 +188,7 @@ static BuildResult performBuild(
     counter & nrStepsBuilding
 )
 {
-    conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
+    auto kont = conn.store->buildDerivationAsync(drvPath, drv, options);
 
     BuildResult result;
 
@@ -237,7 +197,10 @@ static BuildResult performBuild(
     startTime = time(0);
     {
         MaintainCount<counter> mc(nrStepsBuilding);
-        result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
+        result = kont();
+        // Without proper call-once functions, we need to manually
+        // delete after calling.
+        kont = {};
     }
     stopTime = time(0);
 
@@ -253,7 +216,7 @@ static BuildResult performBuild(
 
     // If the protocol was too old to give us `builtOutputs`, initialize
     // it manually by introspecting the derivation.
-    if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
+    if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
     {
         // If the remote is too old to handle CA derivations, we can’t get this
         // far anyways
@@ -286,26 +249,25 @@ static void copyPathFromRemote(
     const ValidPathInfo & info
 )
 {
-      /* Receive the NAR from the remote and add it to the
-          destination store. Meanwhile, extract all the info from the
-          NAR that getBuildOutput() needs. */
-      auto source2 = sinkToSource([&](Sink & sink)
-      {
-          /* Note: we should only send the command to dump the store
-              path to the remote if the NAR is actually going to get read
-              by the destination store, which won't happen if this path
-              is already valid on the destination store. Since this
-              lambda function only gets executed if someone tries to read
-              from source2, we will send the command from here rather
-              than outside the lambda. */
-          conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
-          conn.to.flush();
+    /* Receive the NAR from the remote and add it to the
+        destination store. Meanwhile, extract all the info from the
+        NAR that getBuildOutput() needs. */
+    auto source2 = sinkToSource([&](Sink & sink)
+    {
+        /* Note: we should only send the command to dump the store
+            path to the remote if the NAR is actually going to get read
+            by the destination store, which won't happen if this path
+            is already valid on the destination store. Since this
+            lambda function only gets executed if someone tries to read
+            from source2, we will send the command from here rather
+            than outside the lambda. */
+        conn.store->narFromPath(info.path, [&](Source & source) {
+            TeeSource tee{source, sink};
+            extractNarData(tee, conn.store->printStorePath(info.path), narMembers);
+        });
+    });
 
-          TeeSource tee(conn.from, sink);
-          extractNarData(tee, localStore.printStorePath(info.path), narMembers);
-      });
-
-      destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
+    destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
 }
 
 static void copyPathsFromRemote(
@@ -404,30 +366,39 @@ void State::buildRemote(ref<Store> destStore,
 
         updateStep(ssConnecting);
 
-        auto storeRef = machine->completeStoreReference();
-
-        auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
-        if (!pSpecified || pSpecified->scheme != "ssh") {
-            throw Error("Currently, only (legacy-)ssh stores are supported!");
-        }
-
-        LegacySSHStoreConfig storeConfig {
-            pSpecified->scheme,
-            pSpecified->authority,
-            storeRef.params
-        };
-
-        auto master = storeConfig.createSSHMaster(
-            false, // no SSH master yet
-            logFD.get());
-
         // FIXME: rewrite to use Store.
-        auto child = build_remote::openConnection(machine, master);
+        ::Machine::Connection conn {
+            .machine = machine,
+            .store = [&]{
+                auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
+                if (!pSpecified || pSpecified->scheme != "ssh") {
+                    throw Error("Currently, only (legacy-)ssh stores are supported!");
+                }
+
+                auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>();
+                assert(remoteStore);
+
+                remoteStore->connPipeSize = 1024 * 1024;
+
+                if (machine->isLocalhost()) {
+                    auto rp_new = remoteStore->remoteProgram.get();
+                    rp_new.push_back("--builders");
+                    rp_new.push_back("");
+                    const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
+                }
+                remoteStore->extraSshArgs = {
+                    "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
+                };
+                const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
+
+                return nix::ref{remoteStore};
+            }(),
+        };
 
         {
             auto activeStepState(activeStep->state_.lock());
             if (activeStepState->cancelled) throw Error("step cancelled");
-            activeStepState->pid = child->sshPid;
+            activeStepState->pid = conn.store->getConnectionPid();
         }
 
         Finally clearPid([&]() {
@@ -442,35 +413,12 @@ void State::buildRemote(ref<Store> destStore,
                process. Meh. */
         });
 
-        ::Machine::Connection conn {
-            {
-                .to = child->in.get(),
-                .from = child->out.get(),
-                /* Handshake. */
-                .remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
-            },
-            /*.machine =*/ machine,
-        };
-
         Finally updateStats([&]() {
-            bytesReceived += conn.from.read;
-            bytesSent += conn.to.written;
+            auto stats = conn.store->getConnectionStats();
+            bytesReceived += stats.bytesReceived;
+            bytesSent += stats.bytesSent;
         });
 
-        constexpr ServeProto::Version our_version = 0x206;
-
-        try {
-            conn.remoteVersion = decltype(conn)::handshake(
-                conn.to,
-                conn.from,
-                our_version,
-                machine->storeUri.render());
-        } catch (EndOfFile & e) {
-            child->sshPid.wait();
-            std::string s = chomp(readFile(result.logFile));
-            throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s);
-        }
-
         {
             auto info(machine->state->connectInfo.lock());
             info->consecutiveFailures = 0;
@@ -539,7 +487,7 @@ void State::buildRemote(ref<Store> destStore,
 
             auto now1 = std::chrono::steady_clock::now();
 
-            auto infos = conn.queryPathInfos(*localStore, outputs);
+            auto infos = conn.store->queryPathInfosUncached(outputs);
 
             size_t totalNarSize = 0;
             for (auto & [_, info] : infos) totalNarSize += info.narSize;
@@ -574,9 +522,11 @@ void State::buildRemote(ref<Store> destStore,
             }
         }
 
-        /* Shut down the connection. */
-        child->in = -1;
-        child->sshPid.wait();
+        /* Shut down the connection done by RAII.
+
+           Only difference is kill() instead of wait() (i.e. send signal
+           then wait())
+         */
 
     } catch (Error & e) {
         /* Disable this machine until a certain period of time has
diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh
index 30e01c74..e2d31434 100644
--- a/src/hydra-queue-runner/state.hh
+++ b/src/hydra-queue-runner/state.hh
@@ -20,9 +20,7 @@
 #include "store-api.hh"
 #include "sync.hh"
 #include "nar-extractor.hh"
-#include "serve-protocol.hh"
-#include "serve-protocol-impl.hh"
-#include "serve-protocol-connection.hh"
+#include "legacy-ssh-store.hh"
 #include "machines.hh"
 
 
@@ -292,9 +290,11 @@ struct Machine : nix::Machine
     bool isLocalhost() const;
 
     // A connection to a machine
-    struct Connection : nix::ServeProto::BasicClientConnection {
+    struct Connection {
         // Backpointer to the machine
         ptr machine;
+        // Opened store
+        nix::ref<nix::LegacySSHStore> store;
     };
 };