2016-10-13 15:53:05 +02:00
# include "db.hh"
2018-05-16 14:22:00 +02:00
# include "hydra-config.hh"
2016-10-13 15:53:05 +02:00
# include "pool.hh"
2018-05-16 14:22:00 +02:00
# include "shared.hh"
2016-10-13 15:53:05 +02:00
# include <algorithm>
# include <thread>
# include <cstring>
2020-01-07 12:38:06 +13:00
# include <optional>
2016-10-13 15:53:05 +02:00
# include <sys/types.h>
# include <sys/wait.h>
using namespace nix ;
2017-03-13 16:19:22 +01:00
typedef std : : pair < std : : string , std : : string > JobsetName ;
2020-03-03 19:53:18 -05:00
enum class EvaluationStyle
{
SCHEDULE = 1 ,
ONESHOT = 2 ,
ONE_AT_A_TIME = 3 ,
} ;
2016-10-13 15:53:05 +02:00
struct Evaluator
{
2020-07-08 12:50:02 +02:00
std : : unique_ptr < HydraConfig > config ;
2018-05-16 14:22:00 +02:00
2016-10-13 15:53:05 +02:00
nix : : Pool < Connection > dbPool ;
struct Jobset
{
JobsetName name ;
2020-03-03 19:53:18 -05:00
std : : optional < EvaluationStyle > evaluation_style ;
2016-10-13 15:53:05 +02:00
time_t lastCheckedTime , triggerTime ;
int checkInterval ;
Pid pid ;
} ;
typedef std : : map < JobsetName , Jobset > Jobsets ;
2020-01-07 12:38:06 +13:00
std : : optional < JobsetName > evalOne ;
2017-03-13 16:19:22 +01:00
2018-05-16 14:22:00 +02:00
const size_t maxEvals ;
2016-10-13 15:53:05 +02:00
struct State
{
size_t runningEvals = 0 ;
Jobsets jobsets ;
} ;
Sync < State > state_ ;
std : : condition_variable childStarted ;
std : : condition_variable maybeDoWork ;
const time_t notTriggered = std : : numeric_limits < time_t > : : max ( ) ;
2018-05-16 14:22:00 +02:00
Evaluator ( )
2020-07-08 12:50:02 +02:00
: config ( std : : make_unique < HydraConfig > ( ) )
2018-05-16 14:22:00 +02:00
, maxEvals ( std : : max ( ( size_t ) 1 , ( size_t ) config - > getIntOption ( " max_concurrent_evals " , 4 ) ) )
{ }
2016-10-13 15:53:05 +02:00
void readJobsets ( )
{
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
2020-01-11 22:38:40 -08:00
auto res = txn . exec
2020-03-03 19:53:18 -05:00
( " select project, j.name, lastCheckedTime, triggerTime, checkInterval, j.enabled as jobset_enabled from Jobsets j join Projects p on j.project = p.name "
2020-01-11 22:38:40 -08:00
" where j.enabled != 0 and p.enabled != 0 " ) ;
2016-10-13 15:53:05 +02:00
auto state ( state_ . lock ( ) ) ;
std : : set < JobsetName > seen ;
for ( auto const & row : res ) {
auto name = JobsetName { row [ " project " ] . as < std : : string > ( ) , row [ " name " ] . as < std : : string > ( ) } ;
2017-03-13 16:19:22 +01:00
if ( evalOne & & name ! = * evalOne ) continue ;
2016-10-13 15:53:05 +02:00
auto res = state - > jobsets . try_emplace ( name , Jobset { name } ) ;
auto & jobset = res . first - > second ;
jobset . lastCheckedTime = row [ " lastCheckedTime " ] . as < time_t > ( 0 ) ;
jobset . triggerTime = row [ " triggerTime " ] . as < time_t > ( notTriggered ) ;
jobset . checkInterval = row [ " checkInterval " ] . as < time_t > ( ) ;
2020-03-03 19:53:18 -05:00
switch ( row [ " jobset_enabled " ] . as < int > ( 0 ) ) {
case 1 :
jobset . evaluation_style = EvaluationStyle : : SCHEDULE ;
break ;
case 2 :
jobset . evaluation_style = EvaluationStyle : : ONESHOT ;
break ;
case 3 :
jobset . evaluation_style = EvaluationStyle : : ONE_AT_A_TIME ;
break ;
}
2016-10-13 15:53:05 +02:00
seen . insert ( name ) ;
}
2017-03-13 16:19:22 +01:00
if ( evalOne & & seen . empty ( ) ) {
2019-05-11 00:40:40 +02:00
printError ( " the specified jobset does not exist or is disabled " ) ;
2017-03-13 16:19:22 +01:00
std : : _Exit ( 1 ) ;
}
2016-10-13 15:53:05 +02:00
for ( auto i = state - > jobsets . begin ( ) ; i ! = state - > jobsets . end ( ) ; )
if ( seen . count ( i - > first ) )
+ + i ;
else {
printInfo ( " forgetting jobset ‘ %s:%s’ " , i - > first . first , i - > first . second ) ;
i = state - > jobsets . erase ( i ) ;
}
}
void startEval ( State & state , Jobset & jobset )
{
time_t now = time ( 0 ) ;
2017-03-13 16:19:22 +01:00
printInfo ( " starting evaluation of jobset ‘ %s:%s’ (last checked %d s ago) " ,
jobset . name . first , jobset . name . second ,
now - jobset . lastCheckedTime ) ;
2016-10-13 15:53:05 +02:00
{
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update Jobsets set startTime = $1 where project = $2 and name = $3 " ,
now ,
jobset . name . first ,
jobset . name . second ) ;
2016-10-13 15:53:05 +02:00
txn . commit ( ) ;
}
2017-03-13 16:19:22 +01:00
assert ( jobset . pid = = - 1 ) ;
jobset . pid = startProcess ( [ & ] ( ) {
Strings args = { " hydra-eval-jobset " , jobset . name . first , jobset . name . second } ;
execvp ( args . front ( ) . c_str ( ) , stringsToCharPtrs ( args ) . data ( ) ) ;
2020-06-23 13:43:54 +02:00
throw SysError ( " executing ‘ %1%’ " , args . front ( ) ) ;
2017-03-13 16:19:22 +01:00
} ) ;
state . runningEvals + + ;
childStarted . notify_one ( ) ;
2016-10-13 15:53:05 +02:00
}
2020-03-03 18:17:21 -05:00
bool shouldEvaluate ( Jobset & jobset )
{
if ( jobset . pid ! = - 1 ) {
// Already running.
2020-03-03 19:53:18 -05:00
debug ( " shouldEvaluate %s:%s? no: already running " ,
jobset . name . first , jobset . name . second ) ;
2020-03-03 18:17:21 -05:00
return false ;
}
2020-03-03 19:53:18 -05:00
if ( jobset . triggerTime ! = std : : numeric_limits < time_t > : : max ( ) ) {
2020-03-03 18:17:21 -05:00
// An evaluation of this Jobset is requested
2020-03-03 19:53:18 -05:00
debug ( " shouldEvaluate %s:%s? yes: requested " ,
jobset . name . first , jobset . name . second ) ;
2020-03-03 18:17:21 -05:00
return true ;
}
if ( jobset . checkInterval < = 0 ) {
// Automatic scheduling is disabled. We allow requested
// evaluations, but never schedule start one.
2020-03-03 19:53:18 -05:00
debug ( " shouldEvaluate %s:%s? no: checkInterval <= 0 " ,
jobset . name . first , jobset . name . second ) ;
2020-03-03 18:17:21 -05:00
return false ;
}
if ( jobset . lastCheckedTime + jobset . checkInterval < = time ( 0 ) ) {
2020-03-03 19:53:18 -05:00
// Time to schedule a fresh evaluation. If the jobset
// is a ONE_AT_A_TIME jobset, ensure the previous jobset
// has no remaining, unfinished work.
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
if ( jobset . evaluation_style = = EvaluationStyle : : ONE_AT_A_TIME ) {
auto evaluation_res = txn . parameterized
( " select id from JobsetEvals "
" where project = $1 and jobset = $2 "
" order by id desc limit 1 " )
( jobset . name . first )
( jobset . name . second )
. exec ( ) ;
if ( evaluation_res . empty ( ) ) {
// First evaluation, so allow scheduling.
debug ( " shouldEvaluate(one-at-a-time) %s:%s? yes: no prior eval " ,
jobset . name . first , jobset . name . second ) ;
return true ;
}
auto evaluation_id = evaluation_res [ 0 ] [ 0 ] . as < int > ( ) ;
auto unfinished_build_res = txn . parameterized
( " select id from Builds "
" join JobsetEvalMembers "
" on (JobsetEvalMembers.build = Builds.id) "
" where JobsetEvalMembers.eval = $1 "
" and builds.finished = 0 "
" limit 1 " )
( evaluation_id )
. exec ( ) ;
// If the previous evaluation has no unfinished builds
// schedule!
if ( unfinished_build_res . empty ( ) ) {
debug ( " shouldEvaluate(one-at-a-time) %s:%s? yes: no unfinished builds " ,
jobset . name . first , jobset . name . second ) ;
return true ;
} else {
debug ( " shouldEvaluate(one-at-a-time) %s:%s? no: at least one unfinished build " ,
jobset . name . first , jobset . name . second ) ;
return false ;
}
} else {
// EvaluationStyle::ONESHOT, EvaluationStyle::SCHEDULED
debug ( " shouldEvaluate(oneshot/scheduled) %s:%s? yes: checkInterval elapsed " ,
jobset . name . first , jobset . name . second ) ;
return true ;
}
2020-03-03 18:17:21 -05:00
}
return false ;
}
2016-10-13 15:53:05 +02:00
void startEvals ( State & state )
{
std : : vector < Jobsets : : iterator > sorted ;
/* Filter out jobsets that have been evaluated recently and have
not been triggered . */
for ( auto i = state . jobsets . begin ( ) ; i ! = state . jobsets . end ( ) ; + + i )
2017-03-13 16:19:22 +01:00
if ( evalOne | |
2020-03-03 18:17:21 -05:00
( i - > second . evaluation_style & & shouldEvaluate ( i - > second ) ) )
2016-10-13 15:53:05 +02:00
sorted . push_back ( i ) ;
/* Put jobsets in order of ascending trigger time, last checked
time , and name . */
std : : sort ( sorted . begin ( ) , sorted . end ( ) ,
[ ] ( const Jobsets : : iterator & a , const Jobsets : : iterator & b ) {
return
a - > second . triggerTime ! = b - > second . triggerTime
? a - > second . triggerTime < b - > second . triggerTime
: a - > second . lastCheckedTime ! = b - > second . lastCheckedTime
? a - > second . lastCheckedTime < b - > second . lastCheckedTime
: a - > first < b - > first ;
} ) ;
/* Start jobset evaluations up to the concurrency limit.*/
for ( auto & i : sorted ) {
if ( state . runningEvals > = maxEvals ) break ;
startEval ( state , i - > second ) ;
}
}
void loop ( )
{
auto state ( state_ . lock ( ) ) ;
while ( true ) {
time_t now = time ( 0 ) ;
std : : chrono : : seconds sleepTime = std : : chrono : : seconds : : max ( ) ;
if ( state - > runningEvals < maxEvals ) {
for ( auto & i : state - > jobsets )
if ( i . second . pid = = - 1 & &
i . second . checkInterval > 0 )
sleepTime = std : : min ( sleepTime , std : : chrono : : seconds (
std : : max ( ( time_t ) 1 , i . second . lastCheckedTime - now + i . second . checkInterval ) ) ) ;
}
debug ( " waiting for %d s " , sleepTime . count ( ) ) ;
if ( sleepTime = = std : : chrono : : seconds : : max ( ) )
state . wait ( maybeDoWork ) ;
else
state . wait_for ( maybeDoWork , sleepTime ) ;
startEvals ( * state ) ;
}
}
/* A thread that listens to PostgreSQL notifications about jobset
changes , updates the jobsets map , and signals the main thread
to start evaluations . */
void databaseMonitor ( )
{
while ( true ) {
try {
auto conn ( dbPool . get ( ) ) ;
receiver jobsetsAdded ( * conn , " jobsets_added " ) ;
receiver jobsetsDeleted ( * conn , " jobsets_deleted " ) ;
receiver jobsetsChanged ( * conn , " jobset_scheduling_changed " ) ;
while ( true ) {
/* Note: we read/notify before
await_notification ( ) to ensure we don ' t miss a
state change . */
readJobsets ( ) ;
maybeDoWork . notify_one ( ) ;
conn - > await_notification ( ) ;
printInfo ( " received jobset event " ) ;
}
} catch ( std : : exception & e ) {
printError ( " exception in database monitor thread: %s " , e . what ( ) ) ;
sleep ( 30 ) ;
}
}
}
/* A thread that reaps child processes.*/
void reaper ( )
{
while ( true ) {
{
auto state ( state_ . lock ( ) ) ;
while ( ! state - > runningEvals )
state . wait ( childStarted ) ;
}
int status ;
pid_t pid = waitpid ( - 1 , & status , 0 ) ;
if ( pid = = - 1 ) {
if ( errno = = EINTR ) continue ;
throw SysError ( " waiting for children " ) ;
}
{
auto state ( state_ . lock ( ) ) ;
assert ( state - > runningEvals ) ;
state - > runningEvals - - ;
2017-03-13 16:19:22 +01:00
// FIXME: should use a map.
for ( auto & i : state - > jobsets ) {
auto & jobset ( i . second ) ;
if ( jobset . pid = = pid ) {
2016-11-08 17:00:17 +01:00
printInfo ( " evaluation of jobset ‘ %s:%s’ %s " ,
2017-03-13 16:19:22 +01:00
jobset . name . first , jobset . name . second , statusToString ( status ) ) ;
auto now = time ( 0 ) ;
jobset . triggerTime = notTriggered ;
jobset . lastCheckedTime = now ;
2016-11-08 17:08:54 +01:00
try {
2017-03-13 16:19:22 +01:00
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
/* Clear the trigger time to prevent this
jobset from getting stuck in an endless
failing eval loop . */
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update Jobsets set triggerTime = null where project = $1 and name = $2 and startTime is not null and triggerTime <= startTime " ,
jobset . name . first ,
jobset . name . second ) ;
2017-03-13 16:19:22 +01:00
/* Clear the start time. */
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update Jobsets set startTime = null where project = $1 and name = $2 " ,
jobset . name . first ,
jobset . name . second ) ;
2017-03-13 16:19:22 +01:00
2016-11-08 17:08:54 +01:00
if ( ! WIFEXITED ( status ) | | WEXITSTATUS ( status ) > 1 ) {
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update Jobsets set errorMsg = $1, lastCheckedTime = $2, errorTime = $2, fetchErrorMsg = null where project = $3 and name = $4 " ,
fmt ( " evaluation %s " , statusToString ( status ) ) ,
now ,
jobset . name . first ,
jobset . name . second ) ;
2016-11-08 17:08:54 +01:00
}
2017-03-13 16:19:22 +01:00
txn . commit ( ) ;
2016-11-08 17:08:54 +01:00
} catch ( std : : exception & e ) {
printError ( " exception setting jobset error: %s " , e . what ( ) ) ;
}
2017-03-13 16:19:22 +01:00
jobset . pid . release ( ) ;
2016-10-13 15:53:05 +02:00
maybeDoWork . notify_one ( ) ;
2017-03-13 16:19:22 +01:00
if ( evalOne ) std : : _Exit ( 0 ) ;
2016-10-13 15:53:05 +02:00
break ;
}
2017-03-13 16:19:22 +01:00
}
2016-10-13 15:53:05 +02:00
}
}
}
2017-03-13 16:19:22 +01:00
void unlock ( )
{
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
2020-01-11 22:38:40 -08:00
txn . exec ( " update Jobsets set startTime = null " ) ;
2017-03-13 16:19:22 +01:00
txn . commit ( ) ;
}
2016-10-13 15:53:05 +02:00
void run ( )
{
2017-03-13 16:19:22 +01:00
unlock ( ) ;
2017-02-21 17:54:31 +01:00
/* Can't be bothered to shut down cleanly. Goodbye! */
2017-03-13 16:19:22 +01:00
auto callback = createInterruptCallback ( [ & ] ( ) { std : : _Exit ( 1 ) ; } ) ;
2017-02-21 17:54:31 +01:00
2016-10-13 15:53:05 +02:00
std : : thread reaperThread ( [ & ] ( ) { reaper ( ) ; } ) ;
std : : thread monitorThread ( [ & ] ( ) { databaseMonitor ( ) ; } ) ;
while ( true ) {
try {
loop ( ) ;
} catch ( std : : exception & e ) {
printError ( " exception in main loop: %s " , e . what ( ) ) ;
sleep ( 30 ) ;
}
}
}
} ;
int main ( int argc , char * * argv )
{
return handleExceptions ( argv [ 0 ] , [ & ] ( ) {
initNix ( ) ;
signal ( SIGINT , SIG_DFL ) ;
signal ( SIGTERM , SIG_DFL ) ;
signal ( SIGHUP , SIG_DFL ) ;
2017-03-13 16:19:22 +01:00
bool unlock = false ;
Evaluator evaluator ;
std : : vector < std : : string > args ;
2016-10-13 15:53:05 +02:00
parseCmdLine ( argc , argv , [ & ] ( Strings : : iterator & arg , const Strings : : iterator & end ) {
2017-03-13 16:19:22 +01:00
if ( * arg = = " --unlock " )
unlock = true ;
else if ( hasPrefix ( * arg , " - " ) )
return false ;
args . push_back ( * arg ) ;
return true ;
2016-10-13 15:53:05 +02:00
} ) ;
2017-03-13 16:19:22 +01:00
if ( unlock )
evaluator . unlock ( ) ;
2020-02-15 16:40:16 +01:00
else {
if ( ! args . empty ( ) ) {
if ( args . size ( ) ! = 2 ) throw UsageError ( " Syntax: hydra-evaluator [<project> <jobset>] " ) ;
evaluator . evalOne = JobsetName ( args [ 0 ] , args [ 1 ] ) ;
}
2017-03-13 16:19:22 +01:00
evaluator . run ( ) ;
2020-02-15 16:40:16 +01:00
}
2016-10-13 15:53:05 +02:00
} ) ;
}