Merge remote-tracking branch 'upstream/master' into split-buildRemote

This commit is contained in:
John Ericson
2022-10-25 11:07:51 -04:00
69 changed files with 1914 additions and 764 deletions

View File

@ -2,7 +2,7 @@ bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
builder.cc build-result.cc build-remote.cc \
build-result.hh counter.hh state.hh db.hh \
hydra-build-result.hh counter.hh state.hh db.hh \
nar-extractor.cc nar-extractor.hh
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx -lprometheus-cpp-pull -lprometheus-cpp-core
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations

View File

@ -5,6 +5,7 @@
#include <sys/stat.h>
#include <fcntl.h>
#include "build-result.hh"
#include "serve-protocol.hh"
#include "state.hh"
#include "util.hh"
@ -51,13 +52,35 @@ static Strings extraStoreArgs(std::string & machine)
static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Child & child)
{
string pgmName;
std::string pgmName;
Pipe to, from;
to.create();
from.create();
child.pid = startProcess([&]() {
Strings argv;
if (machine->isLocalhost()) {
pgmName = "nix-store";
argv = {"nix-store", "--builders", "", "--serve", "--write"};
} else {
pgmName = "ssh";
auto sshName = machine->sshName;
Strings extraArgs = extraStoreArgs(sshName);
argv = {"ssh", sshName};
if (machine->sshKey != "") append(argv, {"-i", machine->sshKey});
if (machine->sshPublicHostKey != "") {
Path fileName = tmpDir + "/host-key";
auto p = machine->sshName.find("@");
std::string host = p != std::string::npos ? std::string(machine->sshName, p + 1) : machine->sshName;
writeFile(fileName, host + " " + machine->sshPublicHostKey + "\n");
append(argv, {"-oUserKnownHostsFile=" + fileName});
}
append(argv,
{ "-x", "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
, "--", "nix-store", "--serve", "--write" });
append(argv, extraArgs);
}
child.pid = startProcess([&]() {
restoreProcessContext();
if (dup2(to.readSide.get(), STDIN_FILENO) == -1)
@ -69,30 +92,6 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil
if (dup2(stderrFD, STDERR_FILENO) == -1)
throw SysError("cannot dup stderr");
Strings argv;
if (machine->isLocalhost()) {
pgmName = "nix-store";
argv = {"nix-store", "--builders", "", "--serve", "--write"};
}
else {
pgmName = "ssh";
auto sshName = machine->sshName;
Strings extraArgs = extraStoreArgs(sshName);
argv = {"ssh", sshName};
if (machine->sshKey != "") append(argv, {"-i", machine->sshKey});
if (machine->sshPublicHostKey != "") {
Path fileName = tmpDir + "/host-key";
auto p = machine->sshName.find("@");
string host = p != string::npos ? string(machine->sshName, p + 1) : machine->sshName;
writeFile(fileName, host + " " + machine->sshPublicHostKey + "\n");
append(argv, {"-oUserKnownHostsFile=" + fileName});
}
append(argv,
{ "-x", "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
, "--", "nix-store", "--serve", "--write" });
append(argv, extraArgs);
}
execvp(argv.front().c_str(), (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start %s", pgmName);
@ -179,7 +178,7 @@ StorePaths reverseTopoSortPaths(const std::map<StorePath, ValidPathInfo> & paths
std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const StorePath & drvPath)
{
string base(drvPath.to_string());
std::string base(drvPath.to_string());
auto logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
createDirs(dirOf(logFile));
@ -192,7 +191,7 @@ std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const Store
void handshake(Machine::Connection & conn, unsigned int repeats)
{
conn.to << SERVE_MAGIC_1 << 0x204;
conn.to << SERVE_MAGIC_1 << 0x206;
conn.to.flush();
unsigned int magic = readInt(conn.from);
@ -232,10 +231,10 @@ BasicDerivation sendInputs(
a no-op for regular stores, but for the binary cache store,
this will copy the inputs to the binary cache from the local
store. */
if (localStore.getUri() != destStore.getUri()) {
StorePathSet closure;
localStore.computeFSClosure(step.drv->inputSrcs, closure);
copyPaths(localStore, destStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
if (localStore != destStore) {
copyClosure(localStore, destStore,
step.drv->inputSrcs,
NoRepair, NoCheckSigs, NoSubstitute);
}
{

View File

@ -1,4 +1,4 @@
#include "build-result.hh"
#include "hydra-build-result.hh"
#include "store-api.hh"
#include "util.hh"
#include "fs-accessor.hh"
@ -78,7 +78,7 @@ BuildOutput getBuildOutput(
product.type = match[1];
product.subtype = match[2];
std::string s(match[3]);
product.path = s[0] == '"' ? string(s, 1, s.size() - 2) : s;
product.path = s[0] == '"' ? std::string(s, 1, s.size() - 2) : s;
product.defaultPath = match[5];
/* Ensure that the path exists and points into the Nix

View File

@ -1,7 +1,7 @@
#include <cmath>
#include "state.hh"
#include "build-result.hh"
#include "hydra-build-result.hh"
#include "finally.hh"
#include "binary-cache-store.hh"

View File

@ -6,8 +6,10 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <prometheus/exposer.h>
#include "state.hh"
#include "build-result.hh"
#include "hydra-build-result.hh"
#include "store-api.hh"
#include "remote-store.hh"
@ -36,8 +38,55 @@ std::string getEnvOrDie(const std::string & key)
return *value;
}
State::PromMetrics::PromMetrics()
: registry(std::make_shared<prometheus::Registry>())
, queue_checks_started(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_checks_started_total")
.Help("Number of times State::getQueuedBuilds() was started")
.Register(*registry)
.Add({})
)
, queue_build_loads(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_build_loads_total")
.Help("Number of builds loaded")
.Register(*registry)
.Add({})
)
, queue_steps_created(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_steps_created_total")
.Help("Number of steps created")
.Register(*registry)
.Add({})
)
, queue_checks_early_exits(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_checks_early_exits_total")
.Help("Number of times State::getQueuedBuilds() yielded to potential bumps")
.Register(*registry)
.Add({})
)
, queue_checks_finished(
prometheus::BuildCounter()
.Name("hydraqueuerunner_queue_checks_finished_total")
.Help("Number of times State::getQueuedBuilds() was completed")
.Register(*registry)
.Add({})
)
, queue_max_id(
prometheus::BuildGauge()
.Name("hydraqueuerunner_queue_max_build_id_info")
.Help("Maximum build record ID in the queue")
.Register(*registry)
.Add({})
)
{
State::State()
}
State::State(std::optional<std::string> metricsAddrOpt)
: config(std::make_unique<HydraConfig>())
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
, dbPool(config->getIntOption("max_db_connections", 128))
@ -45,11 +94,16 @@ State::State()
, maxLogSize(config->getIntOption("max_log_size", 64ULL << 20))
, uploadLogsToBinaryCache(config->getBoolOption("upload_logs_to_binary_cache", false))
, rootsDir(config->getStrOption("gc_roots_dir", fmt("%s/gcroots/per-user/%s/hydra-roots", settings.nixStateDir, getEnvOrDie("LOGNAME"))))
, metricsAddr(config->getStrOption("queue_runner_metrics_address", std::string{"127.0.0.1:9198"}))
{
hydraData = getEnvOrDie("HYDRA_DATA");
logDir = canonPath(hydraData + "/build-logs");
if (metricsAddrOpt.has_value()) {
metricsAddr = metricsAddrOpt.value();
}
/* handle deprecated store specification */
if (config->getStrOption("store_mode") != "")
throw Error("store_mode in hydra.conf is deprecated, please use store_uri");
@ -87,7 +141,7 @@ void State::parseMachines(const std::string & contents)
}
for (auto line : tokenizeString<Strings>(contents, "\n")) {
line = trim(string(line, 0, line.find('#')));
line = trim(std::string(line, 0, line.find('#')));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(8);
@ -95,7 +149,7 @@ void State::parseMachines(const std::string & contents)
auto machine = std::make_shared<Machine>();
machine->sshName = tokens[0];
machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");
machine->sshKey = tokens[2] == "-" ? string("") : tokens[2];
machine->sshKey = tokens[2] == "-" ? std::string("") : tokens[2];
if (tokens[3] != "")
machine->maxJobs = string2Int<decltype(machine->maxJobs)>(tokens[3]).value();
else
@ -149,7 +203,7 @@ void State::parseMachines(const std::string & contents)
void State::monitorMachinesFile()
{
string defaultMachinesFile = "/etc/nix/machines";
std::string defaultMachinesFile = "/etc/nix/machines";
auto machinesFiles = tokenizeString<std::vector<Path>>(
getEnv("NIX_REMOTE_SYSTEMS").value_or(pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
@ -191,7 +245,7 @@ void State::monitorMachinesFile()
debug("reloading machines files");
string contents;
std::string contents;
for (auto & machinesFile : machinesFiles) {
try {
contents += readFile(machinesFile);
@ -308,7 +362,7 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const StorePath & drvPath, const string & outputName, const StorePath & storePath)
Build::ptr build, const StorePath & drvPath, const std::string & outputName, const StorePath & storePath)
{
restart:
auto stepNr = allocBuildStep(txn, build->id);
@ -683,14 +737,14 @@ void State::showStatus()
auto conn(dbPool.get());
receiver statusDumped(*conn, "status_dumped");
string status;
std::string status;
bool barf = false;
/* Get the last JSON status dump from the database. */
{
pqxx::work txn(*conn);
auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'");
if (res.size()) status = res[0][0].as<string>();
if (res.size()) status = res[0][0].as<std::string>();
}
if (status != "") {
@ -710,7 +764,7 @@ void State::showStatus()
{
pqxx::work txn(*conn);
auto res = txn.exec("select status from SystemStatus where what = 'queue-runner'");
if (res.size()) status = res[0][0].as<string>();
if (res.size()) status = res[0][0].as<std::string>();
}
}
@ -754,6 +808,18 @@ void State::run(BuildID buildOne)
if (!lock)
throw Error("hydra-queue-runner is already running");
std::cout << "Starting the Prometheus exporter on " << metricsAddr << std::endl;
/* Set up simple exporter, to show that we're still alive. */
prometheus::Exposer promExposer{metricsAddr};
auto exposerPort = promExposer.GetListeningPorts().front();
promExposer.RegisterCollectable(prom.registry);
std::cout << "Started the Prometheus exporter, listening on "
<< metricsAddr << "/metrics (port " << exposerPort << ")"
<< std::endl;
Store::Params localParams;
localParams["max-connections"] = "16";
localParams["max-connection-age"] = "600";
@ -864,6 +930,7 @@ int main(int argc, char * * argv)
bool unlock = false;
bool status = false;
BuildID buildOne = 0;
std::optional<std::string> metricsAddrOpt = std::nullopt;
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
if (*arg == "--unlock")
@ -875,6 +942,8 @@ int main(int argc, char * * argv)
buildOne = *b;
else
throw Error("--build-one requires a build ID");
} else if (*arg == "--prometheus-address") {
metricsAddrOpt = getArg(*arg, arg, end);
} else
return false;
return true;
@ -883,7 +952,7 @@ int main(int argc, char * * argv)
settings.verboseBuild = true;
settings.lockCPU = false;
State state;
State state{metricsAddrOpt};
if (status)
state.showStatus();
else if (unlock)

View File

@ -64,7 +64,7 @@ struct Extractor : ParseSink
}
}
void createSymlink(const Path & path, const string & target) override
void createSymlink(const Path & path, const std::string & target) override
{
members.insert_or_assign(prefix + path, NarMemberData { .type = FSAccessor::Type::tSymlink });
}

View File

@ -1,5 +1,5 @@
#include "state.hh"
#include "build-result.hh"
#include "hydra-build-result.hh"
#include "globals.hh"
#include <cstring>
@ -82,6 +82,8 @@ struct PreviousFailure : public std::exception {
bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId)
{
prom.queue_checks_started.Increment();
printInfo("checking the queue for builds > %d...", lastBuildId);
/* Grab the queued builds from the database, but don't process
@ -107,16 +109,19 @@ bool State::getQueuedBuilds(Connection & conn,
auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue;
if (id > newLastBuildId) newLastBuildId = id;
if (id > newLastBuildId) {
newLastBuildId = id;
prom.queue_max_id.Set(id);
}
if (builds_->count(id)) continue;
auto build = std::make_shared<Build>(
localStore->parseStorePath(row["drvPath"].as<string>()));
localStore->parseStorePath(row["drvPath"].as<std::string>()));
build->id = id;
build->jobsetId = row["jobset_id"].as<JobsetID>();
build->projectName = row["project"].as<string>();
build->jobsetName = row["jobset"].as<string>();
build->jobName = row["job"].as<string>();
build->projectName = row["project"].as<std::string>();
build->jobsetName = row["jobset"].as<std::string>();
build->jobName = row["job"].as<std::string>();
build->maxSilentTime = row["maxsilent"].as<int>();
build->buildTimeout = row["timeout"].as<int>();
build->timestamp = row["timestamp"].as<time_t>();
@ -136,6 +141,7 @@ bool State::getQueuedBuilds(Connection & conn,
std::set<StorePath> finishedDrvs;
createBuild = [&](Build::ptr build) {
prom.queue_build_loads.Increment();
printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName());
nrAdded++;
newBuildsByID.erase(build->id);
@ -306,9 +312,14 @@ bool State::getQueuedBuilds(Connection & conn,
/* Stop after a certain time to allow priority bumps to be
processed. */
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) break;
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) {
prom.queue_checks_early_exits.Increment();
break;
}
}
prom.queue_checks_finished.Increment();
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
return newBuildsByID.empty();
}
@ -437,6 +448,8 @@ Step::ptr State::createStep(ref<Store> destStore,
if (!isNew) return step;
prom.queue_steps_created.Increment();
printMsg(lvlDebug, "considering derivation %1%", localStore->printStorePath(drvPath));
/* Initialize the step. Note that the step may be visible in
@ -447,7 +460,7 @@ Step::ptr State::createStep(ref<Store> destStore,
step->parsedDrv = std::make_unique<ParsedDerivation>(drvPath, *step->drv);
step->preferLocalBuild = step->parsedDrv->willBuildLocally(*localStore);
step->isDeterministic = get(step->drv->env, "isDetermistic").value_or("0") == "1";
step->isDeterministic = getOr(step->drv->env, "isDetermistic", "0") == "1";
step->systemType = step->drv->platform;
{
@ -513,9 +526,9 @@ Step::ptr State::createStep(ref<Store> destStore,
// FIXME: should copy directly from substituter to destStore.
}
StorePathSet closure;
localStore->computeFSClosure({*path}, closure);
copyPaths(*localStore, *destStore, closure, NoRepair, CheckSigs, NoSubstitute);
copyClosure(*localStore, *destStore,
StorePathSet { *path },
NoRepair, CheckSigs, NoSubstitute);
time_t stopTime = time(0);
@ -620,7 +633,7 @@ void State::processJobsetSharesChange(Connection & conn)
auto res = txn.exec("select project, name, schedulingShares from Jobsets");
for (auto const & row : res) {
auto jobsets_(jobsets.lock());
auto i = jobsets_->find(std::make_pair(row["project"].as<string>(), row["name"].as<string>()));
auto i = jobsets_->find(std::make_pair(row["project"].as<std::string>(), row["name"].as<std::string>()));
if (i == jobsets_->end()) continue;
i->second->setShares(row["schedulingShares"].as<unsigned int>());
}

View File

@ -6,12 +6,18 @@
#include <map>
#include <memory>
#include <queue>
#include <regex>
#include <prometheus/counter.h>
#include <prometheus/gauge.h>
#include <prometheus/registry.h>
#include "db.hh"
#include "parsed-derivations.hh"
#include "pathlocks.hh"
#include "pool.hh"
#include "build-result.hh"
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
@ -290,7 +296,8 @@ struct Machine
bool isLocalhost()
{
return sshName == "localhost";
std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r);
}
// A connection to a machine
@ -444,8 +451,25 @@ private:
via gc_roots_dir. */
nix::Path rootsDir;
std::string metricsAddr;
struct PromMetrics
{
std::shared_ptr<prometheus::Registry> registry;
prometheus::Counter& queue_checks_started;
prometheus::Counter& queue_build_loads;
prometheus::Counter& queue_steps_created;
prometheus::Counter& queue_checks_early_exits;
prometheus::Counter& queue_checks_finished;
prometheus::Gauge& queue_max_id;
PromMetrics();
};
PromMetrics prom;
public:
State();
State(std::optional<std::string> metricsAddrOpt);
struct BuildOptions {
unsigned int maxSilentTime, buildTimeout, repeats;
@ -560,6 +584,8 @@ private:
void addRoot(const nix::StorePath & storePath);
void runMetricsExporter();
public:
void showStatus();