When using a binary cache store, the queue runner receives NARs from the build machines, compresses them, and uploads them to the cache. However, keeping multiple large NARs in memory can cause the queue runner to run out of memory. This can happen for instance when it's processing multiple ISO images concurrently. The fix is to use a TokenServer to prevent the builder threads to store more than a certain total size of NARs concurrently (at the moment, this is hard-coded at 4 GiB). Builder threads that cause the limit to be exceeded will block until other threads have finished. The 4 GiB limit does not include certain other allocations, such as for xz compression or for FSAccessor::readFile(). But since these are unlikely to be more than the size of the NARs and hydra.nixos.org has 32 GiB RAM, it should be fine.
87 lines
2.0 KiB
C++
87 lines
2.0 KiB
C++
#pragma once
|
||
|
||
#include <atomic>
|
||
|
||
#include "sync.hh"
|
||
#include "types.hh"
|
||
|
||
namespace nix {
|
||
|
||
MakeError(NoTokens, Error)
|
||
|
||
/* This class hands out tokens. There are only ‘maxTokens’ tokens
|
||
available. Calling get(N) will return a Token object, representing
|
||
ownership of N tokens. If the requested number of tokens is
|
||
unavailable, get() will sleep until another thread returns a
|
||
token. */
|
||
|
||
class TokenServer
|
||
{
|
||
const size_t maxTokens;
|
||
|
||
Sync<size_t> inUse{0};
|
||
std::condition_variable wakeup;
|
||
|
||
public:
|
||
TokenServer(size_t maxTokens) : maxTokens(maxTokens) { }
|
||
|
||
class Token
|
||
{
|
||
friend TokenServer;
|
||
|
||
TokenServer * ts;
|
||
|
||
size_t tokens;
|
||
|
||
bool acquired = false;
|
||
|
||
Token(TokenServer * ts, size_t tokens, unsigned int timeout)
|
||
: ts(ts), tokens(tokens)
|
||
{
|
||
if (tokens >= ts->maxTokens)
|
||
throw NoTokens(format("requesting more tokens (%d) than exist (%d)") % tokens);
|
||
auto inUse(ts->inUse.lock());
|
||
while (*inUse + tokens > ts->maxTokens)
|
||
if (timeout) {
|
||
if (!inUse.wait_for(ts->wakeup, std::chrono::seconds(timeout),
|
||
[&]() { return *inUse + tokens <= ts->maxTokens; }))
|
||
return;
|
||
} else
|
||
inUse.wait(ts->wakeup);
|
||
*inUse += tokens;
|
||
acquired = true;
|
||
}
|
||
|
||
public:
|
||
|
||
Token(Token && t) : ts(t.ts) { t.ts = 0; }
|
||
Token(const Token & l) = delete;
|
||
|
||
~Token()
|
||
{
|
||
if (!ts || !acquired) return;
|
||
{
|
||
auto inUse(ts->inUse.lock());
|
||
assert(*inUse >= tokens);
|
||
*inUse -= tokens;
|
||
}
|
||
ts->wakeup.notify_one();
|
||
}
|
||
|
||
bool operator ()() { return acquired; }
|
||
};
|
||
|
||
Token get(size_t tokens = 1, unsigned int timeout = 0)
|
||
{
|
||
return Token(this, tokens, timeout);
|
||
}
|
||
|
||
size_t currentUse()
|
||
{
|
||
auto inUse_(inUse.lock());
|
||
return *inUse_;
|
||
}
|
||
};
|
||
|
||
}
|