Merge branch 'master' into libpqxx_undeprecate

This commit is contained in:
Kevin Quick
2020-04-01 11:54:41 -07:00
committed by GitHub
88 changed files with 1667 additions and 1309 deletions

View File

@ -3,5 +3,5 @@ bin_PROGRAMS = hydra-queue-runner
hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
builder.cc build-result.cc build-remote.cc \
build-result.hh counter.hh token-server.hh state.hh db.hh
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx -lnixrust
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra -Wno-deprecated-declarations

View File

@ -82,10 +82,10 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil
static void copyClosureTo(std::timed_mutex & sendMutex, ref<Store> destStore,
FdSource & from, FdSink & to, const PathSet & paths,
FdSource & from, FdSink & to, const StorePathSet & paths,
bool useSubstitutes = false)
{
PathSet closure;
StorePathSet closure;
for (auto & path : paths)
destStore->computeFSClosure(path, closure);
@ -94,20 +94,21 @@ static void copyClosureTo(std::timed_mutex & sendMutex, ref<Store> destStore,
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
to << cmdQueryValidPaths << 1 << useSubstitutes << closure;
to << cmdQueryValidPaths << 1 << useSubstitutes;
writeStorePaths(*destStore, to, closure);
to.flush();
/* Get back the set of paths that are already valid on the remote
host. */
auto present = readStorePaths<PathSet>(*destStore, from);
auto present = readStorePaths<StorePathSet>(*destStore, from);
if (present.size() == closure.size()) return;
Paths sorted = destStore->topoSortPaths(closure);
auto sorted = destStore->topoSortPaths(closure);
Paths missing;
StorePathSet missing;
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
if (present.find(*i) == present.end()) missing.push_back(*i);
if (!present.count(*i)) missing.insert(i->clone());
printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
@ -131,7 +132,7 @@ void State::buildRemote(ref<Store> destStore,
{
assert(BuildResult::TimedOut == 8);
string base = baseNameOf(step->drvPath);
string base(step->drvPath.to_string());
result.logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
AutoDelete autoDelete(result.logFile, false);
@ -217,22 +218,22 @@ void State::buildRemote(ref<Store> destStore,
outputs of the input derivations. */
updateStep(ssSendingInputs);
PathSet inputs;
BasicDerivation basicDrv(step->drv);
StorePathSet inputs;
BasicDerivation basicDrv(*step->drv);
if (sendDerivation)
inputs.insert(step->drvPath);
inputs.insert(step->drvPath.clone());
else
for (auto & p : step->drv.inputSrcs)
inputs.insert(p);
for (auto & p : step->drv->inputSrcs)
inputs.insert(p.clone());
for (auto & input : step->drv.inputDrvs) {
Derivation drv2 = readDerivation(input.first);
for (auto & input : step->drv->inputDrvs) {
Derivation drv2 = readDerivation(*localStore, localStore->printStorePath(input.first));
for (auto & name : input.second) {
auto i = drv2.outputs.find(name);
if (i == drv2.outputs.end()) continue;
inputs.insert(i->second.path);
basicDrv.inputSrcs.insert(i->second.path);
inputs.insert(i->second.path.clone());
basicDrv.inputSrcs.insert(i->second.path.clone());
}
}
@ -241,14 +242,15 @@ void State::buildRemote(ref<Store> destStore,
this will copy the inputs to the binary cache from the local
store. */
if (localStore != std::shared_ptr<Store>(destStore))
copyClosure(ref<Store>(localStore), destStore, step->drv.inputSrcs, NoRepair, NoCheckSigs);
copyClosure(ref<Store>(localStore), destStore, step->drv->inputSrcs, NoRepair, NoCheckSigs);
/* Copy the input closure. */
if (!machine->isLocalhost()) {
auto mc1 = std::make_shared<MaintainCount<counter>>(nrStepsWaiting);
mc1.reset();
MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, format("sending closure of %1% to %2%") % step->drvPath % machine->sshName);
printMsg(lvlDebug, "sending closure of %s to %s",
localStore->printStorePath(step->drvPath), machine->sshName);
auto now1 = std::chrono::steady_clock::now();
@ -272,14 +274,19 @@ void State::buildRemote(ref<Store> destStore,
logFD = -1;
/* Do the build. */
printMsg(lvlDebug, format("building %1% on %2%") % step->drvPath % machine->sshName);
printMsg(lvlDebug, "building %s on %s",
localStore->printStorePath(step->drvPath),
machine->sshName);
updateStep(ssBuilding);
if (sendDerivation)
to << cmdBuildPaths << PathSet({step->drvPath});
else
to << cmdBuildDerivation << step->drvPath << basicDrv;
if (sendDerivation) {
to << cmdBuildPaths;
writeStorePaths(*localStore, to, singleton(step->drvPath));
} else {
to << cmdBuildDerivation << localStore->printStorePath(step->drvPath);
writeDerivation(to, *localStore, basicDrv);
}
to << maxSilentTime << buildTimeout;
if (GET_PROTOCOL_MINOR(remoteVersion) >= 2)
to << maxLogSize;
@ -380,7 +387,8 @@ void State::buildRemote(ref<Store> destStore,
/* If the path was substituted or already valid, then we didn't
get a build log. */
if (result.isCached) {
printMsg(lvlInfo, format("outputs of %1% substituted or already valid on %2%") % step->drvPath % machine->sshName);
printMsg(lvlInfo, "outputs of %s substituted or already valid on %s",
localStore->printStorePath(step->drvPath), machine->sshName);
unlink(result.logFile.c_str());
result.logFile = "";
}
@ -395,13 +403,12 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now();
PathSet outputs;
for (auto & output : step->drv.outputs)
outputs.insert(output.second.path);
auto outputs = step->drv->outputPaths();
/* Query the size of the output paths. */
size_t totalNarSize = 0;
to << cmdQueryPathInfos << outputs;
to << cmdQueryPathInfos;
writeStorePaths(*localStore, to, outputs);
to.flush();
while (true) {
if (readString(from) == "") break;
@ -416,8 +423,8 @@ void State::buildRemote(ref<Store> destStore,
return;
}
printMsg(lvlDebug, format("copying outputs of %s from %s (%d bytes)")
% step->drvPath % machine->sshName % totalNarSize);
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
/* Block until we have the required amount of memory
available, which is twice the NAR size (namely the
@ -431,10 +438,11 @@ void State::buildRemote(ref<Store> destStore,
auto resMs = std::chrono::duration_cast<std::chrono::milliseconds>(resStop - resStart).count();
if (resMs >= 1000)
printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s")
% resMs % totalNarSize % step->drvPath);
printMsg(lvlError, "warning: had to wait %d ms for %d memory tokens for %s",
resMs, totalNarSize, localStore->printStorePath(step->drvPath));
to << cmdExportPaths << 0 << outputs;
to << cmdExportPaths << 0;
writeStorePaths(*localStore, to, outputs);
to.flush();
destStore->importPaths(from, result.accessor, NoCheckSigs);

View File

@ -14,16 +14,14 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
BuildOutput res;
/* Compute the closure size. */
PathSet outputs;
for (auto & output : drv.outputs)
outputs.insert(output.second.path);
PathSet closure;
auto outputs = drv.outputPaths();
StorePathSet closure;
for (auto & output : outputs)
store->computeFSClosure(output, closure);
store->computeFSClosure(singleton(output), closure);
for (auto & path : closure) {
auto info = store->queryPathInfo(path);
res.closureSize += info->narSize;
if (outputs.find(path) != outputs.end()) res.size += info->narSize;
if (outputs.count(path)) res.size += info->narSize;
}
/* Get build products. */
@ -39,11 +37,13 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
, std::regex::extended);
for (auto & output : outputs) {
Path failedFile = output + "/nix-support/failed";
auto outputS = store->printStorePath(output);
Path failedFile = outputS + "/nix-support/failed";
if (accessor->stat(failedFile).type == FSAccessor::Type::tRegular)
res.failed = true;
Path productsFile = output + "/nix-support/hydra-build-products";
Path productsFile = outputS + "/nix-support/hydra-build-products";
if (accessor->stat(productsFile).type != FSAccessor::Type::tRegular)
continue;
@ -72,7 +72,7 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
auto st = accessor->stat(product.path);
if (st.type == FSAccessor::Type::tMissing) continue;
product.name = product.path == output ? "" : baseNameOf(product.path);
product.name = product.path == store->printStorePath(output) ? "" : baseNameOf(product.path);
if (st.type == FSAccessor::Type::tRegular) {
product.isRegular = true;
@ -91,14 +91,14 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
if (!explicitProducts) {
for (auto & output : drv.outputs) {
BuildProduct product;
product.path = output.second.path;
product.path = store->printStorePath(output.second.path);
product.type = "nix-build";
product.subtype = output.first == "out" ? "" : output.first;
product.name = storePathToName(product.path);
product.name = output.second.path.name();
auto st = accessor->stat(product.path);
if (st.type == FSAccessor::Type::tMissing)
throw Error(format("getting status of %1%") % product.path);
throw Error("getting status of %s", product.path);
if (st.type == FSAccessor::Type::tDirectory)
res.products.push_back(product);
}
@ -106,7 +106,7 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
/* Get the release name from $output/nix-support/hydra-release-name. */
for (auto & output : outputs) {
Path p = output + "/nix-support/hydra-release-name";
auto p = store->printStorePath(output) + "/nix-support/hydra-release-name";
if (accessor->stat(p).type != FSAccessor::Type::tRegular) continue;
try {
res.releaseName = trim(accessor->readFile(p));
@ -116,7 +116,7 @@ BuildOutput getBuildOutput(nix::ref<Store> store,
/* Get metrics. */
for (auto & output : outputs) {
Path metricsFile = output + "/nix-support/hydra-metrics";
auto metricsFile = store->printStorePath(output) + "/nix-support/hydra-metrics";
if (accessor->stat(metricsFile).type != FSAccessor::Type::tRegular) continue;
for (auto & line : tokenizeString<Strings>(accessor->readFile(metricsFile), "\n")) {
auto fields = tokenizeString<std::vector<std::string>>(line);

View File

@ -18,7 +18,7 @@ void setThreadName(const std::string & name)
void State::builder(MachineReservation::ptr reservation)
{
setThreadName("bld~" + baseNameOf(reservation->step->drvPath));
setThreadName("bld~" + std::string(reservation->step->drvPath.to_string()));
StepResult res = sRetry;
@ -39,8 +39,10 @@ void State::builder(MachineReservation::ptr reservation)
auto destStore = getDestStore();
res = doBuildStep(destStore, reservation, activeStep);
} catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception building %1% on %2%: %3%")
% reservation->step->drvPath % reservation->machine->sshName % e.what());
printMsg(lvlError, "uncaught exception building %s on %s: %s",
localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName,
e.what());
}
}
@ -60,7 +62,7 @@ void State::builder(MachineReservation::ptr reservation)
nrRetries++;
if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
int delta = retryInterval * std::pow(retryBackoff, step_->tries - 1) + (rand() % 10);
printMsg(lvlInfo, format("will retry %1% after %2%s") % step->drvPath % delta);
printMsg(lvlInfo, "will retry %s after %ss", localStore->printStorePath(step->drvPath), delta);
step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
}
@ -95,7 +97,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
cancelled (namely if there are no more Builds referring to
it). */
BuildID buildId;
Path buildDrvPath;
std::optional<StorePath> buildDrvPath;
unsigned int maxSilentTime, buildTimeout;
unsigned int repeats = step->isDeterministic ? 1 : 0;
@ -116,7 +118,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
possibility, we retry this step (putting it back in
the runnable queue). If there are really no strong
pointers to the step, it will be deleted. */
printMsg(lvlInfo, format("maybe cancelling build step %1%") % step->drvPath);
printMsg(lvlInfo, "maybe cancelling build step %s", localStore->printStorePath(step->drvPath));
return sMaybeCancelled;
}
@ -138,15 +140,15 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
if (!build) build = *dependents.begin();
buildId = build->id;
buildDrvPath = build->drvPath;
buildDrvPath = build->drvPath.clone();
maxSilentTime = build->maxSilentTime;
buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
step->drvPath, repeats + 1, machine->sshName, buildId, (dependents.size() - 1));
localStore->printStorePath(step->drvPath), repeats + 1, machine->sshName, buildId, (dependents.size() - 1));
}
bool quit = buildId == buildOne && step->drvPath == buildDrvPath;
bool quit = buildId == buildOne && step->drvPath == *buildDrvPath;
RemoteResult result;
BuildOutput res;
@ -166,7 +168,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try {
auto store = destStore.dynamic_pointer_cast<BinaryCacheStore>();
if (uploadLogsToBinaryCache && store && pathExists(result.logFile)) {
store->upsertFile("log/" + baseNameOf(step->drvPath), readFile(result.logFile), "text/plain; charset=utf-8");
store->upsertFile("log/" + std::string(step->drvPath.to_string()), readFile(result.logFile), "text/plain; charset=utf-8");
unlink(result.logFile.c_str());
}
} catch (...) {
@ -218,7 +220,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
if (result.stepStatus == bsSuccess) {
updateStep(ssPostProcessing);
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), step->drv);
res = getBuildOutput(destStore, ref<FSAccessor>(result.accessor), *step->drv);
}
result.accessor = 0;
@ -255,8 +257,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.canRetry) {
printMsg(lvlError, format("possibly transient failure building %1% on %2%: %3%")
% step->drvPath % machine->sshName % result.errorMsg);
printMsg(lvlError, "possibly transient failure building %s on %s: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg);
assert(stepNr);
bool retry;
{
@ -275,7 +277,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
assert(stepNr);
for (auto & path : step->drv.outputPaths())
for (auto & path : step->drv->outputPaths())
addRoot(path);
/* Register success in the database for all Build objects that
@ -308,7 +310,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
no new referrers can have been added in the
meantime or be added afterwards. */
if (direct.empty()) {
printMsg(lvlDebug, format("finishing build step %1%") % step->drvPath);
printMsg(lvlDebug, "finishing build step %s",
localStore->printStorePath(step->drvPath));
steps_->erase(step->drvPath);
}
}
@ -373,96 +376,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
}
}
} else {
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */
std::vector<BuildID> dependentIDs;
while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);
/* If there are no builds left, delete all referring
steps from steps. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, format("finishing build step %1%") % s->drvPath);
steps_->erase(s->drvPath);
}
}
}
if (indirect.empty() && stepFinished) break;
/* Update the database. */
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
/* Create failed build steps for every build that
depends on this, except when this step is cached
and is the top-level of that build (since then it's
redundant with the build's isCachedBuild field). */
for (auto & build2 : indirect) {
if ((result.stepStatus == bsCachedFailure && build2->drvPath == step->drvPath) ||
(result.stepStatus != bsCachedFailure && buildId == build2->id) ||
build2->finishedInDB)
continue;
createBuildStep(txn, 0, build2->id, step, machine->sshName,
result.stepStatus, result.errorMsg, buildId == build2->id ? 0 : buildId);
}
/* Mark all builds that depend on this derivation as failed. */
for (auto & build2 : indirect) {
if (build2->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
txn.exec_params0
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0",
build2->id,
(int) (build2->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()),
result.startTime,
result.stopTime,
result.stepStatus == bsCachedFailure ? 1 : 0);
nrBuildsDone++;
}
/* Remember failed paths in the database so that they
won't be built again. */
if (result.stepStatus != bsCachedFailure && result.canCache)
for (auto & path : step->drv.outputPaths())
txn.exec_params0("insert into FailedPaths values ($1)", path);
txn.commit();
}
stepFinished = true;
/* Remove the indirect dependencies from builds. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (buildOne == b->id) quit = true;
}
}
/* Send notification about this build and its dependents. */
{
pqxx::work txn(*conn);
notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
}
}
} else
failStep(*conn, step, buildId, result, machine, stepFinished, quit);
// FIXME: keep stats about aborted steps?
nrStepsDone++;
@ -478,8 +393,109 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
}
void State::addRoot(const Path & storePath)
void State::failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished,
bool & quit)
{
auto root = rootsDir + "/" + baseNameOf(storePath);
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */
std::vector<BuildID> dependentIDs;
while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);
/* If there are no builds left, delete all referring
steps from steps. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, "finishing build step %s",
localStore->printStorePath(s->drvPath));
steps_->erase(s->drvPath);
}
}
}
if (indirect.empty() && stepFinished) break;
/* Update the database. */
{
auto mc = startDbUpdate();
pqxx::work txn(conn);
/* Create failed build steps for every build that
depends on this, except when this step is cached
and is the top-level of that build (since then it's
redundant with the build's isCachedBuild field). */
for (auto & build : indirect) {
if ((result.stepStatus == bsCachedFailure && build->drvPath == step->drvPath) ||
((result.stepStatus != bsCachedFailure && result.stepStatus != bsUnsupported) && buildId == build->id) ||
build->finishedInDB)
continue;
createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
}
/* Mark all builds that depend on this derivation as failed. */
for (auto & build : indirect) {
if (build->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build->id);
txn.exec_params0
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0",
build->id,
(int) (build->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()),
result.startTime,
result.stopTime,
result.stepStatus == bsCachedFailure ? 1 : 0);
nrBuildsDone++;
}
/* Remember failed paths in the database so that they
won't be built again. */
if (result.stepStatus != bsCachedFailure && result.canCache)
for (auto & path : step->drv.outputPaths())
txn.exec_params0("insert into FailedPaths values ($1)", localStore->printStorePath(path));
txn.commit();
}
stepFinished = true;
/* Remove the indirect dependencies from builds. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (buildOne == b->id) quit = true;
}
}
/* Send notification about this build and its dependents. */
{
pqxx::work txn(conn);
notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
}
}
void State::addRoot(const StorePath & storePath)
{
auto root = rootsDir + "/" + std::string(storePath.to_string());
if (!pathExists(root)) writeFile(root, "");
}

View File

@ -10,7 +10,7 @@ using namespace nix;
void State::makeRunnable(Step::ptr step)
{
printMsg(lvlChatty, format("step %1% is now runnable") % step->drvPath);
printMsg(lvlChatty, "step %s is now runnable", localStore->printStorePath(step->drvPath));
{
auto step_(step->state.lock());
@ -248,7 +248,7 @@ system_time State::doDispatch()
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, step->drvPath, step->drv.platform);
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform);
continue;
}
@ -300,6 +300,8 @@ system_time State::doDispatch()
} while (keepGoing);
abortUnsupported();
return sleepUntil;
}
@ -314,6 +316,96 @@ void State::wakeDispatcher()
}
void State::abortUnsupported()
{
/* Make a copy of 'runnable' and 'machines' so we don't block them
very long. */
auto runnable2 = *runnable.lock();
auto machines2 = *machines.lock();
system_time now = std::chrono::system_clock::now();
auto now2 = time(0);
std::unordered_set<Step::ptr> aborted;
size_t count = 0;
for (auto & wstep : runnable2) {
auto step(wstep.lock());
if (!step) continue;
bool supported = false;
for (auto & machine : machines2) {
if (machine.second->supportsStep(step)) {
step->state.lock()->lastSupported = now;
supported = true;
break;
}
}
if (!supported)
count++;
if (!supported
&& std::chrono::duration_cast<std::chrono::seconds>(now - step->state.lock()->lastSupported).count() >= maxUnsupportedTime)
{
printError("aborting unsupported build step '%s' (type '%s')",
localStore->printStorePath(step->drvPath),
step->systemType);
aborted.insert(step);
auto conn(dbPool.get());
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);
/* Maybe the step got cancelled. */
if (dependents.empty()) continue;
/* Find the build that has this step as the top-level (if
any). */
Build::ptr build;
for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath)
build = build2;
}
if (!build) build = *dependents.begin();
bool stepFinished = false;
bool quit = false;
failStep(
*conn, step, build->id,
RemoteResult {
.stepStatus = bsUnsupported,
.errorMsg = fmt("unsupported system type '%s'",
step->systemType),
.startTime = now2,
.stopTime = now2,
},
nullptr, stepFinished, quit);
if (quit) exit(1);
}
}
/* Clean up 'runnable'. */
{
auto runnable_(runnable.lock());
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
if (aborted.count(i->lock()))
i = runnable_->erase(i);
else
++i;
}
}
nrUnsupportedSteps = count;
}
void Jobset::addStep(time_t startTime, time_t duration)
{
auto steps_(steps.lock());

View File

@ -39,14 +39,15 @@ static uint64_t getMemSize()
std::string getEnvOrDie(const std::string & key)
{
char * value = getenv(key.c_str());
auto value = getEnv(key);
if (!value) throw Error("environment variable '%s' is not set", key);
return value;
return *value;
}
State::State()
: config(std::make_unique<::Config>())
, maxUnsupportedTime(config->getIntOption("max_unsupported_time", 0))
, dbPool(config->getIntOption("max_db_connections", 128))
, memoryTokens(config->getIntOption("nar_buffer_size", getMemSize() / 2))
, maxOutputSize(config->getIntOption("max_output_size", 2ULL << 30))
@ -161,7 +162,7 @@ void State::monitorMachinesFile()
{
string defaultMachinesFile = "/etc/nix/machines";
auto machinesFiles = tokenizeString<std::vector<Path>>(
getEnv("NIX_REMOTE_SYSTEMS", pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
getEnv("NIX_REMOTE_SYSTEMS").value_or(pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
if (machinesFiles.empty()) {
parseMachines("localhost " +
@ -219,6 +220,7 @@ void State::monitorMachinesFile()
sleep(30);
} catch (std::exception & e) {
printMsg(lvlError, format("reloading machines file: %1%") % e.what());
sleep(5);
}
}
}
@ -253,7 +255,7 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID
buildId,
stepNr,
0, // == build
step->drvPath,
localStore->printStorePath(step->drvPath),
status == bsBusy ? 1 : 0,
startTime != 0 ? std::make_optional(startTime) : std::nullopt,
step->drv.platform,
@ -268,7 +270,7 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID
for (auto & output : step->drv.outputs)
txn.exec_params0
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)",
buildId, stepNr, output.first, output.second.path);
buildId, stepNr, output.first, localStore->printStorePath(output.second.path));
if (status == bsBusy)
txn.exec(fmt("notify step_started, '%d\t%d'", buildId, stepNr));
@ -309,7 +311,7 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const Path & drvPath, const string & outputName, const Path & storePath)
Build::ptr build, const StorePath & drvPath, const string & outputName, const StorePath & storePath)
{
restart:
auto stepNr = allocBuildStep(txn, build->id);
@ -319,7 +321,7 @@ int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t sto
build->id,
stepNr,
1, // == substitution
drvPath,
(localStore->printStorePath(drvPath)),
0,
0,
startTime,
@ -329,7 +331,8 @@ int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t sto
txn.exec_params0
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)",
build->id, stepNr, outputName, storePath);
build->id, stepNr, outputName,
localStore->printStorePath(storePath));
return stepNr;
}
@ -450,7 +453,7 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn)
{
pqxx::work txn(conn);
for (auto & path : step->drv.outputPaths())
if (!txn.exec_params("select 1 from FailedPaths where path = $1", path).empty())
if (!txn.exec_params("select 1 from FailedPaths where path = $1", localStore->printStorePath(path)).empty())
return true;
return false;
}
@ -486,7 +489,7 @@ std::shared_ptr<PathLocks> State::acquireGlobalLock()
}
void State::dumpStatus(Connection & conn, bool log)
void State::dumpStatus(Connection & conn)
{
std::ostringstream out;
@ -518,6 +521,7 @@ void State::dumpStatus(Connection & conn, bool log)
root.attr("nrStepsCopyingTo", nrStepsCopyingTo);
root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom);
root.attr("nrStepsWaiting", nrStepsWaiting);
root.attr("nrUnsupportedSteps", nrUnsupportedSteps);
root.attr("bytesSent", bytesSent);
root.attr("bytesReceived", bytesReceived);
root.attr("nrBuildsRead", nrBuildsRead);
@ -666,11 +670,6 @@ void State::dumpStatus(Connection & conn, bool log)
}
}
if (log && time(0) >= lastStatusLogged + statusLogInterval) {
printMsg(lvlInfo, format("status: %1%") % out.str());
lastStatusLogged = time(0);
}
{
auto mc = startDbUpdate();
pqxx::work txn(conn);
@ -762,7 +761,7 @@ void State::run(BuildID buildOne)
Store::Params localParams;
localParams["max-connections"] = "16";
localParams["max-connection-age"] = "600";
localStore = openStore(getEnv("NIX_REMOTE"), localParams);
localStore = openStore(getEnv("NIX_REMOTE").value_or(""), localParams);
auto storeUri = config->getStrOption("store_uri");
_destStore = storeUri == "" ? localStore : openStore(storeUri);
@ -779,7 +778,7 @@ void State::run(BuildID buildOne)
{
auto conn(dbPool.get());
clearBusy(*conn, 0);
dumpStatus(*conn, false);
dumpStatus(*conn);
}
std::thread(&State::monitorMachinesFile, this).detach();
@ -842,8 +841,8 @@ void State::run(BuildID buildOne)
auto conn(dbPool.get());
receiver dumpStatus_(*conn, "dump_status");
while (true) {
conn->await_notification(statusLogInterval / 2 + 1, 0);
dumpStatus(*conn, true);
conn->await_notification();
dumpStatus(*conn);
}
} catch (std::exception & e) {
printMsg(lvlError, format("main thread: %1%") % e.what());

View File

@ -83,7 +83,7 @@ bool State::getQueuedBuilds(Connection & conn,
them yet (since we don't want a long-running transaction). */
std::vector<BuildID> newIDs;
std::map<BuildID, Build::ptr> newBuildsByID;
std::multimap<Path, BuildID> newBuildsByPath;
std::multimap<StorePath, BuildID> newBuildsByPath;
unsigned int newLastBuildId = lastBuildId;
@ -102,9 +102,9 @@ bool State::getQueuedBuilds(Connection & conn,
if (id > newLastBuildId) newLastBuildId = id;
if (builds_->count(id)) continue;
auto build = std::make_shared<Build>();
auto build = std::make_shared<Build>(
localStore->parseStorePath(row["drvPath"].as<string>()));
build->id = id;
build->drvPath = row["drvPath"].as<string>();
build->projectName = row["project"].as<string>();
build->jobsetName = row["jobset"].as<string>();
build->jobName = row["job"].as<string>();
@ -117,14 +117,14 @@ bool State::getQueuedBuilds(Connection & conn,
newIDs.push_back(id);
newBuildsByID[id] = build;
newBuildsByPath.emplace(std::make_pair(build->drvPath, id));
newBuildsByPath.emplace(std::make_pair(build->drvPath.clone(), id));
}
}
std::set<Step::ptr> newRunnable;
unsigned int nrAdded;
std::function<void(Build::ptr)> createBuild;
std::set<Path> finishedDrvs;
std::set<StorePath> finishedDrvs;
createBuild = [&](Build::ptr build) {
printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName());
@ -160,7 +160,8 @@ bool State::getQueuedBuilds(Connection & conn,
/* Some step previously failed, so mark the build as
failed right away. */
printMsg(lvlError, format("marking build %d as cached failure due to %s") % build->id % ex.step->drvPath);
printMsg(lvlError, "marking build %d as cached failure due to %s",
build->id, localStore->printStorePath(ex.step->drvPath));
if (!build->finishedInDB) {
auto mc = startDbUpdate();
pqxx::work txn(conn);
@ -171,14 +172,14 @@ bool State::getQueuedBuilds(Connection & conn,
auto res = txn.exec_params1
("select max(build) from BuildSteps where drvPath = $1 and startTime != 0 and stopTime != 0 and status = 1",
ex.step->drvPath);
localStore->printStorePathh(ex.step->drvPath));
if (!res[0].is_null()) propagatedFrom = res[0].as<BuildID>();
if (!propagatedFrom) {
for (auto & output : ex.step->drv.outputs) {
auto res = txn.exec_params
("select max(s.build) from BuildSteps s join BuildStepOutputs o on s.build = o.build where path = $1 and startTime != 0 and stopTime != 0 and status = 1",
output.second.path);
localStore->printStorePath(output.second.path));
if (!res[0][0].is_null()) {
propagatedFrom = res[0][0].as<BuildID>();
break;
@ -217,7 +218,7 @@ bool State::getQueuedBuilds(Connection & conn,
/* If we didn't get a step, it means the step's outputs are
all valid. So we mark this as a finished, cached build. */
if (!step) {
Derivation drv = readDerivation(build->drvPath);
Derivation drv = readDerivation(*localStore, localStore->printStorePath(build->drvPath));
BuildOutput res = getBuildOutputCached(conn, destStore, drv);
for (auto & path : drv.outputPaths())
@ -227,7 +228,7 @@ bool State::getQueuedBuilds(Connection & conn,
auto mc = startDbUpdate();
pqxx::work txn(conn);
time_t now = time(0);
printMsg(lvlInfo, format("marking build %1% as succeeded (cached)") % build->id);
printMsg(lvlInfo, "marking build %1% as succeeded (cached)", build->id);
markSucceededBuild(txn, build, res, true, now, now);
notifyBuildFinished(txn, build->id, {});
txn.commit();
@ -250,8 +251,8 @@ bool State::getQueuedBuilds(Connection & conn,
build->propagatePriorities();
printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
% build->id % step->drvPath % newSteps.size());
printMsg(lvlChatty, "added build %1% (top-level step %2%, %3% new steps)",
build->id, localStore->printStorePath(step->drvPath), newSteps.size());
};
/* Now instantiate build steps for each new build. The builder
@ -271,7 +272,7 @@ bool State::getQueuedBuilds(Connection & conn,
try {
createBuild(build);
} catch (Error & e) {
e.addPrefix(format("while loading build %1%: ") % build->id);
e.addPrefix(fmt("while loading build %1%: ", build->id));
throw;
}
@ -358,10 +359,12 @@ void State::processQueueChange(Connection & conn)
activeStepState->cancelled = true;
if (activeStepState->pid != -1) {
printInfo("killing builder process %d of build step %s",
activeStepState->pid, activeStep->step->drvPath);
activeStepState->pid,
localStore->printStorePath(activeStep->step->drvPath));
if (kill(activeStepState->pid, SIGINT) == -1)
printError("error killing build step %s: %s",
activeStep->step->drvPath, strerror(errno));
localStore->printStorePath(activeStep->step->drvPath),
strerror(errno));
}
}
}
@ -370,8 +373,8 @@ void State::processQueueChange(Connection & conn)
Step::ptr State::createStep(ref<Store> destStore,
Connection & conn, Build::ptr build, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
Connection & conn, Build::ptr build, const StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<StorePath> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
{
if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;
@ -399,8 +402,7 @@ Step::ptr State::createStep(ref<Store> destStore,
/* If it doesn't exist, create it. */
if (!step) {
step = std::make_shared<Step>();
step->drvPath = drvPath;
step = std::make_shared<Step>(drvPath.clone());
isNew = true;
}
@ -414,28 +416,28 @@ Step::ptr State::createStep(ref<Store> destStore,
if (referringStep)
step_->rdeps.push_back(referringStep);
(*steps_)[drvPath] = step;
steps_->insert_or_assign(drvPath.clone(), step);
}
if (!isNew) return step;
printMsg(lvlDebug, format("considering derivation %1%") % drvPath);
printMsg(lvlDebug, "considering derivation %1%", localStore->printStorePath(drvPath));
/* Initialize the step. Note that the step may be visible in
steps before this point, but that doesn't matter because
it's not runnable yet, and other threads won't make it
runnable while step->created == false. */
step->drv = readDerivation(drvPath);
step->parsedDrv = std::make_unique<ParsedDerivation>(drvPath, step->drv);
step->drv = std::make_unique<Derivation>(readDerivation(*localStore, localStore->printStorePath(drvPath)));
step->parsedDrv = std::make_unique<ParsedDerivation>(drvPath.clone(), *step->drv);
step->preferLocalBuild = step->parsedDrv->willBuildLocally();
step->isDeterministic = get(step->drv.env, "isDetermistic", "0") == "1";
step->isDeterministic = get(step->drv->env, "isDetermistic").value_or("0") == "1";
step->systemType = step->drv.platform;
step->systemType = step->drv->platform;
{
auto i = step->drv.env.find("requiredSystemFeatures");
auto i = step->drv->env.find("requiredSystemFeatures");
StringSet features;
if (i != step->drv.env.end())
if (i != step->drv->env.end())
features = step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);
if (step->preferLocalBuild)
features.insert("local");
@ -451,12 +453,13 @@ Step::ptr State::createStep(ref<Store> destStore,
/* Are all outputs valid? */
bool valid = true;
PathSet outputs = step->drv.outputPaths();
auto outputs = step->drv->outputPaths();
DerivationOutputs missing;
for (auto & i : step->drv.outputs)
for (auto & i : step->drv->outputs)
if (!destStore->isValidPath(i.second.path)) {
valid = false;
missing[i.first] = i.second;
missing.insert_or_assign(i.first,
DerivationOutput(i.second.path.clone(), std::string(i.second.hashAlgo), std::string(i.second.hash)));
}
/* Try to copy the missing paths from the local store or from
@ -469,7 +472,7 @@ Step::ptr State::createStep(ref<Store> destStore,
avail++;
else if (useSubstitutes) {
SubstitutablePathInfos infos;
localStore->querySubstitutablePathInfos({i.second.path}, infos);
localStore->querySubstitutablePathInfos(singleton(i.second.path), infos);
if (infos.size() == 1)
avail++;
}
@ -482,14 +485,18 @@ Step::ptr State::createStep(ref<Store> destStore,
time_t startTime = time(0);
if (localStore->isValidPath(i.second.path))
printInfo("copying output %1% of %2% from local store", i.second.path, drvPath);
printInfo("copying output %1% of %2% from local store",
localStore->printStorePath(i.second.path),
localStore->printStorePath(drvPath));
else {
printInfo("substituting output %1% of %2%", i.second.path, drvPath);
printInfo("substituting output %1% of %2%",
localStore->printStorePath(i.second.path),
localStore->printStorePath(drvPath));
localStore->ensurePath(i.second.path);
// FIXME: should copy directly from substituter to destStore.
}
copyClosure(ref<Store>(localStore), destStore, {i.second.path});
copyClosure(ref<Store>(localStore), destStore, singleton(i.second.path));
time_t stopTime = time(0);
@ -501,7 +508,10 @@ Step::ptr State::createStep(ref<Store> destStore,
}
} catch (Error & e) {
printError("while copying/substituting output %s of %s: %s", i.second.path, drvPath, e.what());
printError("while copying/substituting output %s of %s: %s",
localStore->printStorePath(i.second.path),
localStore->printStorePath(drvPath),
e.what());
valid = false;
break;
}
@ -511,15 +521,15 @@ Step::ptr State::createStep(ref<Store> destStore,
// FIXME: check whether all outputs are in the binary cache.
if (valid) {
finishedDrvs.insert(drvPath);
finishedDrvs.insert(drvPath.clone());
return 0;
}
/* No, we need to build. */
printMsg(lvlDebug, format("creating build step %1%") % drvPath);
printMsg(lvlDebug, "creating build step %1%", localStore->printStorePath(drvPath));
/* Create steps for the dependencies. */
for (auto & i : step->drv.inputDrvs) {
for (auto & i : step->drv->inputDrvs) {
auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
if (dep) {
auto step_(step->state.lock());
@ -610,7 +620,7 @@ BuildOutput State::getBuildOutputCached(Connection & conn, nix::ref<nix::Store>
("select id, buildStatus, releaseName, closureSize, size from Builds b "
"join BuildOutputs o on b.id = o.build "
"where finished = 1 and (buildStatus = 0 or buildStatus = 6) and path = $1",
output.second.path);
localStore->printStorePath(output.second.path));
if (r.empty()) continue;
BuildID id = r[0][0].as<BuildID>();

View File

@ -68,7 +68,7 @@ struct RemoteResult
std::unique_ptr<nix::TokenServer::Token> tokens;
std::shared_ptr<nix::FSAccessor> accessor;
BuildStatus buildStatus()
BuildStatus buildStatus() const
{
return stepStatus == bsCachedFailure ? bsFailed : stepStatus;
}
@ -123,8 +123,8 @@ struct Build
typedef std::weak_ptr<Build> wptr;
BuildID id;
nix::Path drvPath;
std::map<std::string, nix::Path> outputs;
nix::StorePath drvPath;
std::map<std::string, nix::StorePath> outputs;
std::string projectName, jobsetName, jobName;
time_t timestamp;
unsigned int maxSilentTime, buildTimeout;
@ -136,6 +136,9 @@ struct Build
std::atomic_bool finishedInDB{false};
Build(nix::StorePath && drvPath) : drvPath(std::move(drvPath))
{ }
std::string fullJobName()
{
return projectName + ":" + jobsetName + ":" + jobName;
@ -150,8 +153,8 @@ struct Step
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
nix::Path drvPath;
nix::Derivation drv;
nix::StorePath drvPath;
std::unique_ptr<nix::Derivation> drv;
std::unique_ptr<nix::ParsedDerivation> parsedDrv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
@ -195,12 +198,19 @@ struct Step
/* The time at which this step became runnable. */
system_time runnableSince;
/* The time that we last saw a machine that supports this
step. */
system_time lastSupported = std::chrono::system_clock::now();
};
std::atomic_bool finished{false}; // debugging
nix::Sync<State> state;
Step(nix::StorePath && drvPath) : drvPath(std::move(drvPath))
{ }
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
@ -252,7 +262,7 @@ struct Machine
{
/* Check that this machine is of the type required by the
step. */
if (!systemTypes.count(step->drv.platform == "builtin" ? nix::settings.thisSystem : step->drv.platform))
if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
return false;
/* Check that the step requires all mandatory features of this
@ -297,6 +307,9 @@ private:
const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
/* Time in seconds before unsupported build steps are aborted. */
const unsigned int maxUnsupportedTime = 0;
nix::Path hydraData, logDir;
bool useSubstitutes = false;
@ -313,7 +326,7 @@ private:
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<nix::Path, Step::wptr> Steps;
typedef std::map<nix::StorePath, Step::wptr> Steps;
nix::Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
@ -342,6 +355,7 @@ private:
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrStepsWaiting{0};
counter nrUnsupportedSteps{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
@ -406,9 +420,6 @@ private:
size_t maxOutputSize;
size_t maxLogSize;
time_t lastStatusLogged = 0;
const int statusLogInterval = 300;
/* Steps that were busy while we encounted a PostgreSQL
error. These need to be cleared at a later time to prevent them
from showing up as busy until the queue runner is restarted. */
@ -454,7 +465,7 @@ private:
const std::string & machine);
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::Path & drvPath, const std::string & outputName, const nix::Path & storePath);
Build::ptr build, const nix::StorePath & drvPath, const std::string & outputName, const nix::StorePath & storePath);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
@ -473,10 +484,19 @@ private:
const nix::Derivation & drv);
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::Path> & finishedDrvs,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished,
bool & quit);
Jobset::ptr createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName);
@ -491,6 +511,8 @@ private:
void wakeDispatcher();
void abortUnsupported();
void builder(MachineReservation::ptr reservation);
/* Perform the given build step. Return true if the step is to be
@ -521,9 +543,9 @@ private:
has it. */
std::shared_ptr<nix::PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn, bool log);
void dumpStatus(Connection & conn);
void addRoot(const nix::Path & storePath);
void addRoot(const nix::StorePath & storePath);
public:

View File

@ -7,7 +7,7 @@
namespace nix {
MakeError(NoTokens, Error)
MakeError(NoTokens, Error);
/* This class hands out tokens. There are only maxTokens tokens
available. Calling get(N) will return a Token object, representing