Revert "Use LegacySSHStore"
				
					
				
			There were some hangs caused by this. Need to fix them, ideally
reproducing the issue in a test, before trying this again.
This reverts commit 4a4a0f901c.
			
			
This commit is contained in:
		| @@ -9,10 +9,13 @@ | ||||
| #include "path.hh" | ||||
| #include "legacy-ssh-store.hh" | ||||
| #include "serve-protocol.hh" | ||||
| #include "serve-protocol-impl.hh" | ||||
| #include "state.hh" | ||||
| #include "current-process.hh" | ||||
| #include "processes.hh" | ||||
| #include "util.hh" | ||||
| #include "serve-protocol.hh" | ||||
| #include "serve-protocol-impl.hh" | ||||
| #include "ssh.hh" | ||||
| #include "finally.hh" | ||||
| #include "url.hh" | ||||
| @@ -36,6 +39,38 @@ bool ::Machine::isLocalhost() const | ||||
|  | ||||
| namespace nix::build_remote { | ||||
|  | ||||
| static std::unique_ptr<SSHMaster::Connection> openConnection( | ||||
|     ::Machine::ptr machine, SSHMaster & master) | ||||
| { | ||||
|     Strings command = {"nix-store", "--serve", "--write"}; | ||||
|     if (machine->isLocalhost()) { | ||||
|         command.push_back("--builders"); | ||||
|         command.push_back(""); | ||||
|     } else { | ||||
|         auto remoteStore = machine->storeUri.params.find("remote-store"); | ||||
|         if (remoteStore != machine->storeUri.params.end()) { | ||||
|             command.push_back("--store"); | ||||
|             command.push_back(shellEscape(remoteStore->second)); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     auto ret = master.startCommand(std::move(command), { | ||||
|         "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" | ||||
|     }); | ||||
|  | ||||
|     // XXX: determine the actual max value we can use from /proc. | ||||
|  | ||||
|     // FIXME: Should this be upstreamed into `startCommand` in Nix? | ||||
|  | ||||
|     int pipesize = 1024 * 1024; | ||||
|  | ||||
|     fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize); | ||||
|     fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize); | ||||
|  | ||||
|     return ret; | ||||
| } | ||||
|  | ||||
|  | ||||
| static void copyClosureTo( | ||||
|     ::Machine::Connection & conn, | ||||
|     Store & destStore, | ||||
| @@ -52,8 +87,8 @@ static void copyClosureTo( | ||||
|     // FIXME: substitute output pollutes our build log | ||||
|     /* Get back the set of paths that are already valid on the remote | ||||
|        host. */ | ||||
|     auto present = conn.store->queryValidPaths( | ||||
|         closure, true, useSubstitutes); | ||||
|     auto present = conn.queryValidPaths( | ||||
|         destStore, true, closure, useSubstitutes); | ||||
|  | ||||
|     if (present.size() == closure.size()) return; | ||||
|  | ||||
| @@ -68,7 +103,12 @@ static void copyClosureTo( | ||||
|     std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock, | ||||
|         std::chrono::seconds(600)); | ||||
|  | ||||
|     conn.store->addMultipleToStoreLegacy(destStore, missing); | ||||
|     conn.to << ServeProto::Command::ImportPaths; | ||||
|     destStore.exportPaths(missing, conn.to); | ||||
|     conn.to.flush(); | ||||
|  | ||||
|     if (readInt(conn.from) != 1) | ||||
|         throw Error("remote machine failed to import closure"); | ||||
| } | ||||
|  | ||||
|  | ||||
| @@ -188,7 +228,7 @@ static BuildResult performBuild( | ||||
|     counter & nrStepsBuilding | ||||
| ) | ||||
| { | ||||
|     auto kont = conn.store->buildDerivationAsync(drvPath, drv, options); | ||||
|     conn.putBuildDerivationRequest(localStore, drvPath, drv, options); | ||||
|  | ||||
|     BuildResult result; | ||||
|  | ||||
| @@ -197,10 +237,7 @@ static BuildResult performBuild( | ||||
|     startTime = time(0); | ||||
|     { | ||||
|         MaintainCount<counter> mc(nrStepsBuilding); | ||||
|         result = kont(); | ||||
|         // Without proper call-once functions, we need to manually | ||||
|         // delete after calling. | ||||
|         kont = {}; | ||||
|         result = ServeProto::Serialise<BuildResult>::read(localStore, conn); | ||||
|     } | ||||
|     stopTime = time(0); | ||||
|  | ||||
| @@ -216,7 +253,7 @@ static BuildResult performBuild( | ||||
|  | ||||
|     // If the protocol was too old to give us `builtOutputs`, initialize | ||||
|     // it manually by introspecting the derivation. | ||||
|     if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6) | ||||
|     if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6) | ||||
|     { | ||||
|         // If the remote is too old to handle CA derivations, we can’t get this | ||||
|         // far anyways | ||||
| @@ -249,25 +286,26 @@ static void copyPathFromRemote( | ||||
|     const ValidPathInfo & info | ||||
| ) | ||||
| { | ||||
|     /* Receive the NAR from the remote and add it to the | ||||
|         destination store. Meanwhile, extract all the info from the | ||||
|         NAR that getBuildOutput() needs. */ | ||||
|     auto source2 = sinkToSource([&](Sink & sink) | ||||
|     { | ||||
|         /* Note: we should only send the command to dump the store | ||||
|             path to the remote if the NAR is actually going to get read | ||||
|             by the destination store, which won't happen if this path | ||||
|             is already valid on the destination store. Since this | ||||
|             lambda function only gets executed if someone tries to read | ||||
|             from source2, we will send the command from here rather | ||||
|             than outside the lambda. */ | ||||
|         conn.store->narFromPath(info.path, [&](Source & source) { | ||||
|             TeeSource tee{source, sink}; | ||||
|             extractNarData(tee, conn.store->printStorePath(info.path), narMembers); | ||||
|         }); | ||||
|     }); | ||||
|       /* Receive the NAR from the remote and add it to the | ||||
|           destination store. Meanwhile, extract all the info from the | ||||
|           NAR that getBuildOutput() needs. */ | ||||
|       auto source2 = sinkToSource([&](Sink & sink) | ||||
|       { | ||||
|           /* Note: we should only send the command to dump the store | ||||
|               path to the remote if the NAR is actually going to get read | ||||
|               by the destination store, which won't happen if this path | ||||
|               is already valid on the destination store. Since this | ||||
|               lambda function only gets executed if someone tries to read | ||||
|               from source2, we will send the command from here rather | ||||
|               than outside the lambda. */ | ||||
|           conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); | ||||
|           conn.to.flush(); | ||||
|  | ||||
|     destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); | ||||
|           TeeSource tee(conn.from, sink); | ||||
|           extractNarData(tee, localStore.printStorePath(info.path), narMembers); | ||||
|       }); | ||||
|  | ||||
|       destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); | ||||
| } | ||||
|  | ||||
| static void copyPathsFromRemote( | ||||
| @@ -366,39 +404,30 @@ void State::buildRemote(ref<Store> destStore, | ||||
|  | ||||
|         updateStep(ssConnecting); | ||||
|  | ||||
|         // FIXME: rewrite to use Store. | ||||
|         ::Machine::Connection conn { | ||||
|             .machine = machine, | ||||
|             .store = [&]{ | ||||
|                 auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant); | ||||
|                 if (!pSpecified || pSpecified->scheme != "ssh") { | ||||
|                     throw Error("Currently, only (legacy-)ssh stores are supported!"); | ||||
|                 } | ||||
|         auto storeRef = machine->completeStoreReference(); | ||||
|  | ||||
|                 auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>(); | ||||
|                 assert(remoteStore); | ||||
|         auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant); | ||||
|         if (!pSpecified || pSpecified->scheme != "ssh") { | ||||
|             throw Error("Currently, only (legacy-)ssh stores are supported!"); | ||||
|         } | ||||
|  | ||||
|                 remoteStore->connPipeSize = 1024 * 1024; | ||||
|  | ||||
|                 if (machine->isLocalhost()) { | ||||
|                     auto rp_new = remoteStore->remoteProgram.get(); | ||||
|                     rp_new.push_back("--builders"); | ||||
|                     rp_new.push_back(""); | ||||
|                     const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new); | ||||
|                 } | ||||
|                 remoteStore->extraSshArgs = { | ||||
|                     "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" | ||||
|                 }; | ||||
|                 const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get()); | ||||
|  | ||||
|                 return nix::ref{remoteStore}; | ||||
|             }(), | ||||
|         LegacySSHStoreConfig storeConfig { | ||||
|             pSpecified->scheme, | ||||
|             pSpecified->authority, | ||||
|             storeRef.params | ||||
|         }; | ||||
|  | ||||
|         auto master = storeConfig.createSSHMaster( | ||||
|             false, // no SSH master yet | ||||
|             logFD.get()); | ||||
|  | ||||
|         // FIXME: rewrite to use Store. | ||||
|         auto child = build_remote::openConnection(machine, master); | ||||
|  | ||||
|         { | ||||
|             auto activeStepState(activeStep->state_.lock()); | ||||
|             if (activeStepState->cancelled) throw Error("step cancelled"); | ||||
|             activeStepState->pid = conn.store->getConnectionPid(); | ||||
|             activeStepState->pid = child->sshPid; | ||||
|         } | ||||
|  | ||||
|         Finally clearPid([&]() { | ||||
| @@ -413,12 +442,35 @@ void State::buildRemote(ref<Store> destStore, | ||||
|                process. Meh. */ | ||||
|         }); | ||||
|  | ||||
|         ::Machine::Connection conn { | ||||
|             { | ||||
|                 .to = child->in.get(), | ||||
|                 .from = child->out.get(), | ||||
|                 /* Handshake. */ | ||||
|                 .remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize | ||||
|             }, | ||||
|             /*.machine =*/ machine, | ||||
|         }; | ||||
|  | ||||
|         Finally updateStats([&]() { | ||||
|             auto stats = conn.store->getConnectionStats(); | ||||
|             bytesReceived += stats.bytesReceived; | ||||
|             bytesSent += stats.bytesSent; | ||||
|             bytesReceived += conn.from.read; | ||||
|             bytesSent += conn.to.written; | ||||
|         }); | ||||
|  | ||||
|         constexpr ServeProto::Version our_version = 0x206; | ||||
|  | ||||
|         try { | ||||
|             conn.remoteVersion = decltype(conn)::handshake( | ||||
|                 conn.to, | ||||
|                 conn.from, | ||||
|                 our_version, | ||||
|                 machine->storeUri.render()); | ||||
|         } catch (EndOfFile & e) { | ||||
|             child->sshPid.wait(); | ||||
|             std::string s = chomp(readFile(result.logFile)); | ||||
|             throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s); | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             auto info(machine->state->connectInfo.lock()); | ||||
|             info->consecutiveFailures = 0; | ||||
| @@ -487,7 +539,7 @@ void State::buildRemote(ref<Store> destStore, | ||||
|  | ||||
|             auto now1 = std::chrono::steady_clock::now(); | ||||
|  | ||||
|             auto infos = conn.store->queryPathInfosUncached(outputs); | ||||
|             auto infos = conn.queryPathInfos(*localStore, outputs); | ||||
|  | ||||
|             size_t totalNarSize = 0; | ||||
|             for (auto & [_, info] : infos) totalNarSize += info.narSize; | ||||
| @@ -522,11 +574,9 @@ void State::buildRemote(ref<Store> destStore, | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         /* Shut down the connection done by RAII. | ||||
|  | ||||
|            Only difference is kill() instead of wait() (i.e. send signal | ||||
|            then wait()) | ||||
|          */ | ||||
|         /* Shut down the connection. */ | ||||
|         child->in = -1; | ||||
|         child->sshPid.wait(); | ||||
|  | ||||
|     } catch (Error & e) { | ||||
|         /* Disable this machine until a certain period of time has | ||||
|   | ||||
| @@ -20,7 +20,9 @@ | ||||
| #include "store-api.hh" | ||||
| #include "sync.hh" | ||||
| #include "nar-extractor.hh" | ||||
| #include "legacy-ssh-store.hh" | ||||
| #include "serve-protocol.hh" | ||||
| #include "serve-protocol-impl.hh" | ||||
| #include "serve-protocol-connection.hh" | ||||
| #include "machines.hh" | ||||
|  | ||||
|  | ||||
| @@ -290,11 +292,9 @@ struct Machine : nix::Machine | ||||
|     bool isLocalhost() const; | ||||
|  | ||||
|     // A connection to a machine | ||||
|     struct Connection { | ||||
|     struct Connection : nix::ServeProto::BasicClientConnection { | ||||
|         // Backpointer to the machine | ||||
|         ptr machine; | ||||
|         // Opened store | ||||
|         nix::ref<nix::LegacySSHStore> store; | ||||
|     }; | ||||
| }; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user