From 08bf31b71a59c6e7f1dae0fb62d6a4888ad7cc53 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Thu, 11 Apr 2024 15:03:23 +0200 Subject: [PATCH] queue-runner: limit parallelism of CPU intensive operations My current theory is that running more parallel xz than available CPU cores is reducing our overall throughput by requiring more scheduling overhead and more cache thrashing. --- src/hydra-queue-runner/build-remote.cc | 18 ++++++++++++++++++ src/hydra-queue-runner/hydra-queue-runner.cc | 1 + src/hydra-queue-runner/state.hh | 7 +++++++ src/root/common.tt | 2 ++ 4 files changed, 28 insertions(+) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 79c32d46..0c8b3f10 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -386,6 +386,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) } +/* Utility guard object to auto-release a semaphore on destruction. */ +template +class SemaphoreReleaser { +public: + SemaphoreReleaser(T* s) : sem(s) {} + ~SemaphoreReleaser() { sem->release(); } + +private: + T* sem; +}; void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, @@ -527,6 +537,14 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + /* Throttle CPU-bound work. Opportunistically skip updating the current + * step, since this requires a DB roundtrip. */ + if (!localWorkThrottler.try_acquire()) { + updateStep(ssWaitingForLocalSlot); + localWorkThrottler.acquire(); + } + SemaphoreReleaser releaser(&localWorkThrottler); + StorePathSet outputs; for (auto & [_, realisation] : buildResult.builtOutputs) outputs.insert(realisation.outPath); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 64a98797..cf7d4056 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -85,6 +85,7 @@ State::State(std::optional metricsAddrOpt) : config(std::make_unique()) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , dbPool(config->getIntOption("max_db_connections", 128)) + , localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2))) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 8933720d..34b7a676 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -58,6 +60,7 @@ typedef enum { ssConnecting = 10, ssSendingInputs = 20, ssBuilding = 30, + ssWaitingForLocalSlot = 35, ssReceivingOutputs = 40, ssPostProcessing = 50, } StepState; @@ -353,6 +356,10 @@ private: typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr + /* Throttler for CPU-bound local work. */ + static constexpr unsigned int maxSupportedLocalWorkers = 1024; + std::counting_semaphore localWorkThrottler; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; diff --git a/src/root/common.tt b/src/root/common.tt index 842ad109..6348bee7 100644 --- a/src/root/common.tt +++ b/src/root/common.tt @@ -270,6 +270,8 @@ BLOCK renderBusyStatus; Sending inputs [% ELSIF step.busy == 30 %] Building + [% ELSIF step.busy == 35 %] + Waiting to receive outputs [% ELSIF step.busy == 40 %] Receiving outputs [% ELSIF step.busy == 50 %]