From 27182c7c1d54a71a333f20a132d580c007e663b5 Mon Sep 17 00:00:00 2001
From: Eelco Dolstra <eelco.dolstra@logicblox.com>
Date: Mon, 10 Aug 2015 14:50:22 +0200
Subject: [PATCH] Start steps in order of ascending build ID

---
 src/hydra-queue-runner/dispatcher.cc         | 72 +++++++++++++-------
 src/hydra-queue-runner/hydra-queue-runner.cc | 22 ++++++
 src/hydra-queue-runner/queue-monitor.cc      |  8 +++
 src/hydra-queue-runner/state.hh              |  6 ++
 4 files changed, 85 insertions(+), 23 deletions(-)

diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc
index 3f3776c4..4903a8c1 100644
--- a/src/hydra-queue-runner/dispatcher.cc
+++ b/src/hydra-queue-runner/dispatcher.cc
@@ -1,3 +1,4 @@
+#include <iostream>
 #include <algorithm>
 #include <thread>
 
@@ -53,8 +54,8 @@ void State::dispatcher()
 
 system_time State::doDispatch()
 {
+    /* Start steps until we're out of steps or slots. */
     auto sleepUntil = system_time::max();
-
     bool keepGoing;
 
     do {
@@ -105,35 +106,23 @@ system_time State::doDispatch()
                     a.currentJobs > b.currentJobs;
             });
 
-        /* Find a machine with a free slot and find a step to run
-           on it. Once we find such a pair, we restart the outer
-           loop because the machine sorting will have changed. */
-        keepGoing = false;
-
-        for (auto & mi : machinesSorted) {
-            // FIXME: can we lose a wakeup if a builder exits concurrently?
-            if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
-
+        /* Sort the runnable steps by priority. FIXME: O(n lg n);
+           obviously, it would be better to keep a runnable queue sorted
+           by priority. */
+        std::vector<Step::ptr> runnableSorted;
+        {
             auto runnable_(runnable.lock());
-            //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
-
-            /* FIXME: we're holding the runnable lock too long
-               here. This could be more efficient. */
-
+            runnableSorted.reserve(runnable_->size());
             for (auto i = runnable_->begin(); i != runnable_->end(); ) {
                 auto step = i->lock();
 
-                /* Delete dead steps. */
+                /* Remove dead steps. */
                 if (!step) {
                     i = runnable_->erase(i);
                     continue;
                 }
 
-                /* Can this machine do this step? */
-                if (!mi.machine->supportsStep(step)) {
-                    ++i;
-                    continue;
-                }
+                ++i;
 
                 /* Skip previously failed steps that aren't ready
                    to be retried. */
@@ -142,15 +131,52 @@ system_time State::doDispatch()
                     if (step_->tries > 0 && step_->after > now) {
                         if (step_->after < sleepUntil)
                             sleepUntil = step_->after;
-                        ++i;
                         continue;
                     }
                 }
 
+                runnableSorted.push_back(step);
+            }
+        }
+
+        sort(runnableSorted.begin(), runnableSorted.end(),
+            [](const Step::ptr & a, const Step::ptr & b)
+            {
+                auto a_(a->state.lock());
+                auto b_(b->state.lock()); // FIXME: deadlock?
+                return a_->lowestBuildID < b_->lowestBuildID;
+            });
+
+        /* Find a machine with a free slot and find a step to run
+           on it. Once we find such a pair, we restart the outer
+           loop because the machine sorting will have changed. */
+        keepGoing = false;
+
+        for (auto & mi : machinesSorted) {
+            if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
+
+            for (auto & step : runnableSorted) {
+
+                /* Can this machine do this step? */
+                if (!mi.machine->supportsStep(step)) continue;
+
+                /* Let's do this step. Remove it from the runnable
+                   list. FIXME: O(n). */
+                {
+                    auto runnable_(runnable.lock());
+                    bool removed = false;
+                    for (auto i = runnable_->begin(); i != runnable_->end(); )
+                        if (i->lock() == step) {
+                            i = runnable_->erase(i);
+                            removed = true;
+                            break;
+                        } else ++i;
+                    assert(removed);
+                }
+
                 /* Make a slot reservation and start a thread to
                    do the build. */
                 auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
-                i = runnable_->erase(i);
 
                 auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
                 builderThread.detach(); // FIXME?
diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc
index e5a8d868..9a7bdcbc 100644
--- a/src/hydra-queue-runner/hydra-queue-runner.cc
+++ b/src/hydra-queue-runner/hydra-queue-runner.cc
@@ -204,6 +204,28 @@ void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step:
 }
 
 
+void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr start)
+{
+    std::set<Step::ptr> queued;
+    std::queue<Step::ptr> todo;
+    todo.push(start);
+
+    while (!todo.empty()) {
+        auto step = todo.front();
+        todo.pop();
+
+        visitor(step);
+
+        auto state(step->state.lock());
+        for (auto & dep : state->deps)
+            if (queued.find(dep) == queued.end()) {
+                queued.insert(dep);
+                todo.push(dep);
+            }
+    }
+}
+
+
 void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
     const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime)
 {
diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc
index 8448cb68..ea676bf0 100644
--- a/src/hydra-queue-runner/queue-monitor.cc
+++ b/src/hydra-queue-runner/queue-monitor.cc
@@ -228,6 +228,14 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
             throw;
         }
 
+        /* Update the lowest build ID field of each dependency. This
+           is used by the dispatcher to start steps in order of build
+           ID. */
+        visitDependencies([&](const Step::ptr & step) {
+            auto step_(step->state.lock());
+            step_->lowestBuildID = std::min(step_->lowestBuildID, build->id);
+        }, build->toplevel);
+
         /* Add the new runnable build steps to ‘runnable’ and wake up
            the builder threads. */
         printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);
diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh
index f5043975..3f055230 100644
--- a/src/hydra-queue-runner/state.hh
+++ b/src/hydra-queue-runner/state.hh
@@ -112,6 +112,9 @@ struct Step
 
         /* Point in time after which the step can be retried. */
         system_time after;
+
+        /* The lowest build ID depending on this step. */
+        BuildID lowestBuildID{std::numeric_limits<BuildID>::max()};
     };
 
     std::atomic_bool finished{false}; // debugging
@@ -127,6 +130,9 @@ struct Step
 
 void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps);
 
+/* Call ‘visitor’ for a step and all its dependencies. */
+void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr step);
+
 
 struct Machine
 {