2015-07-21 15:14:17 +02:00
# include "state.hh"
# include "build-result.hh"
2015-10-05 14:57:44 +02:00
# include "globals.hh"
2015-07-21 15:14:17 +02:00
using namespace nix ;
void State : : queueMonitor ( )
{
while ( true ) {
try {
queueMonitorLoop ( ) ;
} catch ( std : : exception & e ) {
printMsg ( lvlError , format ( " queue monitor: %1% " ) % e . what ( ) ) ;
sleep ( 10 ) ; // probably a DB problem, so don't retry right away
}
}
}
void State : : queueMonitorLoop ( )
{
auto conn ( dbPool . get ( ) ) ;
receiver buildsAdded ( * conn , " builds_added " ) ;
receiver buildsRestarted ( * conn , " builds_restarted " ) ;
receiver buildsCancelled ( * conn , " builds_cancelled " ) ;
receiver buildsDeleted ( * conn , " builds_deleted " ) ;
2015-08-10 16:18:06 +02:00
receiver buildsBumped ( * conn , " builds_bumped " ) ;
2015-08-12 13:17:56 +02:00
receiver jobsetSharesChanged ( * conn , " jobset_shares_changed " ) ;
2015-07-21 15:14:17 +02:00
auto store = openStore ( ) ; // FIXME: pool
unsigned int lastBuildId = 0 ;
while ( true ) {
2015-10-22 17:00:46 +02:00
bool done = getQueuedBuilds ( * conn , store , lastBuildId ) ;
2015-07-21 15:14:17 +02:00
/* Sleep until we get notification from the database about an
event . */
2015-10-22 17:00:46 +02:00
if ( done ) {
conn - > await_notification ( ) ;
nrQueueWakeups + + ;
} else
conn - > get_notifs ( ) ;
2015-07-21 15:14:17 +02:00
if ( buildsAdded . get ( ) )
printMsg ( lvlTalkative , " got notification: new builds added to the queue " ) ;
if ( buildsRestarted . get ( ) ) {
printMsg ( lvlTalkative , " got notification: builds restarted " ) ;
lastBuildId = 0 ; // check all builds
}
2015-08-10 16:18:06 +02:00
if ( buildsCancelled . get ( ) | | buildsDeleted . get ( ) | | buildsBumped . get ( ) ) {
printMsg ( lvlTalkative , " got notification: builds cancelled or bumped " ) ;
processQueueChange ( * conn ) ;
2015-07-21 15:14:17 +02:00
}
2015-08-12 13:17:56 +02:00
if ( jobsetSharesChanged . get ( ) ) {
printMsg ( lvlTalkative , " got notification: jobset shares changed " ) ;
processJobsetSharesChange ( * conn ) ;
}
2015-07-21 15:14:17 +02:00
}
}
2016-02-11 15:59:47 +01:00
bool State : : getQueuedBuilds ( Connection & conn , ref < Store > store , unsigned int & lastBuildId )
2015-07-21 15:14:17 +02:00
{
printMsg ( lvlInfo , format ( " checking the queue for builds > %1%... " ) % lastBuildId ) ;
/* Grab the queued builds from the database, but don't process
them yet ( since we don ' t want a long - running transaction ) . */
2015-08-11 02:14:11 +02:00
std : : vector < BuildID > newIDs ;
std : : map < BuildID , Build : : ptr > newBuildsByID ;
std : : multimap < Path , BuildID > newBuildsByPath ;
2015-07-21 15:14:17 +02:00
2015-10-22 17:00:46 +02:00
unsigned int newLastBuildId = lastBuildId ;
2015-07-21 15:14:17 +02:00
{
pqxx : : work txn ( conn ) ;
2015-08-11 02:14:11 +02:00
auto res = txn . parameterized
2015-08-12 12:05:43 +02:00
( " select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority, priority from Builds "
2015-08-11 02:14:11 +02:00
" where id > $1 and finished = 0 order by globalPriority desc, id " )
( lastBuildId ) . exec ( ) ;
2015-07-21 15:14:17 +02:00
for ( auto const & row : res ) {
auto builds_ ( builds . lock ( ) ) ;
BuildID id = row [ " id " ] . as < BuildID > ( ) ;
if ( buildOne & & id ! = buildOne ) continue ;
2015-10-22 17:00:46 +02:00
if ( id > newLastBuildId ) newLastBuildId = id ;
2015-11-02 14:29:12 +01:00
if ( builds_ - > count ( id ) ) continue ;
2015-07-21 15:14:17 +02:00
auto build = std : : make_shared < Build > ( ) ;
build - > id = id ;
build - > drvPath = row [ " drvPath " ] . as < string > ( ) ;
2015-07-31 00:57:30 +02:00
build - > projectName = row [ " project " ] . as < string > ( ) ;
build - > jobsetName = row [ " jobset " ] . as < string > ( ) ;
build - > jobName = row [ " job " ] . as < string > ( ) ;
2015-07-21 15:14:17 +02:00
build - > maxSilentTime = row [ " maxsilent " ] . as < int > ( ) ;
build - > buildTimeout = row [ " timeout " ] . as < int > ( ) ;
2015-07-31 00:57:30 +02:00
build - > timestamp = row [ " timestamp " ] . as < time_t > ( ) ;
2015-08-10 16:18:06 +02:00
build - > globalPriority = row [ " globalPriority " ] . as < int > ( ) ;
2015-08-12 12:05:43 +02:00
build - > localPriority = row [ " priority " ] . as < int > ( ) ;
2015-08-11 01:30:24 +02:00
build - > jobset = createJobset ( txn , build - > projectName , build - > jobsetName ) ;
2015-07-21 15:14:17 +02:00
2015-08-11 02:14:11 +02:00
newIDs . push_back ( id ) ;
newBuildsByID [ id ] = build ;
newBuildsByPath . emplace ( std : : make_pair ( build - > drvPath , id ) ) ;
2015-07-21 15:14:17 +02:00
}
}
std : : set < Step : : ptr > newRunnable ;
unsigned int nrAdded ;
std : : function < void ( Build : : ptr ) > createBuild ;
createBuild = [ & ] ( Build : : ptr build ) {
2015-07-31 00:57:30 +02:00
printMsg ( lvlTalkative , format ( " loading build %1% (%2%) " ) % build - > id % build - > fullJobName ( ) ) ;
2015-07-21 15:14:17 +02:00
nrAdded + + ;
2015-08-11 02:14:11 +02:00
newBuildsByID . erase ( build - > id ) ;
2015-07-21 15:14:17 +02:00
if ( ! store - > isValidPath ( build - > drvPath ) ) {
/* Derivation has been GC'ed prematurely. */
printMsg ( lvlError , format ( " aborting GC'ed build %1% " ) % build - > id ) ;
if ( ! build - > finishedInDB ) {
pqxx : : work txn ( conn ) ;
txn . parameterized
2015-10-27 15:37:17 +01:00
( " update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0 " )
2015-07-21 15:14:17 +02:00
( build - > id )
( ( int ) bsAborted )
( time ( 0 ) )
( " derivation was garbage-collected prior to build " ) . exec ( ) ;
txn . commit ( ) ;
build - > finishedInDB = true ;
nrBuildsDone + + ;
}
return ;
}
std : : set < Step : : ptr > newSteps ;
std : : set < Path > finishedDrvs ; // FIXME: re-use?
2015-10-05 14:57:44 +02:00
Step : : ptr step = createStep ( store , conn , build , build - > drvPath , build , 0 , finishedDrvs , newSteps , newRunnable ) ;
2015-07-21 15:14:17 +02:00
/* Some of the new steps may be the top level of builds that
we haven ' t processed yet . So do them now . This ensures that
if build A depends on build B with top - level step X , then X
will be " accounted " to B in doBuildStep ( ) . */
for ( auto & r : newSteps ) {
2015-08-11 02:14:11 +02:00
auto i = newBuildsByPath . find ( r - > drvPath ) ;
if ( i = = newBuildsByPath . end ( ) ) continue ;
auto j = newBuildsByID . find ( i - > second ) ;
if ( j = = newBuildsByID . end ( ) ) continue ;
createBuild ( j - > second ) ;
2015-07-21 15:14:17 +02:00
}
/* If we didn't get a step, it means the step's outputs are
all valid . So we mark this as a finished , cached build . */
if ( ! step ) {
Derivation drv = readDerivation ( build - > drvPath ) ;
BuildOutput res = getBuildOutput ( store , drv ) ;
pqxx : : work txn ( conn ) ;
time_t now = time ( 0 ) ;
markSucceededBuild ( txn , build , res , true , now , now ) ;
txn . commit ( ) ;
build - > finishedInDB = true ;
return ;
}
2015-08-17 15:10:41 +02:00
/* If any step has a previously failed output path, then fail
the build right away . */
2015-07-21 15:14:17 +02:00
bool badStep = false ;
2015-08-17 15:10:41 +02:00
for ( auto & r : newSteps )
2015-07-21 15:14:17 +02:00
if ( checkCachedFailure ( r , conn ) ) {
printMsg ( lvlError , format ( " marking build %1% as cached failure " ) % build - > id ) ;
if ( ! build - > finishedInDB ) {
pqxx : : work txn ( conn ) ;
2015-09-11 15:55:26 +02:00
/* Find the previous build step record, first by
derivation path , then by output path . */
BuildID propagatedFrom = 0 ;
auto res = txn . parameterized
( " select max(build) from BuildSteps where drvPath = $1 and startTime != 0 and stopTime != 0 and status = 1 " )
( r - > drvPath ) . exec ( ) ;
if ( ! res [ 0 ] [ 0 ] . is_null ( ) ) propagatedFrom = res [ 0 ] [ 0 ] . as < BuildID > ( ) ;
if ( ! propagatedFrom ) {
for ( auto & output : r - > drv . outputs ) {
auto res = txn . parameterized
( " select max(s.build) from BuildSteps s join BuildStepOutputs o on s.build = o.build where path = $1 and startTime != 0 and stopTime != 0 and status = 1 " )
( output . second . path ) . exec ( ) ;
if ( ! res [ 0 ] [ 0 ] . is_null ( ) ) {
propagatedFrom = res [ 0 ] [ 0 ] . as < BuildID > ( ) ;
break ;
}
}
}
createBuildStep ( txn , 0 , build , r , " " , bssCachedFailure , " " , propagatedFrom ) ;
2015-07-21 15:14:17 +02:00
txn . parameterized
2015-10-27 15:37:17 +01:00
( " update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = 1 where id = $1 and finished = 0 " )
2015-07-21 15:14:17 +02:00
( build - > id )
2015-08-17 15:10:41 +02:00
( ( int ) ( step = = r ? bsFailed : bsDepFailed ) )
( time ( 0 ) ) . exec ( ) ;
2015-07-21 15:14:17 +02:00
txn . commit ( ) ;
build - > finishedInDB = true ;
nrBuildsDone + + ;
}
badStep = true ;
break ;
}
if ( badStep ) return ;
/* Note: if we exit this scope prior to this, the build and
all newly created steps are destroyed . */
{
auto builds_ ( builds . lock ( ) ) ;
if ( ! build - > finishedInDB ) // FIXME: can this happen?
( * builds_ ) [ build - > id ] = build ;
build - > toplevel = step ;
}
2015-08-11 01:30:24 +02:00
build - > propagatePriorities ( ) ;
2015-07-21 15:14:17 +02:00
printMsg ( lvlChatty , format ( " added build %1% (top-level step %2%, %3% new steps) " )
% build - > id % step - > drvPath % newSteps . size ( ) ) ;
} ;
/* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away ,
even while we ' re still processing other new builds . */
2015-10-22 17:00:46 +02:00
system_time start = std : : chrono : : system_clock : : now ( ) ;
2015-08-11 02:14:11 +02:00
for ( auto id : newIDs ) {
auto i = newBuildsByID . find ( id ) ;
if ( i = = newBuildsByID . end ( ) ) continue ;
auto build = i - > second ;
2015-07-21 15:14:17 +02:00
newRunnable . clear ( ) ;
nrAdded = 0 ;
try {
createBuild ( build ) ;
} catch ( Error & e ) {
e . addPrefix ( format ( " while loading build %1%: " ) % build - > id ) ;
throw ;
}
/* Add the new runnable build steps to ‘ runnable’ and wake up
the builder threads . */
printMsg ( lvlChatty , format ( " got %1% new runnable steps from %2% new builds " ) % newRunnable . size ( ) % nrAdded ) ;
for ( auto & r : newRunnable )
makeRunnable ( r ) ;
nrBuildsRead + = nrAdded ;
2015-10-22 17:00:46 +02:00
/* Stop after a certain time to allow priority bumps to be
processed . */
if ( std : : chrono : : system_clock : : now ( ) > start + std : : chrono : : seconds ( 600 ) ) break ;
2015-07-21 15:14:17 +02:00
}
2015-10-22 17:00:46 +02:00
lastBuildId = newBuildsByID . empty ( ) ? newLastBuildId : newBuildsByID . begin ( ) - > first - 1 ;
return newBuildsByID . empty ( ) ;
2015-07-21 15:14:17 +02:00
}
2015-08-10 16:18:06 +02:00
void Build : : propagatePriorities ( )
{
/* Update the highest global priority and lowest build ID fields
of each dependency . This is used by the dispatcher to start
steps in order of descending global priority and ascending
build ID . */
visitDependencies ( [ & ] ( const Step : : ptr & step ) {
auto step_ ( step - > state . lock ( ) ) ;
step_ - > highestGlobalPriority = std : : max ( step_ - > highestGlobalPriority , globalPriority ) ;
2015-08-12 12:05:43 +02:00
step_ - > highestLocalPriority = std : : max ( step_ - > highestLocalPriority , localPriority ) ;
2015-08-10 16:18:06 +02:00
step_ - > lowestBuildID = std : : min ( step_ - > lowestBuildID , id ) ;
2015-08-11 01:30:24 +02:00
step_ - > jobsets . insert ( jobset ) ;
2015-08-10 16:18:06 +02:00
} , toplevel ) ;
}
void State : : processQueueChange ( Connection & conn )
2015-07-21 15:14:17 +02:00
{
/* Get the current set of queued builds. */
2015-08-10 16:18:06 +02:00
std : : map < BuildID , int > currentIds ;
2015-07-21 15:14:17 +02:00
{
pqxx : : work txn ( conn ) ;
2015-08-10 16:18:06 +02:00
auto res = txn . exec ( " select id, globalPriority from Builds where finished = 0 " ) ;
2015-07-21 15:14:17 +02:00
for ( auto const & row : res )
2015-08-10 16:18:06 +02:00
currentIds [ row [ " id " ] . as < BuildID > ( ) ] = row [ " globalPriority " ] . as < BuildID > ( ) ;
2015-07-21 15:14:17 +02:00
}
auto builds_ ( builds . lock ( ) ) ;
for ( auto i = builds_ - > begin ( ) ; i ! = builds_ - > end ( ) ; ) {
2015-08-10 16:18:06 +02:00
auto b = currentIds . find ( i - > first ) ;
if ( b = = currentIds . end ( ) ) {
2015-07-21 15:14:17 +02:00
printMsg ( lvlInfo , format ( " discarding cancelled build %1% " ) % i - > first ) ;
i = builds_ - > erase ( i ) ;
// FIXME: ideally we would interrupt active build steps here.
2015-08-10 16:18:06 +02:00
continue ;
}
if ( i - > second - > globalPriority < b - > second ) {
printMsg ( lvlInfo , format ( " priority of build %1% increased " ) % i - > first ) ;
i - > second - > globalPriority = b - > second ;
i - > second - > propagatePriorities ( ) ;
}
+ + i ;
2015-07-21 15:14:17 +02:00
}
}
2016-02-11 15:59:47 +01:00
Step : : ptr State : : createStep ( ref < Store > store ,
2015-10-05 14:57:44 +02:00
Connection & conn , Build : : ptr build , const Path & drvPath ,
2015-07-21 15:14:17 +02:00
Build : : ptr referringBuild , Step : : ptr referringStep , std : : set < Path > & finishedDrvs ,
std : : set < Step : : ptr > & newSteps , std : : set < Step : : ptr > & newRunnable )
{
if ( finishedDrvs . find ( drvPath ) ! = finishedDrvs . end ( ) ) return 0 ;
/* Check if the requested step already exists. If not, create a
new step . In any case , make the step reachable from
referringBuild or referringStep . This is done atomically ( with
‘ steps ’ locked ) , to ensure that this step can never become
reachable from a new build after doBuildStep has removed it
from ‘ steps ’ . */
Step : : ptr step ;
bool isNew = false ;
{
auto steps_ ( steps . lock ( ) ) ;
/* See if the step already exists in ‘ steps’ and is not
stale . */
auto prev = steps_ - > find ( drvPath ) ;
if ( prev ! = steps_ - > end ( ) ) {
step = prev - > second . lock ( ) ;
/* Since ‘ step’ is a strong pointer, the referred Step
object won ' t be deleted after this . */
if ( ! step ) steps_ - > erase ( drvPath ) ; // remove stale entry
}
/* If it doesn't exist, create it. */
if ( ! step ) {
step = std : : make_shared < Step > ( ) ;
step - > drvPath = drvPath ;
isNew = true ;
}
auto step_ ( step - > state . lock ( ) ) ;
assert ( step_ - > created ! = isNew ) ;
if ( referringBuild )
step_ - > builds . push_back ( referringBuild ) ;
if ( referringStep )
step_ - > rdeps . push_back ( referringStep ) ;
( * steps_ ) [ drvPath ] = step ;
}
if ( ! isNew ) return step ;
printMsg ( lvlDebug , format ( " considering derivation ‘ %1%’ " ) % drvPath ) ;
/* Initialize the step. Note that the step may be visible in
‘ steps ’ before this point , but that doesn ' t matter because
it ' s not runnable yet , and other threads won ' t make it
runnable while step - > created = = false . */
step - > drv = readDerivation ( drvPath ) ;
2015-09-02 13:42:25 +02:00
2016-02-11 15:59:47 +01:00
step - > preferLocalBuild = step - > drv . willBuildLocally ( ) ;
2015-09-02 13:42:25 +02:00
2015-08-17 14:37:57 +02:00
step - > systemType = step - > drv . platform ;
2015-07-21 15:14:17 +02:00
{
auto i = step - > drv . env . find ( " requiredSystemFeatures " ) ;
2015-09-02 13:42:25 +02:00
StringSet features ;
if ( i ! = step - > drv . env . end ( ) )
features = step - > requiredSystemFeatures = tokenizeString < std : : set < std : : string > > ( i - > second ) ;
if ( step - > preferLocalBuild )
features . insert ( " local " ) ;
if ( ! features . empty ( ) ) {
2015-08-17 14:37:57 +02:00
step - > systemType + = " : " ;
2015-09-02 13:42:25 +02:00
step - > systemType + = concatStringsSep ( " , " , features ) ;
2015-08-17 14:37:57 +02:00
}
2015-07-21 15:14:17 +02:00
}
/* Are all outputs valid? */
bool valid = true ;
2016-02-11 15:59:47 +01:00
PathSet outputs = step - > drv . outputPaths ( ) ;
2015-10-05 14:57:44 +02:00
DerivationOutputs missing ;
PathSet missingPaths ;
for ( auto & i : step - > drv . outputs )
2015-07-21 15:14:17 +02:00
if ( ! store - > isValidPath ( i . second . path ) ) {
valid = false ;
2015-10-05 14:57:44 +02:00
missing [ i . first ] = i . second ;
missingPaths . insert ( i . second . path ) ;
}
/* Try to substitute the missing paths. Note: can't use the more
efficient querySubstitutablePaths ( ) here because upstream Hydra
servers don ' t allow it ( they have " WantMassQuery: 0 " ) . */
assert ( missing . size ( ) = = missingPaths . size ( ) ) ;
if ( ! missing . empty ( ) & & settings . useSubstitutes ) {
SubstitutablePathInfos infos ;
store - > querySubstitutablePathInfos ( missingPaths , infos ) ;
if ( infos . size ( ) = = missingPaths . size ( ) ) {
valid = true ;
for ( auto & i : missing ) {
try {
printMsg ( lvlInfo , format ( " substituting output ‘ %1%’ of ‘ %2%’ " ) % i . second . path % drvPath ) ;
time_t startTime = time ( 0 ) ;
store - > ensurePath ( i . second . path ) ;
time_t stopTime = time ( 0 ) ;
{
pqxx : : work txn ( conn ) ;
createSubstitutionStep ( txn , startTime , stopTime , build , drvPath , " out " , i . second . path ) ;
txn . commit ( ) ;
}
} catch ( Error & e ) {
valid = false ;
break ;
}
}
2015-07-21 15:14:17 +02:00
}
}
// FIXME: check whether all outputs are in the binary cache.
if ( valid ) {
finishedDrvs . insert ( drvPath ) ;
return 0 ;
}
/* No, we need to build. */
printMsg ( lvlDebug , format ( " creating build step ‘ %1%’ " ) % drvPath ) ;
newSteps . insert ( step ) ;
/* Create steps for the dependencies. */
for ( auto & i : step - > drv . inputDrvs ) {
2015-10-05 14:57:44 +02:00
auto dep = createStep ( store , conn , build , i . first , 0 , step , finishedDrvs , newSteps , newRunnable ) ;
2015-07-21 15:14:17 +02:00
if ( dep ) {
auto step_ ( step - > state . lock ( ) ) ;
step_ - > deps . insert ( dep ) ;
}
}
/* If the step has no (remaining) dependencies, make it
runnable . */
{
auto step_ ( step - > state . lock ( ) ) ;
assert ( ! step_ - > created ) ;
step_ - > created = true ;
if ( step_ - > deps . empty ( ) )
newRunnable . insert ( step ) ;
}
return step ;
}
2015-08-11 01:30:24 +02:00
Jobset : : ptr State : : createJobset ( pqxx : : work & txn ,
const std : : string & projectName , const std : : string & jobsetName )
{
auto p = std : : make_pair ( projectName , jobsetName ) ;
2015-08-12 13:17:56 +02:00
{
auto jobsets_ ( jobsets . lock ( ) ) ;
auto i = jobsets_ - > find ( p ) ;
if ( i ! = jobsets_ - > end ( ) ) return i - > second ;
}
2015-08-11 01:30:24 +02:00
auto res = txn . parameterized
( " select schedulingShares from Jobsets where project = $1 and name = $2 " )
( projectName ) ( jobsetName ) . exec ( ) ;
if ( res . empty ( ) ) throw Error ( " missing jobset - can't happen " ) ;
auto shares = res [ 0 ] [ " schedulingShares " ] . as < unsigned int > ( ) ;
2015-08-12 13:17:56 +02:00
auto jobset = std : : make_shared < Jobset > ( ) ;
jobset - > setShares ( shares ) ;
2015-08-11 01:30:24 +02:00
/* Load the build steps from the last 24 hours. */
res = txn . parameterized
( " select s.startTime, s.stopTime from BuildSteps s join Builds b on build = id "
" where s.startTime is not null and s.stopTime > $1 and project = $2 and jobset = $3 " )
( time ( 0 ) - Jobset : : schedulingWindow * 10 ) ( projectName ) ( jobsetName ) . exec ( ) ;
for ( auto const & row : res ) {
time_t startTime = row [ " startTime " ] . as < time_t > ( ) ;
time_t stopTime = row [ " stopTime " ] . as < time_t > ( ) ;
jobset - > addStep ( startTime , stopTime - startTime ) ;
}
2015-08-12 13:17:56 +02:00
auto jobsets_ ( jobsets . lock ( ) ) ;
// Can't happen because only this thread adds to "jobsets".
assert ( jobsets_ - > find ( p ) = = jobsets_ - > end ( ) ) ;
2015-08-11 01:30:24 +02:00
( * jobsets_ ) [ p ] = jobset ;
return jobset ;
}
2015-08-12 13:17:56 +02:00
void State : : processJobsetSharesChange ( Connection & conn )
{
/* Get the current set of jobsets. */
pqxx : : work txn ( conn ) ;
auto res = txn . exec ( " select project, name, schedulingShares from Jobsets " ) ;
for ( auto const & row : res ) {
auto jobsets_ ( jobsets . lock ( ) ) ;
auto i = jobsets_ - > find ( std : : make_pair ( row [ " project " ] . as < string > ( ) , row [ " name " ] . as < string > ( ) ) ) ;
if ( i = = jobsets_ - > end ( ) ) continue ;
i - > second - > setShares ( row [ " schedulingShares " ] . as < unsigned int > ( ) ) ;
}
}