586 lines
16 KiB
C++
Raw Normal View History

2015-07-07 10:17:21 +02:00
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <memory>
#include <queue>
#include <regex>
#include <semaphore>
2015-07-07 10:17:21 +02:00
#include <prometheus/counter.h>
#include <prometheus/gauge.h>
#include <prometheus/registry.h>
2015-07-07 10:17:21 +02:00
#include "db.hh"
2025-04-07 11:36:59 -04:00
#include <nix/store/derivations.hh>
#include <nix/store/derivation-options.hh>
#include <nix/store/pathlocks.hh>
#include <nix/util/pool.hh>
#include <nix/store/build-result.hh>
#include <nix/store/store-api.hh>
#include <nix/util/sync.hh>
#include "nar-extractor.hh"
2025-04-07 11:36:59 -04:00
#include <nix/store/serve-protocol.hh>
#include <nix/store/serve-protocol-impl.hh>
#include <nix/store/serve-protocol-connection.hh>
#include <nix/store/machines.hh>
2016-02-20 00:04:08 +01:00
2015-07-07 10:17:21 +02:00
typedef unsigned int BuildID;
2022-01-09 08:58:36 -05:00
typedef unsigned int JobsetID;
2015-07-07 10:17:21 +02:00
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
2017-09-14 17:22:48 +02:00
typedef std::atomic<unsigned long> counter;
2015-07-07 10:17:21 +02:00
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2, // builds only
2015-07-07 10:17:21 +02:00
bsAborted = 3,
bsCancelled = 4,
bsFailedWithOutput = 6, // builds only
2015-07-07 10:17:21 +02:00
bsTimedOut = 7,
bsCachedFailure = 8, // steps only
2015-07-07 10:17:21 +02:00
bsUnsupported = 9,
bsLogLimitExceeded = 10,
bsNarSizeLimitExceeded = 11,
bsNotDeterministic = 12,
bsBusy = 100, // not stored
2015-07-07 10:17:21 +02:00
} BuildStatus;
typedef enum {
ssPreparing = 1,
ssConnecting = 10,
ssSendingInputs = 20,
ssBuilding = 30,
ssWaitingForLocalSlot = 35,
ssReceivingOutputs = 40,
ssPostProcessing = 50,
} StepState;
struct RemoteResult
2015-07-07 10:25:33 +02:00
{
BuildStatus stepStatus = bsAborted;
bool canRetry = false; // for bsAborted
bool isCached = false; // for bsSucceed
bool canCache = false; // for bsFailed
std::string errorMsg; // for bsAborted
unsigned int timesBuilt = 0;
bool isNonDeterministic = false;
2015-07-07 10:25:33 +02:00
time_t startTime = 0, stopTime = 0;
unsigned int overhead = 0;
2015-07-07 10:25:33 +02:00
nix::Path logFile;
BuildStatus buildStatus() const
{
return stepStatus == bsCachedFailure ? bsFailed : stepStatus;
}
2022-03-21 12:14:37 +01:00
void updateWithBuildResult(const nix::BuildResult &);
2015-07-07 10:25:33 +02:00
};
2015-07-07 10:17:21 +02:00
struct Step;
struct BuildOutput;
2015-07-07 10:17:21 +02:00
class Jobset
{
public:
typedef std::shared_ptr<Jobset> ptr;
typedef std::weak_ptr<Jobset> wptr;
static const time_t schedulingWindow = 24 * 60 * 60;
private:
std::atomic<time_t> seconds{0};
std::atomic<unsigned int> shares{1};
/* The start time and duration of the most recent build steps. */
2016-02-24 14:04:31 +01:00
nix::Sync<std::map<time_t, time_t>> steps;
public:
double shareUsed()
{
return (double) seconds / shares;
}
void setShares(int shares_)
{
assert(shares_ > 0);
shares = shares_;
}
time_t getSeconds() { return seconds; }
void addStep(time_t startTime, time_t duration);
void pruneSteps();
};
2015-07-07 10:17:21 +02:00
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
nix::StorePath drvPath;
std::map<std::string, nix::StorePath> outputs;
2022-01-09 08:58:36 -05:00
JobsetID jobsetId;
std::string projectName, jobsetName, jobName;
time_t timestamp;
2015-07-07 10:17:21 +02:00
unsigned int maxSilentTime, buildTimeout;
int localPriority, globalPriority;
2015-07-07 10:17:21 +02:00
std::shared_ptr<Step> toplevel;
Jobset::ptr jobset;
2015-07-07 10:17:21 +02:00
std::atomic_bool finishedInDB{false};
Build(nix::StorePath && drvPath) : drvPath(std::move(drvPath))
{ }
std::string fullJobName()
{
return projectName + ":" + jobsetName + ":" + jobName;
}
void propagatePriorities();
2015-07-07 10:17:21 +02:00
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
nix::StorePath drvPath;
std::unique_ptr<nix::Derivation> drv;
2025-04-07 11:12:12 -04:00
std::unique_ptr<nix::DerivationOptions> drvOptions;
2015-07-07 10:17:21 +02:00
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
bool isDeterministic;
std::string systemType; // concatenation of drv.platform and requiredSystemFeatures
2015-07-07 10:17:21 +02:00
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Jobsets to which this step belongs. Used for determining
scheduling priority. */
std::set<Jobset::ptr> jobsets;
2015-07-07 10:17:21 +02:00
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
/* The highest global priority of any build depending on this
step. */
int highestGlobalPriority{0};
/* The highest local priority of any build depending on this
step. */
int highestLocalPriority{0};
/* The lowest ID of any build depending on this step. */
BuildID lowestBuildID{std::numeric_limits<BuildID>::max()};
/* The time at which this step became runnable. */
system_time runnableSince;
/* The time that we last saw a machine that supports this
step. */
system_time lastSupported = std::chrono::system_clock::now();
2015-07-07 10:17:21 +02:00
};
std::atomic_bool finished{false}; // debugging
2016-02-24 14:04:31 +01:00
nix::Sync<State> state;
2015-07-07 10:17:21 +02:00
2020-06-23 13:43:54 +02:00
Step(const nix::StorePath & drvPath) : drvPath(drvPath)
{ }
2015-07-07 10:17:21 +02:00
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
2015-07-21 15:14:17 +02:00
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps);
/* Call visitor for a step and all its dependencies. */
void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr step);
2015-07-21 15:14:17 +02:00
struct Machine : nix::Machine
2015-07-07 10:17:21 +02:00
{
typedef std::shared_ptr<Machine> ptr;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
std::atomic<time_t> idleSince{0};
struct ConnectInfo
{
system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures;
};
2016-02-24 14:04:31 +01:00
nix::Sync<ConnectInfo> connectInfo;
/* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */
std::timed_mutex sendLock;
2015-07-07 10:17:21 +02:00
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
2017-07-21 17:22:11 +02:00
/* Check that this machine is of the type required by the
step. */
if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
return false;
2017-07-21 17:22:11 +02:00
/* Check that the step requires all mandatory features of this
machine. (Thus, a machine with the mandatory "benchmark"
feature will *only* execute steps that require
"benchmark".) The "preferLocalBuild" bit of a step is
mapped to the "local" feature; thus machines that have
"local" as a mandatory feature will only do
preferLocalBuild steps. */
2015-07-07 10:17:21 +02:00
for (auto & f : mandatoryFeatures)
2017-07-21 17:22:11 +02:00
if (!step->requiredSystemFeatures.count(f)
&& !(f == "local" && step->preferLocalBuild))
2015-07-07 10:17:21 +02:00
return false;
2017-07-21 17:22:11 +02:00
/* Check that the machine supports all features required by
the step. */
2015-07-07 10:17:21 +02:00
for (auto & f : step->requiredSystemFeatures)
2017-07-21 17:22:11 +02:00
if (!supportedFeatures.count(f)) return false;
2015-07-07 10:17:21 +02:00
return true;
}
bool isLocalhost() const;
// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
// Backpointer to the machine
ptr machine;
};
2015-07-07 10:17:21 +02:00
};
2020-07-08 12:50:02 +02:00
class HydraConfig;
2015-07-07 10:17:21 +02:00
class State
{
private:
2020-07-08 12:50:02 +02:00
std::unique_ptr<HydraConfig> config;
2015-07-21 15:14:17 +02:00
// FIXME: Make configurable.
const unsigned int maxTries = 5;
const unsigned int retryInterval = 60; // seconds
const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
/* Time in seconds before unsupported build steps are aborted. */
const unsigned int maxUnsupportedTime = 0;
2015-07-07 10:29:43 +02:00
nix::Path hydraData, logDir;
2015-07-07 10:17:21 +02:00
bool useSubstitutes = false;
2015-07-07 10:17:21 +02:00
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
2016-02-24 14:04:31 +01:00
nix::Sync<Builds> builds;
2015-07-07 10:17:21 +02:00
/* The jobsets. */
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
2016-02-24 14:04:31 +01:00
nix::Sync<Jobsets> jobsets;
2015-07-07 10:17:21 +02:00
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<nix::StorePath, Step::wptr> Steps;
2016-02-24 14:04:31 +01:00
nix::Sync<Steps> steps;
2015-07-07 10:17:21 +02:00
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
2016-02-24 14:04:31 +01:00
nix::Sync<Runnable> runnable;
2015-07-07 10:17:21 +02:00
/* CV for waking up the dispatcher. */
2016-02-24 14:04:31 +01:00
nix::Sync<bool> dispatcherWakeup;
std::condition_variable dispatcherWakeupCV;
2015-07-07 10:17:21 +02:00
/* PostgreSQL connection pool. */
2016-02-24 14:04:31 +01:00
nix::Pool<Connection> dbPool;
2015-07-07 10:17:21 +02:00
/* The build machines. */
std::mutex machinesReadyLock;
typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
2016-02-24 14:04:31 +01:00
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
2015-07-07 10:17:21 +02:00
/* Throttler for CPU-bound local work. */
static constexpr unsigned int maxSupportedLocalWorkers = 1024;
std::counting_semaphore<maxSupportedLocalWorkers> localWorkThrottler;
2015-07-07 10:17:21 +02:00
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter buildReadTimeMs{0};
2015-07-07 10:17:21 +02:00
counter nrBuildsDone{0};
counter nrStepsStarted{0};
2015-07-07 10:17:21 +02:00
counter nrStepsDone{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrStepsWaiting{0};
counter nrUnsupportedSteps{0};
2015-07-07 10:17:21 +02:00
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter dispatchTimeMs{0};
2015-07-07 10:17:21 +02:00
counter bytesSent{0};
counter bytesReceived{0};
2016-02-29 15:10:30 +01:00
counter nrActiveDbUpdates{0};
2015-07-07 10:17:21 +02:00
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
bool buildOneDone = false;
2015-07-07 10:17:21 +02:00
/* Statistics per machine type for the Hydra auto-scaler. */
struct MachineType
{
unsigned int runnable{0}, running{0};
system_time lastActive;
std::chrono::seconds waitTime; // time runnable steps have been waiting
};
2016-02-24 14:04:31 +01:00
nix::Sync<std::map<std::string, MachineType>> machineTypes;
struct MachineReservation
{
typedef std::shared_ptr<MachineReservation> ptr;
State & state;
Step::ptr step;
Machine::ptr machine;
MachineReservation(State & state, Step::ptr step, Machine::ptr machine);
~MachineReservation();
};
struct ActiveStep
{
Step::ptr step;
struct State
{
pid_t pid = -1;
bool cancelled = false;
};
nix::Sync<State> state_;
};
nix::Sync<std::set<std::shared_ptr<ActiveStep>>> activeSteps_;
std::atomic<time_t> lastDispatcherCheck{0};
std::shared_ptr<nix::Store> localStore;
std::shared_ptr<nix::Store> _destStore;
size_t maxOutputSize;
2017-09-22 15:23:58 +02:00
size_t maxLogSize;
/* Steps that were busy while we encounted a PostgreSQL
error. These need to be cleared at a later time to prevent them
from showing up as busy until the queue runner is restarted. */
nix::Sync<std::set<std::pair<BuildID, int>>> orphanedSteps;
/* How often the build steps of a jobset should be repeated in
order to detect non-determinism. */
std::map<std::pair<std::string, std::string>, size_t> jobsetRepeats;
2017-03-15 16:43:54 +01:00
bool uploadLogsToBinaryCache;
/* Where to store GC roots. Defaults to
/nix/var/nix/gcroots/per-user/$USER/hydra-roots, overridable
via gc_roots_dir. */
nix::Path rootsDir;
std::string metricsAddr;
struct PromMetrics
{
std::shared_ptr<prometheus::Registry> registry;
prometheus::Counter& queue_checks_started;
prometheus::Counter& queue_build_loads;
2022-04-06 20:23:02 -04:00
prometheus::Counter& queue_steps_created;
prometheus::Counter& queue_checks_early_exits;
prometheus::Counter& queue_checks_finished;
prometheus::Gauge& queue_max_id;
PromMetrics();
};
PromMetrics prom;
2015-07-07 10:17:21 +02:00
public:
State(std::optional<std::string> metricsAddrOpt);
2015-07-07 10:17:21 +02:00
private:
2017-09-14 17:22:48 +02:00
nix::MaintainCount<counter> startDbUpdate();
2016-02-29 15:10:30 +01:00
/* Return a store object to store build results. */
nix::ref<nix::Store> getDestStore();
2015-07-07 10:17:21 +02:00
void clearBusy(Connection & conn, time_t stopTime);
void parseMachines(const std::string & contents);
2015-07-07 10:17:21 +02:00
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
unsigned int allocBuildStep(pqxx::work & txn, BuildID buildId);
unsigned int createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg = "",
2015-07-07 10:17:21 +02:00
BuildID propagatedFrom = 0);
void updateBuildStep(pqxx::work & txn, BuildID buildId, unsigned int stepNr, StepState stepState);
void finishBuildStep(pqxx::work & txn, const RemoteResult & result, BuildID buildId, unsigned int stepNr,
const std::string & machine);
2015-07-07 10:17:21 +02:00
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::StorePath & drvPath, const nix::Derivation drv, const std::string & outputName, const nix::StorePath & storePath);
2015-07-07 10:17:21 +02:00
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
hydra-queue-runner: drop broken connections from pool Closes #1336 When restarting postgresql, the connections are still reused in `hydra-queue-runner` causing errors like this main thread: Lost connection to the database server. queue monitor: Lost connection to the database server. and no more builds being processed. `hydra-evaluator` doesn't have that issue since it crashes right away. We could let it retry indefinitely as well (see below), but I don't want to change too much. If the DB is still unreachable 10s later, the process will stop with a non-zero exit code because of a missing DB connection. This however isn't such a big deal because it will be immediately restarted afterwards. With the current configuration, Hydra will never give up, but restart (and retry) infinitely. To me that seems reasonable, i.e. to retry DB connections on a long-running process. If this doesn't work out, the monitoring should fire anyways because the queue fills up, but I'm open to discuss that. Please note that this isn't reproducible with the DB and the queue runner on the same machine when using `services.hydra-dev`, because of the `Requires=` dependency `hydra-queue-runner.service` -> `hydra-init.service` -> `postgresql.service` that causes the queue runner to be restarted on `systemctl restart postgresql`. Internally, Hydra uses Nix's pool data structure: it basically has N slots (here DB connections) and whenever a new one is requested, an idle slot is provided or a new one is created (when N slots are active, it'll be waited until one slot is free). The issue in the code here is however that whenever an error is encountered, the slot is released, however the same broken connection will be reused the next time. By using `Pool::Handle::markBad`, Nix will drop a broken slot. This is now being done when `pqxx::broken_connection` was caught.
2024-03-14 22:47:37 +01:00
void queueMonitorLoop(Connection & conn);
2015-07-07 10:17:21 +02:00
/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
2015-07-07 10:17:21 +02:00
/* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn);
2015-07-07 10:17:21 +02:00
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::StorePath & drvPath);
2016-02-11 15:59:47 +01:00
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
2015-07-07 10:17:21 +02:00
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished);
Jobset::ptr createJobset(pqxx::work & txn,
2022-01-09 08:58:36 -05:00
const std::string & projectName, const std::string & jobsetName, const JobsetID);
void processJobsetSharesChange(Connection & conn);
2015-07-07 10:17:21 +02:00
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
2015-08-10 11:26:30 +02:00
system_time doDispatch();
2015-07-07 10:17:21 +02:00
void wakeDispatcher();
void abortUnsupported();
void builder(MachineReservation::ptr reservation);
2015-07-07 10:17:21 +02:00
/* Perform the given build step. Return true if the step is to be
retried. */
enum StepResult { sDone, sRetry, sMaybeCancelled };
StepResult doBuildStep(nix::ref<nix::Store> destStore,
MachineReservation::ptr reservation,
std::shared_ptr<ActiveStep> activeStep);
2015-07-07 10:17:21 +02:00
void buildRemote(nix::ref<nix::Store> destStore,
2015-07-07 10:25:33 +02:00
Machine::ptr machine, Step::ptr step,
const nix::ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers);
2015-07-07 10:25:33 +02:00
2015-07-07 10:17:21 +02:00
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
2015-07-07 10:17:21 +02:00
bool checkCachedFailure(Step::ptr step, Connection & conn);
void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds);
2015-07-07 10:17:21 +02:00
/* Acquire the global queue runner lock, or null if somebody else
has it. */
2015-07-07 10:29:43 +02:00
std::shared_ptr<nix::PathLocks> acquireGlobalLock();
2015-07-07 10:17:21 +02:00
void dumpStatus(Connection & conn);
2015-07-07 10:17:21 +02:00
void addRoot(const nix::StorePath & storePath);
void runMetricsExporter();
2015-07-07 10:17:21 +02:00
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};