2015-05-28 17:39:29 +02:00
# include <iostream>
2015-05-29 01:31:12 +02:00
# include <thread>
2020-01-11 22:38:40 -08:00
# include <optional>
2015-05-29 01:31:12 +02:00
2015-06-19 14:51:59 +02:00
# include <sys/types.h>
# include <sys/stat.h>
# include <fcntl.h>
2022-03-10 12:21:30 -08:00
# include <prometheus/exposer.h>
2015-07-07 10:17:21 +02:00
# include "state.hh"
2015-07-21 15:14:17 +02:00
# include "build-result.hh"
2016-05-12 10:15:15 +02:00
# include "store-api.hh"
2017-09-14 18:16:33 +02:00
# include "remote-store.hh"
2015-05-29 17:14:20 +02:00
2015-05-28 17:39:29 +02:00
# include "globals.hh"
2018-05-16 14:14:53 +02:00
# include "hydra-config.hh"
2016-10-06 15:24:09 +02:00
# include "json.hh"
# include "s3-binary-cache-store.hh"
2018-05-16 14:14:53 +02:00
# include "shared.hh"
2015-05-28 17:39:29 +02:00
using namespace nix ;
2017-04-26 15:11:12 +02:00
namespace nix {
template < > void toJSON < std : : atomic < long > > ( std : : ostream & str , const std : : atomic < long > & n ) { str < < n ; }
2020-08-04 11:34:05 +02:00
template < > void toJSON < std : : atomic < uint64_t > > ( std : : ostream & str , const std : : atomic < uint64_t > & n ) { str < < n ; }
2017-04-26 15:11:12 +02:00
template < > void toJSON < double > ( std : : ostream & str , const double & n ) { str < < n ; }
}
2017-10-18 11:23:00 +02:00
std : : string getEnvOrDie ( const std : : string & key )
{
2019-12-30 22:49:26 +01:00
auto value = getEnv ( key ) ;
2017-10-18 11:23:00 +02:00
if ( ! value ) throw Error ( " environment variable '%s' is not set " , key ) ;
2019-12-30 22:49:26 +01:00
return * value ;
2017-10-18 11:23:00 +02:00
}
2022-04-06 10:58:57 -07:00
State : : State ( std : : optional < std : : string > metricsAddrOpt )
2020-07-08 12:50:02 +02:00
: config ( std : : make_unique < HydraConfig > ( ) )
2020-03-26 15:00:04 +01:00
, maxUnsupportedTime ( config - > getIntOption ( " max_unsupported_time " , 0 ) )
2017-03-21 11:53:46 +01:00
, dbPool ( config - > getIntOption ( " max_db_connections " , 128 ) )
2017-03-03 12:37:27 +01:00
, maxOutputSize ( config - > getIntOption ( " max_output_size " , 2ULL < < 30 ) )
2017-09-22 15:23:58 +02:00
, maxLogSize ( config - > getIntOption ( " max_log_size " , 64ULL < < 20 ) )
2017-03-15 16:43:54 +01:00
, uploadLogsToBinaryCache ( config - > getBoolOption ( " upload_logs_to_binary_cache " , false ) )
2017-10-18 11:23:00 +02:00
, rootsDir ( config - > getStrOption ( " gc_roots_dir " , fmt ( " %s/gcroots/per-user/%s/hydra-roots " , settings . nixStateDir , getEnvOrDie ( " LOGNAME " ) ) ) )
2022-04-06 10:58:57 -07:00
, metricsAddr ( config - > getStrOption ( " queue_runner_metrics_address " , std : : string { " 127.0.0.1:9198 " } ) )
2022-03-11 11:52:43 -08:00
, registry ( std : : make_shared < prometheus : : Registry > ( ) )
2022-04-06 11:41:04 -07:00
// , call_ctr_family(prometheus::BuildCounter().Name("queue_queued_builds_calls_total").Help("Number of times State::getQueuedBuilds() was called").Register(*registry))
// , call_ctr(call_ctr_family.Add({}))
2017-03-03 12:37:27 +01:00
{
2022-04-06 11:41:04 -07:00
// call_ctr_family(prometheus::BuildCounter().Name("queue_queued_builds_calls_total").Help("Number of times State::getQueuedBuilds() was called").Register(*registry));
// call_ctr(call_ctr_family.Add({}));
auto & fam = prometheus : : BuildCounter ( )
. Name ( " queue_queued_builds_calls_total " )
. Help ( " Number of times State::getQueuedBuilds() was called " )
. Register ( * registry )
. Add ( { } ) ;
// call_ctr_family(fam);
// call_ctr(call_ctr_family.Add({}));
2017-10-18 11:23:00 +02:00
hydraData = getEnvOrDie ( " HYDRA_DATA " ) ;
2016-03-09 16:59:38 +01:00
2015-06-09 14:21:21 +02:00
logDir = canonPath ( hydraData + " /build-logs " ) ;
2016-05-12 10:17:05 +02:00
2022-04-06 10:58:57 -07:00
if ( metricsAddrOpt . has_value ( ) ) {
metricsAddr = metricsAddrOpt . value ( ) ;
2022-03-29 10:42:07 -07:00
}
2022-04-06 11:41:04 -07:00
2016-05-12 10:17:05 +02:00
/* handle deprecated store specification */
2017-03-03 12:37:27 +01:00
if ( config - > getStrOption ( " store_mode " ) ! = " " )
2016-05-12 10:17:05 +02:00
throw Error ( " store_mode in hydra.conf is deprecated, please use store_uri " ) ;
2017-03-03 12:37:27 +01:00
if ( config - > getStrOption ( " binary_cache_dir " ) ! = " " )
2016-05-12 10:17:05 +02:00
printMsg ( lvlError , " hydra.conf: binary_cache_dir is deprecated and ignored. use store_uri=file:// instead " ) ;
2017-03-03 12:37:27 +01:00
if ( config - > getStrOption ( " binary_cache_s3_bucket " ) ! = " " )
2016-05-12 10:17:05 +02:00
printMsg ( lvlError , " hydra.conf: binary_cache_s3_bucket is deprecated and ignored. use store_uri=s3:// instead " ) ;
2017-03-03 12:37:27 +01:00
if ( config - > getStrOption ( " binary_cache_secret_key_file " ) ! = " " )
2016-05-12 10:17:05 +02:00
printMsg ( lvlError , " hydra.conf: binary_cache_secret_key_file is deprecated and ignored. use store_uri=...?secret-key= instead " ) ;
2017-10-17 13:00:38 -05:00
createDirs ( rootsDir ) ;
2015-05-28 17:39:29 +02:00
}
2017-09-14 17:22:48 +02:00
nix : : MaintainCount < counter > State : : startDbUpdate ( )
2016-02-29 15:10:30 +01:00
{
2017-09-14 17:22:48 +02:00
if ( nrActiveDbUpdates > 6 )
printError ( " warning: %d concurrent database updates; PostgreSQL may be stalled " , nrActiveDbUpdates . load ( ) ) ;
return MaintainCount < counter > ( nrActiveDbUpdates ) ;
2016-02-29 15:10:30 +01:00
}
2016-02-15 21:10:29 +01:00
ref < Store > State : : getDestStore ( )
{
2016-02-18 17:31:19 +01:00
return ref < Store > ( _destStore ) ;
2016-02-15 21:10:29 +01:00
}
2015-08-25 14:11:50 +02:00
void State : : parseMachines ( const std : : string & contents )
2015-06-09 14:21:21 +02:00
{
2015-06-25 12:24:11 +02:00
Machines newMachines , oldMachines ;
{
auto machines_ ( machines . lock ( ) ) ;
oldMachines = * machines_ ;
}
for ( auto line : tokenizeString < Strings > ( contents , " \n " ) ) {
line = trim ( string ( line , 0 , line . find ( ' # ' ) ) ) ;
auto tokens = tokenizeString < std : : vector < std : : string > > ( line ) ;
if ( tokens . size ( ) < 3 ) continue ;
2015-08-26 13:43:02 +02:00
tokens . resize ( 8 ) ;
2015-06-09 14:21:21 +02:00
auto machine = std : : make_shared < Machine > ( ) ;
2015-06-25 12:24:11 +02:00
machine - > sshName = tokens [ 0 ] ;
machine - > systemTypes = tokenizeString < StringSet > ( tokens [ 1 ] , " , " ) ;
2015-08-26 13:43:02 +02:00
machine - > sshKey = tokens [ 2 ] = = " - " ? string ( " " ) : tokens [ 2 ] ;
2015-06-25 12:24:11 +02:00
if ( tokens [ 3 ] ! = " " )
2021-02-22 15:10:24 +01:00
machine - > maxJobs = string2Int < decltype ( machine - > maxJobs ) > ( tokens [ 3 ] ) . value ( ) ;
2015-06-25 12:24:11 +02:00
else
machine - > maxJobs = 1 ;
machine - > speedFactor = atof ( tokens [ 4 ] . c_str ( ) ) ;
2015-06-30 00:20:19 +02:00
if ( tokens [ 5 ] = = " - " ) tokens [ 5 ] = " " ;
2015-06-25 12:24:11 +02:00
machine - > supportedFeatures = tokenizeString < StringSet > ( tokens [ 5 ] , " , " ) ;
2015-06-30 00:20:19 +02:00
if ( tokens [ 6 ] = = " - " ) tokens [ 6 ] = " " ;
2015-06-25 12:24:11 +02:00
machine - > mandatoryFeatures = tokenizeString < StringSet > ( tokens [ 6 ] , " , " ) ;
for ( auto & f : machine - > mandatoryFeatures )
machine - > supportedFeatures . insert ( f ) ;
2015-08-26 13:43:02 +02:00
if ( tokens [ 7 ] ! = " " & & tokens [ 7 ] ! = " - " )
machine - > sshPublicHostKey = base64Decode ( tokens [ 7 ] ) ;
2015-06-25 12:24:11 +02:00
/* Re-use the State object of the previous machine with the
same name . */
auto i = oldMachines . find ( machine - > sshName ) ;
if ( i = = oldMachines . end ( ) )
printMsg ( lvlChatty , format ( " adding new machine ‘ %1%’ " ) % machine - > sshName ) ;
else
printMsg ( lvlChatty , format ( " updating machine ‘ %1%’ " ) % machine - > sshName ) ;
machine - > state = i = = oldMachines . end ( )
? std : : make_shared < Machine : : State > ( )
: i - > second - > state ;
newMachines [ machine - > sshName ] = machine ;
2015-06-09 14:21:21 +02:00
}
2015-06-25 12:24:11 +02:00
for ( auto & m : oldMachines )
2015-09-02 13:31:47 +02:00
if ( newMachines . find ( m . first ) = = newMachines . end ( ) ) {
2015-10-30 18:22:43 +01:00
if ( m . second - > enabled )
printMsg ( lvlInfo , format ( " removing machine ‘ %1%’ " ) % m . first ) ;
2015-09-02 13:31:47 +02:00
/* Add a disabled Machine object to make sure stats are
maintained . */
auto machine = std : : make_shared < Machine > ( * ( m . second ) ) ;
machine - > enabled = false ;
newMachines [ m . first ] = machine ;
}
2015-06-25 12:24:11 +02:00
2016-10-27 20:14:12 +02:00
static bool warned = false ;
if ( newMachines . empty ( ) & & ! warned ) {
printError ( " warning: no build machines are defined " ) ;
warned = true ;
}
2015-06-09 14:21:21 +02:00
auto machines_ ( machines . lock ( ) ) ;
* machines_ = newMachines ;
2015-08-17 15:48:10 +02:00
wakeDispatcher ( ) ;
2015-06-09 14:21:21 +02:00
}
2015-06-25 12:24:11 +02:00
void State : : monitorMachinesFile ( )
{
2015-08-25 14:11:50 +02:00
string defaultMachinesFile = " /etc/nix/machines " ;
auto machinesFiles = tokenizeString < std : : vector < Path > > (
2019-12-30 22:49:26 +01:00
getEnv ( " NIX_REMOTE_SYSTEMS " ) . value_or ( pathExists ( defaultMachinesFile ) ? defaultMachinesFile : " " ) , " : " ) ;
2015-08-25 14:11:50 +02:00
if ( machinesFiles . empty ( ) ) {
2016-02-15 21:10:29 +01:00
parseMachines ( " localhost " +
2017-04-18 20:46:13 +02:00
( settings . thisSystem = = " x86_64-linux " ? " x86_64-linux,i686-linux " : settings . thisSystem . get ( ) )
2021-04-28 11:43:04 +02:00
+ " - " + std : : to_string ( settings . maxBuildJobs ) + " 1 "
+ concatStringsSep ( " , " , settings . systemFeatures . get ( ) ) ) ;
2022-02-10 10:51:12 -05:00
machinesReadyLock . unlock ( ) ;
2015-08-25 14:11:50 +02:00
return ;
}
std : : vector < struct stat > fileStats ;
fileStats . resize ( machinesFiles . size ( ) ) ;
for ( unsigned int n = 0 ; n < machinesFiles . size ( ) ; + + n ) {
auto & st ( fileStats [ n ] ) ;
st . st_ino = st . st_mtime = 0 ;
}
auto readMachinesFiles = [ & ] ( ) {
/* Check if any of the machines files changed. */
bool anyChanged = false ;
for ( unsigned int n = 0 ; n < machinesFiles . size ( ) ; + + n ) {
Path machinesFile = machinesFiles [ n ] ;
struct stat st ;
if ( stat ( machinesFile . c_str ( ) , & st ) ! = 0 ) {
if ( errno ! = ENOENT )
2020-06-23 13:43:54 +02:00
throw SysError ( " getting stats about ‘ %s’ " , machinesFile ) ;
2015-08-25 14:11:50 +02:00
st . st_ino = st . st_mtime = 0 ;
}
auto & old ( fileStats [ n ] ) ;
if ( old . st_ino ! = st . st_ino | | old . st_mtime ! = st . st_mtime )
anyChanged = true ;
old = st ;
}
if ( ! anyChanged ) return ;
debug ( " reloading machines files " ) ;
string contents ;
for ( auto & machinesFile : machinesFiles ) {
try {
contents + = readFile ( machinesFile ) ;
contents + = ' \n ' ;
} catch ( SysError & e ) {
if ( e . errNo ! = ENOENT ) throw ;
}
}
parseMachines ( contents ) ;
} ;
2022-02-10 10:51:12 -05:00
auto firstParse = true ;
2015-06-25 12:24:11 +02:00
while ( true ) {
try {
2015-08-25 14:11:50 +02:00
readMachinesFiles ( ) ;
2022-02-10 10:51:12 -05:00
if ( firstParse ) {
machinesReadyLock . unlock ( ) ;
firstParse = false ;
}
2015-06-25 12:24:11 +02:00
// FIXME: use inotify.
2015-08-25 14:11:50 +02:00
sleep ( 30 ) ;
2015-06-25 12:24:11 +02:00
} catch ( std : : exception & e ) {
2020-06-23 13:43:54 +02:00
printMsg ( lvlError , " reloading machines file: %s " , e . what ( ) ) ;
2020-01-14 13:34:35 +01:00
sleep ( 5 ) ;
2015-06-25 12:24:11 +02:00
}
}
}
2015-06-22 14:06:44 +02:00
void State : : clearBusy ( Connection & conn , time_t stopTime )
2015-05-28 17:39:29 +02:00
{
2015-06-22 14:06:44 +02:00
pqxx : : work txn ( conn ) ;
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy != 0 " ,
( int ) bsAborted ,
stopTime ! = 0 ? std : : make_optional ( stopTime ) : std : : nullopt ) ;
2015-05-28 17:39:29 +02:00
txn . commit ( ) ;
}
2016-10-31 14:58:29 +01:00
unsigned int State : : allocBuildStep ( pqxx : : work & txn , BuildID buildId )
2015-05-28 17:39:29 +02:00
{
2020-01-11 22:38:40 -08:00
auto res = txn . exec_params1 ( " select max(stepnr) from BuildSteps where build = $1 " , buildId ) ;
return res [ 0 ] . is_null ( ) ? 1 : res [ 0 ] . as < int > ( ) + 1 ;
2015-10-05 14:57:44 +02:00
}
2016-10-31 14:58:29 +01:00
unsigned int State : : createBuildStep ( pqxx : : work & txn , time_t startTime , BuildID buildId , Step : : ptr step ,
2016-03-09 15:15:12 +01:00
const std : : string & machine , BuildStatus status , const std : : string & errorMsg , BuildID propagatedFrom )
2015-10-05 14:57:44 +02:00
{
2017-12-07 14:41:29 +01:00
restart :
2016-10-31 14:58:29 +01:00
auto stepNr = allocBuildStep ( txn , buildId ) ;
2015-05-28 17:39:29 +02:00
2020-01-11 22:38:40 -08:00
auto r = txn . exec_params
( " insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) on conflict do nothing " ,
buildId ,
stepNr ,
0 , // == build
2020-04-01 11:54:41 -07:00
localStore - > printStorePath ( step - > drvPath ) ,
2020-01-11 22:38:40 -08:00
status = = bsBusy ? 1 : 0 ,
startTime ! = 0 ? std : : make_optional ( startTime ) : std : : nullopt ,
2020-04-06 19:48:34 +02:00
step - > drv - > platform ,
2020-01-11 22:38:40 -08:00
status ! = bsBusy ? std : : make_optional ( ( int ) status ) : std : : nullopt ,
propagatedFrom ! = 0 ? std : : make_optional ( propagatedFrom ) : std : : nullopt , // internal::params
errorMsg ! = " " ? std : : make_optional ( errorMsg ) : std : : nullopt ,
startTime ! = 0 & & status ! = bsBusy ? std : : make_optional ( startTime ) : std : : nullopt ,
machine ) ;
2015-05-28 17:39:29 +02:00
2017-12-07 14:41:29 +01:00
if ( r . affected_rows ( ) = = 0 ) goto restart ;
2020-09-26 23:37:39 +02:00
for ( auto & [ name , output ] : step - > drv - > outputs )
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4) " ,
2020-09-26 23:37:39 +02:00
buildId , stepNr , name , localStore - > printStorePath ( * output . path ( * localStore , step - > drv - > name , name ) ) ) ;
2015-05-28 17:39:29 +02:00
2019-08-12 17:34:01 +02:00
if ( status = = bsBusy )
txn . exec ( fmt ( " notify step_started, '%d \t %d' " , buildId , stepNr ) ) ;
2015-05-28 17:39:29 +02:00
return stepNr ;
}
2017-12-07 15:35:31 +01:00
void State : : updateBuildStep ( pqxx : : work & txn , BuildID buildId , unsigned int stepNr , StepState stepState )
{
2020-01-11 22:38:40 -08:00
if ( txn . exec_params
( " update BuildSteps set busy = $1 where build = $2 and stepnr = $3 and busy != 0 and status is null " ,
( int ) stepState ,
buildId ,
stepNr ) . affected_rows ( ) ! = 1 )
2017-12-07 15:35:31 +01:00
throw Error ( " step %d of build %d is in an unexpected state " , stepNr , buildId ) ;
}
2016-12-07 15:57:13 +01:00
void State : : finishBuildStep ( pqxx : : work & txn , const RemoteResult & result ,
BuildID buildId , unsigned int stepNr , const std : : string & machine )
2015-05-28 17:39:29 +02:00
{
2016-12-07 15:57:13 +01:00
assert ( result . startTime ) ;
assert ( result . stopTime ) ;
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update BuildSteps set busy = 0, status = $1, errorMsg = $4, startTime = $5, stopTime = $6, machine = $7, overhead = $8, timesBuilt = $9, isNonDeterministic = $10 where build = $2 and stepnr = $3 " ,
( int ) result . stepStatus , buildId , stepNr ,
result . errorMsg ! = " " ? std : : make_optional ( result . errorMsg ) : std : : nullopt ,
result . startTime , result . stopTime ,
machine ! = " " ? std : : make_optional ( machine ) : std : : nullopt ,
result . overhead ! = 0 ? std : : make_optional ( result . overhead ) : std : : nullopt ,
result . timesBuilt > 0 ? std : : make_optional ( result . timesBuilt ) : std : : nullopt ,
result . timesBuilt > 1 ? std : : make_optional ( result . isNonDeterministic ) : std : : nullopt ) ;
2019-08-12 15:26:12 +02:00
assert ( result . logFile . find ( ' \t ' ) = = std : : string : : npos ) ;
txn . exec ( fmt ( " notify step_finished, '%d \t %d \t %s' " ,
buildId , stepNr , result . logFile ) ) ;
2015-05-28 17:39:29 +02:00
}
2015-10-05 14:57:44 +02:00
int State : : createSubstitutionStep ( pqxx : : work & txn , time_t startTime , time_t stopTime ,
2019-12-30 22:49:26 +01:00
Build : : ptr build , const StorePath & drvPath , const string & outputName , const StorePath & storePath )
2015-10-05 14:57:44 +02:00
{
2017-12-07 14:41:29 +01:00
restart :
2016-10-31 14:58:29 +01:00
auto stepNr = allocBuildStep ( txn , build - > id ) ;
2015-10-05 14:57:44 +02:00
2020-01-11 22:38:40 -08:00
auto r = txn . exec_params
( " insert into BuildSteps (build, stepnr, type, drvPath, busy, status, startTime, stopTime) values ($1, $2, $3, $4, $5, $6, $7, $8) on conflict do nothing " ,
build - > id ,
stepNr ,
1 , // == substitution
2020-04-01 11:54:41 -07:00
( localStore - > printStorePath ( drvPath ) ) ,
2020-01-11 22:38:40 -08:00
0 ,
0 ,
startTime ,
stopTime ) ;
2015-10-05 14:57:44 +02:00
2017-12-07 14:41:29 +01:00
if ( r . affected_rows ( ) = = 0 ) goto restart ;
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4) " ,
2022-02-10 10:51:12 -05:00
build - > id , stepNr , outputName ,
2020-04-01 11:54:41 -07:00
localStore - > printStorePath ( storePath ) ) ;
2015-10-05 14:57:44 +02:00
return stepNr ;
}
2015-06-18 16:30:28 +02:00
/* Get the steps and unfinished builds that depend on the given step. */
void getDependents ( Step : : ptr step , std : : set < Build : : ptr > & builds , std : : set < Step : : ptr > & steps )
2015-05-28 17:39:29 +02:00
{
std : : function < void ( Step : : ptr ) > visit ;
visit = [ & ] ( Step : : ptr step ) {
2015-11-02 14:29:12 +01:00
if ( steps . count ( step ) ) return ;
2015-06-18 16:30:28 +02:00
steps . insert ( step ) ;
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
std : : vector < Step : : wptr > rdeps ;
{
auto step_ ( step - > state . lock ( ) ) ;
for ( auto & build : step_ - > builds ) {
auto build_ = build . lock ( ) ;
2015-06-18 16:30:28 +02:00
if ( build_ & & ! build_ - > finishedInDB ) builds . insert ( build_ ) ;
2015-05-29 17:14:20 +02:00
}
/* Make a copy of rdeps so that we don't hold the lock for
very long . */
rdeps = step_ - > rdeps ;
2015-05-28 17:39:29 +02:00
}
2015-05-29 17:14:20 +02:00
for ( auto & rdep : rdeps ) {
auto rdep_ = rdep . lock ( ) ;
if ( rdep_ ) visit ( rdep_ ) ;
2015-05-28 17:39:29 +02:00
}
} ;
visit ( step ) ;
}
2015-08-10 14:50:22 +02:00
void visitDependencies ( std : : function < void ( Step : : ptr ) > visitor , Step : : ptr start )
{
std : : set < Step : : ptr > queued ;
std : : queue < Step : : ptr > todo ;
todo . push ( start ) ;
while ( ! todo . empty ( ) ) {
auto step = todo . front ( ) ;
todo . pop ( ) ;
visitor ( step ) ;
auto state ( step - > state . lock ( ) ) ;
for ( auto & dep : state - > deps )
if ( queued . find ( dep ) = = queued . end ( ) ) {
queued . insert ( dep ) ;
todo . push ( dep ) ;
}
}
}
2015-05-28 17:39:29 +02:00
void State : : markSucceededBuild ( pqxx : : work & txn , Build : : ptr build ,
2015-07-21 01:45:00 +02:00
const BuildOutput & res , bool isCachedBuild , time_t startTime , time_t stopTime )
2015-05-28 17:39:29 +02:00
{
2015-06-18 17:12:51 +02:00
if ( build - > finishedInDB ) return ;
2015-06-18 16:30:28 +02:00
2020-01-11 22:38:40 -08:00
if ( txn . exec_params ( " select 1 from Builds where id = $1 and finished = 0 " , build - > id ) . empty ( ) ) return ;
2015-08-07 04:18:48 +02:00
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8, notificationPendingSince = $4 where id = $1 " ,
build - > id ,
( int ) ( res . failed ? bsFailedWithOutput : bsSuccess ) ,
startTime ,
stopTime ,
res . size ,
res . closureSize ,
res . releaseName ! = " " ? std : : make_optional ( res . releaseName ) : std : : nullopt ,
isCachedBuild ? 1 : 0 ) ;
2015-05-28 17:39:29 +02:00
2020-01-11 22:38:40 -08:00
txn . exec_params0 ( " delete from BuildProducts where build = $1 " , build - > id ) ;
2015-08-07 04:18:48 +02:00
2015-05-28 17:39:29 +02:00
unsigned int productNr = 1 ;
for ( auto & product : res . products ) {
2020-01-11 22:38:40 -08:00
txn . exec_params0
2020-07-27 18:24:10 +02:00
( " insert into BuildProducts (build, productnr, type, subtype, fileSize, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9) " ,
2020-01-11 22:38:40 -08:00
build - > id ,
productNr + + ,
product . type ,
product . subtype ,
2020-08-04 11:33:29 +02:00
product . fileSize ? std : : make_optional ( * product . fileSize ) : std : : nullopt ,
product . sha256hash ? std : : make_optional ( product . sha256hash - > to_string ( Base16 , false ) ) : std : : nullopt ,
2020-01-11 22:38:40 -08:00
product . path ,
product . name ,
product . defaultPath ) ;
2015-05-28 17:39:29 +02:00
}
2020-01-11 22:38:40 -08:00
txn . exec_params0 ( " delete from BuildMetrics where build = $1 " , build - > id ) ;
2015-08-07 04:18:48 +02:00
2015-07-31 00:57:30 +02:00
for ( auto & metric : res . metrics ) {
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " insert into BuildMetrics (build, name, unit, value, project, jobset, job, timestamp) values ($1, $2, $3, $4, $5, $6, $7, $8) " ,
build - > id ,
metric . second . name ,
metric . second . unit ! = " " ? std : : make_optional ( metric . second . unit ) : std : : nullopt ,
metric . second . value ,
build - > projectName ,
build - > jobsetName ,
build - > jobName ,
build - > timestamp ) ;
2015-07-31 00:57:30 +02:00
}
2015-06-17 22:38:12 +02:00
nrBuildsDone + + ;
2015-05-28 17:39:29 +02:00
}
2015-06-15 15:31:42 +02:00
bool State : : checkCachedFailure ( Step : : ptr step , Connection & conn )
{
pqxx : : work txn ( conn ) ;
2020-09-26 23:37:39 +02:00
for ( auto & i : step - > drv - > outputsAndOptPaths ( * localStore ) )
if ( i . second . second )
if ( ! txn . exec_params ( " select 1 from FailedPaths where path = $1 " , localStore - > printStorePath ( * i . second . second ) ) . empty ( ) )
return true ;
2015-06-15 15:31:42 +02:00
return false ;
}
2019-08-09 19:11:38 +02:00
void State : : notifyBuildStarted ( pqxx : : work & txn , BuildID buildId )
2015-06-23 00:14:49 +02:00
{
2019-08-09 19:11:38 +02:00
txn . exec ( fmt ( " notify build_started, '%s' " , buildId ) ) ;
}
2017-07-26 15:17:51 +02:00
2017-07-24 16:26:44 +02:00
2019-08-09 19:11:38 +02:00
void State : : notifyBuildFinished ( pqxx : : work & txn , BuildID buildId ,
const std : : vector < BuildID > & dependentIds )
{
2020-04-08 12:05:25 +02:00
auto payload = fmt ( " %d " , buildId ) ;
2019-08-09 19:11:38 +02:00
for ( auto & d : dependentIds )
2020-04-08 12:05:25 +02:00
payload + = fmt ( " \t %d " , d ) ;
2019-08-09 19:11:38 +02:00
// FIXME: apparently parameterized() doesn't support NOTIFY.
txn . exec ( fmt ( " notify build_finished, '%s' " , payload ) ) ;
2015-06-23 00:14:49 +02:00
}
2015-06-22 14:24:03 +02:00
std : : shared_ptr < PathLocks > State : : acquireGlobalLock ( )
{
2015-07-02 01:01:44 +02:00
Path lockPath = hydraData + " /queue-runner/lock " ;
createDirs ( dirOf ( lockPath ) ) ;
2015-06-22 14:24:03 +02:00
auto lock = std : : make_shared < PathLocks > ( ) ;
if ( ! lock - > lockPaths ( PathSet ( { lockPath } ) , " " , false ) ) return 0 ;
return lock ;
}
2020-03-26 15:30:37 +01:00
void State : : dumpStatus ( Connection & conn )
2015-06-15 18:20:14 +02:00
{
2015-06-22 14:06:44 +02:00
std : : ostringstream out ;
2015-06-15 18:20:14 +02:00
{
2015-06-22 14:06:44 +02:00
JSONObject root ( out ) ;
2015-06-22 17:11:17 +02:00
time_t now = time ( 0 ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " status " , " up " ) ;
root . attr ( " time " , time ( 0 ) ) ;
2015-06-22 17:11:17 +02:00
root . attr ( " uptime " , now - startedAt ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " pid " , getpid ( ) ) ;
{
auto builds_ ( builds . lock ( ) ) ;
root . attr ( " nrQueuedBuilds " , builds_ - > size ( ) ) ;
}
{
auto steps_ ( steps . lock ( ) ) ;
for ( auto i = steps_ - > begin ( ) ; i ! = steps_ - > end ( ) ; )
if ( i - > second . lock ( ) ) + + i ; else i = steps_ - > erase ( i ) ;
root . attr ( " nrUnfinishedSteps " , steps_ - > size ( ) ) ;
}
{
auto runnable_ ( runnable . lock ( ) ) ;
for ( auto i = runnable_ - > begin ( ) ; i ! = runnable_ - > end ( ) ; )
if ( i - > lock ( ) ) + + i ; else i = runnable_ - > erase ( i ) ;
root . attr ( " nrRunnableSteps " , runnable_ - > size ( ) ) ;
}
2016-10-31 14:58:29 +01:00
root . attr ( " nrActiveSteps " , activeSteps_ . lock ( ) - > size ( ) ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " nrStepsBuilding " , nrStepsBuilding ) ;
2015-06-24 13:19:16 +02:00
root . attr ( " nrStepsCopyingTo " , nrStepsCopyingTo ) ;
root . attr ( " nrStepsCopyingFrom " , nrStepsCopyingFrom ) ;
2015-07-10 19:10:14 +02:00
root . attr ( " nrStepsWaiting " , nrStepsWaiting ) ;
2020-03-26 15:26:12 +01:00
root . attr ( " nrUnsupportedSteps " , nrUnsupportedSteps ) ;
2016-02-19 14:24:23 +01:00
root . attr ( " bytesSent " , bytesSent ) ;
root . attr ( " bytesReceived " , bytesReceived ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " nrBuildsRead " , nrBuildsRead ) ;
2016-03-08 13:09:14 +01:00
root . attr ( " buildReadTimeMs " , buildReadTimeMs ) ;
2016-10-06 15:24:09 +02:00
root . attr ( " buildReadTimeAvgMs " , nrBuildsRead = = 0 ? 0.0 : ( float ) buildReadTimeMs / nrBuildsRead ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " nrBuildsDone " , nrBuildsDone ) ;
2016-03-02 14:18:39 +01:00
root . attr ( " nrStepsStarted " , nrStepsStarted ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " nrStepsDone " , nrStepsDone ) ;
root . attr ( " nrRetries " , nrRetries ) ;
root . attr ( " maxNrRetries " , maxNrRetries ) ;
2015-06-22 15:34:33 +02:00
if ( nrStepsDone ) {
2015-06-22 17:11:17 +02:00
root . attr ( " totalStepTime " , totalStepTime ) ;
root . attr ( " totalStepBuildTime " , totalStepBuildTime ) ;
2016-02-19 14:24:23 +01:00
root . attr ( " avgStepTime " , ( float ) totalStepTime / nrStepsDone ) ;
root . attr ( " avgStepBuildTime " , ( float ) totalStepBuildTime / nrStepsDone ) ;
2015-06-22 15:34:33 +02:00
}
2015-06-22 14:06:44 +02:00
root . attr ( " nrQueueWakeups " , nrQueueWakeups ) ;
root . attr ( " nrDispatcherWakeups " , nrDispatcherWakeups ) ;
2016-03-02 14:18:39 +01:00
root . attr ( " dispatchTimeMs " , dispatchTimeMs ) ;
root . attr ( " dispatchTimeAvgMs " , nrDispatcherWakeups = = 0 ? 0.0 : ( float ) dispatchTimeMs / nrDispatcherWakeups ) ;
2015-06-22 14:06:44 +02:00
root . attr ( " nrDbConnections " , dbPool . count ( ) ) ;
2016-02-29 15:10:30 +01:00
root . attr ( " nrActiveDbUpdates " , nrActiveDbUpdates ) ;
2016-03-09 14:30:13 +01:00
2015-06-22 14:06:44 +02:00
{
2016-10-06 15:24:09 +02:00
auto nested = root . object ( " machines " ) ;
2015-06-22 14:06:44 +02:00
auto machines_ ( machines . lock ( ) ) ;
2015-06-25 12:24:11 +02:00
for ( auto & i : * machines_ ) {
auto & m ( i . second ) ;
auto & s ( m - > state ) ;
2016-10-06 15:24:09 +02:00
auto nested2 = nested . object ( m - > sshName ) ;
2015-09-02 13:31:47 +02:00
nested2 . attr ( " enabled " , m - > enabled ) ;
2018-03-07 10:06:56 +01:00
{
auto list = nested2 . list ( " systemTypes " ) ;
for ( auto & s : m - > systemTypes )
list . elem ( s ) ;
}
{
auto list = nested2 . list ( " supportedFeatures " ) ;
for ( auto & s : m - > supportedFeatures )
list . elem ( s ) ;
}
{
auto list = nested2 . list ( " mandatoryFeatures " ) ;
for ( auto & s : m - > mandatoryFeatures )
list . elem ( s ) ;
}
2015-06-25 12:24:11 +02:00
nested2 . attr ( " currentJobs " , s - > currentJobs ) ;
2015-08-17 13:50:41 +02:00
if ( s - > currentJobs = = 0 )
nested2 . attr ( " idleSince " , s - > idleSince ) ;
2015-06-25 12:24:11 +02:00
nested2 . attr ( " nrStepsDone " , s - > nrStepsDone ) ;
if ( m - > state - > nrStepsDone ) {
nested2 . attr ( " totalStepTime " , s - > totalStepTime ) ;
nested2 . attr ( " totalStepBuildTime " , s - > totalStepBuildTime ) ;
2016-02-19 14:24:23 +01:00
nested2 . attr ( " avgStepTime " , ( float ) s - > totalStepTime / s - > nrStepsDone ) ;
nested2 . attr ( " avgStepBuildTime " , ( float ) s - > totalStepBuildTime / s - > nrStepsDone ) ;
2015-06-22 17:11:17 +02:00
}
2016-03-22 16:54:06 +01:00
auto info ( m - > state - > connectInfo . lock ( ) ) ;
nested2 . attr ( " disabledUntil " , std : : chrono : : system_clock : : to_time_t ( info - > disabledUntil ) ) ;
nested2 . attr ( " lastFailure " , std : : chrono : : system_clock : : to_time_t ( info - > lastFailure ) ) ;
nested2 . attr ( " consecutiveFailures " , info - > consecutiveFailures ) ;
2018-03-07 10:06:56 +01:00
2015-06-22 14:06:44 +02:00
}
}
2016-03-09 14:30:13 +01:00
2015-08-11 01:30:24 +02:00
{
2016-10-06 15:24:09 +02:00
auto nested = root . object ( " jobsets " ) ;
2015-08-11 01:30:24 +02:00
auto jobsets_ ( jobsets . lock ( ) ) ;
for ( auto & jobset : * jobsets_ ) {
2016-10-06 15:24:09 +02:00
auto nested2 = nested . object ( jobset . first . first + " : " + jobset . first . second ) ;
2016-02-19 14:24:23 +01:00
nested2 . attr ( " shareUsed " , jobset . second - > shareUsed ( ) ) ;
2015-08-11 01:30:24 +02:00
nested2 . attr ( " seconds " , jobset . second - > getSeconds ( ) ) ;
}
}
2016-03-09 14:30:13 +01:00
2015-08-17 13:50:41 +02:00
{
2016-10-06 15:24:09 +02:00
auto nested = root . object ( " machineTypes " ) ;
2015-08-17 13:50:41 +02:00
auto machineTypes_ ( machineTypes . lock ( ) ) ;
for ( auto & i : * machineTypes_ ) {
2016-10-06 15:24:09 +02:00
auto nested2 = nested . object ( i . first ) ;
2015-08-17 13:50:41 +02:00
nested2 . attr ( " runnable " , i . second . runnable ) ;
nested2 . attr ( " running " , i . second . running ) ;
2015-08-17 15:45:44 +02:00
if ( i . second . runnable > 0 )
nested2 . attr ( " waitTime " , i . second . waitTime . count ( ) +
i . second . runnable * ( time ( 0 ) - lastDispatcherCheck ) ) ;
2015-08-17 13:50:41 +02:00
if ( i . second . running = = 0 )
2015-08-17 15:45:44 +02:00
nested2 . attr ( " lastActive " , std : : chrono : : system_clock : : to_time_t ( i . second . lastActive ) ) ;
2015-08-17 13:50:41 +02:00
}
}
2016-02-19 14:24:23 +01:00
2016-04-20 15:29:40 +02:00
auto store = getDestStore ( ) ;
2016-10-06 15:24:09 +02:00
auto nested = root . object ( " store " ) ;
2016-04-20 15:29:40 +02:00
auto & stats = store - > getStats ( ) ;
nested . attr ( " narInfoRead " , stats . narInfoRead ) ;
nested . attr ( " narInfoReadAverted " , stats . narInfoReadAverted ) ;
nested . attr ( " narInfoMissing " , stats . narInfoMissing ) ;
nested . attr ( " narInfoWrite " , stats . narInfoWrite ) ;
nested . attr ( " narInfoCacheSize " , stats . pathInfoCacheSize ) ;
nested . attr ( " narRead " , stats . narRead ) ;
nested . attr ( " narReadBytes " , stats . narReadBytes ) ;
nested . attr ( " narReadCompressedBytes " , stats . narReadCompressedBytes ) ;
nested . attr ( " narWrite " , stats . narWrite ) ;
nested . attr ( " narWriteAverted " , stats . narWriteAverted ) ;
nested . attr ( " narWriteBytes " , stats . narWriteBytes ) ;
nested . attr ( " narWriteCompressedBytes " , stats . narWriteCompressedBytes ) ;
nested . attr ( " narWriteCompressionTimeMs " , stats . narWriteCompressionTimeMs ) ;
nested . attr ( " narCompressionSavings " ,
stats . narWriteBytes
? 1.0 - ( double ) stats . narWriteCompressedBytes / stats . narWriteBytes
: 0.0 ) ;
nested . attr ( " narCompressionSpeed " , // MiB/s
stats . narWriteCompressionTimeMs
? ( double ) stats . narWriteBytes / stats . narWriteCompressionTimeMs * 1000.0 / ( 1024.0 * 1024.0 )
: 0.0 ) ;
auto s3Store = dynamic_cast < S3BinaryCacheStore * > ( & * store ) ;
if ( s3Store ) {
2016-10-06 15:24:09 +02:00
auto nested2 = nested . object ( " s3 " ) ;
2016-04-20 15:29:40 +02:00
auto & s3Stats = s3Store - > getS3Stats ( ) ;
nested2 . attr ( " put " , s3Stats . put ) ;
2020-08-04 18:25:21 +02:00
nested2 . attr ( " putBytes " , s3Stats . putBytes ) ;
2016-04-20 15:29:40 +02:00
nested2 . attr ( " putTimeMs " , s3Stats . putTimeMs ) ;
2020-08-04 18:25:21 +02:00
nested2 . attr ( " putSpeed " ,
s3Stats . putTimeMs
? ( double ) s3Stats . putBytes / s3Stats . putTimeMs * 1000.0 / ( 1024.0 * 1024.0 )
: 0.0 ) ;
2016-04-20 15:29:40 +02:00
nested2 . attr ( " get " , s3Stats . get ) ;
nested2 . attr ( " getBytes " , s3Stats . getBytes ) ;
nested2 . attr ( " getTimeMs " , s3Stats . getTimeMs ) ;
nested2 . attr ( " getSpeed " ,
s3Stats . getTimeMs
? ( double ) s3Stats . getBytes / s3Stats . getTimeMs * 1000.0 / ( 1024.0 * 1024.0 )
2016-02-19 14:24:23 +01:00
: 0.0 ) ;
2016-04-20 15:29:40 +02:00
nested2 . attr ( " head " , s3Stats . head ) ;
nested2 . attr ( " costDollarApprox " ,
( s3Stats . get + s3Stats . head ) / 10000.0 * 0.004
+ s3Stats . put / 1000.0 * 0.005 +
+ s3Stats . getBytes / ( 1024.0 * 1024.0 * 1024.0 ) * 0.09 ) ;
2016-02-19 14:24:23 +01:00
}
2015-06-15 18:20:14 +02:00
}
2015-06-22 14:06:44 +02:00
2015-06-15 18:20:14 +02:00
{
2016-02-29 15:10:30 +01:00
auto mc = startDbUpdate ( ) ;
2015-06-22 14:06:44 +02:00
pqxx : : work txn ( conn ) ;
// FIXME: use PostgreSQL 9.5 upsert.
txn . exec ( " delete from SystemStatus where what = 'queue-runner' " ) ;
2020-01-11 22:38:40 -08:00
txn . exec_params0 ( " insert into SystemStatus values ('queue-runner', $1) " , out . str ( ) ) ;
2015-06-22 14:06:44 +02:00
txn . exec ( " notify status_dumped " ) ;
txn . commit ( ) ;
2015-06-15 18:20:14 +02:00
}
2015-06-22 14:06:44 +02:00
}
void State : : showStatus ( )
{
auto conn ( dbPool . get ( ) ) ;
receiver statusDumped ( * conn , " status_dumped " ) ;
string status ;
bool barf = false ;
/* Get the last JSON status dump from the database. */
2015-06-15 18:20:14 +02:00
{
2015-06-22 14:06:44 +02:00
pqxx : : work txn ( * conn ) ;
auto res = txn . exec ( " select status from SystemStatus where what = 'queue-runner' " ) ;
if ( res . size ( ) ) status = res [ 0 ] [ 0 ] . as < string > ( ) ;
2015-06-15 18:20:14 +02:00
}
2015-06-22 14:06:44 +02:00
if ( status ! = " " ) {
/* If the status is not empty, then the queue runner is
running . Ask it to update the status dump . */
{
pqxx : : work txn ( * conn ) ;
txn . exec ( " notify dump_status " ) ;
txn . commit ( ) ;
}
/* Wait until it has done so. */
barf = conn - > await_notification ( 5 , 0 ) = = 0 ;
/* Get the new status. */
{
pqxx : : work txn ( * conn ) ;
auto res = txn . exec ( " select status from SystemStatus where what = 'queue-runner' " ) ;
if ( res . size ( ) ) status = res [ 0 ] [ 0 ] . as < string > ( ) ;
2015-06-15 18:20:14 +02:00
}
2015-06-22 14:06:44 +02:00
}
if ( status = = " " ) status = R " ({ " status " : " down " }) " ;
std : : cout < < status < < " \n " ;
if ( barf )
throw Error ( " queue runner did not respond; status information may be wrong " ) ;
}
void State : : unlock ( )
{
2015-06-22 14:24:03 +02:00
auto lock = acquireGlobalLock ( ) ;
if ( ! lock )
throw Error ( " hydra-queue-runner is currently running " ) ;
2015-06-22 14:06:44 +02:00
auto conn ( dbPool . get ( ) ) ;
clearBusy ( * conn , 0 ) ;
{
pqxx : : work txn ( * conn ) ;
txn . exec ( " delete from SystemStatus where what = 'queue-runner' " ) ;
txn . commit ( ) ;
2015-06-15 18:20:14 +02:00
}
}
2022-03-29 10:55:28 -07:00
void State : : run ( BuildID buildOne )
{
/* Can't be bothered to shut down cleanly. Goodbye! */
auto callback = createInterruptCallback ( [ & ] ( ) { std : : _Exit ( 0 ) ; } ) ;
startedAt = time ( 0 ) ;
this - > buildOne = buildOne ;
auto lock = acquireGlobalLock ( ) ;
if ( ! lock )
throw Error ( " hydra-queue-runner is already running " ) ;
2022-04-06 10:58:57 -07:00
std : : cout < < " Starting the Prometheus exporter on " < < metricsAddr < < std : : endl ;
2022-04-06 10:46:56 -07:00
/* Set up simple exporter, to show that we're still alive. */
2022-04-06 10:58:57 -07:00
prometheus : : Exposer promExposer { metricsAddr } ;
2022-04-06 10:46:56 -07:00
auto exposerPort = promExposer . GetListeningPorts ( ) . front ( ) ;
2022-04-06 11:41:04 -07:00
2022-04-06 10:46:56 -07:00
promExposer . RegisterCollectable ( registry ) ;
std : : cout < < " Started the Prometheus exporter, listening on "
2022-04-06 10:58:57 -07:00
< < metricsAddr < < " /metrics (port " < < exposerPort < < " ) "
2022-04-06 10:46:56 -07:00
< < std : : endl ;
2022-03-29 10:55:28 -07:00
2017-04-06 18:50:53 +02:00
Store : : Params localParams ;
localParams [ " max-connections " ] = " 16 " ;
2017-09-14 18:16:33 +02:00
localParams [ " max-connection-age " ] = " 600 " ;
2019-12-30 22:49:26 +01:00
localStore = openStore ( getEnv ( " NIX_REMOTE " ) . value_or ( " " ) , localParams ) ;
2016-02-24 14:04:31 +01:00
2017-03-03 12:37:27 +01:00
auto storeUri = config - > getStrOption ( " store_uri " ) ;
_destStore = storeUri = = " " ? localStore : openStore ( storeUri ) ;
2016-10-07 20:23:05 +02:00
2017-03-03 12:37:27 +01:00
useSubstitutes = config - > getBoolOption ( " use-substitutes " , false ) ;
2016-10-07 20:23:05 +02:00
2016-12-07 15:57:13 +01:00
// FIXME: hacky mechanism for configuring determinism checks.
2017-03-03 12:37:27 +01:00
for ( auto & s : tokenizeString < Strings > ( config - > getStrOption ( " xxx-jobset-repeats " ) ) ) {
2016-12-07 15:57:13 +01:00
auto s2 = tokenizeString < std : : vector < std : : string > > ( s , " : " ) ;
if ( s2 . size ( ) ! = 3 ) throw Error ( " bad value in xxx-jobset-repeats " ) ;
jobsetRepeats . emplace ( std : : make_pair ( s2 [ 0 ] , s2 [ 1 ] ) , std : : stoi ( s2 [ 2 ] ) ) ;
}
2015-06-22 14:06:44 +02:00
{
auto conn ( dbPool . get ( ) ) ;
clearBusy ( * conn , 0 ) ;
2020-03-26 15:30:37 +01:00
dumpStatus ( * conn ) ;
2015-06-22 14:06:44 +02:00
}
2015-05-29 01:31:12 +02:00
2022-02-10 10:51:12 -05:00
machinesReadyLock . lock ( ) ;
2015-06-25 12:24:11 +02:00
std : : thread ( & State : : monitorMachinesFile , this ) . detach ( ) ;
2015-06-09 14:21:21 +02:00
2015-06-23 00:14:49 +02:00
std : : thread ( & State : : queueMonitor , this ) . detach ( ) ;
2015-05-29 01:31:12 +02:00
2015-06-10 15:55:46 +02:00
std : : thread ( & State : : dispatcher , this ) . detach ( ) ;
2015-06-19 14:51:59 +02:00
2016-04-13 16:18:35 +02:00
/* Periodically clean up orphaned busy steps in the database. */
std : : thread ( [ & ] ( ) {
while ( true ) {
sleep ( 180 ) ;
std : : set < std : : pair < BuildID , int > > steps ;
{
auto orphanedSteps_ ( orphanedSteps . lock ( ) ) ;
if ( orphanedSteps_ - > empty ( ) ) continue ;
steps = * orphanedSteps_ ;
orphanedSteps_ - > clear ( ) ;
}
try {
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
for ( auto & step : steps ) {
2020-06-23 13:43:54 +02:00
printMsg ( lvlError , " cleaning orphaned step %d of build %d " , step . second , step . first ) ;
2020-01-11 22:38:40 -08:00
txn . exec_params0
( " update BuildSteps set busy = 0, status = $1 where build = $2 and stepnr = $3 and busy != 0 " ,
( int ) bsAborted ,
step . first ,
step . second ) ;
2016-04-13 16:18:35 +02:00
}
txn . commit ( ) ;
} catch ( std : : exception & e ) {
2020-06-23 13:43:54 +02:00
printMsg ( lvlError , " cleanup thread: %s " , e . what ( ) ) ;
2016-04-13 16:18:35 +02:00
auto orphanedSteps_ ( orphanedSteps . lock ( ) ) ;
orphanedSteps_ - > insert ( steps . begin ( ) , steps . end ( ) ) ;
}
}
} ) . detach ( ) ;
2017-09-14 18:16:33 +02:00
/* Make sure that old daemon connections are closed even when
we ' re not doing much . */
std : : thread ( [ & ] ( ) {
while ( true ) {
sleep ( 10 ) ;
try {
if ( auto remoteStore = getDestStore ( ) . dynamic_pointer_cast < RemoteStore > ( ) )
remoteStore - > flushBadConnections ( ) ;
} catch ( std : : exception & e ) {
2020-06-23 13:43:54 +02:00
printMsg ( lvlError , " connection flush thread: %s " , e . what ( ) ) ;
2017-09-14 18:16:33 +02:00
}
}
} ) . detach ( ) ;
2015-06-23 00:14:49 +02:00
/* Monitor the database for status dump requests (e.g. from
‘ hydra - queue - runner - - status ’ ) . */
2015-06-18 01:57:01 +02:00
while ( true ) {
try {
auto conn ( dbPool . get ( ) ) ;
2016-02-18 17:11:46 +01:00
receiver dumpStatus_ ( * conn , " dump_status " ) ;
2015-06-18 01:57:01 +02:00
while ( true ) {
2020-03-26 15:30:37 +01:00
conn - > await_notification ( ) ;
dumpStatus ( * conn ) ;
2015-06-18 01:57:01 +02:00
}
} catch ( std : : exception & e ) {
2020-06-23 13:43:54 +02:00
printMsg ( lvlError , " main thread: %s " , e . what ( ) ) ;
2015-06-18 01:57:01 +02:00
sleep ( 10 ) ; // probably a DB problem, so don't retry right away
}
}
2015-05-29 01:31:12 +02:00
}
2015-05-28 17:39:29 +02:00
int main ( int argc , char * * argv )
{
return handleExceptions ( argv [ 0 ] , [ & ] ( ) {
initNix ( ) ;
2015-06-10 15:55:46 +02:00
signal ( SIGINT , SIG_DFL ) ;
signal ( SIGTERM , SIG_DFL ) ;
signal ( SIGHUP , SIG_DFL ) ;
2015-05-29 01:31:12 +02:00
2019-09-25 17:25:07 +02:00
// FIXME: do this in the child environment in openConnection().
unsetenv ( " IN_SYSTEMD " ) ;
2015-06-17 21:35:20 +02:00
bool unlock = false ;
2015-06-22 14:06:44 +02:00
bool status = false ;
2015-06-25 15:29:22 +02:00
BuildID buildOne = 0 ;
2022-04-06 10:58:57 -07:00
std : : optional < std : : string > metricsAddrOpt = std : : nullopt ;
2015-06-17 21:35:20 +02:00
parseCmdLine ( argc , argv , [ & ] ( Strings : : iterator & arg , const Strings : : iterator & end ) {
if ( * arg = = " --unlock " )
unlock = true ;
2015-06-22 14:06:44 +02:00
else if ( * arg = = " --status " )
status = true ;
2015-06-25 15:29:22 +02:00
else if ( * arg = = " --build-one " ) {
2021-02-22 15:10:24 +01:00
if ( auto b = string2Int < BuildID > ( getArg ( * arg , arg , end ) ) )
buildOne = * b ;
else
2015-06-25 15:29:22 +02:00
throw Error ( " ‘ --build-one’ requires a build ID" ) ;
2022-04-06 10:58:57 -07:00
} else if ( * arg = = " --prometheus-address " ) {
metricsAddrOpt = getArg ( * arg , arg , end ) ;
2015-06-25 15:29:22 +02:00
} else
2015-06-17 21:35:20 +02:00
return false ;
return true ;
} ) ;
2016-05-11 18:34:50 +02:00
settings . verboseBuild = true ;
2015-06-17 11:48:38 +02:00
settings . lockCPU = false ;
2015-05-28 17:39:29 +02:00
2022-04-06 10:58:57 -07:00
State state { metricsAddrOpt } ;
2015-06-22 14:06:44 +02:00
if ( status )
state . showStatus ( ) ;
else if ( unlock )
state . unlock ( ) ;
2015-06-17 21:35:20 +02:00
else
2015-06-25 15:29:22 +02:00
state . run ( buildOne ) ;
2015-05-28 17:39:29 +02:00
} ) ;
}