#! /usr/bin/env perl

use strict;
use utf8;
use Hydra::Schema;
use Hydra::Plugin;
use Hydra::Helper::Nix;
use Hydra::Helper::AddBuilds;
use Hydra::Helper::Email;
use Hydra::Model::DB;
use Digest::SHA qw(sha256_hex);
use Config::General;
use Data::Dump qw(dump);
use Try::Tiny;
use Net::Statsd;
use Time::HiRes qw(clock_gettime CLOCK_REALTIME);
use JSON;
use File::Slurp;

STDOUT->autoflush();
STDERR->autoflush(1);
binmode STDERR, ":encoding(utf8)";

my $db = Hydra::Model::DB->new();
my $config = getHydraConfig();

my $plugins = [Hydra::Plugin->instantiate(db => $db, config => $config)];

my $dryRun = defined $ENV{'HYDRA_DRY_RUN'};


sub fetchInputs {
    my ($project, $jobset, $inputInfo) = @_;
    foreach my $input ($jobset->jobsetinputs->all) {
        foreach my $alt ($input->jobsetinputalts->all) {
            push @{$$inputInfo{$input->name}}, $_
                foreach fetchInput($plugins, $db, $project, $jobset, $input->name, $input->type, $alt->value, $input->emailresponsible);
        }
    }
}


sub setJobsetError {
    my ($jobset, $errorMsg) = @_;
    my $prevError = $jobset->errormsg;

    eval {
        txn_do($db, sub {
            $jobset->update({ errormsg => $errorMsg, errortime => time, fetcherrormsg => undef });
        });
    };
    if (defined $errorMsg && $errorMsg ne ($prevError // "") || $ENV{'HYDRA_MAIL_TEST'}) {
        sendJobsetErrorNotification($jobset, $errorMsg);
    }
}


sub sendJobsetErrorNotification() {
    my ($jobset, $errorMsg) = @_;

    chomp $errorMsg;

    return if $jobset->project->owner->emailonerror == 0;
    return if $errorMsg eq "";

    my $projectName = $jobset->project->name;
    my $jobsetName = $jobset->name;
    my $body = "Hi,\n"
        . "\n"
        . "This is to let you know that evaluation of the Hydra jobset ‘$projectName:$jobsetName’\n"
        . "resulted in the following error:\n"
        . "\n"
        . "$errorMsg"
        . "\n"
        . "Regards,\n\nThe Hydra build daemon.\n";

    try {
        sendEmail(
            $config,
            $jobset->project->owner->emailaddress,
            "Hydra $projectName:$jobsetName evaluation error",
            $body,
            [ 'X-Hydra-Project' => $projectName
            , 'X-Hydra-Jobset'  => $jobsetName
            ]);
    } catch {
        warn "error sending email: $_\n";
    };
}


sub permute {
    my @list = @_;
    for (my $n = scalar @list - 1; $n > 0; $n--) {
        my $k = int(rand($n + 1)); # 0 <= $k <= $n
        @list[$n, $k] = @list[$k, $n];
    }
    return @list;
}


sub checkJobsetWrapped {
    my ($jobset) = @_;
    my $project = $jobset->project;
    my $jobsetsJobset = length($project->declfile) && $jobset->name eq ".jobsets";
    my $inputInfo = {};
    if ($jobsetsJobset) {
        my @declInputs = fetchInput($plugins, $db, $project, $jobset, "decl", $project->decltype, $project->declvalue, 0);
        my $declInput = @declInputs[0] or die "cannot find the input containing the declarative project specification\n";
        die "multiple alternatives for the input containing the declarative project specificaiton are not supported\n"
            if scalar @declInputs != 1;
        my $declFile = $declInput->{storePath} . "/" . $project->declfile;
        my $declText = read_file($declFile)
            or die "Couldn't read declarative specification file $declFile: $!\n";
        my $declSpec;
        eval {
            $declSpec = decode_json($declText);
        };
        die "Declarative specification file $declFile not valid JSON: $@\n" if $@;
        updateDeclarativeJobset($db, $project, ".jobsets", $declSpec);
        $jobset->discard_changes;
        $inputInfo->{"declInput"} = [ $declInput ];
    }
    my $exprType = $jobset->nixexprpath =~ /.scm$/ ? "guile" : "nix";

    # Fetch all values for all inputs.
    my $checkoutStart = clock_gettime(CLOCK_REALTIME);
    eval {
        fetchInputs($project, $jobset, $inputInfo);
    };
    my $fetchError = $@;

    Net::Statsd::increment("hydra.evaluator.checkouts");
    my $checkoutStop = clock_gettime(CLOCK_REALTIME);
    Net::Statsd::timing("hydra.evaluator.checkout_time", int(($checkoutStop - $checkoutStart) * 1000));

    if ($fetchError) {
        Net::Statsd::increment("hydra.evaluator.failed_checkouts");
        print STDERR $fetchError;
        txn_do($db, sub {
            $jobset->update({ lastcheckedtime => time, fetcherrormsg => $fetchError }) if !$dryRun;
        });
        return;
    }

    # Hash the arguments to hydra-eval-jobs and check the
    # JobsetInputHashes to see if the previous evaluation had the same
    # inputs.  If so, bail out.
    my @args = ($jobset->nixexprinput, $jobset->nixexprpath, inputsToArgs($inputInfo, $exprType));
    my $argsHash = sha256_hex("@args");
    my $prevEval = getPrevJobsetEval($db, $jobset, 0);
    if (defined $prevEval && $prevEval->hash eq $argsHash && !$dryRun) {
        print STDERR "  jobset is unchanged, skipping\n";
        Net::Statsd::increment("hydra.evaluator.unchanged_checkouts");
        txn_do($db, sub {
            $jobset->update({ lastcheckedtime => time, fetcherrormsg => undef });
        });
        return;
    }

    # Evaluate the job expression.
    my $evalStart = clock_gettime(CLOCK_REALTIME);
    my ($jobs, $nixExprInput) = evalJobs($inputInfo, $exprType, $jobset->nixexprinput, $jobset->nixexprpath);
    my $evalStop = clock_gettime(CLOCK_REALTIME);

    if ($jobsetsJobset) {
        my @keys = keys %$jobs;
        die "The .jobsets jobset must only have a single job named 'jobsets'"
            unless (scalar @keys) == 1 && $keys[0] eq "jobsets";
    }
    Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));

    if ($dryRun) {
        foreach my $name (keys %{$jobs}) {
            my $job = $jobs->{$name};
            if (defined $job->{drvPath}) {
                print STDERR "good job $name: $job->{drvPath}\n";
            } else {
                print STDERR "failed job $name: $job->{error}\n";
            }
        }
        return;
    }

    $jobs->{$_}->{jobName} = $_ for keys %{$jobs};

    my $jobOutPathMap = {};
    my $jobsetChanged = 0;
    my $dbStart = clock_gettime(CLOCK_REALTIME);

    txn_do($db, sub {

        my $prevEval = getPrevJobsetEval($db, $jobset, 1);

        # Clear the "current" flag on all builds.  Since we're in a
        # transaction this will only become visible after the new
        # current builds have been added.
        $jobset->builds->search({iscurrent => 1})->update({iscurrent => 0});

        # Schedule each successfully evaluated job.
        my %buildMap;
        foreach my $job (permute(values %{$jobs})) {
            next if defined $job->{error};
            #print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n";
            checkBuild($db, $jobset, $inputInfo, $nixExprInput, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins);
        }

        # Have any builds been added or removed since last time?
        $jobsetChanged =
            (scalar(grep { $_->{new} } values(%buildMap)) > 0)
            || (defined $prevEval && $prevEval->jobsetevalmembers->count != scalar(keys %buildMap));

        my $ev = $jobset->jobsetevals->create(
            { hash => $argsHash
            , timestamp => time
            , checkouttime => abs(int($checkoutStop - $checkoutStart))
            , evaltime => abs(int($evalStop - $evalStart))
            , hasnewbuilds => $jobsetChanged ? 1 : 0
            , nrbuilds => $jobsetChanged ? scalar(keys %buildMap) : undef
            });

        if ($jobsetChanged) {
            # Create JobsetEvalMembers mappings.
            while (my ($id, $x) = each %buildMap) {
                $ev->jobsetevalmembers->create({ build => $id, isnew => $x->{new} });
            }

            # Create AggregateConstituents mappings.  Since there can
            # be jobs that alias each other, if there are multiple
            # builds for the same derivation, pick the one with the
            # shortest name.
            my %drvPathToId;
            while (my ($id, $x) = each %buildMap) {
                my $y = $drvPathToId{$x->{drvPath}};
                if (defined $y) {
                    next if length $x->{jobName} > length $y->{jobName};
                    next if length $x->{jobName} == length $y->{jobName} && $x->{jobName} ge $y->{jobName};
                }
                $drvPathToId{$x->{drvPath}} = $x;
            }

            foreach my $job (values %{$jobs}) {
                next unless $job->{constituents};
                my $x = $drvPathToId{$job->{drvPath}} or die;
                foreach my $drvPath (split / /, $job->{constituents}) {
                    my $constituent = $drvPathToId{$drvPath};
                    if (defined $constituent) {
                        $db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}});
                    } else {
                        warn "aggregate job ‘$job->{jobName}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n";
                    }
                }
            }

            foreach my $name (keys %{$inputInfo}) {
                for (my $n = 0; $n < scalar(@{$inputInfo->{$name}}); $n++) {
                    my $input = $inputInfo->{$name}->[$n];
                    $ev->jobsetevalinputs->create(
                        { name => $name
                        , altnr => $n
                        , type => $input->{type}
                        , uri => $input->{uri}
                        , revision => $input->{revision}
                        , value => $input->{value}
                        , dependency => $input->{id}
                        , path => $input->{storePath} || "" # !!! temporary hack
                        , sha256hash => $input->{sha256hash}
                        });
                }
            }

            print STDERR "  created new eval ", $ev->id, "\n";
            $ev->builds->update({iscurrent => 1});
        } else {
            print STDERR "  created cached eval ", $ev->id, "\n";
            $prevEval->builds->update({iscurrent => 1}) if defined $prevEval;
        }

        # If this is a one-shot jobset, disable it now.
        $jobset->update({ enabled => 0 }) if $jobset->enabled == 2;

        $jobset->update({ lastcheckedtime => time });
    });

    my $dbStop = clock_gettime(CLOCK_REALTIME);

    Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000));
    Net::Statsd::increment("hydra.evaluator.evals");
    Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged;

    # Store the error messages for jobs that failed to evaluate.
    my $msg = "";
    foreach my $job (values %{$jobs}) {
        next unless defined $job->{error};
        $msg .=
            ($job->{jobName} ne "" ? "in job ‘$job->{jobName}’" : "at top-level") .
            ":\n" . $job->{error} . "\n\n";
    }
    setJobsetError($jobset, $msg);
}


sub checkJobset {
    my ($jobset) = @_;

    print STDERR "considering jobset ", $jobset->project->name, ":", $jobset->name,
      $jobset->lastcheckedtime
          ? " (last checked " . (time() - $jobset->lastcheckedtime) . "s ago)\n"
          : " (never checked)\n";

    my $triggerTime = $jobset->triggertime;

    my $startTime = clock_gettime(CLOCK_REALTIME);

    eval {
        checkJobsetWrapped($jobset);
    };
    my $checkError = $@;

    my $stopTime = clock_gettime(CLOCK_REALTIME);
    Net::Statsd::timing("hydra.evaluator.total_time", int(($stopTime - $startTime) * 1000));

    my $failed = 0;
    if ($checkError) {
        print STDERR $checkError;
        txn_do($db, sub {
            $jobset->update({lastcheckedtime => time});
            setJobsetError($jobset, $checkError);
        }) if !$dryRun;
        $failed = 1;
    }

    if (defined $triggerTime) {
        txn_do($db, sub {
            # Only clear the trigger time if the jobset hasn't been
            # triggered in the meantime.  In that case, we need to
            # evaluate again.
            my $new = $jobset->get_from_storage();
            $jobset->update({ triggertime => undef })
                if $new->triggertime == $triggerTime;
        }) if !$dryRun;
    }

    return $failed;
}


sub checkSomeJobset {
    # If any jobset has been triggered by a push, check it.
    my ($jobset) = $db->resultset('Jobsets')->search(
        { 'triggertime' => { '!=', undef } },
        { join => 'project', order_by => [ 'triggertime' ], rows => 1 });

    # Otherwise, check the jobset that hasn't been checked for the
    # longest time (but don't check more often than the jobset's
    # minimal check interval).
    ($jobset) = $db->resultset('Jobsets')->search(
        { 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 },
        , 'checkinterval' => { '!=', 0 }
        , -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] },
        { join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 })
        unless defined $jobset;

    return 0 unless defined $jobset;

    return system($0, $jobset->project->name, $jobset->name) == 0;
}


if (scalar @ARGV == 2) {
    my $projectName = $ARGV[0];
    my $jobsetName = $ARGV[1];
    my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or 
      die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
    exit checkJobset($jobset);
}


while (1) {
    eval {
        if (checkSomeJobset) {
            # Just so we don't go completely crazy if lastcheckedtime
            # isn't updated properly.
            sleep 1;
        } else {
            # print STDERR "sleeping...\n";
            sleep 30;
        }
    };
    if ($@) { print STDERR "$@"; }
}