diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index d2edf32c..6b9d4aae 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -8,6 +8,8 @@ #include #include "build-result.hh" +#include "sync.hh" + #include "store-api.hh" #include "derivations.hh" #include "shared.hh" @@ -16,6 +18,13 @@ using namespace nix; +template +bool has(const C & c, const V & v) +{ + return c.find(v) != c.end(); +} + + std::mutex exitRequestMutex; std::condition_variable exitRequest; bool exitRequested(false); @@ -76,6 +85,9 @@ struct Connection : pqxx::connection typedef unsigned int BuildID; +struct Step; + + struct Build { typedef std::shared_ptr ptr; @@ -84,10 +96,18 @@ struct Build BuildID id; Path drvPath; std::map outputs; + std::string fullJobName; + + std::shared_ptr toplevel; bool finishedInDB; Build() : finishedInDB(false) { } + + ~Build() + { + printMsg(lvlError, format("destroying build %1%") % id); + } }; @@ -95,17 +115,28 @@ struct Step { typedef std::shared_ptr ptr; typedef std::weak_ptr wptr; + Path drvPath; Derivation drv; - /* The build steps on which this step depends. */ - std::set deps; + struct State + { + /* The build steps on which this step depends. */ + std::set deps; - /* The build steps that depend on this step. */ - std::vector rdeps; + /* The build steps that depend on this step. */ + std::vector rdeps; - /* Builds that have this step as the top-level derivation. */ - std::vector builds; + /* Builds that have this step as the top-level derivation. */ + std::vector builds; + }; + + Sync state; + + ~Step() + { + printMsg(lvlError, format("destroying step %1%") % drvPath); + } }; @@ -116,17 +147,21 @@ private: std::thread queueMonitorThread; /* The queued builds. */ - std::map builds; + typedef std::map Builds; + Sync builds; /* All active or pending build steps (i.e. dependencies of the - queued builds). */ - std::map steps; + queued builds). Note that these are weak pointers. Steps are + kept alive by being reachable from Builds or by being in + progress. */ + typedef std::map Steps; + Sync steps; /* Build steps that have no unbuilt dependencies. */ - std::set runnable; + typedef std::list Runnable; + Sync runnable; - std::mutex runnableMutex; - std::condition_variable runnableCV; + std::condition_variable_any runnableCV; public: State(); @@ -147,7 +182,8 @@ public: void getQueuedBuilds(std::shared_ptr store, pqxx::connection & conn); - Step::ptr createStep(std::shared_ptr store, const Path & drvPath); + Step::ptr createStep(std::shared_ptr store, const Path & drvPath, + std::set & newRunnable); void destroyStep(Step::ptr step, bool proceed); @@ -254,26 +290,55 @@ void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & { printMsg(lvlError, "checking the queue..."); - pqxx::work txn(conn); +#if 0 + { + auto runnable_(runnable.lock()); + auto builds_(builds.lock()); + auto steps_(steps.lock()); + printMsg(lvlError, format("%1% builds, %2% steps, %3% runnable steps") + % builds_->size() + % steps_->size() + % runnable_->size()); + } +#endif - // FIXME: query only builds with ID higher than the previous - // highest. - auto res = txn.exec("select * from Builds where finished = 0"); + /* Grab the queued builds from the database, but don't process + them yet (since we don't want a long-running transaction). */ + std::list newBuilds; // FIXME: use queue - // FIXME: don't process inside a txn. - for (auto const & row : res) { - BuildID id = row["id"].as(); - if (builds.find(id) != builds.end()) continue; + { + pqxx::work txn(conn); - Build::ptr build(new Build); - build->id = id; - build->drvPath = row["drvPath"].as(); + // FIXME: query only builds with ID higher than the previous + // highest. + auto res = txn.exec("select * from Builds where finished = 0 order by id"); - printMsg(lvlInfo, format("loading build %1% (%2%:%3%:%4%)") % id % row["project"] % row["jobset"] % row["job"]); + auto builds_(builds.lock()); + + for (auto const & row : res) { + BuildID id = row["id"].as(); + if (has(*builds_, id)) continue; + + auto build = std::make_shared(); + build->id = id; + build->drvPath = row["drvPath"].as(); + build->fullJobName = row["project"].as() + ":" + row["jobset"].as() + ":" + row["job"].as(); + + newBuilds.push_back(build); + } + } + + /* Now instantiate build steps for each new build. The builder + threads can start building the runnable build steps right away, + even while we're still processing other new builds. */ + for (auto & build : newBuilds) { + // FIXME: remove build from newBuilds to ensure quick destruction + // FIXME: exception handling + + printMsg(lvlInfo, format("loading build %1% (%2%)") % build->id % build->fullJobName); if (!store->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ - Connection conn; pqxx::work txn(conn); txn.parameterized ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") @@ -285,12 +350,15 @@ void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & continue; } - Step::ptr step = createStep(store, build->drvPath); + std::set newRunnable; + Step::ptr step = createStep(store, build->drvPath, newRunnable); + + /* If we didn't get a step, it means the step's outputs are + all valid. So we mark this as a finished, cached build. */ if (!step) { Derivation drv = readDerivation(build->drvPath); BuildResult res = getBuildResult(store, drv); - Connection conn; pqxx::work txn(conn); time_t now = time(0); markSucceededBuild(txn, build, res, true, now, now); @@ -299,21 +367,49 @@ void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & continue; } - step->builds.push_back(build); + /* Note: if we exit this scope prior to this, the build and + all newly created steps are destroyed. */ - builds[id] = build; + { + auto builds_(builds.lock()); + auto step_(step->state.lock()); + (*builds_)[build->id] = build; + step_->builds.push_back(build); + build->toplevel = step; + } + + /* Prior to this, the build is not visible to + getDependentBuilds(). Now it is, so the build can be + failed if a dependency fails. (It can't succeed right away + because its top-level is not runnable yet). */ + + /* Add the new runnable build steps to ‘runnable’ and wake up + the builder threads. */ + for (auto & r : newRunnable) + makeRunnable(r); } } -Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath) +Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath, + std::set & newRunnable) { - auto prev = steps.find(drvPath); - if (prev != steps.end()) return prev->second; + /* Check if the requested step already exists. */ + { + auto steps_(steps.lock()); + auto prev = steps_->find(drvPath); + if (prev != steps_->end()) { + auto step = prev->second.lock(); + /* Since ‘step’ is a strong pointer, the referred Step + object won't be deleted after this. */ + if (step) return step; + steps_->erase(drvPath); // remove stale entry + } + } printMsg(lvlInfo, format("considering derivation ‘%1%’") % drvPath); - Step::ptr step(new Step); + auto step = std::make_shared(); step->drvPath = drvPath; step->drv = readDerivation(drvPath); @@ -333,17 +429,25 @@ Step::ptr State::createStep(std::shared_ptr store, const Path & drvPat printMsg(lvlInfo, format("creating build step ‘%1%’") % drvPath); /* Create steps for the dependencies. */ + bool hasDeps = false; for (auto & i : step->drv.inputDrvs) { - Step::ptr dep = createStep(store, i.first); + Step::ptr dep = createStep(store, i.first, newRunnable); if (dep) { - step->deps.insert(dep); - dep->rdeps.push_back(step); + hasDeps = true; + auto step_(step->state.lock()); + auto dep_(dep->state.lock()); + step_->deps.insert(dep); + dep_->rdeps.push_back(step); } } - steps[drvPath] = step; + { + auto steps_(steps.lock()); + assert(steps_->find(drvPath) == steps_->end()); + (*steps_)[drvPath] = step; + } - if (step->deps.empty()) makeRunnable(step); + if (!hasDeps) newRunnable.insert(step); return step; } @@ -351,30 +455,48 @@ Step::ptr State::createStep(std::shared_ptr store, const Path & drvPat void State::destroyStep(Step::ptr step, bool proceed) { - steps.erase(step->drvPath); + printMsg(lvlInfo, format("destroying build step ‘%1%’") % step->drvPath); - for (auto & rdep_ : step->rdeps) { + { + auto steps_(steps.lock()); + steps_->erase(step->drvPath); + } + + std::vector rdeps; + + { + auto step_(step->state.lock()); + rdeps = step_->rdeps; + + /* Sanity checks. */ + for (auto & build_ : step_->builds) { + auto build = build_.lock(); + if (!build) continue; + assert(build->drvPath == step->drvPath); + assert(build->finishedInDB); + } + } + + for (auto & rdep_ : rdeps) { auto rdep = rdep_.lock(); if (!rdep) continue; - assert(rdep->deps.find(step) != rdep->deps.end()); - rdep->deps.erase(step); + bool runnable = false; + { + auto rdep_(rdep->state.lock()); + assert(has(rdep_->deps, step)); + rdep_->deps.erase(step); + if (rdep_->deps.empty()) runnable = true; + } if (proceed) { /* If this rdep has no other dependencies, then we can now build it. */ - if (rdep->deps.empty()) + if (runnable) makeRunnable(rdep); } else - /* If ‘step’ failed, then delete all dependent steps as - well. */ + /* If ‘step’ failed or was cancelled, then delete all + dependent steps as well. */ destroyStep(rdep, false); } - - for (auto & build_ : step->builds) { - auto build = build_.lock(); - if (!build) continue; - assert(build->drvPath == step->drvPath); - assert(build->finishedInDB); - } } @@ -386,17 +508,27 @@ std::set State::getDependentBuilds(Step::ptr step) std::function visit; visit = [&](Step::ptr step) { - if (done.find(step) != done.end()) return; + if (has(done, step)) return; done.insert(step); - for (auto & build : step->builds) { - auto build2 = build.lock(); - if (build2) res.insert(build2); + std::vector rdeps; + + { + auto step_(step->state.lock()); + + for (auto & build : step_->builds) { + auto build_ = build.lock(); + if (build_) res.insert(build_); + } + + /* Make a copy of rdeps so that we don't hold the lock for + very long. */ + rdeps = step_->rdeps; } - for (auto & rdep : step->rdeps) { - auto rdep2 = rdep.lock(); - if (rdep2) visit(rdep2); + for (auto & rdep : rdeps) { + auto rdep_ = rdep.lock(); + if (rdep_) visit(rdep_); } }; @@ -408,11 +540,14 @@ std::set State::getDependentBuilds(Step::ptr step) void State::makeRunnable(Step::ptr step) { - assert(step->deps.empty()); + { + auto step_(step->state.lock()); + assert(step_->deps.empty()); + } { - std::lock_guard lock(runnableMutex); - runnable.insert(step); + auto runnable_(runnable.lock()); + runnable_->push_back(step); } runnableCV.notify_one(); @@ -424,17 +559,20 @@ void State::builderThreadEntry(int slot) auto store = openStore(); // FIXME: pool while (true) { + /* Sleep until a runnable build step becomes available. */ Step::ptr step; { - std::unique_lock lock(runnableMutex); - while (runnable.empty()) - runnableCV.wait(lock); - step = *runnable.begin(); - runnable.erase(step); + auto runnable_(runnable.lock()); + while (runnable_->empty()) + runnable_.wait(runnableCV); + auto weak = *runnable_->begin(); + runnable_->pop_front(); + step = weak.lock(); + if (!step) continue; } + /* Build it. */ printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath); - doBuildStep(store, step); } @@ -444,34 +582,38 @@ void State::builderThreadEntry(int slot) void State::doBuildStep(std::shared_ptr store, Step::ptr step) { - assert(step->deps.empty()); - /* There can be any number of builds in the database that depend - on this derivation. Arbitrarily pick one (though preferring - those build of which this is the top-level derivation) for the + on this derivation. Arbitrarily pick one (though preferring a + build of which this is the top-level derivation) for the purpose of creating build steps. We could create a build step record for every build, but that could be very expensive (e.g. a stdenv derivation can be a dependency of tens of thousands of builds), so we don't. */ Build::ptr build; - auto builds = getDependentBuilds(step); + { + auto dependents = getDependentBuilds(step); - if (builds.empty()) { - /* Apparently all builds that depend on this derivation are - gone (e.g. cancelled). So don't bother. */ - printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath); - destroyStep(step, true); - return; + if (dependents.empty()) { + /* Apparently all builds that depend on this derivation + are gone (e.g. cancelled). So don't bother. (This is + very unlikely to happen, because normally Steps are + only kept alive by being reachable from a + Build). FIXME: what if a new Build gets a reference to + this step? */ + printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath); + destroyStep(step, false); + return; + } + + for (auto build2 : dependents) + if (build2->drvPath == step->drvPath) { build = build2; break; } + + if (!build) build = *dependents.begin(); + + printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % dependents.size()); } - for (auto build2 : builds) - if (build2->drvPath == step->drvPath) { build = build2; break; } - - if (!build) build = *builds.begin(); - - printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % builds.size()); - /* Create a build step record indicating that we started building. */ Connection conn; @@ -499,8 +641,30 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) // FIXME: handle failed-with-output - // FIXME: handle new builds having been added in the meantime. + /* Remove this step. After this, incoming builds that depend on + drvPath will either see that the output paths exist, or will + create a new build step for drvPath. The latter is fine - it + won't conflict with this one, because we're removing it. In any + case, the set of dependent builds for ‘step’ can't increase + anymore because ‘step’ is no longer visible to createStep(). */ + { + auto steps_(steps.lock()); + steps_->erase(step->drvPath); + } + /* Get the final set of dependent builds. */ + auto dependents = getDependentBuilds(step); + + std::set direct; + { + auto step_(step->state.lock()); + for (auto & build : step_->builds) { + auto build_ = build.lock(); + if (build_) direct.insert(build_); + } + } + + /* Update the database. */ { pqxx::work txn(conn); @@ -510,24 +674,21 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) /* Mark all builds of which this derivation is the top level as succeeded. */ - for (auto build2_ : step->builds) { - auto build2 = build2_.lock(); - if (!build2) continue; + for (auto build2 : direct) markSucceededBuild(txn, build2, res, false, startTime, stopTime); - } } else { /* Create failed build steps for every build that depends on this. */ finishBuildStep(txn, stopTime, build->id, stepNr, bssFailed, errorMsg); - for (auto build2 : builds) { + for (auto build2 : dependents) { if (build == build2) continue; createBuildStep(txn, stopTime, build2, step, bssFailed, errorMsg, build->id); } /* Mark all builds that depend on this derivation as failed. */ - for (auto build2 : builds) { + for (auto build2 : dependents) { txn.parameterized ("update Builds set finished = 1, isCachedBuild = 0, buildStatus = $2, startTime = $3, stopTime = $4 where id = $1") (build2->id) @@ -539,10 +700,21 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) } txn.commit(); - } - /* Remove the build step from the graph. */ + /* In case of success, destroy all Build objects of which ‘step’ + is the top-level derivation. In case of failure, destroy all + dependent Build objects. Any Steps not referenced by other + Builds will be destroyed as well. */ + for (auto build2 : dependents) + if (build2->toplevel == step || !success) { + auto builds_(builds.lock()); + builds_->erase(build2->id); + } + + /* Remove the step from the graph. In case of success, make + dependent build steps runnable if they have no other + dependencies. */ destroyStep(step, success); } @@ -590,8 +762,6 @@ void State::run() queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this); - sleep(1); - for (int n = 0; n < 4; n++) std::thread(&State::builderThreadEntry, this, n).detach(); diff --git a/src/hydra-queue-runner/sync.hh b/src/hydra-queue-runner/sync.hh new file mode 100644 index 00000000..6f5f9e6a --- /dev/null +++ b/src/hydra-queue-runner/sync.hh @@ -0,0 +1,53 @@ +#pragma once + +#include +#include + +/* This template class ensures synchronized access to a value of type + T. It is used as follows: + + struct Data { int x; ... }; + + Sync data; + + { + auto data_(data.lock()); + data_->x = 123; + } + + Here, "data" is automatically unlocked when "data_" goes out of + scope. +*/ + +template +class Sync +{ +private: + std::mutex mutex; + T data; + +public: + + class Lock + { + private: + Sync * s; + friend Sync; + Lock(Sync * s) : s(s) { s->mutex.lock(); } + public: + Lock(Lock && l) : s(l.s) { l.s = 0; } + Lock(const Lock & l) = delete; + ~Lock() { if (s) s->mutex.unlock(); } + T * operator -> () { return &s->data; } + T & operator * () { return s->data; } + + /* FIXME: performance impact of condition_variable_any? */ + void wait(std::condition_variable_any & cv) + { + assert(s); + cv.wait(s->mutex); + } + }; + + Lock lock() { return Lock(this); } +};