diff --git a/src/hydra-queue-runner/binary-cache-store.cc b/src/hydra-queue-runner/binary-cache-store.cc index c76670a5..6bea0e6c 100644 --- a/src/hydra-queue-runner/binary-cache-store.cc +++ b/src/hydra-queue-runner/binary-cache-store.cc @@ -12,9 +12,9 @@ namespace nix { -BinaryCacheStore::BinaryCacheStore(const StoreFactory & storeFactory, +BinaryCacheStore::BinaryCacheStore(std::shared_ptr<Store> localStore, const Path & secretKeyFile, const Path & publicKeyFile) - : storeFactory(storeFactory) + : localStore(localStore) { if (secretKeyFile != "") secretKey = std::unique_ptr<SecretKey>(new SecretKey(readFile(secretKeyFile))); @@ -237,14 +237,14 @@ void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths, { PathSet left; - auto localStore = storeFactory(); + if (!localStore) return; for (auto & storePath : paths) { - if (!(*localStore)->isValidPath(storePath)) { + if (!localStore->isValidPath(storePath)) { left.insert(storePath); continue; } - ValidPathInfo info = (*localStore)->queryPathInfo(storePath); + ValidPathInfo info = localStore->queryPathInfo(storePath); SubstitutablePathInfo sub; sub.references = info.references; sub.downloadSize = 0; @@ -253,24 +253,25 @@ void BinaryCacheStore::querySubstitutablePathInfos(const PathSet & paths, } if (settings.useSubstitutes) - (*localStore)->querySubstitutablePathInfos(left, infos); + localStore->querySubstitutablePathInfos(left, infos); } void BinaryCacheStore::buildPaths(const PathSet & paths, BuildMode buildMode) { - auto localStore = storeFactory(); - for (auto & storePath : paths) { assert(!isDerivation(storePath)); if (isValidPath(storePath)) continue; - (*localStore)->addTempRoot(storePath); + if (!localStore) + throw Error(format("don't know how to realise path ‘%1%’ in a binary cache") % storePath); - if (!(*localStore)->isValidPath(storePath)) - (*localStore)->ensurePath(storePath); + localStore->addTempRoot(storePath); - ValidPathInfo info = (*localStore)->queryPathInfo(storePath); + if (!localStore->isValidPath(storePath)) + localStore->ensurePath(storePath); + + ValidPathInfo info = localStore->queryPathInfo(storePath); for (auto & ref : info.references) if (ref != storePath) diff --git a/src/hydra-queue-runner/binary-cache-store.hh b/src/hydra-queue-runner/binary-cache-store.hh index 4d02b3b2..d02f46de 100644 --- a/src/hydra-queue-runner/binary-cache-store.hh +++ b/src/hydra-queue-runner/binary-cache-store.hh @@ -13,12 +13,6 @@ namespace nix { struct NarInfo; -/* While BinaryCacheStore is thread-safe, LocalStore and RemoteStore - aren't. Until they are, use a factory to produce a thread-local - local store. */ -typedef Pool<nix::ref<nix::Store>> StorePool; -typedef std::function<StorePool::Handle()> StoreFactory; - class BinaryCacheStore : public Store { private: @@ -26,7 +20,7 @@ private: std::unique_ptr<SecretKey> secretKey; std::unique_ptr<PublicKeys> publicKeys; - StoreFactory storeFactory; + std::shared_ptr<Store> localStore; struct State { @@ -37,7 +31,7 @@ private: protected: - BinaryCacheStore(const StoreFactory & storeFactory, + BinaryCacheStore(std::shared_ptr<Store> localStore, const Path & secretKeyFile, const Path & publicKeyFile); virtual bool fileExists(const std::string & path) = 0; diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index a3ca0221..2e4229c2 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -18,7 +18,6 @@ using namespace nix; State::State() - : localStorePool([]() { return std::make_shared<ref<Store>>(openStore()); }) { hydraData = getEnv("HYDRA_DATA"); if (hydraData == "") throw Error("$HYDRA_DATA must be set"); @@ -46,10 +45,9 @@ State::State() } -StorePool::Handle State::getLocalStore() +ref<Store> State::getLocalStore() { - auto conn(localStorePool.get()); - return conn; + return ref<Store>(_localStore); } @@ -748,8 +746,10 @@ void State::run(BuildID buildOne) auto storeMode = hydraConfig["store_mode"]; + _localStore = openStore(); + if (storeMode == "direct" || storeMode == "") { - _destStore = openStore(); + _destStore = _localStore; } else if (storeMode == "local-binary-cache") { @@ -757,9 +757,9 @@ void State::run(BuildID buildOne) if (dir == "") throw Error("you must set ‘binary_cache_dir’ in hydra.conf"); auto store = make_ref<LocalBinaryCacheStore>( - [this]() { return this->getLocalStore(); }, - "/home/eelco/Misc/Keys/test.nixos.org/secret", - "/home/eelco/Misc/Keys/test.nixos.org/public", + _localStore, + hydraConfig["binary_cache_secret_key_file"], + hydraConfig["binary_cache_public_key_file"], dir); store->init(); _destStore = std::shared_ptr<LocalBinaryCacheStore>(store); @@ -770,7 +770,7 @@ void State::run(BuildID buildOne) if (bucketName == "") throw Error("you must set ‘binary_cache_s3_bucket’ in hydra.conf"); auto store = make_ref<S3BinaryCacheStore>( - [this]() { return this->getLocalStore(); }, + _localStore, hydraConfig["binary_cache_secret_key_file"], hydraConfig["binary_cache_public_key_file"], bucketName); diff --git a/src/hydra-queue-runner/local-binary-cache-store.cc b/src/hydra-queue-runner/local-binary-cache-store.cc index a82e6597..5714688e 100644 --- a/src/hydra-queue-runner/local-binary-cache-store.cc +++ b/src/hydra-queue-runner/local-binary-cache-store.cc @@ -2,10 +2,10 @@ namespace nix { -LocalBinaryCacheStore::LocalBinaryCacheStore(const StoreFactory & storeFactory, +LocalBinaryCacheStore::LocalBinaryCacheStore(std::shared_ptr<Store> localStore, const Path & secretKeyFile, const Path & publicKeyFile, const Path & binaryCacheDir) - : BinaryCacheStore(storeFactory, secretKeyFile, publicKeyFile) + : BinaryCacheStore(localStore, secretKeyFile, publicKeyFile) , binaryCacheDir(binaryCacheDir) { } diff --git a/src/hydra-queue-runner/local-binary-cache-store.hh b/src/hydra-queue-runner/local-binary-cache-store.hh index e29d0ca2..0303ebe7 100644 --- a/src/hydra-queue-runner/local-binary-cache-store.hh +++ b/src/hydra-queue-runner/local-binary-cache-store.hh @@ -12,7 +12,7 @@ private: public: - LocalBinaryCacheStore(const StoreFactory & storeFactory, + LocalBinaryCacheStore(std::shared_ptr<Store> localStore, const Path & secretKeyFile, const Path & publicKeyFile, const Path & binaryCacheDir); diff --git a/src/hydra-queue-runner/pool.hh b/src/hydra-queue-runner/pool.hh deleted file mode 100644 index 83d947e3..00000000 --- a/src/hydra-queue-runner/pool.hh +++ /dev/null @@ -1,97 +0,0 @@ -#pragma once - -#include <memory> -#include <list> -#include <functional> - -#include "sync.hh" - -/* This template class implements a simple pool manager of resources - of some type R, such as database connections. It is used as - follows: - - class Connection { ... }; - - Pool<Connection> pool; - - { - auto conn(pool.get()); - conn->exec("select ..."); - } - - Here, the Connection object referenced by ‘conn’ is automatically - returned to the pool when ‘conn’ goes out of scope. -*/ - -template <class R> -class Pool -{ -public: - - typedef std::function<std::shared_ptr<R>()> Factory; - -private: - - Factory factory; - - struct State - { - unsigned int count = 0; - std::list<std::shared_ptr<R>> idle; - }; - - Sync<State> state; - -public: - - Pool(const Factory & factory = []() { return std::make_shared<R>(); }) - : factory(factory) - { } - - class Handle - { - private: - Pool & pool; - std::shared_ptr<R> r; - - friend Pool; - - Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { } - - public: - Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } - - Handle(const Handle & l) = delete; - - ~Handle() - { - auto state_(pool.state.lock()); - if (r) state_->idle.push_back(r); - } - - R * operator -> () { return r.get(); } - R & operator * () { return *r; } - }; - - Handle get() - { - { - auto state_(state.lock()); - if (!state_->idle.empty()) { - auto p = state_->idle.back(); - state_->idle.pop_back(); - return Handle(*this, p); - } - state_->count++; - } - /* Note: we don't hold the lock while creating a new instance, - because creation might take a long time. */ - return Handle(*this, factory()); - } - - unsigned int count() - { - auto state_(state.lock()); - return state_->count; - } -}; diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index f3970407..f7e40827 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -36,7 +36,7 @@ void State::queueMonitorLoop() unsigned int lastBuildId = 0; while (true) { - bool done = getQueuedBuilds(*conn, *localStore, destStore, lastBuildId); + bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId); /* Sleep until we get notification from the database about an event. */ diff --git a/src/hydra-queue-runner/s3-binary-cache-store.cc b/src/hydra-queue-runner/s3-binary-cache-store.cc index 3f6b6bf9..4a78033a 100644 --- a/src/hydra-queue-runner/s3-binary-cache-store.cc +++ b/src/hydra-queue-runner/s3-binary-cache-store.cc @@ -31,10 +31,10 @@ R && checkAws(Aws::Utils::Outcome<R, E> && outcome) return outcome.GetResultWithOwnership(); } -S3BinaryCacheStore::S3BinaryCacheStore(const StoreFactory & storeFactory, +S3BinaryCacheStore::S3BinaryCacheStore(std::shared_ptr<Store> localStore, const Path & secretKeyFile, const Path & publicKeyFile, const std::string & bucketName) - : BinaryCacheStore(storeFactory, secretKeyFile, publicKeyFile) + : BinaryCacheStore(localStore, secretKeyFile, publicKeyFile) , bucketName(bucketName) , config(makeConfig()) , client(make_ref<Aws::S3::S3Client>(*config)) diff --git a/src/hydra-queue-runner/s3-binary-cache-store.hh b/src/hydra-queue-runner/s3-binary-cache-store.hh index a0dd7813..1ba78dce 100644 --- a/src/hydra-queue-runner/s3-binary-cache-store.hh +++ b/src/hydra-queue-runner/s3-binary-cache-store.hh @@ -20,7 +20,7 @@ private: public: - S3BinaryCacheStore(const StoreFactory & storeFactory, + S3BinaryCacheStore(std::shared_ptr<Store> localStore, const Path & secretKeyFile, const Path & publicKeyFile, const std::string & bucketName); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 58abe6ec..01dfd650 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -80,7 +80,7 @@ private: std::atomic<unsigned int> shares{1}; /* The start time and duration of the most recent build steps. */ - Sync<std::map<time_t, time_t>> steps; + nix::Sync<std::map<time_t, time_t>> steps; public: @@ -187,7 +187,7 @@ struct Step std::atomic_bool finished{false}; // debugging - Sync<State> state; + nix::Sync<State> state; ~Step() { @@ -227,7 +227,7 @@ struct Machine system_time lastFailure, disabledUntil; unsigned int consecutiveFailures; }; - Sync<ConnectInfo> connectInfo; + nix::Sync<ConnectInfo> connectInfo; /* Mutex to prevent multiple threads from sending data to the same machine (which would be inefficient). */ @@ -266,33 +266,33 @@ private: /* The queued builds. */ typedef std::map<BuildID, Build::ptr> Builds; - Sync<Builds> builds; + nix::Sync<Builds> builds; /* The jobsets. */ typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets; - Sync<Jobsets> jobsets; + nix::Sync<Jobsets> jobsets; /* All active or pending build steps (i.e. dependencies of the queued builds). Note that these are weak pointers. Steps are kept alive by being reachable from Builds or by being in progress. */ typedef std::map<nix::Path, Step::wptr> Steps; - Sync<Steps> steps; + nix::Sync<Steps> steps; /* Build steps that have no unbuilt dependencies. */ typedef std::list<Step::wptr> Runnable; - Sync<Runnable> runnable; + nix::Sync<Runnable> runnable; /* CV for waking up the dispatcher. */ - Sync<bool> dispatcherWakeup; - std::condition_variable_any dispatcherWakeupCV; + nix::Sync<bool> dispatcherWakeup; + std::condition_variable dispatcherWakeupCV; /* PostgreSQL connection pool. */ - Pool<Connection> dbPool; + nix::Pool<Connection> dbPool; /* The build machines. */ typedef std::map<std::string, Machine::ptr> Machines; - Sync<Machines> machines; // FIXME: use atomic_shared_ptr + nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr /* Various stats. */ time_t startedAt; @@ -314,16 +314,16 @@ private: counter bytesReceived{0}; /* Log compressor work queue. */ - Sync<std::queue<nix::Path>> logCompressorQueue; - std::condition_variable_any logCompressorWakeup; + nix::Sync<std::queue<nix::Path>> logCompressorQueue; + std::condition_variable logCompressorWakeup; /* Notification sender work queue. FIXME: if hydra-queue-runner is killed before it has finished sending notifications about a build, then the notifications may be lost. It would be better to mark builds with pending notification in the database. */ typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem; - Sync<std::queue<NotificationItem>> notificationSenderQueue; - std::condition_variable_any notificationSenderWakeup; + nix::Sync<std::queue<NotificationItem>> notificationSenderQueue; + std::condition_variable notificationSenderWakeup; /* Specific build to do for --build-one (testing only). */ BuildID buildOne; @@ -336,7 +336,7 @@ private: std::chrono::seconds waitTime; // time runnable steps have been waiting }; - Sync<std::map<std::string, MachineType>> machineTypes; + nix::Sync<std::map<std::string, MachineType>> machineTypes; struct MachineReservation { @@ -350,10 +350,7 @@ private: std::atomic<time_t> lastDispatcherCheck{0}; - /* Pool of local stores. */ - nix::StorePool localStorePool; - - /* Destination store. */ + std::shared_ptr<nix::Store> _localStore; std::shared_ptr<nix::Store> _destStore; public: @@ -363,7 +360,7 @@ private: /* Return a store object that can access derivations produced by hydra-evaluator. */ - nix::StorePool::Handle getLocalStore(); + nix::ref<nix::Store> getLocalStore(); /* Return a store object to store build results. */ nix::ref<nix::Store> getDestStore(); diff --git a/src/hydra-queue-runner/sync.hh b/src/hydra-queue-runner/sync.hh deleted file mode 100644 index 4d53a6ee..00000000 --- a/src/hydra-queue-runner/sync.hh +++ /dev/null @@ -1,74 +0,0 @@ -#pragma once - -#include <mutex> -#include <condition_variable> -#include <cassert> - -/* This template class ensures synchronized access to a value of type - T. It is used as follows: - - struct Data { int x; ... }; - - Sync<Data> data; - - { - auto data_(data.lock()); - data_->x = 123; - } - - Here, "data" is automatically unlocked when "data_" goes out of - scope. -*/ - -template<class T> -class Sync -{ -private: - std::mutex mutex; - T data; - -public: - - Sync() { } - Sync(const T & data) : data(data) { } - - class Lock - { - private: - Sync * s; - friend Sync; - Lock(Sync * s) : s(s) { s->mutex.lock(); } - public: - Lock(Lock && l) : s(l.s) { l.s = 0; } - Lock(const Lock & l) = delete; - ~Lock() { if (s) s->mutex.unlock(); } - T * operator -> () { return &s->data; } - T & operator * () { return s->data; } - - /* FIXME: performance impact of condition_variable_any? */ - void wait(std::condition_variable_any & cv) - { - assert(s); - cv.wait(s->mutex); - } - - template<class Rep, class Period, class Predicate> - bool wait_for(std::condition_variable_any & cv, - const std::chrono::duration<Rep, Period> & duration, - Predicate pred) - { - assert(s); - return cv.wait_for(s->mutex, duration, pred); - } - - template<class Clock, class Duration> - std::cv_status wait_until(std::condition_variable_any & cv, - const std::chrono::time_point<Clock, Duration> & duration) - { - assert(s); - return cv.wait_until(s->mutex, duration); - } - }; - - Lock lock() { return Lock(this); } -}; diff --git a/src/hydra-queue-runner/token-server.hh b/src/hydra-queue-runner/token-server.hh index 2ff748e3..d4f5f843 100644 --- a/src/hydra-queue-runner/token-server.hh +++ b/src/hydra-queue-runner/token-server.hh @@ -14,7 +14,7 @@ class TokenServer unsigned int maxTokens; Sync<unsigned int> curTokens{0}; - std::condition_variable_any wakeup; + std::condition_variable wakeup; public: TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { }