From 5a9985f96cd00278a137bbbca8ee2789acfae093 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Thu, 11 Apr 2024 17:12:47 +0200 Subject: [PATCH 1/7] web: Skip System on /machines It is redundant --- src/root/machine-status.tt | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/root/machine-status.tt b/src/root/machine-status.tt index 3af5073c..07a2359d 100644 --- a/src/root/machine-status.tt +++ b/src/root/machine-status.tt @@ -6,7 +6,6 @@ Job - System Build Step What @@ -41,7 +40,6 @@ [% idle = 0 %] [% INCLUDE renderFullJobName project=step.project jobset=step.jobset job=step.job %] - [% step.system %] [% step.build %] [% IF step.busy >= 30 %][% step.stepnr %][% ELSE; step.stepnr; END %] [% step.drvpath.match('-(.*)').0 %] From 1e2d3211d9fec6dd9e00667bc5c12203ee0bdaa4 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Thu, 11 Apr 2024 15:03:23 +0200 Subject: [PATCH 2/7] queue-runner: limit parallelism of CPU intensive operations My current theory is that running more parallel xz than available CPU cores is reducing our overall throughput by requiring more scheduling overhead and more cache thrashing. --- src/hydra-queue-runner/build-remote.cc | 18 ++++++++++++++++++ src/hydra-queue-runner/hydra-queue-runner.cc | 1 + src/hydra-queue-runner/state.hh | 7 +++++++ src/root/common.tt | 2 ++ 4 files changed, 28 insertions(+) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 79c32d46..0c8b3f10 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -386,6 +386,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) } +/* Utility guard object to auto-release a semaphore on destruction. */ +template +class SemaphoreReleaser { +public: + SemaphoreReleaser(T* s) : sem(s) {} + ~SemaphoreReleaser() { sem->release(); } + +private: + T* sem; +}; void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, @@ -527,6 +537,14 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + /* Throttle CPU-bound work. Opportunistically skip updating the current + * step, since this requires a DB roundtrip. */ + if (!localWorkThrottler.try_acquire()) { + updateStep(ssWaitingForLocalSlot); + localWorkThrottler.acquire(); + } + SemaphoreReleaser releaser(&localWorkThrottler); + StorePathSet outputs; for (auto & [_, realisation] : buildResult.builtOutputs) outputs.insert(realisation.outPath); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 64a98797..cf7d4056 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -85,6 +85,7 @@ State::State(std::optional metricsAddrOpt) : config(std::make_unique()) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , dbPool(config->getIntOption("max_db_connections", 128)) + , localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2))) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 8933720d..34b7a676 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -58,6 +60,7 @@ typedef enum { ssConnecting = 10, ssSendingInputs = 20, ssBuilding = 30, + ssWaitingForLocalSlot = 35, ssReceivingOutputs = 40, ssPostProcessing = 50, } StepState; @@ -353,6 +356,10 @@ private: typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr + /* Throttler for CPU-bound local work. */ + static constexpr unsigned int maxSupportedLocalWorkers = 1024; + std::counting_semaphore localWorkThrottler; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; diff --git a/src/root/common.tt b/src/root/common.tt index 842ad109..6348bee7 100644 --- a/src/root/common.tt +++ b/src/root/common.tt @@ -270,6 +270,8 @@ BLOCK renderBusyStatus; Sending inputs [% ELSIF step.busy == 30 %] Building + [% ELSIF step.busy == 35 %] + Waiting to receive outputs [% ELSIF step.busy == 40 %] Receiving outputs [% ELSIF step.busy == 50 %] From efcf6815d95134050b0f280668af7655aae9ef72 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Sat, 20 Apr 2024 16:48:03 +0200 Subject: [PATCH 3/7] queue-runner: add prom metrics to allow detecting internal bottlenecks By looking at the ratio of running vs. waiting for the dispatcher and the queue monitor, we should get better visibility into what hydra is currently bottlenecked on. There are other side effects we can try to measure to get to the same result, but having a simple way doesn't cost us much. --- src/hydra-queue-runner/dispatcher.cc | 12 ++++++--- src/hydra-queue-runner/hydra-queue-runner.cc | 28 ++++++++++++++++++++ src/hydra-queue-runner/queue-monitor.cc | 11 ++++++++ src/hydra-queue-runner/state.hh | 6 +++++ 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index cbf982bf..11db0071 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -40,13 +40,15 @@ void State::dispatcher() printMsg(lvlDebug, "dispatcher woken up"); nrDispatcherWakeups++; - auto now1 = std::chrono::steady_clock::now(); + auto t_before_work = std::chrono::steady_clock::now(); auto sleepUntil = doDispatch(); - auto now2 = std::chrono::steady_clock::now(); + auto t_after_work = std::chrono::steady_clock::now(); - dispatchTimeMs += std::chrono::duration_cast(now2 - now1).count(); + prom.dispatcher_time_spent_running.Increment( + std::chrono::duration_cast(t_after_work - t_before_work).count()); + dispatchTimeMs += std::chrono::duration_cast(t_after_work - t_before_work).count(); /* Sleep until we're woken up (either because a runnable build is added, or because a build finishes). */ @@ -60,6 +62,10 @@ void State::dispatcher() *dispatcherWakeup_ = false; } + auto t_after_sleep = std::chrono::steady_clock::now(); + prom.dispatcher_time_spent_waiting.Increment( + std::chrono::duration_cast(t_after_sleep - t_after_work).count()); + } catch (std::exception & e) { printError("dispatcher: %s", e.what()); sleep(1); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index cf7d4056..8123fd39 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -77,6 +77,34 @@ State::PromMetrics::PromMetrics() .Register(*registry) .Add({}) ) + , dispatcher_time_spent_running( + prometheus::BuildCounter() + .Name("hydraqueuerunner_dispatcher_time_spent_running") + .Help("Time (in micros) spent running the dispatcher") + .Register(*registry) + .Add({}) + ) + , dispatcher_time_spent_waiting( + prometheus::BuildCounter() + .Name("hydraqueuerunner_dispatcher_time_spent_waiting") + .Help("Time (in micros) spent waiting for the dispatcher to obtain work") + .Register(*registry) + .Add({}) + ) + , queue_monitor_time_spent_running( + prometheus::BuildCounter() + .Name("hydraqueuerunner_queue_monitor_time_spent_running") + .Help("Time (in micros) spent running the queue monitor") + .Register(*registry) + .Add({}) + ) + , queue_monitor_time_spent_waiting( + prometheus::BuildCounter() + .Name("hydraqueuerunner_queue_monitor_time_spent_waiting") + .Help("Time (in micros) spent waiting for the queue monitor to obtain work") + .Register(*registry) + .Add({}) + ) { } diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 81bda873..3af0752a 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -42,12 +42,19 @@ void State::queueMonitorLoop(Connection & conn) bool quit = false; while (!quit) { + auto t_before_work = std::chrono::steady_clock::now(); + localStore->clearPathInfoCache(); bool done = getQueuedBuilds(conn, destStore, lastBuildId); if (buildOne && buildOneDone) quit = true; + auto t_after_work = std::chrono::steady_clock::now(); + + prom.queue_monitor_time_spent_running.Increment( + std::chrono::duration_cast(t_after_work - t_before_work).count()); + /* Sleep until we get notification from the database about an event. */ if (done && !quit) { @@ -72,6 +79,10 @@ void State::queueMonitorLoop(Connection & conn) printMsg(lvlTalkative, "got notification: jobset shares changed"); processJobsetSharesChange(conn); } + + auto t_after_sleep = std::chrono::steady_clock::now(); + prom.queue_monitor_time_spent_waiting.Increment( + std::chrono::duration_cast(t_after_sleep - t_after_work).count()); } exit(0); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 34b7a676..5e05157b 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -459,6 +459,12 @@ private: prometheus::Counter& queue_checks_finished; prometheus::Gauge& queue_max_id; + prometheus::Counter& dispatcher_time_spent_running; + prometheus::Counter& dispatcher_time_spent_waiting; + + prometheus::Counter& queue_monitor_time_spent_running; + prometheus::Counter& queue_monitor_time_spent_waiting; + PromMetrics(); }; PromMetrics prom; From d8ffa6b56a49dc6705b1fc3a59f6acfe64dd106a Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Sat, 20 Apr 2024 16:53:52 +0200 Subject: [PATCH 4/7] queue-runner: remove id > X from new builds query Running the query with/without it shows that it makes no difference to postgres, since there's an index on finished=0 already. This allows a few simplifications, but also paves the way towards running multiple parallel monitor threads in the future. --- src/hydra-queue-runner/hydra-queue-runner.cc | 7 ------- src/hydra-queue-runner/queue-monitor.cc | 21 ++++---------------- src/hydra-queue-runner/state.hh | 4 +--- 3 files changed, 5 insertions(+), 27 deletions(-) diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 8123fd39..405c44d1 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -70,13 +70,6 @@ State::PromMetrics::PromMetrics() .Register(*registry) .Add({}) ) - , queue_max_id( - prometheus::BuildGauge() - .Name("hydraqueuerunner_queue_max_build_id_info") - .Help("Maximum build record ID in the queue") - .Register(*registry) - .Add({}) - ) , dispatcher_time_spent_running( prometheus::BuildCounter() .Name("hydraqueuerunner_dispatcher_time_spent_running") diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 3af0752a..a9d386d0 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -38,15 +38,13 @@ void State::queueMonitorLoop(Connection & conn) auto destStore = getDestStore(); - unsigned int lastBuildId = 0; - bool quit = false; while (!quit) { auto t_before_work = std::chrono::steady_clock::now(); localStore->clearPathInfoCache(); - bool done = getQueuedBuilds(conn, destStore, lastBuildId); + bool done = getQueuedBuilds(conn, destStore); if (buildOne && buildOneDone) quit = true; @@ -64,12 +62,10 @@ void State::queueMonitorLoop(Connection & conn) conn.get_notifs(); if (auto lowestId = buildsAdded.get()) { - lastBuildId = std::min(lastBuildId, static_cast(std::stoul(*lowestId) - 1)); printMsg(lvlTalkative, "got notification: new builds added to the queue"); } if (buildsRestarted.get()) { printMsg(lvlTalkative, "got notification: builds restarted"); - lastBuildId = 0; // check all builds } if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); @@ -96,11 +92,11 @@ struct PreviousFailure : public std::exception { bool State::getQueuedBuilds(Connection & conn, - ref destStore, unsigned int & lastBuildId) + ref destStore) { prom.queue_checks_started.Increment(); - printInfo("checking the queue for builds > %d...", lastBuildId); + printInfo("checking the queue for builds..."); /* Grab the queued builds from the database, but don't process them yet (since we don't want a long-running transaction). */ @@ -108,8 +104,6 @@ bool State::getQueuedBuilds(Connection & conn, std::map newBuildsByID; std::multimap newBuildsByPath; - unsigned int newLastBuildId = lastBuildId; - { pqxx::work txn(conn); @@ -118,17 +112,12 @@ bool State::getQueuedBuilds(Connection & conn, "jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, " "globalPriority, priority from Builds " "inner join jobsets on builds.jobset_id = jobsets.id " - "where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id", - lastBuildId); + "where finished = 0 order by globalPriority desc, builds.id"); for (auto const & row : res) { auto builds_(builds.lock()); BuildID id = row["id"].as(); if (buildOne && id != buildOne) continue; - if (id > newLastBuildId) { - newLastBuildId = id; - prom.queue_max_id.Set(id); - } if (builds_->count(id)) continue; auto build = std::make_shared( @@ -337,8 +326,6 @@ bool State::getQueuedBuilds(Connection & conn, } prom.queue_checks_finished.Increment(); - - lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1; return newBuildsByID.empty(); } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 5e05157b..4cb295e7 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -457,7 +457,6 @@ private: prometheus::Counter& queue_steps_created; prometheus::Counter& queue_checks_early_exits; prometheus::Counter& queue_checks_finished; - prometheus::Gauge& queue_max_id; prometheus::Counter& dispatcher_time_spent_running; prometheus::Counter& dispatcher_time_spent_waiting; @@ -507,8 +506,7 @@ private: void queueMonitorLoop(Connection & conn); /* Check the queue for new builds. */ - bool getQueuedBuilds(Connection & conn, - nix::ref destStore, unsigned int & lastBuildId); + bool getQueuedBuilds(Connection & conn, nix::ref destStore); /* Handle cancellation, deletion and priority bumps. */ void processQueueChange(Connection & conn); From 9265fc5002f6c1073d32c0ced999f334fefd4bc6 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Sat, 20 Apr 2024 16:58:10 +0200 Subject: [PATCH 5/7] queue-runner: reduce the time between queue monitor restarts This will induce more DB queries (though these are fairly cheap), but at the benefit of processing bumps within 1m instead of within 10m. --- src/hydra-queue-runner/queue-monitor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index a9d386d0..2049a6c1 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -319,7 +319,7 @@ bool State::getQueuedBuilds(Connection & conn, /* Stop after a certain time to allow priority bumps to be processed. */ - if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) { + if (std::chrono::system_clock::now() > start + std::chrono::seconds(60)) { prom.queue_checks_early_exits.Increment(); break; } From 52a0199a9bfbb499b564a77d33fe615f39899f6b Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Sat, 20 Apr 2024 22:18:13 +0200 Subject: [PATCH 6/7] queue runner: introduce some parallelism for remote paths lookup Each output for a given step being ingested is looked up in parallel, which should basically multiply the speed of builds ingestion by the average number of outputs per derivation. --- src/hydra-queue-runner/queue-monitor.cc | 40 +++++++++++++++++++++---- src/hydra-queue-runner/state.hh | 6 ++++ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 2049a6c1..9eab6e90 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -2,6 +2,7 @@ #include "hydra-build-result.hh" #include #include +#include #include @@ -404,6 +405,34 @@ void State::processQueueChange(Connection & conn) } +std::map> State::getMissingRemotePaths( + ref destStore, + const std::map> & paths) +{ + Sync>> missing_; + ThreadPool tp; + + for (auto & [output, maybeOutputPath] : paths) { + if (!maybeOutputPath) { + auto missing(missing_.lock()); + missing->insert({output, maybeOutputPath}); + } else { + tp.enqueue([&] { + if (!destStore->isValidPath(*maybeOutputPath)) { + auto missing(missing_.lock()); + missing->insert({output, maybeOutputPath}); + } + }); + } + } + + tp.process(); + + auto missing(missing_.lock()); + return *missing; +} + + Step::ptr State::createStep(ref destStore, Connection & conn, Build::ptr build, const StorePath & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, @@ -487,16 +516,15 @@ Step::ptr State::createStep(ref destStore, /* Are all outputs valid? */ auto outputHashes = staticOutputHashes(*localStore, *(step->drv)); - bool valid = true; - std::map> missing; + std::map> paths; for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) { auto outputHash = outputHashes.at(outputName); - if (maybeOutputPath && destStore->isValidPath(*maybeOutputPath)) - continue; - valid = false; - missing.insert({{outputHash, outputName}, maybeOutputPath}); + paths.insert({{outputHash, outputName}, maybeOutputPath}); } + auto missing = getMissingRemotePaths(destStore, paths); + bool valid = missing.empty(); + /* Try to copy the missing paths from the local store or from substitutes. */ if (!missing.empty()) { diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 4cb295e7..18101a0a 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -514,6 +514,12 @@ private: BuildOutput getBuildOutputCached(Connection & conn, nix::ref destStore, const nix::StorePath & drvPath); + /* Returns paths missing from the remote store. Paths are processed in + * parallel to work around the possible latency of remote stores. */ + std::map> getMissingRemotePaths( + nix::ref destStore, + const std::map> & paths); + Step::ptr createStep(nix::ref store, Connection & conn, Build::ptr build, const nix::StorePath & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, From 8e02589ac8d6c0c71d55d3a0fa75d321b0af2e50 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Sat, 20 Apr 2024 22:49:24 +0200 Subject: [PATCH 7/7] queue-runner: switch to pseudorandom ordering of builds processing We don't rely on sequential / monotonic build IDs processing anymore, so randomizing actually has the advantage of mixing builds for different systems together, to avoid only one chunk of builds for a single system getting processed while builders for other systems are starved. --- src/hydra-queue-runner/queue-monitor.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 9eab6e90..bb15ac04 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -102,7 +102,7 @@ bool State::getQueuedBuilds(Connection & conn, /* Grab the queued builds from the database, but don't process them yet (since we don't want a long-running transaction). */ std::vector newIDs; - std::map newBuildsByID; + std::unordered_map newBuildsByID; std::multimap newBuildsByPath; { @@ -113,7 +113,7 @@ bool State::getQueuedBuilds(Connection & conn, "jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, " "globalPriority, priority from Builds " "inner join jobsets on builds.jobset_id = jobsets.id " - "where finished = 0 order by globalPriority desc, builds.id"); + "where finished = 0 order by globalPriority desc, random()"); for (auto const & row : res) { auto builds_(builds.lock());