diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 79c32d46..0c8b3f10 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -386,6 +386,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) } +/* Utility guard object to auto-release a semaphore on destruction. */ +template +class SemaphoreReleaser { +public: + SemaphoreReleaser(T* s) : sem(s) {} + ~SemaphoreReleaser() { sem->release(); } + +private: + T* sem; +}; void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, @@ -527,6 +537,14 @@ void State::buildRemote(ref destStore, result.logFile = ""; } + /* Throttle CPU-bound work. Opportunistically skip updating the current + * step, since this requires a DB roundtrip. */ + if (!localWorkThrottler.try_acquire()) { + updateStep(ssWaitingForLocalSlot); + localWorkThrottler.acquire(); + } + SemaphoreReleaser releaser(&localWorkThrottler); + StorePathSet outputs; for (auto & [_, realisation] : buildResult.builtOutputs) outputs.insert(realisation.outPath); diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index cbf982bf..11db0071 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -40,13 +40,15 @@ void State::dispatcher() printMsg(lvlDebug, "dispatcher woken up"); nrDispatcherWakeups++; - auto now1 = std::chrono::steady_clock::now(); + auto t_before_work = std::chrono::steady_clock::now(); auto sleepUntil = doDispatch(); - auto now2 = std::chrono::steady_clock::now(); + auto t_after_work = std::chrono::steady_clock::now(); - dispatchTimeMs += std::chrono::duration_cast(now2 - now1).count(); + prom.dispatcher_time_spent_running.Increment( + std::chrono::duration_cast(t_after_work - t_before_work).count()); + dispatchTimeMs += std::chrono::duration_cast(t_after_work - t_before_work).count(); /* Sleep until we're woken up (either because a runnable build is added, or because a build finishes). */ @@ -60,6 +62,10 @@ void State::dispatcher() *dispatcherWakeup_ = false; } + auto t_after_sleep = std::chrono::steady_clock::now(); + prom.dispatcher_time_spent_waiting.Increment( + std::chrono::duration_cast(t_after_sleep - t_after_work).count()); + } catch (std::exception & e) { printError("dispatcher: %s", e.what()); sleep(1); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 64a98797..405c44d1 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -70,10 +70,31 @@ State::PromMetrics::PromMetrics() .Register(*registry) .Add({}) ) - , queue_max_id( - prometheus::BuildGauge() - .Name("hydraqueuerunner_queue_max_build_id_info") - .Help("Maximum build record ID in the queue") + , dispatcher_time_spent_running( + prometheus::BuildCounter() + .Name("hydraqueuerunner_dispatcher_time_spent_running") + .Help("Time (in micros) spent running the dispatcher") + .Register(*registry) + .Add({}) + ) + , dispatcher_time_spent_waiting( + prometheus::BuildCounter() + .Name("hydraqueuerunner_dispatcher_time_spent_waiting") + .Help("Time (in micros) spent waiting for the dispatcher to obtain work") + .Register(*registry) + .Add({}) + ) + , queue_monitor_time_spent_running( + prometheus::BuildCounter() + .Name("hydraqueuerunner_queue_monitor_time_spent_running") + .Help("Time (in micros) spent running the queue monitor") + .Register(*registry) + .Add({}) + ) + , queue_monitor_time_spent_waiting( + prometheus::BuildCounter() + .Name("hydraqueuerunner_queue_monitor_time_spent_waiting") + .Help("Time (in micros) spent waiting for the queue monitor to obtain work") .Register(*registry) .Add({}) ) @@ -85,6 +106,7 @@ State::State(std::optional metricsAddrOpt) : config(std::make_unique()) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , dbPool(config->getIntOption("max_db_connections", 128)) + , localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2))) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 81bda873..bb15ac04 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -2,6 +2,7 @@ #include "hydra-build-result.hh" #include #include +#include #include @@ -38,16 +39,21 @@ void State::queueMonitorLoop(Connection & conn) auto destStore = getDestStore(); - unsigned int lastBuildId = 0; - bool quit = false; while (!quit) { + auto t_before_work = std::chrono::steady_clock::now(); + localStore->clearPathInfoCache(); - bool done = getQueuedBuilds(conn, destStore, lastBuildId); + bool done = getQueuedBuilds(conn, destStore); if (buildOne && buildOneDone) quit = true; + auto t_after_work = std::chrono::steady_clock::now(); + + prom.queue_monitor_time_spent_running.Increment( + std::chrono::duration_cast(t_after_work - t_before_work).count()); + /* Sleep until we get notification from the database about an event. */ if (done && !quit) { @@ -57,12 +63,10 @@ void State::queueMonitorLoop(Connection & conn) conn.get_notifs(); if (auto lowestId = buildsAdded.get()) { - lastBuildId = std::min(lastBuildId, static_cast(std::stoul(*lowestId) - 1)); printMsg(lvlTalkative, "got notification: new builds added to the queue"); } if (buildsRestarted.get()) { printMsg(lvlTalkative, "got notification: builds restarted"); - lastBuildId = 0; // check all builds } if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); @@ -72,6 +76,10 @@ void State::queueMonitorLoop(Connection & conn) printMsg(lvlTalkative, "got notification: jobset shares changed"); processJobsetSharesChange(conn); } + + auto t_after_sleep = std::chrono::steady_clock::now(); + prom.queue_monitor_time_spent_waiting.Increment( + std::chrono::duration_cast(t_after_sleep - t_after_work).count()); } exit(0); @@ -85,20 +93,18 @@ struct PreviousFailure : public std::exception { bool State::getQueuedBuilds(Connection & conn, - ref destStore, unsigned int & lastBuildId) + ref destStore) { prom.queue_checks_started.Increment(); - printInfo("checking the queue for builds > %d...", lastBuildId); + printInfo("checking the queue for builds..."); /* Grab the queued builds from the database, but don't process them yet (since we don't want a long-running transaction). */ std::vector newIDs; - std::map newBuildsByID; + std::unordered_map newBuildsByID; std::multimap newBuildsByPath; - unsigned int newLastBuildId = lastBuildId; - { pqxx::work txn(conn); @@ -107,17 +113,12 @@ bool State::getQueuedBuilds(Connection & conn, "jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, " "globalPriority, priority from Builds " "inner join jobsets on builds.jobset_id = jobsets.id " - "where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id", - lastBuildId); + "where finished = 0 order by globalPriority desc, random()"); for (auto const & row : res) { auto builds_(builds.lock()); BuildID id = row["id"].as(); if (buildOne && id != buildOne) continue; - if (id > newLastBuildId) { - newLastBuildId = id; - prom.queue_max_id.Set(id); - } if (builds_->count(id)) continue; auto build = std::make_shared( @@ -319,15 +320,13 @@ bool State::getQueuedBuilds(Connection & conn, /* Stop after a certain time to allow priority bumps to be processed. */ - if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) { + if (std::chrono::system_clock::now() > start + std::chrono::seconds(60)) { prom.queue_checks_early_exits.Increment(); break; } } prom.queue_checks_finished.Increment(); - - lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1; return newBuildsByID.empty(); } @@ -406,6 +405,34 @@ void State::processQueueChange(Connection & conn) } +std::map> State::getMissingRemotePaths( + ref destStore, + const std::map> & paths) +{ + Sync>> missing_; + ThreadPool tp; + + for (auto & [output, maybeOutputPath] : paths) { + if (!maybeOutputPath) { + auto missing(missing_.lock()); + missing->insert({output, maybeOutputPath}); + } else { + tp.enqueue([&] { + if (!destStore->isValidPath(*maybeOutputPath)) { + auto missing(missing_.lock()); + missing->insert({output, maybeOutputPath}); + } + }); + } + } + + tp.process(); + + auto missing(missing_.lock()); + return *missing; +} + + Step::ptr State::createStep(ref destStore, Connection & conn, Build::ptr build, const StorePath & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, @@ -489,16 +516,15 @@ Step::ptr State::createStep(ref destStore, /* Are all outputs valid? */ auto outputHashes = staticOutputHashes(*localStore, *(step->drv)); - bool valid = true; - std::map> missing; + std::map> paths; for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) { auto outputHash = outputHashes.at(outputName); - if (maybeOutputPath && destStore->isValidPath(*maybeOutputPath)) - continue; - valid = false; - missing.insert({{outputHash, outputName}, maybeOutputPath}); + paths.insert({{outputHash, outputName}, maybeOutputPath}); } + auto missing = getMissingRemotePaths(destStore, paths); + bool valid = missing.empty(); + /* Try to copy the missing paths from the local store or from substitutes. */ if (!missing.empty()) { diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 8933720d..18101a0a 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -58,6 +60,7 @@ typedef enum { ssConnecting = 10, ssSendingInputs = 20, ssBuilding = 30, + ssWaitingForLocalSlot = 35, ssReceivingOutputs = 40, ssPostProcessing = 50, } StepState; @@ -353,6 +356,10 @@ private: typedef std::map Machines; nix::Sync machines; // FIXME: use atomic_shared_ptr + /* Throttler for CPU-bound local work. */ + static constexpr unsigned int maxSupportedLocalWorkers = 1024; + std::counting_semaphore localWorkThrottler; + /* Various stats. */ time_t startedAt; counter nrBuildsRead{0}; @@ -450,7 +457,12 @@ private: prometheus::Counter& queue_steps_created; prometheus::Counter& queue_checks_early_exits; prometheus::Counter& queue_checks_finished; - prometheus::Gauge& queue_max_id; + + prometheus::Counter& dispatcher_time_spent_running; + prometheus::Counter& dispatcher_time_spent_waiting; + + prometheus::Counter& queue_monitor_time_spent_running; + prometheus::Counter& queue_monitor_time_spent_waiting; PromMetrics(); }; @@ -494,8 +506,7 @@ private: void queueMonitorLoop(Connection & conn); /* Check the queue for new builds. */ - bool getQueuedBuilds(Connection & conn, - nix::ref destStore, unsigned int & lastBuildId); + bool getQueuedBuilds(Connection & conn, nix::ref destStore); /* Handle cancellation, deletion and priority bumps. */ void processQueueChange(Connection & conn); @@ -503,6 +514,12 @@ private: BuildOutput getBuildOutputCached(Connection & conn, nix::ref destStore, const nix::StorePath & drvPath); + /* Returns paths missing from the remote store. Paths are processed in + * parallel to work around the possible latency of remote stores. */ + std::map> getMissingRemotePaths( + nix::ref destStore, + const std::map> & paths); + Step::ptr createStep(nix::ref store, Connection & conn, Build::ptr build, const nix::StorePath & drvPath, Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, diff --git a/src/root/common.tt b/src/root/common.tt index 842ad109..6348bee7 100644 --- a/src/root/common.tt +++ b/src/root/common.tt @@ -270,6 +270,8 @@ BLOCK renderBusyStatus; Sending inputs [% ELSIF step.busy == 30 %] Building + [% ELSIF step.busy == 35 %] + Waiting to receive outputs [% ELSIF step.busy == 40 %] Receiving outputs [% ELSIF step.busy == 50 %] diff --git a/src/root/machine-status.tt b/src/root/machine-status.tt index 3af5073c..07a2359d 100644 --- a/src/root/machine-status.tt +++ b/src/root/machine-status.tt @@ -6,7 +6,6 @@ Job - System Build Step What @@ -41,7 +40,6 @@ [% idle = 0 %] [% INCLUDE renderFullJobName project=step.project jobset=step.jobset job=step.job %] - [% step.system %] [% step.build %] [% IF step.busy >= 30 %][% step.stepnr %][% ELSE; step.stepnr; END %] [% step.drvpath.match('-(.*)').0 %]