Merge pull request #1464 from NixOS/more-hydra.nixos.org-changes

More hydra.nixos.org changes
This commit is contained in:
Jörg Thalheim 2025-04-07 16:50:36 +00:00 committed by GitHub
commit 56170dd117
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 126 additions and 37 deletions

View File

@ -386,6 +386,16 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
} }
/* Utility guard object to auto-release a semaphore on destruction. */
template <typename T>
class SemaphoreReleaser {
public:
SemaphoreReleaser(T* s) : sem(s) {}
~SemaphoreReleaser() { sem->release(); }
private:
T* sem;
};
void State::buildRemote(ref<Store> destStore, void State::buildRemote(ref<Store> destStore,
::Machine::ptr machine, Step::ptr step, ::Machine::ptr machine, Step::ptr step,
@ -527,6 +537,14 @@ void State::buildRemote(ref<Store> destStore,
result.logFile = ""; result.logFile = "";
} }
/* Throttle CPU-bound work. Opportunistically skip updating the current
* step, since this requires a DB roundtrip. */
if (!localWorkThrottler.try_acquire()) {
updateStep(ssWaitingForLocalSlot);
localWorkThrottler.acquire();
}
SemaphoreReleaser releaser(&localWorkThrottler);
StorePathSet outputs; StorePathSet outputs;
for (auto & [_, realisation] : buildResult.builtOutputs) for (auto & [_, realisation] : buildResult.builtOutputs)
outputs.insert(realisation.outPath); outputs.insert(realisation.outPath);

View File

@ -40,13 +40,15 @@ void State::dispatcher()
printMsg(lvlDebug, "dispatcher woken up"); printMsg(lvlDebug, "dispatcher woken up");
nrDispatcherWakeups++; nrDispatcherWakeups++;
auto now1 = std::chrono::steady_clock::now(); auto t_before_work = std::chrono::steady_clock::now();
auto sleepUntil = doDispatch(); auto sleepUntil = doDispatch();
auto now2 = std::chrono::steady_clock::now(); auto t_after_work = std::chrono::steady_clock::now();
dispatchTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); prom.dispatcher_time_spent_running.Increment(
std::chrono::duration_cast<std::chrono::microseconds>(t_after_work - t_before_work).count());
dispatchTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(t_after_work - t_before_work).count();
/* Sleep until we're woken up (either because a runnable build /* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */ is added, or because a build finishes). */
@ -60,6 +62,10 @@ void State::dispatcher()
*dispatcherWakeup_ = false; *dispatcherWakeup_ = false;
} }
auto t_after_sleep = std::chrono::steady_clock::now();
prom.dispatcher_time_spent_waiting.Increment(
std::chrono::duration_cast<std::chrono::microseconds>(t_after_sleep - t_after_work).count());
} catch (std::exception & e) { } catch (std::exception & e) {
printError("dispatcher: %s", e.what()); printError("dispatcher: %s", e.what());
sleep(1); sleep(1);

View File

@ -70,10 +70,31 @@ State::PromMetrics::PromMetrics()
.Register(*registry) .Register(*registry)
.Add({}) .Add({})
) )
, queue_max_id( , dispatcher_time_spent_running(
prometheus::BuildGauge() prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_max_build_id_info") .Name("hydraqueuerunner_dispatcher_time_spent_running")
.Help("Maximum build record ID in the queue") .Help("Time (in micros) spent running the dispatcher")
.Register(*registry)
.Add({})
)
, dispatcher_time_spent_waiting(
prometheus::BuildCounter()
.Name("hydraqueuerunner_dispatcher_time_spent_waiting")
.Help("Time (in micros) spent waiting for the dispatcher to obtain work")
.Register(*registry)
.Add({})
)
, queue_monitor_time_spent_running(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_monitor_time_spent_running")
.Help("Time (in micros) spent running the queue monitor")
.Register(*registry)
.Add({})
)
, queue_monitor_time_spent_waiting(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_monitor_time_spent_waiting")
.Help("Time (in micros) spent waiting for the queue monitor to obtain work")
.Register(*registry) .Register(*registry)
.Add({}) .Add({})
) )
@ -85,6 +106,7 @@ State::State(std::optional<std::string> metricsAddrOpt)
: config(std::make_unique<HydraConfig>()) : config(std::make_unique<HydraConfig>())
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0)) , maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
, dbPool(config->getIntOption("max_db_connections", 128)) , dbPool(config->getIntOption("max_db_connections", 128))
, localWorkThrottler(config->getIntOption("max_local_worker_threads", std::min(maxSupportedLocalWorkers, std::max(4u, std::thread::hardware_concurrency()) - 2)))
, maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30)) , maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30))
, maxLogSize(config->getIntOption("max_log_size", 64ULL << 20)) , maxLogSize(config->getIntOption("max_log_size", 64ULL << 20))
, uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false)) , uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false))

View File

@ -2,6 +2,7 @@
#include "hydra-build-result.hh" #include "hydra-build-result.hh"
#include <nix/store/globals.hh> #include <nix/store/globals.hh>
#include <nix/store/parsed-derivations.hh> #include <nix/store/parsed-derivations.hh>
#include <nix/util/thread-pool.hh>
#include <cstring> #include <cstring>
@ -38,16 +39,21 @@ void State::queueMonitorLoop(Connection & conn)
auto destStore = getDestStore(); auto destStore = getDestStore();
unsigned int lastBuildId = 0;
bool quit = false; bool quit = false;
while (!quit) { while (!quit) {
auto t_before_work = std::chrono::steady_clock::now();
localStore->clearPathInfoCache(); localStore->clearPathInfoCache();
bool done = getQueuedBuilds(conn, destStore, lastBuildId); bool done = getQueuedBuilds(conn, destStore);
if (buildOne && buildOneDone) quit = true; if (buildOne && buildOneDone) quit = true;
auto t_after_work = std::chrono::steady_clock::now();
prom.queue_monitor_time_spent_running.Increment(
std::chrono::duration_cast<std::chrono::microseconds>(t_after_work - t_before_work).count());
/* Sleep until we get notification from the database about an /* Sleep until we get notification from the database about an
event. */ event. */
if (done && !quit) { if (done && !quit) {
@ -57,12 +63,10 @@ void State::queueMonitorLoop(Connection & conn)
conn.get_notifs(); conn.get_notifs();
if (auto lowestId = buildsAdded.get()) { if (auto lowestId = buildsAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
printMsg(lvlTalkative, "got notification: new builds added to the queue"); printMsg(lvlTalkative, "got notification: new builds added to the queue");
} }
if (buildsRestarted.get()) { if (buildsRestarted.get()) {
printMsg(lvlTalkative, "got notification: builds restarted"); printMsg(lvlTalkative, "got notification: builds restarted");
lastBuildId = 0; // check all builds
} }
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
@ -72,6 +76,10 @@ void State::queueMonitorLoop(Connection & conn)
printMsg(lvlTalkative, "got notification: jobset shares changed"); printMsg(lvlTalkative, "got notification: jobset shares changed");
processJobsetSharesChange(conn); processJobsetSharesChange(conn);
} }
auto t_after_sleep = std::chrono::steady_clock::now();
prom.queue_monitor_time_spent_waiting.Increment(
std::chrono::duration_cast<std::chrono::microseconds>(t_after_sleep - t_after_work).count());
} }
exit(0); exit(0);
@ -85,20 +93,18 @@ struct PreviousFailure : public std::exception {
bool State::getQueuedBuilds(Connection & conn, bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId) ref<Store> destStore)
{ {
prom.queue_checks_started.Increment(); prom.queue_checks_started.Increment();
printInfo("checking the queue for builds > %d...", lastBuildId); printInfo("checking the queue for builds...");
/* Grab the queued builds from the database, but don't process /* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */ them yet (since we don't want a long-running transaction). */
std::vector<BuildID> newIDs; std::vector<BuildID> newIDs;
std::map<BuildID, Build::ptr> newBuildsByID; std::unordered_map<BuildID, Build::ptr> newBuildsByID;
std::multimap<StorePath, BuildID> newBuildsByPath; std::multimap<StorePath, BuildID> newBuildsByPath;
unsigned int newLastBuildId = lastBuildId;
{ {
pqxx::work txn(conn); pqxx::work txn(conn);
@ -107,17 +113,12 @@ bool State::getQueuedBuilds(Connection & conn,
"jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, " "jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, "
"globalPriority, priority from Builds " "globalPriority, priority from Builds "
"inner join jobsets on builds.jobset_id = jobsets.id " "inner join jobsets on builds.jobset_id = jobsets.id "
"where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id", "where finished = 0 order by globalPriority desc, random()");
lastBuildId);
for (auto const & row : res) { for (auto const & row : res) {
auto builds_(builds.lock()); auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>(); BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue; if (buildOne && id != buildOne) continue;
if (id > newLastBuildId) {
newLastBuildId = id;
prom.queue_max_id.Set(id);
}
if (builds_->count(id)) continue; if (builds_->count(id)) continue;
auto build = std::make_shared<Build>( auto build = std::make_shared<Build>(
@ -319,15 +320,13 @@ bool State::getQueuedBuilds(Connection & conn,
/* Stop after a certain time to allow priority bumps to be /* Stop after a certain time to allow priority bumps to be
processed. */ processed. */
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) { if (std::chrono::system_clock::now() > start + std::chrono::seconds(60)) {
prom.queue_checks_early_exits.Increment(); prom.queue_checks_early_exits.Increment();
break; break;
} }
} }
prom.queue_checks_finished.Increment(); prom.queue_checks_finished.Increment();
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
return newBuildsByID.empty(); return newBuildsByID.empty();
} }
@ -406,6 +405,34 @@ void State::processQueueChange(Connection & conn)
} }
std::map<DrvOutput, std::optional<StorePath>> State::getMissingRemotePaths(
ref<Store> destStore,
const std::map<DrvOutput, std::optional<StorePath>> & paths)
{
Sync<std::map<DrvOutput, std::optional<StorePath>>> missing_;
ThreadPool tp;
for (auto & [output, maybeOutputPath] : paths) {
if (!maybeOutputPath) {
auto missing(missing_.lock());
missing->insert({output, maybeOutputPath});
} else {
tp.enqueue([&] {
if (!destStore->isValidPath(*maybeOutputPath)) {
auto missing(missing_.lock());
missing->insert({output, maybeOutputPath});
}
});
}
}
tp.process();
auto missing(missing_.lock());
return *missing;
}
Step::ptr State::createStep(ref<Store> destStore, Step::ptr State::createStep(ref<Store> destStore,
Connection & conn, Build::ptr build, const StorePath & drvPath, Connection & conn, Build::ptr build, const StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs,
@ -489,16 +516,15 @@ Step::ptr State::createStep(ref<Store> destStore,
/* Are all outputs valid? */ /* Are all outputs valid? */
auto outputHashes = staticOutputHashes(*localStore, *(step->drv)); auto outputHashes = staticOutputHashes(*localStore, *(step->drv));
bool valid = true; std::map<DrvOutput, std::optional<StorePath>> paths;
std::map<DrvOutput, std::optional<StorePath>> missing;
for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) { for (auto & [outputName, maybeOutputPath] : destStore->queryPartialDerivationOutputMap(drvPath, &*localStore)) {
auto outputHash = outputHashes.at(outputName); auto outputHash = outputHashes.at(outputName);
if (maybeOutputPath && destStore->isValidPath(*maybeOutputPath)) paths.insert({{outputHash, outputName}, maybeOutputPath});
continue;
valid = false;
missing.insert({{outputHash, outputName}, maybeOutputPath});
} }
auto missing = getMissingRemotePaths(destStore, paths);
bool valid = missing.empty();
/* Try to copy the missing paths from the local store or from /* Try to copy the missing paths from the local store or from
substitutes. */ substitutes. */
if (!missing.empty()) { if (!missing.empty()) {

View File

@ -6,6 +6,8 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <regex>
#include <semaphore>
#include <prometheus/counter.h> #include <prometheus/counter.h>
#include <prometheus/gauge.h> #include <prometheus/gauge.h>
@ -58,6 +60,7 @@ typedef enum {
ssConnecting = 10, ssConnecting = 10,
ssSendingInputs = 20, ssSendingInputs = 20,
ssBuilding = 30, ssBuilding = 30,
ssWaitingForLocalSlot = 35,
ssReceivingOutputs = 40, ssReceivingOutputs = 40,
ssPostProcessing = 50, ssPostProcessing = 50,
} StepState; } StepState;
@ -353,6 +356,10 @@ private:
typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines; typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Throttler for CPU-bound local work. */
static constexpr unsigned int maxSupportedLocalWorkers = 1024;
std::counting_semaphore<maxSupportedLocalWorkers> localWorkThrottler;
/* Various stats. */ /* Various stats. */
time_t startedAt; time_t startedAt;
counter nrBuildsRead{0}; counter nrBuildsRead{0};
@ -450,7 +457,12 @@ private:
prometheus::Counter& queue_steps_created; prometheus::Counter& queue_steps_created;
prometheus::Counter& queue_checks_early_exits; prometheus::Counter& queue_checks_early_exits;
prometheus::Counter& queue_checks_finished; prometheus::Counter& queue_checks_finished;
prometheus::Gauge& queue_max_id;
prometheus::Counter& dispatcher_time_spent_running;
prometheus::Counter& dispatcher_time_spent_waiting;
prometheus::Counter& queue_monitor_time_spent_running;
prometheus::Counter& queue_monitor_time_spent_waiting;
PromMetrics(); PromMetrics();
}; };
@ -494,8 +506,7 @@ private:
void queueMonitorLoop(Connection & conn); void queueMonitorLoop(Connection & conn);
/* Check the queue for new builds. */ /* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> destStore);
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);
@ -503,6 +514,12 @@ private:
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore, BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::StorePath & drvPath); const nix::StorePath & drvPath);
/* Returns paths missing from the remote store. Paths are processed in
* parallel to work around the possible latency of remote stores. */
std::map<nix::DrvOutput, std::optional<nix::StorePath>> getMissingRemotePaths(
nix::ref<nix::Store> destStore,
const std::map<nix::DrvOutput, std::optional<nix::StorePath>> & paths);
Step::ptr createStep(nix::ref<nix::Store> store, Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath, Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs, Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,

View File

@ -270,6 +270,8 @@ BLOCK renderBusyStatus;
<strong>Sending inputs</strong> <strong>Sending inputs</strong>
[% ELSIF step.busy == 30 %] [% ELSIF step.busy == 30 %]
<strong>Building</strong> <strong>Building</strong>
[% ELSIF step.busy == 35 %]
<strong>Waiting to receive outputs</strong>
[% ELSIF step.busy == 40 %] [% ELSIF step.busy == 40 %]
<strong>Receiving outputs</strong> <strong>Receiving outputs</strong>
[% ELSIF step.busy == 50 %] [% ELSIF step.busy == 50 %]

View File

@ -6,7 +6,6 @@
<thead> <thead>
<tr> <tr>
<th>Job</th> <th>Job</th>
<th>System</th>
<th>Build</th> <th>Build</th>
<th>Step</th> <th>Step</th>
<th>What</th> <th>What</th>
@ -41,7 +40,6 @@
[% idle = 0 %] [% idle = 0 %]
<tr> <tr>
<td><tt>[% INCLUDE renderFullJobName project=step.project jobset=step.jobset job=step.job %]</tt></td> <td><tt>[% INCLUDE renderFullJobName project=step.project jobset=step.jobset job=step.job %]</tt></td>
<td><tt>[% step.system %]</tt></td>
<td><a href="[% c.uri_for('/build' step.build) %]">[% step.build %]</a></td> <td><a href="[% c.uri_for('/build' step.build) %]">[% step.build %]</a></td>
<td>[% IF step.busy >= 30 %]<a class="row-link" href="[% c.uri_for('/build' step.build 'nixlog' step.stepnr 'tail') %]">[% step.stepnr %]</a>[% ELSE; step.stepnr; END %]</td> <td>[% IF step.busy >= 30 %]<a class="row-link" href="[% c.uri_for('/build' step.build 'nixlog' step.stepnr 'tail') %]">[% step.stepnr %]</a>[% ELSE; step.stepnr; END %]</td>
<td><tt>[% step.drvpath.match('-(.*)').0 %]</tt></td> <td><tt>[% step.drvpath.match('-(.*)').0 %]</tt></td>