2015-05-29 01:31:12 +02:00
# include <atomic>
# include <condition_variable>
2015-05-28 17:39:29 +02:00
# include <iostream>
# include <map>
2015-06-19 14:51:59 +02:00
# include <queue>
2015-05-29 01:31:12 +02:00
# include <memory>
# include <thread>
2015-06-17 11:45:20 +02:00
# include <cmath>
# include <chrono>
2015-06-18 01:52:20 +02:00
# include <algorithm>
2015-05-29 01:31:12 +02:00
2015-05-28 17:39:29 +02:00
# include <pqxx/pqxx>
2015-06-19 14:51:59 +02:00
# include <sys/types.h>
# include <sys/stat.h>
# include <fcntl.h>
2015-05-28 17:39:29 +02:00
# include "build-result.hh"
2015-06-09 14:21:21 +02:00
# include "build-remote.hh"
2015-05-29 17:14:20 +02:00
# include "sync.hh"
2015-05-29 20:55:13 +02:00
# include "pool.hh"
2015-05-29 17:14:20 +02:00
2015-05-28 17:39:29 +02:00
# include "store-api.hh"
# include "derivations.hh"
# include "shared.hh"
# include "globals.hh"
using namespace nix ;
2015-06-17 11:45:20 +02:00
const int maxTries = 5 ;
const int retryInterval = 60 ; // seconds
const float retryBackoff = 3.0 ;
typedef std : : chrono : : time_point < std : : chrono : : system_clock > system_time ;
2015-05-29 17:14:20 +02:00
template < class C , class V >
bool has ( const C & c , const V & v )
{
return c . find ( v ) ! = c . end ( ) ;
}
2015-06-17 22:38:12 +02:00
typedef std : : atomic < unsigned int > counter ;
2015-06-18 00:24:56 +02:00
struct MaintainCount
{
counter & c ;
MaintainCount ( counter & c ) : c ( c ) { c + + ; }
~ MaintainCount ( ) { c - - ; }
} ;
2015-06-17 22:38:12 +02:00
2015-05-28 17:39:29 +02:00
typedef enum {
bsSuccess = 0 ,
bsFailed = 1 ,
bsDepFailed = 2 ,
bsAborted = 3 ,
bsFailedWithOutput = 6 ,
2015-06-17 13:32:06 +02:00
bsTimedOut = 7 ,
2015-06-15 15:07:04 +02:00
bsUnsupported = 9 ,
2015-05-28 17:39:29 +02:00
} BuildStatus ;
typedef enum {
bssSuccess = 0 ,
bssFailed = 1 ,
bssAborted = 4 ,
2015-06-17 13:32:06 +02:00
bssTimedOut = 7 ,
2015-06-15 15:07:04 +02:00
bssUnsupported = 9 ,
2015-05-28 17:39:29 +02:00
bssBusy = 100 , // not stored
} BuildStepStatus ;
struct Connection : pqxx : : connection
{
2015-06-17 22:11:01 +02:00
Connection ( ) : pqxx : : connection ( getFlags ( ) ) { } ;
string getFlags ( )
{
string s = getEnv ( " HYDRA_DBI " , " dbi:Pg:dbname=hydra; " ) ;
string prefix = " dbi:Pg: " ;
if ( string ( s , 0 , prefix . size ( ) ) ! = prefix )
throw Error ( " $HYDRA_DBI does not denote a PostgreSQL database " ) ;
return concatStringsSep ( " " , tokenizeString < Strings > ( string ( s , prefix . size ( ) ) , " ; " ));
}
2015-05-28 17:39:29 +02:00
} ;
2015-06-18 01:57:01 +02:00
struct receiver : public pqxx : : notification_receiver
{
bool status = false ;
receiver ( pqxx : : connection_base & c , const std : : string & channel )
: pqxx : : notification_receiver ( c , channel ) { }
void operator ( ) ( const string & payload , int pid ) override
{
status = true ;
} ;
bool get ( ) {
bool b = status ;
status = false ;
return b ;
}
} ;
2015-05-28 17:39:29 +02:00
typedef unsigned int BuildID ;
2015-05-29 17:14:20 +02:00
struct Step ;
2015-05-28 17:39:29 +02:00
struct Build
{
typedef std : : shared_ptr < Build > ptr ;
typedef std : : weak_ptr < Build > wptr ;
BuildID id ;
Path drvPath ;
std : : map < string , Path > outputs ;
2015-05-29 17:14:20 +02:00
std : : string fullJobName ;
2015-06-17 13:32:06 +02:00
unsigned int maxSilentTime , buildTimeout ;
2015-05-29 17:14:20 +02:00
std : : shared_ptr < Step > toplevel ;
2015-05-28 17:39:29 +02:00
2015-06-18 16:30:28 +02:00
std : : atomic_bool finishedInDB { false } ;
2015-05-29 17:14:20 +02:00
~ Build ( )
{
2015-06-15 16:54:52 +02:00
printMsg ( lvlDebug , format ( " destroying build %1% " ) % id ) ;
2015-05-29 17:14:20 +02:00
}
2015-05-28 17:39:29 +02:00
} ;
struct Step
{
typedef std : : shared_ptr < Step > ptr ;
typedef std : : weak_ptr < Step > wptr ;
2015-05-29 17:14:20 +02:00
2015-05-28 17:39:29 +02:00
Path drvPath ;
Derivation drv ;
2015-06-15 16:33:50 +02:00
std : : set < std : : string > requiredSystemFeatures ;
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
struct State
{
/* The build steps on which this step depends. */
std : : set < Step : : ptr > deps ;
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
/* The build steps that depend on this step. */
std : : vector < Step : : wptr > rdeps ;
/* Builds that have this step as the top-level derivation. */
std : : vector < Build : : wptr > builds ;
2015-06-17 11:45:20 +02:00
/* Number of times we've tried this step. */
unsigned int tries = 0 ;
/* Point in time after which the step can be retried. */
system_time after ;
2015-05-29 17:14:20 +02:00
} ;
2015-06-18 16:30:28 +02:00
std : : atomic_bool created { false } ; // debugging
std : : atomic_bool finished { false } ; // debugging
2015-06-09 14:21:21 +02:00
2015-06-18 16:30:28 +02:00
Sync < State > state ;
2015-06-09 14:21:21 +02:00
2015-06-18 16:30:28 +02:00
~ Step ( )
{
2015-06-18 17:12:51 +02:00
//printMsg(lvlError, format("destroying step %1%") % drvPath);
2015-06-18 16:30:28 +02:00
}
2015-05-28 17:39:29 +02:00
} ;
2015-06-09 14:21:21 +02:00
struct Machine
{
typedef std : : shared_ptr < Machine > ptr ;
std : : string sshName , sshKey ;
std : : set < std : : string > systemTypes , supportedFeatures , mandatoryFeatures ;
unsigned int maxJobs = 1 ;
float speedFactor = 1.0 ;
2015-06-18 01:52:20 +02:00
std : : atomic < unsigned int > currentJobs { 0 } ;
2015-06-15 14:51:49 +02:00
bool supportsStep ( Step : : ptr step )
{
if ( systemTypes . find ( step - > drv . platform ) = = systemTypes . end ( ) ) return false ;
2015-06-15 16:33:50 +02:00
for ( auto & f : mandatoryFeatures )
if ( step - > requiredSystemFeatures . find ( f ) = = step - > requiredSystemFeatures . end ( ) ) return false ;
for ( auto & f : step - > requiredSystemFeatures )
if ( supportedFeatures . find ( f ) = = supportedFeatures . end ( ) ) return false ;
2015-06-15 14:51:49 +02:00
return true ;
}
2015-06-09 14:21:21 +02:00
} ;
/* A RAII helper that manages the currentJobs field of Machine
objects . */
struct MachineReservation
{
typedef std : : shared_ptr < MachineReservation > ptr ;
Machine : : ptr machine ;
MachineReservation ( Machine : : ptr machine ) : machine ( machine )
{
2015-06-18 01:52:20 +02:00
machine - > currentJobs + + ;
2015-06-09 14:21:21 +02:00
}
~ MachineReservation ( )
{
2015-06-18 01:52:20 +02:00
machine - > currentJobs - - ;
2015-06-09 14:21:21 +02:00
}
} ;
2015-05-28 17:39:29 +02:00
class State
{
private :
2015-05-29 01:31:12 +02:00
2015-06-09 14:21:21 +02:00
Path hydraData , logDir ;
2015-05-29 20:55:13 +02:00
2015-05-28 17:39:29 +02:00
/* The queued builds. */
2015-05-29 17:14:20 +02:00
typedef std : : map < BuildID , Build : : ptr > Builds ;
Sync < Builds > builds ;
2015-05-28 17:39:29 +02:00
/* All active or pending build steps (i.e. dependencies of the
2015-05-29 17:14:20 +02:00
queued builds ) . Note that these are weak pointers . Steps are
kept alive by being reachable from Builds or by being in
progress . */
typedef std : : map < Path , Step : : wptr > Steps ;
Sync < Steps > steps ;
2015-05-28 17:39:29 +02:00
/* Build steps that have no unbuilt dependencies. */
2015-05-29 17:14:20 +02:00
typedef std : : list < Step : : wptr > Runnable ;
Sync < Runnable > runnable ;
2015-05-28 17:39:29 +02:00
2015-06-09 14:21:21 +02:00
/* CV for waking up the dispatcher. */
std : : condition_variable dispatcherWakeup ;
std : : mutex dispatcherMutex ;
2015-05-29 20:55:13 +02:00
/* PostgreSQL connection pool. */
Pool < Connection > dbPool ;
2015-06-09 14:21:21 +02:00
/* The build machines. */
typedef std : : list < Machine : : ptr > Machines ;
Sync < Machines > machines ;
2015-06-15 18:20:14 +02:00
/* Various stats. */
2015-06-17 22:38:12 +02:00
counter nrBuildsRead { 0 } ;
counter nrBuildsDone { 0 } ;
counter nrStepsDone { 0 } ;
2015-06-18 00:24:56 +02:00
counter nrActiveSteps { 0 } ;
2015-06-17 22:38:12 +02:00
counter nrRetries { 0 } ;
counter maxNrRetries { 0 } ;
counter nrQueueWakeups { 0 } ;
counter nrDispatcherWakeups { 0 } ;
2015-06-15 18:20:14 +02:00
2015-06-19 14:51:59 +02:00
/* Log compressor work queue. */
Sync < std : : queue < Path > > logCompressorQueue ;
std : : condition_variable_any logCompressorWakeup ;
2015-05-28 17:39:29 +02:00
public :
State ( ) ;
~ State ( ) ;
2015-06-09 14:31:14 +02:00
void clearBusy ( time_t stopTime ) ;
2015-05-28 17:39:29 +02:00
2015-06-19 14:51:59 +02:00
private :
void loadMachines ( ) ;
2015-05-28 17:39:29 +02:00
int createBuildStep ( pqxx : : work & txn , time_t startTime , Build : : ptr build , Step : : ptr step ,
2015-06-09 14:57:49 +02:00
const std : : string & machine , BuildStepStatus status , const std : : string & errorMsg = " " ,
BuildID propagatedFrom = 0 ) ;
2015-05-28 17:39:29 +02:00
2015-06-09 14:21:21 +02:00
void finishBuildStep ( pqxx : : work & txn , time_t startTime , time_t stopTime , BuildID buildId , int stepNr ,
2015-06-09 14:57:49 +02:00
const std : : string & machine , BuildStepStatus status , const string & errorMsg = " " ,
BuildID propagatedFrom = 0 ) ;
2015-05-28 17:39:29 +02:00
void updateBuild ( pqxx : : work & txn , Build : : ptr build , BuildStatus status ) ;
2015-05-29 20:02:15 +02:00
void queueMonitor ( ) ;
2015-05-28 17:39:29 +02:00
2015-06-15 16:54:52 +02:00
void queueMonitorLoop ( ) ;
2015-06-11 18:07:45 +02:00
void getQueuedBuilds ( Connection & conn , std : : shared_ptr < StoreAPI > store , unsigned int & lastBuildId ) ;
void removeCancelledBuilds ( Connection & conn ) ;
2015-05-29 01:31:12 +02:00
2015-05-29 17:14:20 +02:00
Step : : ptr createStep ( std : : shared_ptr < StoreAPI > store , const Path & drvPath ,
2015-06-18 16:30:28 +02:00
Build : : ptr referringBuild , Step : : ptr referringStep ,
2015-06-15 15:13:03 +02:00
std : : set < Step : : ptr > & newSteps , std : : set < Step : : ptr > & newRunnable ) ;
2015-05-28 17:39:29 +02:00
2015-05-29 01:31:12 +02:00
void makeRunnable ( Step : : ptr step ) ;
2015-06-09 14:21:21 +02:00
/* The thread that selects and starts runnable builds. */
void dispatcher ( ) ;
void wakeDispatcher ( ) ;
2015-05-28 17:39:29 +02:00
2015-06-09 14:21:21 +02:00
void builder ( Step : : ptr step , MachineReservation : : ptr reservation ) ;
2015-06-17 11:45:20 +02:00
/* Perform the given build step. Return true if the step is to be
retried . */
bool doBuildStep ( std : : shared_ptr < StoreAPI > store , Step : : ptr step ,
2015-06-09 14:21:21 +02:00
Machine : : ptr machine ) ;
2015-05-28 17:39:29 +02:00
void markSucceededBuild ( pqxx : : work & txn , Build : : ptr build ,
const BuildResult & res , bool isCachedBuild , time_t startTime , time_t stopTime ) ;
2015-05-29 01:31:12 +02:00
2015-06-15 15:31:42 +02:00
bool checkCachedFailure ( Step : : ptr step , Connection & conn ) ;
2015-06-19 14:51:59 +02:00
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor ( ) ;
public :
2015-06-15 18:20:14 +02:00
void dumpStatus ( ) ;
2015-05-29 01:31:12 +02:00
void run ( ) ;
2015-05-28 17:39:29 +02:00
} ;
State : : State ( )
{
2015-06-09 14:21:21 +02:00
hydraData = getEnv ( " HYDRA_DATA " ) ;
if ( hydraData = = " " ) throw Error ( " $HYDRA_DATA must be set " ) ;
logDir = canonPath ( hydraData + " /build-logs " ) ;
2015-05-28 17:39:29 +02:00
}
State : : ~ State ( )
{
try {
2015-06-15 16:54:52 +02:00
printMsg ( lvlInfo , " clearing active builds / build steps... " ) ;
2015-06-09 14:31:14 +02:00
clearBusy ( time ( 0 ) ) ;
2015-05-28 17:39:29 +02:00
} catch ( . . . ) {
ignoreException ( ) ;
}
}
2015-06-09 14:21:21 +02:00
void State : : loadMachines ( )
{
Path machinesFile = getEnv ( " NIX_REMOTE_SYSTEMS " , " /etc/nix/machines " ) ;
Machines newMachines ;
if ( pathExists ( machinesFile ) ) {
for ( auto line : tokenizeString < Strings > ( readFile ( machinesFile ) , " \n " ) ) {
line = trim ( string ( line , 0 , line . find ( ' # ' ) ) ) ;
auto tokens = tokenizeString < std : : vector < std : : string > > ( line ) ;
if ( tokens . size ( ) < 3 ) continue ;
tokens . resize ( 7 ) ;
auto machine = std : : make_shared < Machine > ( ) ;
machine - > sshName = tokens [ 0 ] ;
machine - > systemTypes = tokenizeString < StringSet > ( tokens [ 1 ] , " , " ) ;
machine - > sshKey = tokens [ 2 ] ;
if ( tokens [ 3 ] ! = " " )
string2Int ( tokens [ 3 ] , machine - > maxJobs ) ;
else
machine - > maxJobs = 1 ;
machine - > speedFactor = atof ( tokens [ 4 ] . c_str ( ) ) ;
machine - > supportedFeatures = tokenizeString < StringSet > ( tokens [ 5 ] , " , " ) ;
machine - > mandatoryFeatures = tokenizeString < StringSet > ( tokens [ 6 ] , " , " ) ;
2015-06-15 16:33:50 +02:00
for ( auto & f : machine - > mandatoryFeatures )
machine - > supportedFeatures . insert ( f ) ;
2015-06-09 14:21:21 +02:00
newMachines . push_back ( machine ) ;
}
} else {
auto machine = std : : make_shared < Machine > ( ) ;
machine - > sshName = " localhost " ;
machine - > systemTypes = StringSet ( { settings . thisSystem } ) ;
if ( settings . thisSystem = = " x86_64-linux " )
machine - > systemTypes . insert ( " i686-linux " ) ;
machine - > maxJobs = settings . maxBuildJobs ;
newMachines . push_back ( machine ) ;
}
auto machines_ ( machines . lock ( ) ) ;
* machines_ = newMachines ;
}
2015-06-09 14:31:14 +02:00
void State : : clearBusy ( time_t stopTime )
2015-05-28 17:39:29 +02:00
{
2015-05-29 20:55:13 +02:00
auto conn ( dbPool . get ( ) ) ;
pqxx : : work txn ( * conn ) ;
2015-05-28 19:06:17 +02:00
txn . parameterized
2015-05-28 17:39:29 +02:00
( " update BuildSteps set busy = 0, status = $1, stopTime = $2 where busy = 1 " )
2015-05-28 19:06:17 +02:00
( ( int ) bssAborted )
( stopTime , stopTime ! = 0 ) . exec ( ) ;
2015-06-09 14:31:14 +02:00
txn . exec ( " update Builds set busy = 0 where finished = 0 and busy = 1 " ) ;
2015-05-28 17:39:29 +02:00
txn . commit ( ) ;
}
int State : : createBuildStep ( pqxx : : work & txn , time_t startTime , Build : : ptr build , Step : : ptr step ,
2015-06-09 14:57:49 +02:00
const std : : string & machine , BuildStepStatus status , const std : : string & errorMsg , BuildID propagatedFrom )
2015-05-28 17:39:29 +02:00
{
2015-06-09 15:03:20 +02:00
/* Acquire an exclusive lock on BuildSteps to ensure that we don't
race with other threads creating a step of the same build . */
txn . exec ( " lock table BuildSteps in exclusive mode " ) ;
2015-05-28 17:39:29 +02:00
auto res = txn . parameterized ( " select max(stepnr) from BuildSteps where build = $1 " ) ( build - > id ) . exec ( ) ;
int stepNr = res [ 0 ] [ 0 ] . is_null ( ) ? 1 : res [ 0 ] [ 0 ] . as < int > ( ) + 1 ;
2015-05-28 19:06:17 +02:00
txn . parameterized
2015-06-09 14:57:49 +02:00
( " insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " )
2015-06-15 15:48:05 +02:00
( build - > id ) ( stepNr ) ( 0 ) ( step - > drvPath ) ( status = = bssBusy ? 1 : 0 )
( startTime , startTime ! = 0 )
( step - > drv . platform )
2015-05-28 19:06:17 +02:00
( ( int ) status , status ! = bssBusy )
( propagatedFrom , propagatedFrom ! = 0 )
( errorMsg , errorMsg ! = " " )
2015-06-15 15:48:05 +02:00
( startTime , startTime ! = 0 & & status ! = bssBusy )
2015-06-15 15:07:04 +02:00
( machine ) . exec ( ) ;
2015-05-28 17:39:29 +02:00
for ( auto & output : step - > drv . outputs )
txn . parameterized
( " insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4) " )
( build - > id ) ( stepNr ) ( output . first ) ( output . second . path ) . exec ( ) ;
return stepNr ;
}
2015-06-09 14:21:21 +02:00
void State : : finishBuildStep ( pqxx : : work & txn , time_t startTime , time_t stopTime , BuildID buildId , int stepNr ,
2015-06-09 14:57:49 +02:00
const std : : string & machine , BuildStepStatus status , const std : : string & errorMsg , BuildID propagatedFrom )
2015-05-28 17:39:29 +02:00
{
2015-06-09 14:21:21 +02:00
assert ( startTime ) ;
assert ( stopTime ) ;
2015-05-28 19:06:17 +02:00
txn . parameterized
2015-06-09 14:57:49 +02:00
( " update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7, machine = $8 where build = $2 and stepnr = $3 " )
2015-05-28 19:06:17 +02:00
( ( int ) status ) ( buildId ) ( stepNr )
( propagatedFrom , propagatedFrom ! = 0 )
( errorMsg , errorMsg ! = " " )
2015-06-09 14:57:49 +02:00
( startTime ) ( stopTime )
( machine , machine ! = " " ) . exec ( ) ;
2015-05-28 17:39:29 +02:00
}
2015-05-29 20:02:15 +02:00
void State : : queueMonitor ( )
2015-06-15 16:54:52 +02:00
{
while ( true ) {
try {
queueMonitorLoop ( ) ;
} catch ( std : : exception & e ) {
printMsg ( lvlError , format ( " queue monitor: %1% " ) % e . what ( ) ) ;
sleep ( 10 ) ; // probably a DB problem, so don't retry right away
}
}
}
void State : : queueMonitorLoop ( )
2015-05-29 01:31:12 +02:00
{
2015-06-11 17:38:55 +02:00
auto conn ( dbPool . get ( ) ) ;
receiver buildsAdded ( * conn , " builds_added " ) ;
receiver buildsRestarted ( * conn , " builds_restarted " ) ;
receiver buildsCancelled ( * conn , " builds_cancelled " ) ;
2015-05-29 01:31:12 +02:00
auto store = openStore ( ) ; // FIXME: pool
2015-06-11 17:38:55 +02:00
unsigned int lastBuildId = 0 ;
2015-06-10 15:55:46 +02:00
while ( true ) {
2015-06-11 18:07:45 +02:00
getQueuedBuilds ( * conn , store , lastBuildId ) ;
2015-05-29 01:31:12 +02:00
2015-06-11 17:38:55 +02:00
/* Sleep until we get notification from the database about an
event . */
conn - > await_notification ( ) ;
2015-06-15 18:20:14 +02:00
nrQueueWakeups + + ;
2015-06-11 17:38:55 +02:00
if ( buildsAdded . get ( ) )
2015-06-15 16:54:52 +02:00
printMsg ( lvlTalkative , " got notification: new builds added to the queue " ) ;
2015-06-11 17:38:55 +02:00
if ( buildsRestarted . get ( ) ) {
2015-06-15 16:54:52 +02:00
printMsg ( lvlTalkative , " got notification: builds restarted " ) ;
2015-06-11 17:38:55 +02:00
lastBuildId = 0 ; // check all builds
}
if ( buildsCancelled . get ( ) ) {
2015-06-15 16:54:52 +02:00
printMsg ( lvlTalkative , " got notification: builds cancelled " ) ;
2015-06-11 18:07:45 +02:00
removeCancelledBuilds ( * conn ) ;
2015-05-29 01:31:12 +02:00
}
2015-06-15 18:20:14 +02:00
2015-05-29 01:31:12 +02:00
}
}
2015-06-11 18:07:45 +02:00
void State : : getQueuedBuilds ( Connection & conn , std : : shared_ptr < StoreAPI > store , unsigned int & lastBuildId )
2015-05-28 17:39:29 +02:00
{
2015-06-15 16:54:52 +02:00
printMsg ( lvlInfo , format ( " checking the queue for builds > %1%... " ) % lastBuildId ) ;
2015-05-29 01:31:12 +02:00
2015-05-29 17:14:20 +02:00
/* Grab the queued builds from the database, but don't process
them yet ( since we don ' t want a long - running transaction ) . */
2015-06-17 14:46:02 +02:00
std : : multimap < Path , Build : : ptr > newBuilds ;
2015-05-29 17:14:20 +02:00
{
2015-06-11 18:07:45 +02:00
pqxx : : work txn ( conn ) ;
2015-05-29 17:14:20 +02:00
2015-06-17 13:32:06 +02:00
auto res = txn . parameterized ( " select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id " ) ( lastBuildId ) . exec ( ) ;
2015-05-29 17:14:20 +02:00
for ( auto const & row : res ) {
2015-06-10 15:36:21 +02:00
auto builds_ ( builds . lock ( ) ) ;
2015-05-29 17:14:20 +02:00
BuildID id = row [ " id " ] . as < BuildID > ( ) ;
2015-06-11 17:38:55 +02:00
if ( id > lastBuildId ) lastBuildId = id ;
2015-05-29 17:14:20 +02:00
if ( has ( * builds_ , id ) ) continue ;
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
auto build = std : : make_shared < Build > ( ) ;
build - > id = id ;
build - > drvPath = row [ " drvPath " ] . as < string > ( ) ;
build - > fullJobName = row [ " project " ] . as < string > ( ) + " : " + row [ " jobset " ] . as < string > ( ) + " : " + row [ " job " ] . as < string > ( ) ;
2015-06-17 13:32:06 +02:00
build - > maxSilentTime = row [ " maxsilent " ] . as < int > ( ) ;
build - > buildTimeout = row [ " timeout " ] . as < int > ( ) ;
2015-05-29 17:14:20 +02:00
2015-06-17 14:46:02 +02:00
newBuilds . emplace ( std : : make_pair ( build - > drvPath , build ) ) ;
2015-05-29 17:14:20 +02:00
}
}
2015-05-28 17:39:29 +02:00
2015-06-17 14:46:02 +02:00
std : : set < Step : : ptr > newRunnable ;
unsigned int nrAdded ;
std : : function < void ( Build : : ptr ) > createBuild ;
2015-05-28 17:39:29 +02:00
2015-06-17 14:46:02 +02:00
createBuild = [ & ] ( Build : : ptr build ) {
2015-06-15 16:54:52 +02:00
printMsg ( lvlTalkative , format ( " loading build %1% (%2%) " ) % build - > id % build - > fullJobName ) ;
2015-06-17 14:46:02 +02:00
nrAdded + + ;
2015-05-28 17:39:29 +02:00
if ( ! store - > isValidPath ( build - > drvPath ) ) {
/* Derivation has been GC'ed prematurely. */
2015-06-15 14:51:49 +02:00
printMsg ( lvlError , format ( " aborting GC'ed build %1% " ) % build - > id ) ;
2015-06-18 17:12:51 +02:00
if ( ! build - > finishedInDB ) {
pqxx : : work txn ( conn ) ;
txn . parameterized
( " update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0 " )
( build - > id )
( ( int ) bsAborted )
( time ( 0 ) )
( " derivation was garbage-collected prior to build " ) . exec ( ) ;
txn . commit ( ) ;
build - > finishedInDB = true ;
nrBuildsDone + + ;
}
2015-06-17 14:46:02 +02:00
return ;
2015-05-28 17:39:29 +02:00
}
2015-06-17 14:46:02 +02:00
std : : set < Step : : ptr > newSteps ;
2015-06-18 16:30:28 +02:00
Step : : ptr step = createStep ( store , build - > drvPath , build , 0 , newSteps , newRunnable ) ;
2015-05-29 17:14:20 +02:00
2015-06-17 14:46:02 +02:00
/* Some of the new steps may be the top level of builds that
we haven ' t processed yet . So do them now . This ensures that
if build A depends on build B with top - level step X , then X
will be " accounted " to B in doBuildStep ( ) . */
for ( auto & r : newSteps ) {
while ( true ) {
auto i = newBuilds . find ( r - > drvPath ) ;
if ( i = = newBuilds . end ( ) ) break ;
Build : : ptr b = i - > second ;
newBuilds . erase ( i ) ;
createBuild ( b ) ;
}
}
2015-05-29 17:14:20 +02:00
/* If we didn't get a step, it means the step's outputs are
all valid . So we mark this as a finished , cached build . */
2015-05-28 17:39:29 +02:00
if ( ! step ) {
Derivation drv = readDerivation ( build - > drvPath ) ;
2015-05-29 01:31:12 +02:00
BuildResult res = getBuildResult ( store , drv ) ;
2015-05-28 17:39:29 +02:00
2015-06-11 18:07:45 +02:00
pqxx : : work txn ( conn ) ;
2015-05-28 17:39:29 +02:00
time_t now = time ( 0 ) ;
markSucceededBuild ( txn , build , res , true , now , now ) ;
txn . commit ( ) ;
2015-06-18 16:30:28 +02:00
build - > finishedInDB = true ;
2015-06-17 14:46:02 +02:00
return ;
2015-05-28 17:39:29 +02:00
}
2015-06-15 15:31:42 +02:00
/* If any step has an unsupported system type or has a
previously failed output path , then fail the build right
away . */
bool badStep = false ;
2015-06-15 15:13:03 +02:00
for ( auto & r : newSteps ) {
2015-06-15 15:31:42 +02:00
BuildStatus buildStatus = bsSuccess ;
2015-06-15 16:33:50 +02:00
BuildStepStatus buildStepStatus = bssFailed ;
2015-06-15 15:31:42 +02:00
if ( checkCachedFailure ( r , conn ) ) {
2015-06-15 16:54:52 +02:00
printMsg ( lvlError , format ( " marking build %1% as cached failure " ) % build - > id ) ;
2015-06-15 15:31:42 +02:00
buildStatus = step = = r ? bsFailed : bsFailed ;
buildStepStatus = bssFailed ;
}
2015-06-16 18:00:39 +02:00
if ( buildStatus = = bsSuccess ) {
bool supported = false ;
{
auto machines_ ( machines . lock ( ) ) ; // FIXME: use shared_mutex
for ( auto & m : * machines_ )
if ( m - > supportsStep ( r ) ) { supported = true ; break ; }
}
if ( ! supported ) {
printMsg ( lvlError , format ( " aborting unsupported build %1% " ) % build - > id ) ;
buildStatus = bsUnsupported ;
buildStepStatus = bssUnsupported ;
}
}
2015-06-15 15:31:42 +02:00
if ( buildStatus ! = bsSuccess ) {
2015-06-15 15:07:04 +02:00
time_t now = time ( 0 ) ;
2015-06-18 17:12:51 +02:00
if ( ! build - > finishedInDB ) {
pqxx : : work txn ( conn ) ;
createBuildStep ( txn , 0 , build , r , " " , buildStepStatus ) ;
txn . parameterized
( " update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0 " )
( build - > id )
( ( int ) buildStatus )
( now )
( buildStatus ! = bsUnsupported ? 1 : 0 ) . exec ( ) ;
txn . commit ( ) ;
build - > finishedInDB = true ;
nrBuildsDone + + ;
}
2015-06-15 15:31:42 +02:00
badStep = true ;
2015-06-15 15:07:04 +02:00
break ;
}
2015-06-15 14:51:49 +02:00
}
2015-06-17 14:46:02 +02:00
if ( badStep ) return ;
2015-06-15 14:51:49 +02:00
2015-05-29 17:14:20 +02:00
/* Note: if we exit this scope prior to this, the build and
all newly created steps are destroyed . */
{
auto builds_ ( builds . lock ( ) ) ;
2015-06-18 17:12:51 +02:00
if ( ! build - > finishedInDB ) // FIXME: can this happen?
( * builds_ ) [ build - > id ] = build ;
2015-05-29 17:14:20 +02:00
build - > toplevel = step ;
}
2015-06-17 14:46:02 +02:00
printMsg ( lvlChatty , format ( " added build %1% (top-level step %2%, %3% new steps) " )
% build - > id % step - > drvPath % newSteps . size ( ) ) ;
} ;
/* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away ,
even while we ' re still processing other new builds . */
while ( ! newBuilds . empty ( ) ) {
auto build = newBuilds . begin ( ) - > second ;
newBuilds . erase ( newBuilds . begin ( ) ) ;
newRunnable . clear ( ) ;
nrAdded = 0 ;
createBuild ( build ) ;
2015-05-29 17:14:20 +02:00
/* Add the new runnable build steps to ‘ runnable’ and wake up
the builder threads . */
2015-06-17 14:46:02 +02:00
printMsg ( lvlChatty , format ( " got %1% new runnable steps from %2% new builds " ) % newRunnable . size ( ) % nrAdded ) ;
2015-05-29 17:14:20 +02:00
for ( auto & r : newRunnable )
makeRunnable ( r ) ;
2015-06-17 22:38:12 +02:00
nrBuildsRead + = nrAdded ;
2015-05-28 17:39:29 +02:00
}
}
2015-06-11 18:07:45 +02:00
void State : : removeCancelledBuilds ( Connection & conn )
{
/* Get the current set of queued builds. */
std : : set < BuildID > currentIds ;
{
pqxx : : work txn ( conn ) ;
auto res = txn . exec ( " select id from Builds where finished = 0 " ) ;
for ( auto const & row : res )
currentIds . insert ( row [ " id " ] . as < BuildID > ( ) ) ;
}
auto builds_ ( builds . lock ( ) ) ;
for ( auto i = builds_ - > begin ( ) ; i ! = builds_ - > end ( ) ; ) {
if ( currentIds . find ( i - > first ) = = currentIds . end ( ) ) {
printMsg ( lvlInfo , format ( " discarding cancelled build %1% " ) % i - > first ) ;
i = builds_ - > erase ( i ) ;
// FIXME: ideally we would interrupt active build steps here.
} else
+ + i ;
}
}
2015-05-29 17:14:20 +02:00
Step : : ptr State : : createStep ( std : : shared_ptr < StoreAPI > store , const Path & drvPath ,
2015-06-18 16:30:28 +02:00
Build : : ptr referringBuild , Step : : ptr referringStep ,
2015-06-15 15:13:03 +02:00
std : : set < Step : : ptr > & newSteps , std : : set < Step : : ptr > & newRunnable )
2015-05-28 17:39:29 +02:00
{
2015-06-18 16:30:28 +02:00
/* Check if the requested step already exists. If not, create a
new step . In any case , make the step reachable from
referringBuild or referringStep . This is done atomically ( with
‘ steps ’ locked ) , to ensure that this step can never become
reachable from a new build after doBuildStep has removed it
from ‘ steps ’ . */
Step : : ptr step ;
bool isNew = false ;
2015-05-29 17:14:20 +02:00
{
auto steps_ ( steps . lock ( ) ) ;
2015-06-18 16:30:28 +02:00
/* See if the step already exists in ‘ steps’ and is not
stale . */
2015-05-29 17:14:20 +02:00
auto prev = steps_ - > find ( drvPath ) ;
if ( prev ! = steps_ - > end ( ) ) {
2015-06-18 16:30:28 +02:00
step = prev - > second . lock ( ) ;
2015-05-29 17:14:20 +02:00
/* Since ‘ step’ is a strong pointer, the referred Step
object won ' t be deleted after this . */
2015-06-18 16:30:28 +02:00
if ( ! step ) steps_ - > erase ( drvPath ) ; // remove stale entry
}
/* If it doesn't exist, create it. */
if ( ! step ) {
step = std : : make_shared < Step > ( ) ;
step - > drvPath = drvPath ;
isNew = true ;
2015-05-29 17:14:20 +02:00
}
2015-06-18 16:30:28 +02:00
auto step_ ( step - > state . lock ( ) ) ;
if ( referringBuild )
step_ - > builds . push_back ( referringBuild ) ;
if ( referringStep )
step_ - > rdeps . push_back ( referringStep ) ;
( * steps_ ) [ drvPath ] = step ;
2015-05-29 17:14:20 +02:00
}
2015-05-28 17:39:29 +02:00
2015-06-18 16:30:28 +02:00
if ( ! isNew ) {
assert ( step - > created ) ;
return step ;
}
2015-06-18 17:43:13 +02:00
printMsg ( lvlDebug , format ( " considering derivation ‘ %1%’ " ) % drvPath ) ;
2015-06-18 16:30:28 +02:00
/* Initialize the step. Note that the step may be visible in
‘ steps ’ before this point , but that doesn ' t matter because
it ' s not runnable yet , and other threads won ' t make it
runnable while step - > created = = false . */
2015-05-28 17:39:29 +02:00
step - > drv = readDerivation ( drvPath ) ;
2015-06-15 16:33:50 +02:00
{
auto i = step - > drv . env . find ( " requiredSystemFeatures " ) ;
if ( i ! = step - > drv . env . end ( ) )
step - > requiredSystemFeatures = tokenizeString < std : : set < std : : string > > ( i - > second ) ;
}
2015-05-28 17:39:29 +02:00
/* Are all outputs valid? */
bool valid = true ;
for ( auto & i : step - > drv . outputs ) {
if ( ! store - > isValidPath ( i . second . path ) ) {
valid = false ;
break ;
}
}
// FIXME: check whether all outputs are in the binary cache.
if ( valid ) return 0 ;
/* No, we need to build. */
2015-06-15 16:54:52 +02:00
printMsg ( lvlDebug , format ( " creating build step ‘ %1%’ " ) % drvPath ) ;
2015-06-18 16:30:28 +02:00
newSteps . insert ( step ) ;
2015-05-28 17:39:29 +02:00
/* Create steps for the dependencies. */
for ( auto & i : step - > drv . inputDrvs ) {
2015-06-18 16:30:28 +02:00
auto dep = createStep ( store , i . first , 0 , step , newSteps , newRunnable ) ;
2015-05-28 17:39:29 +02:00
if ( dep ) {
2015-05-29 17:14:20 +02:00
auto step_ ( step - > state . lock ( ) ) ;
step_ - > deps . insert ( dep ) ;
2015-05-28 17:39:29 +02:00
}
}
2015-06-18 16:30:28 +02:00
/* If the step has no (remaining) dependencies, make it
runnable . */
2015-05-29 17:14:20 +02:00
{
auto step_ ( step - > state . lock ( ) ) ;
2015-06-18 16:30:28 +02:00
assert ( ! step - > created ) ;
step - > created = true ;
if ( step_ - > deps . empty ( ) )
newRunnable . insert ( step ) ;
2015-05-29 17:14:20 +02:00
}
2015-05-28 17:39:29 +02:00
2015-06-18 16:30:28 +02:00
return step ;
2015-05-28 17:39:29 +02:00
}
2015-06-18 16:30:28 +02:00
/* Get the steps and unfinished builds that depend on the given step. */
void getDependents ( Step : : ptr step , std : : set < Build : : ptr > & builds , std : : set < Step : : ptr > & steps )
2015-05-28 17:39:29 +02:00
{
std : : function < void ( Step : : ptr ) > visit ;
visit = [ & ] ( Step : : ptr step ) {
2015-06-18 16:30:28 +02:00
if ( has ( steps , step ) ) return ;
steps . insert ( step ) ;
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
std : : vector < Step : : wptr > rdeps ;
{
auto step_ ( step - > state . lock ( ) ) ;
for ( auto & build : step_ - > builds ) {
auto build_ = build . lock ( ) ;
2015-06-18 16:30:28 +02:00
if ( build_ & & ! build_ - > finishedInDB ) builds . insert ( build_ ) ;
2015-05-29 17:14:20 +02:00
}
/* Make a copy of rdeps so that we don't hold the lock for
very long . */
rdeps = step_ - > rdeps ;
2015-05-28 17:39:29 +02:00
}
2015-05-29 17:14:20 +02:00
for ( auto & rdep : rdeps ) {
auto rdep_ = rdep . lock ( ) ;
if ( rdep_ ) visit ( rdep_ ) ;
2015-05-28 17:39:29 +02:00
}
} ;
visit ( step ) ;
}
2015-05-29 01:31:12 +02:00
void State : : makeRunnable ( Step : : ptr step )
2015-05-28 17:39:29 +02:00
{
2015-06-15 16:54:52 +02:00
printMsg ( lvlChatty , format ( " step ‘ %1%’ is now runnable " ) % step - > drvPath ) ;
2015-06-09 14:21:21 +02:00
2015-05-29 17:14:20 +02:00
{
auto step_ ( step - > state . lock ( ) ) ;
2015-06-18 16:30:28 +02:00
assert ( step - > created ) ;
assert ( ! step - > finished ) ;
2015-05-29 17:14:20 +02:00
assert ( step_ - > deps . empty ( ) ) ;
}
2015-05-29 01:31:12 +02:00
{
2015-05-29 17:14:20 +02:00
auto runnable_ ( runnable . lock ( ) ) ;
runnable_ - > push_back ( step ) ;
2015-05-28 17:39:29 +02:00
}
2015-05-29 01:31:12 +02:00
2015-06-09 14:21:21 +02:00
wakeDispatcher ( ) ;
2015-05-28 17:39:29 +02:00
}
2015-06-09 14:21:21 +02:00
void State : : dispatcher ( )
2015-05-29 01:31:12 +02:00
{
2015-06-10 15:55:46 +02:00
while ( true ) {
2015-06-15 16:54:52 +02:00
printMsg ( lvlDebug , " dispatcher woken up " ) ;
2015-05-29 01:31:12 +02:00
2015-06-17 11:45:20 +02:00
auto sleepUntil = system_time : : max ( ) ;
2015-06-18 01:52:20 +02:00
bool keepGoing ;
2015-06-09 14:21:21 +02:00
2015-06-18 01:52:20 +02:00
do {
/* Bail out when there are no slots left. */
std : : vector < Machine : : ptr > machinesSorted ;
{
auto machines_ ( machines . lock ( ) ) ;
machinesSorted . insert ( machinesSorted . end ( ) ,
machines_ - > begin ( ) , machines_ - > end ( ) ) ;
}
2015-06-09 14:21:21 +02:00
2015-06-18 01:52:20 +02:00
/* Sort the machines by a combination of speed factor and
available slots . Prioritise the available machines as
follows :
2015-06-17 11:45:20 +02:00
2015-06-18 01:52:20 +02:00
- First by load divided by speed factor , rounded to the
nearest integer . This causes fast machines to be
preferred over slow machines with similar loads .
2015-06-09 14:21:21 +02:00
2015-06-18 01:52:20 +02:00
- Then by speed factor .
2015-06-09 14:21:21 +02:00
2015-06-18 01:52:20 +02:00
- Finally by load . */
sort ( machinesSorted . begin ( ) , machinesSorted . end ( ) ,
[ ] ( const Machine : : ptr & a , const Machine : : ptr & b ) - > bool
2015-06-17 11:45:20 +02:00
{
2015-06-18 01:52:20 +02:00
float ta = roundf ( a - > currentJobs / a - > speedFactor ) ;
float tb = roundf ( b - > currentJobs / b - > speedFactor ) ;
return
ta ! = tb ? ta > tb :
a - > speedFactor ! = b - > speedFactor ? a - > speedFactor > b - > speedFactor :
a - > maxJobs > b - > maxJobs ;
} ) ;
/* Find a machine with a free slot and find a step to run
on it . Once we find such a pair , we restart the outer
loop because the machine sorting will have changed . */
keepGoing = false ;
system_time now = std : : chrono : : system_clock : : now ( ) ;
for ( auto & machine : machinesSorted ) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if ( machine - > currentJobs > = machine - > maxJobs ) continue ;
auto runnable_ ( runnable . lock ( ) ) ;
2015-06-18 16:30:28 +02:00
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
2015-06-18 01:52:20 +02:00
/* FIXME: we're holding the runnable lock too long
here . This could be more efficient . */
for ( auto i = runnable_ - > begin ( ) ; i ! = runnable_ - > end ( ) ; ) {
auto step = i - > lock ( ) ;
/* Delete dead steps. */
if ( ! step ) {
i = runnable_ - > erase ( i ) ;
continue ;
}
/* Can this machine do this step? */
if ( ! machine - > supportsStep ( step ) ) {
2015-06-17 11:45:20 +02:00
+ + i ;
continue ;
}
2015-06-18 01:52:20 +02:00
/* Skip previously failed steps that aren't ready
to be retried . */
{
auto step_ ( step - > state . lock ( ) ) ;
if ( step_ - > tries > 0 & & step_ - > after > now ) {
if ( step_ - > after < sleepUntil )
sleepUntil = step_ - > after ;
+ + i ;
continue ;
}
}
2015-06-09 14:21:21 +02:00
2015-06-18 01:52:20 +02:00
/* Make a slot reservation and start a thread to
do the build . */
auto reservation = std : : make_shared < MachineReservation > ( machine ) ;
i = runnable_ - > erase ( i ) ;
2015-06-09 14:21:21 +02:00
2015-06-18 01:52:20 +02:00
auto builderThread = std : : thread ( & State : : builder , this , step , reservation ) ;
builderThread . detach ( ) ; // FIXME?
keepGoing = true ;
break ;
}
if ( keepGoing ) break ;
2015-06-09 14:21:21 +02:00
}
2015-06-18 01:52:20 +02:00
} while ( keepGoing ) ;
2015-05-29 01:31:12 +02:00
2015-06-09 14:21:21 +02:00
/* Sleep until we're woken up (either because a runnable build
is added , or because a build finishes ) . */
{
std : : unique_lock < std : : mutex > lock ( dispatcherMutex ) ;
2015-06-17 11:45:20 +02:00
printMsg ( lvlDebug , format ( " dispatcher sleeping for %1%s " ) %
std : : chrono : : duration_cast < std : : chrono : : seconds > ( sleepUntil - std : : chrono : : system_clock : : now ( ) ) . count ( ) ) ;
dispatcherWakeup . wait_until ( lock , sleepUntil ) ;
nrDispatcherWakeups + + ;
2015-06-09 14:21:21 +02:00
}
2015-05-29 01:31:12 +02:00
}
2015-06-09 14:21:21 +02:00
printMsg ( lvlError , " dispatcher exits " ) ;
2015-05-29 01:31:12 +02:00
}
2015-06-09 14:21:21 +02:00
void State : : wakeDispatcher ( )
{
{ std : : lock_guard < std : : mutex > lock ( dispatcherMutex ) ; } // barrier
2015-06-19 14:51:59 +02:00
dispatcherWakeup . notify_one ( ) ;
2015-06-09 14:21:21 +02:00
}
void State : : builder ( Step : : ptr step , MachineReservation : : ptr reservation )
{
2015-06-17 11:45:20 +02:00
bool retry = true ;
2015-06-18 00:24:56 +02:00
MaintainCount mc ( nrActiveSteps ) ;
2015-06-09 14:21:21 +02:00
try {
auto store = openStore ( ) ; // FIXME: pool
2015-06-17 11:45:20 +02:00
retry = doBuildStep ( store , step , reservation - > machine ) ;
2015-06-09 14:21:21 +02:00
} catch ( std : : exception & e ) {
2015-06-17 11:45:20 +02:00
printMsg ( lvlError , format ( " uncaught exception building ‘ %1%’ on ‘ %2%’ : %3% " )
% step - > drvPath % reservation - > machine - > sshName % e . what ( ) ) ;
2015-06-09 14:21:21 +02:00
}
/* Release the machine and wake up the dispatcher. */
assert ( reservation . unique ( ) ) ;
reservation = 0 ;
wakeDispatcher ( ) ;
2015-06-17 11:45:20 +02:00
/* If there was a temporary failure, retry the step after an
exponentially increasing interval . */
if ( retry ) {
{
auto step_ ( step - > state . lock ( ) ) ;
step_ - > tries + + ;
nrRetries + + ;
if ( step_ - > tries > maxNrRetries ) maxNrRetries = step_ - > tries ; // yeah yeah, not atomic
int delta = retryInterval * powf ( retryBackoff , step_ - > tries - 1 ) ;
printMsg ( lvlInfo , format ( " will retry ‘ %1%’ after %2%s " ) % step - > drvPath % delta ) ;
step_ - > after = std : : chrono : : system_clock : : now ( ) + std : : chrono : : seconds ( delta ) ;
}
makeRunnable ( step ) ;
}
2015-06-09 14:21:21 +02:00
}
2015-06-17 11:45:20 +02:00
bool State : : doBuildStep ( std : : shared_ptr < StoreAPI > store , Step : : ptr step ,
2015-06-09 14:21:21 +02:00
Machine : : ptr machine )
2015-05-28 17:39:29 +02:00
{
2015-06-18 16:30:28 +02:00
{
auto step_ ( step - > state . lock ( ) ) ;
assert ( step - > created ) ;
assert ( ! step - > finished ) ;
}
2015-05-28 17:39:29 +02:00
/* There can be any number of builds in the database that depend
2015-05-29 17:14:20 +02:00
on this derivation . Arbitrarily pick one ( though preferring a
build of which this is the top - level derivation ) for the
2015-05-28 17:39:29 +02:00
purpose of creating build steps . We could create a build step
record for every build , but that could be very expensive
( e . g . a stdenv derivation can be a dependency of tens of
thousands of builds ) , so we don ' t . */
Build : : ptr build ;
2015-05-29 17:14:20 +02:00
{
2015-06-18 16:30:28 +02:00
std : : set < Build : : ptr > dependents ;
std : : set < Step : : ptr > steps ;
getDependents ( step , dependents , steps ) ;
2015-05-29 17:14:20 +02:00
if ( dependents . empty ( ) ) {
/* Apparently all builds that depend on this derivation
2015-06-17 11:45:20 +02:00
are gone ( e . g . cancelled ) . So don ' t bother . This is
2015-05-29 17:14:20 +02:00
very unlikely to happen , because normally Steps are
only kept alive by being reachable from a
2015-06-17 11:45:20 +02:00
Build . However , it ' s possible that a new Build just
created a reference to this step . So to handle that
possibility , we retry this step ( putting it back in
the runnable queue ) . If there are really no strong
pointers to the step , it will be deleted . */
2015-05-29 17:14:20 +02:00
printMsg ( lvlInfo , format ( " cancelling build step ‘ %1%’ " ) % step - > drvPath ) ;
2015-06-17 11:45:20 +02:00
return true ;
2015-05-29 17:14:20 +02:00
}
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
for ( auto build2 : dependents )
if ( build2 - > drvPath = = step - > drvPath ) { build = build2 ; break ; }
2015-05-28 17:39:29 +02:00
2015-05-29 17:14:20 +02:00
if ( ! build ) build = * dependents . begin ( ) ;
2015-05-28 17:39:29 +02:00
2015-06-17 13:32:06 +02:00
printMsg ( lvlInfo , format ( " performing step ‘ %1%’ on ‘ %2%’ (needed by build %3% and %4% others) " )
% step - > drvPath % machine - > sshName % build - > id % ( dependents . size ( ) - 1 ) ) ;
2015-05-29 17:14:20 +02:00
}
2015-05-28 17:39:29 +02:00
2015-05-29 20:55:13 +02:00
auto conn ( dbPool . get ( ) ) ;
2015-06-10 14:57:16 +02:00
2015-06-09 14:21:21 +02:00
RemoteResult result ;
2015-06-10 14:57:16 +02:00
BuildResult res ;
int stepNr = 0 ;
2015-06-09 14:21:21 +02:00
result . startTime = time ( 0 ) ;
2015-06-10 14:57:16 +02:00
2015-06-17 11:45:20 +02:00
/* If any of the outputs have previously failed, then don't bother
building again . */
2015-06-15 15:31:42 +02:00
bool cachedFailure = checkCachedFailure ( step , * conn ) ;
2015-05-28 17:39:29 +02:00
2015-06-10 14:57:16 +02:00
if ( cachedFailure )
result . status = RemoteResult : : rrPermanentFailure ;
else {
2015-05-28 17:39:29 +02:00
2015-06-10 14:57:16 +02:00
/* Create a build step record indicating that we started
building . Also , mark the selected build as busy . */
{
pqxx : : work txn ( * conn ) ;
stepNr = createBuildStep ( txn , result . startTime , build , step , machine - > sshName , bssBusy ) ;
txn . parameterized ( " update Builds set busy = 1 where id = $1 " ) ( build - > id ) . exec ( ) ;
txn . commit ( ) ;
}
2015-05-28 17:39:29 +02:00
2015-06-19 14:51:59 +02:00
/* Do the build. */
2015-06-10 14:57:16 +02:00
try {
2015-06-17 13:32:06 +02:00
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote ( store , machine - > sshName , machine - > sshKey , step - > drvPath , step - > drv ,
logDir , build - > maxSilentTime , build - > buildTimeout , result ) ;
2015-06-10 14:57:16 +02:00
} catch ( Error & e ) {
result . status = RemoteResult : : rrMiscFailure ;
result . errorMsg = e . msg ( ) ;
}
if ( result . status = = RemoteResult : : rrSuccess ) res = getBuildResult ( store , step - > drv ) ;
}
if ( ! result . stopTime ) result . stopTime = time ( 0 ) ;
2015-05-28 17:39:29 +02:00
2015-06-19 14:51:59 +02:00
/* Asynchronously compress the log. */
if ( result . logFile ! = " " ) {
{
auto logCompressorQueue_ ( logCompressorQueue . lock ( ) ) ;
logCompressorQueue_ - > push ( result . logFile ) ;
}
logCompressorWakeup . notify_one ( ) ;
}
2015-06-18 14:51:08 +02:00
/* The step had a hopefully temporary failure (e.g. network
issue ) . Retry a number of times . */
2015-06-17 11:45:20 +02:00
if ( result . status = = RemoteResult : : rrMiscFailure ) {
2015-06-19 10:37:22 +02:00
printMsg ( lvlError , format ( " irregular failure building ‘ %1%’ on ‘ %2%’ : %3% " )
% step - > drvPath % machine - > sshName % result . errorMsg ) ;
2015-06-18 14:51:08 +02:00
bool retry ;
{
auto step_ ( step - > state . lock ( ) ) ;
retry = step_ - > tries + 1 < maxTries ;
}
if ( retry ) {
pqxx : : work txn ( * conn ) ;
finishBuildStep ( txn , result . startTime , result . stopTime , build - > id ,
stepNr , machine - > sshName , bssAborted , result . errorMsg ) ;
txn . commit ( ) ;
return true ;
}
2015-06-17 11:45:20 +02:00
}
2015-06-18 16:30:28 +02:00
if ( result . status = = RemoteResult : : rrSuccess ) {
2015-05-29 17:14:20 +02:00
2015-06-18 16:30:28 +02:00
/* Register success in the database for all Build objects that
have this step as the top - level step . Since the queue
monitor thread may be creating new referring Builds
concurrently , and updating the database may fail , we do
this in a loop , marking all known builds , repeating until
there are no unmarked builds .
*/
while ( true ) {
2015-05-29 17:14:20 +02:00
2015-06-18 16:30:28 +02:00
/* Get the builds that have this one as the top-level. */
std : : vector < Build : : ptr > direct ;
{
auto steps_ ( steps . lock ( ) ) ;
auto step_ ( step - > state . lock ( ) ) ;
2015-05-28 17:39:29 +02:00
2015-06-18 16:30:28 +02:00
for ( auto & b_ : step_ - > builds ) {
auto b = b_ . lock ( ) ;
if ( b & & ! b - > finishedInDB ) direct . push_back ( b ) ;
}
/* If there are no builds left to update in the DB,
2015-06-18 17:37:35 +02:00
then we ' re done ( except for calling
finishBuildStep ( ) ) . Delete the step from
2015-06-18 16:30:28 +02:00
‘ steps ’ . Since we ' ve been holding the ‘ steps ’ lock ,
no new referrers can have been added in the
meantime or be added afterwards . */
if ( direct . empty ( ) ) {
printMsg ( lvlDebug , format ( " finishing build step ‘ %1%’ " ) % step - > drvPath ) ;
nrStepsDone + + ;
steps_ - > erase ( step - > drvPath ) ;
}
2015-06-10 14:57:16 +02:00
}
2015-06-09 15:03:20 +02:00
2015-06-18 16:30:28 +02:00
/* Update the database. */
{
pqxx : : work txn ( * conn ) ;
2015-06-17 11:45:20 +02:00
2015-06-18 16:30:28 +02:00
finishBuildStep ( txn , result . startTime , result . stopTime , build - > id , stepNr , machine - > sshName , bssSuccess ) ;
for ( auto & b : direct )
markSucceededBuild ( txn , b , res , build ! = b ,
result . startTime , result . stopTime ) ;
txn . commit ( ) ;
2015-06-18 14:51:08 +02:00
}
2015-06-10 14:57:16 +02:00
2015-06-18 17:37:35 +02:00
if ( direct . empty ( ) ) break ;
2015-06-18 16:30:28 +02:00
/* Remove the direct dependencies from ‘ builds’ . This will
cause them to be destroyed . */
for ( auto & b : direct ) {
auto builds_ ( builds . lock ( ) ) ;
b - > finishedInDB = true ;
builds_ - > erase ( b - > id ) ;
}
2015-05-28 17:39:29 +02:00
}
2015-06-18 16:30:28 +02:00
/* Wake up any dependent steps that have no other
dependencies . */
{
auto step_ ( step - > state . lock ( ) ) ;
for ( auto & rdepWeak : step_ - > rdeps ) {
auto rdep = rdepWeak . lock ( ) ;
if ( ! rdep ) continue ;
2015-05-28 17:39:29 +02:00
2015-06-18 16:30:28 +02:00
bool runnable = false ;
{
auto rdep_ ( rdep - > state . lock ( ) ) ;
rdep_ - > deps . erase ( step ) ;
if ( rdep_ - > deps . empty ( ) ) runnable = true ;
}
if ( runnable ) makeRunnable ( rdep ) ;
}
2015-06-18 14:51:08 +02:00
}
2015-05-29 17:14:20 +02:00
2015-06-18 16:30:28 +02:00
} else {
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step . */
while ( true ) {
/* Get the builds and steps that depend on this step. */
std : : set < Build : : ptr > indirect ;
{
auto steps_ ( steps . lock ( ) ) ;
std : : set < Step : : ptr > steps ;
getDependents ( step , indirect , steps ) ;
/* If there are no builds left, delete all referring
steps from ‘ steps ’ . As for the success case , we can
be certain no new referrers can be added . */
if ( indirect . empty ( ) ) {
for ( auto & s : steps ) {
printMsg ( lvlDebug , format ( " finishing build step ‘ %1%’ " ) % step - > drvPath ) ;
nrStepsDone + + ;
steps_ - > erase ( s - > drvPath ) ;
}
break ;
}
}
/* Update the database. */
{
pqxx : : work txn ( * conn ) ;
BuildStatus buildStatus =
result . status = = RemoteResult : : rrPermanentFailure ? bsFailed :
result . status = = RemoteResult : : rrTimedOut ? bsTimedOut :
bsAborted ;
BuildStepStatus buildStepStatus =
result . status = = RemoteResult : : rrPermanentFailure ? bssFailed :
result . status = = RemoteResult : : rrTimedOut ? bssTimedOut :
bssAborted ;
/* For regular failures, we don't care about the error
message . */
if ( buildStatus ! = bsAborted ) result . errorMsg = " " ;
/* Create failed build steps for every build that depends
on this . For cached failures , only create a step for
builds that don ' t have this step as top - level
( otherwise the user won ' t be able to see what caused
the build to fail ) . */
for ( auto & build2 : indirect ) {
2015-06-19 11:33:15 +02:00
if ( ( cachedFailure & & build2 - > drvPath = = step - > drvPath ) | |
2015-06-19 15:27:49 +02:00
( ! cachedFailure & & build = = build2 ) | |
build2 - > finishedInDB )
2015-06-19 11:33:15 +02:00
continue ;
2015-06-18 16:30:28 +02:00
createBuildStep ( txn , 0 , build2 , step , machine - > sshName ,
buildStepStatus , result . errorMsg , build - > id ) ;
}
if ( ! cachedFailure )
finishBuildStep ( txn , result . startTime , result . stopTime , build - > id ,
stepNr , machine - > sshName , buildStepStatus , result . errorMsg ) ;
/* Mark all builds that depend on this derivation as failed. */
for ( auto & build2 : indirect ) {
2015-06-19 15:27:49 +02:00
if ( build2 - > finishedInDB ) continue ;
2015-06-18 16:30:28 +02:00
printMsg ( lvlError , format ( " marking build %1% as failed " ) % build2 - > id ) ;
txn . parameterized
2015-06-18 17:12:51 +02:00
( " update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0 " )
2015-06-18 16:30:28 +02:00
( build2 - > id )
( ( int ) ( build2 - > drvPath ! = step - > drvPath & & buildStatus = = bsFailed ? bsDepFailed : buildStatus ) )
( result . startTime )
( result . stopTime )
( cachedFailure ? 1 : 0 ) . exec ( ) ;
nrBuildsDone + + ;
}
/* Remember failed paths in the database so that they
won ' t be built again . */
if ( ! cachedFailure & & result . status = = RemoteResult : : rrPermanentFailure )
for ( auto & path : outputPaths ( step - > drv ) )
txn . parameterized ( " insert into FailedPaths values ($1) " ) ( path ) . exec ( ) ;
txn . commit ( ) ;
}
/* Remove the indirect dependencies from ‘ builds’ . This
will cause them to be destroyed . */
for ( auto & b : indirect ) {
auto builds_ ( builds . lock ( ) ) ;
b - > finishedInDB = true ;
builds_ - > erase ( b - > id ) ;
}
}
}
2015-06-17 11:45:20 +02:00
2015-06-18 14:51:08 +02:00
return false ;
2015-05-28 17:39:29 +02:00
}
void State : : markSucceededBuild ( pqxx : : work & txn , Build : : ptr build ,
const BuildResult & res , bool isCachedBuild , time_t startTime , time_t stopTime )
{
2015-06-15 16:54:52 +02:00
printMsg ( lvlInfo , format ( " marking build %1% as succeeded " ) % build - > id ) ;
2015-06-09 14:21:21 +02:00
2015-06-18 17:12:51 +02:00
if ( build - > finishedInDB ) return ;
2015-06-18 16:30:28 +02:00
2015-05-28 19:06:17 +02:00
txn . parameterized
2015-06-18 17:12:51 +02:00
( " update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1 and finished = 0 " )
2015-05-28 17:39:29 +02:00
( build - > id )
2015-06-17 17:11:42 +02:00
( ( int ) ( res . failed ? bsFailedWithOutput : bsSuccess ) )
2015-05-28 17:39:29 +02:00
( startTime )
( stopTime )
( res . size )
2015-05-28 19:06:17 +02:00
( res . closureSize )
( res . releaseName , res . releaseName ! = " " )
( isCachedBuild ? 1 : 0 ) . exec ( ) ;
2015-05-28 17:39:29 +02:00
unsigned int productNr = 1 ;
for ( auto & product : res . products ) {
2015-05-28 19:06:17 +02:00
txn . parameterized
2015-05-28 17:39:29 +02:00
( " insert into BuildProducts (build, productnr, type, subtype, fileSize, sha1hash, sha256hash, path, name, defaultPath) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) " )
( build - > id )
( productNr + + )
( product . type )
2015-05-28 19:06:17 +02:00
( product . subtype )
( product . fileSize , product . isRegular )
( printHash ( product . sha1hash ) , product . isRegular )
( printHash ( product . sha256hash ) , product . isRegular )
2015-05-28 17:39:29 +02:00
( product . path )
( product . name )
( product . defaultPath ) . exec ( ) ;
}
2015-06-17 22:38:12 +02:00
nrBuildsDone + + ;
2015-05-28 17:39:29 +02:00
}
2015-06-15 15:31:42 +02:00
bool State : : checkCachedFailure ( Step : : ptr step , Connection & conn )
{
pqxx : : work txn ( conn ) ;
for ( auto & path : outputPaths ( step - > drv ) )
if ( ! txn . parameterized ( " select 1 from FailedPaths where path = $1 " ) ( path ) . exec ( ) . empty ( ) )
return true ;
return false ;
}
2015-06-19 14:51:59 +02:00
void State : : logCompressor ( )
{
while ( true ) {
try {
Path logPath ;
{
auto logCompressorQueue_ ( logCompressorQueue . lock ( ) ) ;
while ( logCompressorQueue_ - > empty ( ) )
logCompressorQueue_ . wait ( logCompressorWakeup ) ;
logPath = logCompressorQueue_ - > front ( ) ;
logCompressorQueue_ - > pop ( ) ;
}
if ( ! pathExists ( logPath ) ) continue ;
printMsg ( lvlChatty , format ( " compressing log file ‘ %1%’ " ) % logPath ) ;
Path tmpPath = logPath + " .bz2.tmp " ;
AutoCloseFD fd = open ( tmpPath . c_str ( ) , O_CREAT | O_TRUNC | O_WRONLY , 0644 ) ;
// FIXME: use libbz2
Pid pid = startProcess ( [ & ] ( ) {
if ( dup2 ( fd , STDOUT_FILENO ) = = - 1 )
throw SysError ( " cannot dup output pipe to stdout " ) ;
execlp ( " bzip2 " , " bzip2 " , " -c " , logPath . c_str ( ) , nullptr ) ;
throw SysError ( " cannot start ssh " ) ;
} ) ;
int res = pid . wait ( true ) ;
if ( res ! = 0 )
throw Error ( format ( " bzip2 returned exit code %1% while compressing ‘ %2%’ " )
% res % logPath ) ;
if ( rename ( tmpPath . c_str ( ) , ( logPath + " .bz2 " ) . c_str ( ) ) ! = 0 )
throw SysError ( format ( " renaming ‘ %1%’ " ) % tmpPath ) ;
if ( unlink ( logPath . c_str ( ) ) ! = 0 )
throw SysError ( format ( " unlinking ‘ %1%’ " ) % logPath ) ;
} catch ( std : : exception & e ) {
printMsg ( lvlError , format ( " log compressor: %1% " ) % e . what ( ) ) ;
sleep ( 5 ) ;
}
}
}
2015-06-15 18:20:14 +02:00
void State : : dumpStatus ( )
{
{
auto builds_ ( builds . lock ( ) ) ;
printMsg ( lvlError , format ( " %1% queued builds " ) % builds_ - > size ( ) ) ;
}
{
auto steps_ ( steps . lock ( ) ) ;
for ( auto i = steps_ - > begin ( ) ; i ! = steps_ - > end ( ) ; )
if ( i - > second . lock ( ) ) + + i ; else i = steps_ - > erase ( i ) ;
printMsg ( lvlError , format ( " %1% pending/active build steps " ) % steps_ - > size ( ) ) ;
}
{
auto runnable_ ( runnable . lock ( ) ) ;
for ( auto i = runnable_ - > begin ( ) ; i ! = runnable_ - > end ( ) ; )
if ( i - > lock ( ) ) + + i ; else i = runnable_ - > erase ( i ) ;
printMsg ( lvlError , format ( " %1% runnable build steps " ) % runnable_ - > size ( ) ) ;
}
2015-06-18 00:24:56 +02:00
printMsg ( lvlError , format ( " %1% active build steps " ) % nrActiveSteps ) ;
2015-06-17 22:38:12 +02:00
printMsg ( lvlError , format ( " %1% builds read from queue " ) % nrBuildsRead ) ;
printMsg ( lvlError , format ( " %1% builds done " ) % nrBuildsDone ) ;
printMsg ( lvlError , format ( " %1% build steps done " ) % nrStepsDone ) ;
2015-06-17 11:45:20 +02:00
printMsg ( lvlError , format ( " %1% build step retries " ) % nrRetries ) ;
printMsg ( lvlError , format ( " %1% most retries for any build step " ) % maxNrRetries ) ;
printMsg ( lvlError , format ( " %1% queue wakeups " ) % nrQueueWakeups ) ;
printMsg ( lvlError , format ( " %1% dispatcher wakeups " ) % nrDispatcherWakeups ) ;
printMsg ( lvlError , format ( " %1% database connections " ) % dbPool . count ( ) ) ;
2015-06-15 18:20:14 +02:00
{
auto machines_ ( machines . lock ( ) ) ;
for ( auto & m : * machines_ ) {
printMsg ( lvlError , format ( " machine %1%: %2%/%3% active " )
2015-06-18 01:52:20 +02:00
% m - > sshName % m - > currentJobs % m - > maxJobs ) ;
2015-06-15 18:20:14 +02:00
}
}
}
2015-05-29 01:31:12 +02:00
void State : : run ( )
{
2015-06-09 14:31:14 +02:00
clearBusy ( 0 ) ;
2015-05-29 01:31:12 +02:00
2015-06-09 14:21:21 +02:00
loadMachines ( ) ;
auto queueMonitorThread = std : : thread ( & State : : queueMonitor , this ) ;
2015-05-29 01:31:12 +02:00
2015-06-10 15:55:46 +02:00
std : : thread ( & State : : dispatcher , this ) . detach ( ) ;
2015-05-29 01:31:12 +02:00
2015-06-19 14:51:59 +02:00
/* Run a log compressor thread. If needed, we could start more
than one . */
std : : thread ( & State : : logCompressor , this ) . detach ( ) ;
2015-06-18 01:57:01 +02:00
while ( true ) {
try {
auto conn ( dbPool . get ( ) ) ;
receiver dumpStatus ( * conn , " dump_status " ) ;
while ( true ) {
conn - > await_notification ( ) ;
if ( dumpStatus . get ( ) )
State : : dumpStatus ( ) ;
}
} catch ( std : : exception & e ) {
printMsg ( lvlError , format ( " main thread: %1% " ) % e . what ( ) ) ;
sleep ( 10 ) ; // probably a DB problem, so don't retry right away
}
}
// Never reached.
2015-05-29 01:31:12 +02:00
queueMonitorThread . join ( ) ;
}
2015-05-28 17:39:29 +02:00
int main ( int argc , char * * argv )
{
return handleExceptions ( argv [ 0 ] , [ & ] ( ) {
initNix ( ) ;
2015-06-10 15:55:46 +02:00
signal ( SIGINT , SIG_DFL ) ;
signal ( SIGTERM , SIG_DFL ) ;
signal ( SIGHUP , SIG_DFL ) ;
2015-05-29 01:31:12 +02:00
2015-06-17 21:35:20 +02:00
bool unlock = false ;
parseCmdLine ( argc , argv , [ & ] ( Strings : : iterator & arg , const Strings : : iterator & end ) {
if ( * arg = = " --unlock " )
unlock = true ;
else
return false ;
return true ;
} ) ;
2015-05-28 17:39:29 +02:00
settings . buildVerbosity = lvlVomit ;
settings . useSubstitutes = false ;
2015-06-17 11:48:38 +02:00
settings . lockCPU = false ;
2015-05-28 17:39:29 +02:00
/* FIXME: need some locking to prevent multiple instances of
hydra - queue - runner . */
State state ;
2015-06-17 21:35:20 +02:00
if ( unlock )
state . clearBusy ( 0 ) ;
else
state . run ( ) ;
2015-05-28 17:39:29 +02:00
} ) ;
}