From 1e2d3211d9fec6dd9e00667bc5c12203ee0bdaa4 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Thu, 11 Apr 2024 15:03:23 +0200 Subject: [PATCH] queue-runner: limit parallelism of CPU intensive operations My current theory is that running more parallel xz than available CPU cores is reducing our overall throughput by requiring more scheduling overhead and more cache thrashing. --- src/hydra-queue-runner/build-remote.cc | 18 ++++++++++++++++++ src/hydra-queue-runner/hydra-queue-runner.cc | 1 + src/hydra-queue-runner/state.hh | 7 +++++++ src/root/common.tt | 2 ++ 4 files changed, 28 insertions(+) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 79c32d46..0c8b3f10 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -386,6 +386,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) } +/* Utility guard object to auto-release a semaphore on destruction. */ +template +class SemaphoreReleaser { +public: + SemaphoreReleaser(T* s) : sem(s) {} + ~SemaphoreReleaser() { sem->release(); } + +private: + T* sem; +}; void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, @@ -527,6 +537,14 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + /* Throttle CPU-bound work. Opportunistically skip updating the current + * step, since this requires a DB roundtrip. */ + if (!localWorkThrottler.try_acquire()) { + updateStep(ssWaitingForLocalSlot); + localWorkThrottler.acquire(); + } + SemaphoreReleaser releaser(&localWorkThrottler); + StorePathSet outputs; for (auto & [_, realisation] : buildResult.builtOutputs) outputs.insert(realisation.outPath); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 64a98797..cf7d4056 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -85,6 +85,7 @@ State::State(std::optional metricsAddrOpt) : config(std::make_unique()) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , dbPool(config->getIntOption("max_db_connections", 128)) + , localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2))) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 8933720d..34b7a676 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -58,6 +60,7 @@ typedef enum { ssConnecting = 10, ssSendingInputs = 20, ssBuilding = 30, + ssWaitingForLocalSlot = 35, ssReceivingOutputs = 40, ssPostProcessing = 50, } StepState; @@ -353,6 +356,10 @@ private: typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr + /* Throttler for CPU-bound local work. */ + static constexpr unsigned int maxSupportedLocalWorkers = 1024; + std::counting_semaphore localWorkThrottler; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; diff --git a/src/root/common.tt b/src/root/common.tt index 842ad109..6348bee7 100644 --- a/src/root/common.tt +++ b/src/root/common.tt @@ -270,6 +270,8 @@ BLOCK renderBusyStatus; Sending inputs [% ELSIF step.busy == 30 %] Building + [% ELSIF step.busy == 35 %] + Waiting to receive outputs [% ELSIF step.busy == 40 %] Receiving outputs [% ELSIF step.busy == 50 %]