Allow build to be bumped to the front of the queue via the web interface
Builds now have a "Bump up" action. This will cause the queue runner to prioritise the steps of the build above all other steps.
This commit is contained in:
		| @@ -144,7 +144,9 @@ system_time State::doDispatch() | ||||
|             { | ||||
|                 auto a_(a->state.lock()); | ||||
|                 auto b_(b->state.lock()); // FIXME: deadlock? | ||||
|                 return a_->lowestBuildID < b_->lowestBuildID; | ||||
|                 return | ||||
|                     a_->highestGlobalPriority != b_->highestGlobalPriority ? a_->highestGlobalPriority > b_->highestGlobalPriority : | ||||
|                     a_->lowestBuildID < b_->lowestBuildID; | ||||
|             }); | ||||
|  | ||||
|         /* Find a machine with a free slot and find a step to run | ||||
|   | ||||
| @@ -25,6 +25,7 @@ void State::queueMonitorLoop() | ||||
|     receiver buildsRestarted(*conn, "builds_restarted"); | ||||
|     receiver buildsCancelled(*conn, "builds_cancelled"); | ||||
|     receiver buildsDeleted(*conn, "builds_deleted"); | ||||
|     receiver buildsBumped(*conn, "builds_bumped"); | ||||
|  | ||||
|     auto store = openStore(); // FIXME: pool | ||||
|  | ||||
| @@ -44,9 +45,9 @@ void State::queueMonitorLoop() | ||||
|             printMsg(lvlTalkative, "got notification: builds restarted"); | ||||
|             lastBuildId = 0; // check all builds | ||||
|         } | ||||
|         if (buildsCancelled.get() || buildsDeleted.get()) { | ||||
|             printMsg(lvlTalkative, "got notification: builds cancelled"); | ||||
|             removeCancelledBuilds(*conn); | ||||
|         if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { | ||||
|             printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); | ||||
|             processQueueChange(*conn); | ||||
|         } | ||||
|  | ||||
|     } | ||||
| @@ -64,7 +65,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, | ||||
|     { | ||||
|         pqxx::work txn(conn); | ||||
|  | ||||
|         auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); | ||||
|         auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); | ||||
|  | ||||
|         for (auto const & row : res) { | ||||
|             auto builds_(builds.lock()); | ||||
| @@ -82,6 +83,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, | ||||
|             build->maxSilentTime = row["maxsilent"].as<int>(); | ||||
|             build->buildTimeout = row["timeout"].as<int>(); | ||||
|             build->timestamp = row["timestamp"].as<time_t>(); | ||||
|             build->globalPriority = row["globalPriority"].as<int>(); | ||||
|  | ||||
|             newBuilds.emplace(std::make_pair(build->drvPath, build)); | ||||
|         } | ||||
| @@ -228,13 +230,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, | ||||
|             throw; | ||||
|         } | ||||
|  | ||||
|         /* Update the lowest build ID field of each dependency. This | ||||
|            is used by the dispatcher to start steps in order of build | ||||
|            ID. */ | ||||
|         visitDependencies([&](const Step::ptr & step) { | ||||
|             auto step_(step->state.lock()); | ||||
|             step_->lowestBuildID = std::min(step_->lowestBuildID, build->id); | ||||
|         }, build->toplevel); | ||||
|         build->propagatePriorities(); | ||||
|  | ||||
|         /* Add the new runnable build steps to ‘runnable’ and wake up | ||||
|            the builder threads. */ | ||||
| @@ -247,26 +243,47 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, | ||||
| } | ||||
|  | ||||
|  | ||||
| void State::removeCancelledBuilds(Connection & conn) | ||||
| void Build::propagatePriorities() | ||||
| { | ||||
|     /* Update the highest global priority and lowest build ID fields | ||||
|        of each dependency. This is used by the dispatcher to start | ||||
|        steps in order of descending global priority and ascending | ||||
|        build ID. */ | ||||
|     visitDependencies([&](const Step::ptr & step) { | ||||
|         auto step_(step->state.lock()); | ||||
|         step_->highestGlobalPriority = std::max(step_->highestGlobalPriority, globalPriority); | ||||
|         step_->lowestBuildID = std::min(step_->lowestBuildID, id); | ||||
|     }, toplevel); | ||||
| } | ||||
|  | ||||
|  | ||||
| void State::processQueueChange(Connection & conn) | ||||
| { | ||||
|     /* Get the current set of queued builds. */ | ||||
|     std::set<BuildID> currentIds; | ||||
|     std::map<BuildID, int> currentIds; | ||||
|     { | ||||
|         pqxx::work txn(conn); | ||||
|         auto res = txn.exec("select id from Builds where finished = 0"); | ||||
|         auto res = txn.exec("select id, globalPriority from Builds where finished = 0"); | ||||
|         for (auto const & row : res) | ||||
|             currentIds.insert(row["id"].as<BuildID>()); | ||||
|             currentIds[row["id"].as<BuildID>()] = row["globalPriority"].as<BuildID>(); | ||||
|     } | ||||
|  | ||||
|     auto builds_(builds.lock()); | ||||
|  | ||||
|     for (auto i = builds_->begin(); i != builds_->end(); ) { | ||||
|         if (currentIds.find(i->first) == currentIds.end()) { | ||||
|         auto b = currentIds.find(i->first); | ||||
|         if (b == currentIds.end()) { | ||||
|             printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); | ||||
|             i = builds_->erase(i); | ||||
|             // FIXME: ideally we would interrupt active build steps here. | ||||
|         } else | ||||
|             ++i; | ||||
|             continue; | ||||
|         } | ||||
|         if (i->second->globalPriority < b->second) { | ||||
|             printMsg(lvlInfo, format("priority of build %1% increased") % i->first); | ||||
|             i->second->globalPriority = b->second; | ||||
|             i->second->propagatePriorities(); | ||||
|         } | ||||
|         ++i; | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -71,6 +71,7 @@ struct Build | ||||
|     std::string projectName, jobsetName, jobName; | ||||
|     time_t timestamp; | ||||
|     unsigned int maxSilentTime, buildTimeout; | ||||
|     int globalPriority; | ||||
|  | ||||
|     std::shared_ptr<Step> toplevel; | ||||
|  | ||||
| @@ -80,6 +81,8 @@ struct Build | ||||
|     { | ||||
|         return projectName + ":" + jobsetName + ":" + jobName; | ||||
|     } | ||||
|  | ||||
|     void propagatePriorities(); | ||||
| }; | ||||
|  | ||||
|  | ||||
| @@ -113,7 +116,11 @@ struct Step | ||||
|         /* Point in time after which the step can be retried. */ | ||||
|         system_time after; | ||||
|  | ||||
|         /* The lowest build ID depending on this step. */ | ||||
|         /* The highest global priority of any build depending on this | ||||
|            step. */ | ||||
|         int highestGlobalPriority{0}; | ||||
|  | ||||
|         /* The lowest ID of any build depending on this step. */ | ||||
|         BuildID lowestBuildID{std::numeric_limits<BuildID>::max()}; | ||||
|     }; | ||||
|  | ||||
| @@ -282,9 +289,11 @@ private: | ||||
|  | ||||
|     void queueMonitorLoop(); | ||||
|  | ||||
|     /* Check the queue for new builds. */ | ||||
|     void getQueuedBuilds(Connection & conn, std::shared_ptr<nix::StoreAPI> store, unsigned int & lastBuildId); | ||||
|  | ||||
|     void removeCancelledBuilds(Connection & conn); | ||||
|     /* Handle cancellation, deletion and priority bumps. */ | ||||
|     void processQueueChange(Connection & conn); | ||||
|  | ||||
|     Step::ptr createStep(std::shared_ptr<nix::StoreAPI> store, const nix::Path & drvPath, | ||||
|         Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::Path> & finishedDrvs, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user