From 29468995040ae21e0e1c14c1bdbb16ccb514caa8 Mon Sep 17 00:00:00 2001
From: Eelco Dolstra <edolstra@gmail.com>
Date: Fri, 9 Aug 2019 19:11:38 +0200
Subject: [PATCH] Turn hydra-notify into a daemon

It now receives notifications about started/finished builds/steps via
PostgreSQL. This gets rid of the (substantial) overhead of starting
hydra-notify for every event. It also allows other programs (even on
other machines) to listen to Hydra notifications.
---
 src/hydra-queue-runner/builder.cc            |  30 +++---
 src/hydra-queue-runner/hydra-queue-runner.cc | 104 +++----------------
 src/hydra-queue-runner/queue-monitor.cc      |   6 +-
 src/hydra-queue-runner/state.hh              |  40 +------
 src/script/hydra-notify                      |  99 ++++++++++++------
 5 files changed, 105 insertions(+), 174 deletions(-)

diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc
index d9a7cbbb..ce65f117 100644
--- a/src/hydra-queue-runner/builder.cc
+++ b/src/hydra-queue-runner/builder.cc
@@ -99,6 +99,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
     unsigned int maxSilentTime, buildTimeout;
     unsigned int repeats = step->isDeterministic ? 1 : 0;
 
+    auto conn(dbPool.get());
+
     {
         std::set<Build::ptr> dependents;
         std::set<Step::ptr> steps;
@@ -122,8 +124,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
 
         for (auto build2 : dependents) {
             if (build2->drvPath == step->drvPath) {
-              build = build2;
-              enqueueNotificationItem({NotificationItem::Type::BuildStarted, build->id});
+                build = build2;
+                pqxx::work txn(*conn);
+                notifyBuildStarted(txn, build->id);
+                txn.commit();
             }
             {
                 auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
@@ -144,8 +148,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
 
     bool quit = buildId == buildOne && step->drvPath == buildDrvPath;
 
-    auto conn(dbPool.get());
-
     RemoteResult result;
     BuildOutput res;
     unsigned int stepNr = 0;
@@ -170,11 +172,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
             } catch (...) {
                 ignoreException();
             }
-
-            /* Asynchronously run plugins. FIXME: if we're killed,
-               plugin actions might not be run. Need to ensure
-               at-least-once semantics. */
-            enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
         }
     });
 
@@ -342,8 +339,12 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
 
         /* Send notification about the builds that have this step as
            the top-level. */
-        for (auto id : buildIDs)
-            enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
+        {
+            pqxx::work txn(*conn);
+            for (auto id : buildIDs)
+                notifyBuildFinished(txn, id, {});
+            txn.commit();
+        }
 
         /* Wake up any dependent steps that have no other
            dependencies. */
@@ -462,11 +463,10 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
 
         /* Send notification about this build and its dependents. */
         {
-            auto notificationSenderQueue_(notificationSenderQueue.lock());
-            notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs});
+            pqxx::work txn(*conn);
+            notifyBuildFinished(txn, buildId, dependentIDs);
+            txn.commit();
         }
-        notificationSenderWakeup.notify_one();
-
     }
 
     // FIXME: keep stats about aborted steps?
diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc
index 1c82cab3..27d5bdcf 100644
--- a/src/hydra-queue-runner/hydra-queue-runner.cc
+++ b/src/hydra-queue-runner/hydra-queue-runner.cc
@@ -299,6 +299,9 @@ void State::finishBuildStep(pqxx::work & txn, const RemoteResult & result,
         (result.timesBuilt, result.timesBuilt > 0)
         (result.isNonDeterministic, result.timesBuilt > 1)
         .exec();
+    assert(result.logFile.find('\'') == std::string::npos);
+    txn.exec(fmt("notify step_finished, '%d %d %s'", buildId, stepNr,
+            result.logFile.empty() ? "-" : result.logFile));
 }
 
 
@@ -450,74 +453,20 @@ bool State::checkCachedFailure(Step::ptr step, Connection & conn)
 }
 
 
-void State::notificationSender()
+void State::notifyBuildStarted(pqxx::work & txn, BuildID buildId)
 {
-    while (true) {
-        try {
+    txn.exec(fmt("notify build_started, '%s'", buildId));
+}
 
-            NotificationItem item;
-            {
-                auto notificationSenderQueue_(notificationSenderQueue.lock());
-                while (notificationSenderQueue_->empty())
-                    notificationSenderQueue_.wait(notificationSenderWakeup);
-                item = notificationSenderQueue_->front();
-                notificationSenderQueue_->pop();
-            }
 
-            MaintainCount<counter> mc(nrNotificationsInProgress);
-
-            printMsg(lvlChatty, format("sending notification about build %1%") % item.id);
-
-            auto now1 = std::chrono::steady_clock::now();
-
-            Pid pid = startProcess([&]() {
-                Strings argv;
-                switch (item.type) {
-                    case NotificationItem::Type::BuildStarted:
-                        argv = {"hydra-notify", "build-started", std::to_string(item.id)};
-                        for (auto id : item.dependentIds)
-                            argv.push_back(std::to_string(id));
-                        break;
-                    case NotificationItem::Type::BuildFinished:
-                        argv = {"hydra-notify", "build-finished", std::to_string(item.id)};
-                        for (auto id : item.dependentIds)
-                            argv.push_back(std::to_string(id));
-                        break;
-                    case NotificationItem::Type::StepFinished:
-                        argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};
-                        break;
-                };
-                printMsg(lvlChatty, "Executing hydra-notify " + concatStringsSep(" ", argv));
-                execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
-                throw SysError("cannot start hydra-notify");
-            });
-
-            int res = pid.wait();
-
-            if (!statusOk(res))
-                throw Error("notification about build %d failed: %s", item.id, statusToString(res));
-
-            auto now2 = std::chrono::steady_clock::now();
-
-            if (item.type == NotificationItem::Type::BuildFinished) {
-                auto conn(dbPool.get());
-                pqxx::work txn(*conn);
-                txn.parameterized
-                    ("update Builds set notificationPendingSince = null where id = $1")
-                    (item.id)
-                    .exec();
-                txn.commit();
-            }
-
-            nrNotificationTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
-            nrNotificationsDone++;
-
-        } catch (std::exception & e) {
-            nrNotificationsFailed++;
-            printMsg(lvlError, format("notification sender: %1%") % e.what());
-            sleep(5);
-        }
-    }
+void State::notifyBuildFinished(pqxx::work & txn, BuildID buildId,
+    const std::vector<BuildID> & dependentIds)
+{
+    auto payload = fmt("%d ", buildId);
+    for (auto & d : dependentIds)
+        payload += fmt("%d ", d);
+    // FIXME: apparently parameterized() doesn't support NOTIFY.
+    txn.exec(fmt("notify build_finished, '%s'", payload));
 }
 
 
@@ -589,13 +538,6 @@ void State::dumpStatus(Connection & conn, bool log)
         root.attr("nrDbConnections", dbPool.count());
         root.attr("nrActiveDbUpdates", nrActiveDbUpdates);
         root.attr("memoryTokensInUse", memoryTokens.currentUse());
-        root.attr("nrNotificationsDone", nrNotificationsDone);
-        root.attr("nrNotificationsFailed", nrNotificationsFailed);
-        root.attr("nrNotificationsInProgress", nrNotificationsInProgress);
-        root.attr("nrNotificationsPending", notificationSenderQueue.lock()->size());
-        root.attr("nrNotificationTimeMs", nrNotificationTimeMs);
-        uint64_t nrNotificationsTotal = nrNotificationsDone + nrNotificationsFailed;
-        root.attr("nrNotificationTimeAvgMs", nrNotificationsTotal == 0 ? 0.0 : (float) nrNotificationTimeMs / nrNotificationsTotal);
 
         {
             auto nested = root.object("machines");
@@ -843,24 +785,6 @@ void State::run(BuildID buildOne)
 
     std::thread(&State::dispatcher, this).detach();
 
-    /* Idem for notification sending. */
-    auto maxConcurrentNotifications = config->getIntOption("max-concurrent-notifications", 2);
-    for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
-        std::thread(&State::notificationSender, this).detach();
-
-    /* Enqueue notification items for builds that were finished
-       previously, but for which we didn't manage to send
-       notifications. */
-    {
-        auto conn(dbPool.get());
-        pqxx::work txn(*conn);
-        auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();
-        for (auto const & row : res) {
-            auto id = row["id"].as<BuildID>();
-            enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
-        }
-    }
-
     /* Periodically clean up orphaned busy steps in the database. */
     std::thread([&]() {
         while (true) {
diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc
index c10f895b..e657a4b8 100644
--- a/src/hydra-queue-runner/queue-monitor.cc
+++ b/src/hydra-queue-runner/queue-monitor.cc
@@ -193,13 +193,12 @@ bool State::getQueuedBuilds(Connection & conn,
                     (build->id)
                     ((int) (ex.step->drvPath == build->drvPath ? bsFailed : bsDepFailed))
                     (time(0)).exec();
+                notifyBuildFinished(txn, build->id, {});
                 txn.commit();
                 build->finishedInDB = true;
                 nrBuildsDone++;
             }
 
-            enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});
-
             return;
         }
 
@@ -230,13 +229,12 @@ bool State::getQueuedBuilds(Connection & conn,
             time_t now = time(0);
             printMsg(lvlInfo, format("marking build %1% as succeeded (cached)") % build->id);
             markSucceededBuild(txn, build, res, true, now, now);
+            notifyBuildFinished(txn, build->id, {});
             txn.commit();
             }
 
             build->finishedInDB = true;
 
-            enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});
-
             return;
         }
 
diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh
index 79bbe355..fedca088 100644
--- a/src/hydra-queue-runner/state.hh
+++ b/src/hydra-queue-runner/state.hh
@@ -347,39 +347,6 @@ private:
     counter bytesSent{0};
     counter bytesReceived{0};
     counter nrActiveDbUpdates{0};
-    counter nrNotificationsDone{0};
-    counter nrNotificationsFailed{0};
-    counter nrNotificationsInProgress{0};
-    counter nrNotificationTimeMs{0};
-
-    /* Notification sender work queue. FIXME: if hydra-queue-runner is
-       killed before it has finished sending notifications about a
-       build, then the notifications may be lost. It would be better
-       to mark builds with pending notification in the database. */
-    struct NotificationItem
-    {
-        enum class Type : char {
-           BuildStarted,
-           BuildFinished,
-           StepFinished,
-        };
-        Type type;
-        BuildID id;
-        std::vector<BuildID> dependentIds;
-        unsigned int stepNr;
-        nix::Path logPath;
-    };
-    nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
-    std::condition_variable notificationSenderWakeup;
-
-    void enqueueNotificationItem(const NotificationItem && item)
-    {
-        {
-            auto notificationSenderQueue_(notificationSenderQueue.lock());
-            notificationSenderQueue_->emplace(item);
-        }
-        notificationSenderWakeup.notify_one();
-    }
 
     /* Specific build to do for --build-one (testing only). */
     BuildID buildOne;
@@ -540,9 +507,10 @@ private:
 
     bool checkCachedFailure(Step::ptr step, Connection & conn);
 
-    /* Thread that asynchronously invokes hydra-notify to send build
-       notifications. */
-    void notificationSender();
+    void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
+
+    void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
+        const std::vector<BuildID> & dependentIds);
 
     /* Acquire the global queue runner lock, or null if somebody else
        has it. */
diff --git a/src/script/hydra-notify b/src/script/hydra-notify
index 83963731..4da648ba 100755
--- a/src/script/hydra-notify
+++ b/src/script/hydra-notify
@@ -5,6 +5,7 @@ use utf8;
 use Hydra::Plugin;
 use Hydra::Helper::Nix;
 use Hydra::Helper::AddBuilds;
+use IO::Select;
 
 STDERR->autoflush(1);
 binmode STDERR, ":encoding(utf8)";
@@ -15,20 +16,37 @@ my $db = Hydra::Model::DB->new();
 
 my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
 
-my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n";
+my $dbh = $db->storage->dbh;
 
-my $buildId = shift @ARGV or die;
-my $build = $db->resultset('Builds')->find($buildId)
-    or die "build $buildId does not exist\n";
+$dbh->do("listen build_started");
+$dbh->do("listen build_finished");
+$dbh->do("listen step_finished");
+
+sub buildStarted {
+    my ($buildId) = @_;
+
+    my $build = $db->resultset('Builds')->find($buildId)
+        or die "build $buildId does not exist\n";
+
+    foreach my $plugin (@plugins) {
+        eval { $plugin->buildStarted($build); };
+        if ($@) {
+            print STDERR "$plugin->buildStarted: $@\n";
+        }
+    }
+}
+
+sub buildFinished {
+    my ($build, @deps) = @_;
 
-if ($cmd eq "build-finished") {
     my $project = $build->project;
     my $jobset = $build->jobset;
     if (length($project->declfile) && $jobset->name eq ".jobsets" && $build->iscurrent) {
         handleDeclarativeJobsetBuild($db, $project, $build);
     }
+
     my @dependents;
-    foreach my $id (@ARGV) {
+    foreach my $id (@deps) {
         my $dep = $db->resultset('Builds')->find($id)
             or die "build $id does not exist\n";
         push @dependents, $dep;
@@ -40,33 +58,20 @@ if ($cmd eq "build-finished") {
             print STDERR "$plugin->buildFinished: $@\n";
         }
     }
+
+    $build->update({ notificationpendingsince => undef });
 }
 
-elsif ($cmd eq "build-queued") {
-    foreach my $plugin (@plugins) {
-        eval { $plugin->buildQueued($build); };
-        if ($@) {
-            print STDERR "$plugin->buildQueued: $@\n";
-        }
-    }
-}
+sub stepFinished {
+    my ($buildId, $stepNr, $logPath) = @_;
 
-elsif ($cmd eq "build-started") {
-    foreach my $plugin (@plugins) {
-        eval { $plugin->buildStarted($build); };
-        if ($@) {
-            print STDERR "$plugin->buildStarted: $@\n";
-        }
-    }
-}
+    my $build = $db->resultset('Builds')->find($buildId)
+        or die "build $buildId does not exist\n";
 
-elsif ($cmd eq "step-finished") {
-    die if scalar @ARGV < 2;
-    my $stepNr = shift @ARGV;
     my $step = $build->buildsteps->find({stepnr => $stepNr})
         or die "step $stepNr does not exist\n";
-    my $logPath = shift @ARGV;
-    $logPath = undef if $logPath eq "";
+
+    $logPath = undef if $logPath eq "-";
 
     foreach my $plugin (@plugins) {
         eval { $plugin->stepFinished($step, $logPath); };
@@ -76,6 +81,42 @@ elsif ($cmd eq "step-finished") {
     }
 }
 
-else {
-    die "unknown action ‘$cmd’";
+# Process builds that finished while hydra-notify wasn't running.
+for my $build ($db->resultset('Builds')->search(
+                   { notificationpendingsince => { '!=', undef } }))
+{
+    my $buildId = $build->id;
+    print STDERR "sending notifications for build ${\$buildId}...\n";
+    buildFinished($build);
+}
+
+# Process incoming notifications.
+my $fd = $dbh->func("getfd");
+my $sel = IO::Select->new($fd);
+
+while (1) {
+    $sel->can_read;
+    my $notify = $dbh->func("pg_notifies");
+    next if !$notify;
+
+    my ($channelName, $pid, $payload) = @$notify;
+    #print STDERR "got '$channelName' from $pid: $payload\n";
+
+    my @payload = split / /, $payload;
+
+    eval {
+        if ($channelName eq "build_started") {
+            buildStarted(int($payload[0]));
+        } elsif ($channelName eq "build_finished") {
+            my $buildId = int($payload[0]);
+            my $build = $db->resultset('Builds')->find($buildId)
+                or die "build $buildId does not exist\n";
+            buildFinished($build, @payload[1..$#payload]);
+        } elsif ($channelName eq "step_finished") {
+            stepFinished(int($payload[0]), int($payload[1]));
+        }
+    };
+    if ($@) {
+        print STDERR "error processing message '$payload' on channel '$channelName': $@\n";
+    }
 }