99from dataclasses import dataclass
1010from datetime import UTC , datetime
1111from pathlib import Path
12- from typing import TYPE_CHECKING , Any
12+ from typing import TYPE_CHECKING , Any , Iterable
1313import typing
1414import copy
1515
@@ -86,10 +86,11 @@ class BuildTrigger(buildstep.ShellMixin, steps.BuildStep):
8686 running : bool
8787 wait_for_finish_deferred : defer .Deferred [tuple [list [int ], int ]] | None
8888 brids : list [int ]
89+ consumers : dict [int , Any ]
8990
9091 @dataclass
9192 class ScheduledJob :
92- job : NixEvalJobSuccess
93+ job : NixEvalJob
9394 builder_ids : dict [int , int ]
9495 results : defer .Deferred [list [int ]]
9596
@@ -135,6 +136,7 @@ def __init__(
135136 None
136137 )
137138 self .brids = []
139+ self .consumers = {}
138140 super ().__init__ (** kwargs )
139141
140142 def interrupt (self , reason : str | Failure ) -> None :
@@ -267,6 +269,9 @@ def schedule(
267269 yield self .addURL (f"{ scheduler .name } #{ brid } " , url )
268270 self ._add_results (brid )
269271
272+ if not self .combine_builds :
273+ self .produceEventForBuildRequestsById (brids .values (), "started-nix-build" , None )
274+
270275 return brids , results_deferred
271276
272277 @defer .inlineCallbacks
@@ -351,31 +356,72 @@ def get_failed_dependents(
351356 return removed
352357
353358 @defer .inlineCallbacks
354- def produceEventForBuilds (
359+
360+ @defer .inlineCallbacks
361+ def produceEventForBuildRequestsById (
355362 self ,
356- build_ids : list [int ],
363+ buildrequest_ids : Iterable [int ],
357364 event : str ,
358365 result : None | int ,
359366 ) -> Generator [Any , object , None ]:
360- for build_id in build_ids :
361- build = yield self .master .data .get (('builds' , str (build_id )))
362- buildT = typing .cast (dict [str , Any ], build )
363- if result is not None :
364- buildT ["results" ] = result
365- self .master .mq .produce (("builds" , str (build_id ), event ), copy .deepcopy (buildT ))
367+ buildrequest_ids = list (buildrequest_ids )
368+
369+ def gotBuild (key : int , build : Any ) -> None :
370+ if build ['buildrequestid' ] in buildrequest_ids :
371+ log .info ("got build {key} : {build}" , key = key , build = build )
372+ # we only care about the first started build
373+ buildrequest_ids .remove (int (build ['buildrequestid' ]))
374+ if not buildrequest_ids :
375+ del self .consumers [build ['buildrequestid' ]]
376+ self .produceEventForBuild (event , build , result )
377+
378+ for buildrequest_id in buildrequest_ids :
379+ builds : Any = yield self .master .data .get (('buildrequests' , str (buildrequest_id ), 'builds' ))
380+
381+ if not builds :
382+ # only subscribe to new builds, if there are no existing builds
383+ self .consumers [buildrequest_id ] = yield self .master .mq .startConsuming (
384+ gotBuild , ('builds' , None , None )
385+ )
386+ # send starts for any exsting builds
387+ for build in builds :
388+ self .produceEventForBuild (event , build , result )
366389
367390 @defer .inlineCallbacks
368- def produceEvent (
391+ def produceEventForBuildById (
369392 self ,
370393 event : str ,
394+ build_id : int ,
371395 result : None | int ,
372396 ) -> Generator [Any , object , None ]:
373- yield self .produceEventForBuilds ([self .build .buildid ], event , result )
397+ build : Any = yield self .master .data .get (('builds' , str (build_id )))
398+ self .produceEventForBuild (event , build , result )
399+
400+ def produceEventForBuild (
401+ self ,
402+ event : str ,
403+ build : Any ,
404+ result : None | int ,
405+ ) -> None :
406+ if result is not None :
407+ build ["results" ] = result
408+ self .master .mq .produce (("builds" , str (build ['buildid' ]), event ), copy .deepcopy (build ))
374409
375410 @defer .inlineCallbacks
376411 def run (self ) -> Generator [Any , Any , None ]:
412+ """
413+ This function implements a relatively simple scheduling algorithm. At the start we compute the
414+ interdependencies between each of the jobs we want to run and at every iteration we schedule those
415+ who's dependencies have completed successfully. If a job fails, we recursively fail every jobs which
416+ depends on it.
417+ We also run fake builds for failed evaluations so that they nicely show up in the UI and also Forge
418+ reports.
419+ """
377420 if self .combine_builds :
378- self .produceEvent ("started-nix-build" , None )
421+ self .produceEventForBuildById ("started-nix-build" , self .build .buildid , None )
422+
423+ done : list [BuildTrigger .DoneJob ] = []
424+ scheduled : list [BuildTrigger .ScheduledJob ] = []
379425
380426 self .running = True
381427 build_props = self .build .getProperties ()
@@ -384,15 +430,19 @@ def run(self) -> Generator[Any, Any, None]:
384430
385431 # inject failed buildsteps for any failed eval jobs we got
386432 overall_result = SUCCESS if not self .failed_jobs else util .FAILURE
433+ # inject failed buildsteps for any failed eval jobs we got
387434 if self .failed_jobs :
388435 scheduler_log .addStdout ("The following jobs failed to evaluate:\n " )
389- for failed_job in self .failed_jobs :
390- scheduler_log .addStdout (f"\t - { failed_job .attr } failed eval\n " )
391- brids , _ = yield self .schedule (
392- ss_for_trigger ,
393- * self .schedule_eval_failure (failed_job ),
394- )
395- self .brids .extend (brids )
436+ for failed_job in self .failed_jobs :
437+ scheduler_log .addStdout (f"\t - { failed_job .attr } failed eval\n " )
438+ brids , results_deferred = yield self .schedule (
439+ ss_for_trigger ,
440+ * self .schedule_eval_failure (failed_job ),
441+ )
442+ scheduled .append (
443+ BuildTrigger .ScheduledJob (failed_job , brids , results_deferred )
444+ )
445+ self .brids .extend (brids )
396446
397447 # get all job derivations
398448 job_set = {job .drvPath for job in self .successful_jobs }
@@ -411,8 +461,6 @@ def run(self) -> Generator[Any, Any, None]:
411461 self .successful_jobs , job_closures
412462 )
413463
414- done : list [BuildTrigger .DoneJob ] = []
415- scheduled : list [BuildTrigger .ScheduledJob ] = []
416464 while build_schedule_order or scheduled :
417465 scheduler_log .addStdout ("Scheduling...\n " )
418466
@@ -437,8 +485,6 @@ def run(self) -> Generator[Any, Any, None]:
437485 scheduled .append (
438486 BuildTrigger .ScheduledJob (build , brids , results_deferred )
439487 )
440- if not self .combine_builds :
441- self .produceEventForBuilds (brids .values (), "started-nix-build" , None )
442488 self .brids .extend (brids .values ())
443489 elif failed_build is not None and self .build .reason == "rebuild" :
444490 failed_builds .remove_build (build .drvPath )
@@ -467,8 +513,6 @@ def run(self) -> Generator[Any, Any, None]:
467513 BuildTrigger .ScheduledJob (job , brids , results_deferred )
468514 )
469515
470- if not self .combine_builds :
471- self .produceEventForBuilds (brids .values (), "started-nix-build" , None )
472516 self .brids .extend (brids .values ())
473517
474518 scheduler_log .addStdout ("Waiting...\n " )
@@ -491,46 +535,49 @@ def run(self) -> Generator[Any, Any, None]:
491535 scheduler_log .addStdout (
492536 f"Found finished build { job .attr } , result { util .Results [result ].upper ()} \n "
493537 )
538+ log .info (
539+ f"Found finished build { job .attr } , result { util .Results [result ].upper ()} , brids: { brids } "
540+ )
494541 if not self .combine_builds :
495- self .produceEventForBuilds (brids .values (), "finished-nix-build" , result )
542+ self .produceEventForBuildRequestsById (brids .values (), "finished-nix-build" , result )
496543
497544 # if it failed, remove all dependent jobs, schedule placeholders and add them to the list of scheduled jobs
498- if result != SUCCESS :
499- failed_builds .add_build (job .drvPath , datetime .now (tz = UTC ))
545+ if isinstance (job , NixEvalJobSuccess ):
546+ if result != SUCCESS :
547+ failed_builds .add_build (job .drvPath , datetime .now (tz = UTC ))
500548
501- removed = self .get_failed_dependents (
502- job , build_schedule_order , job_closures
503- )
504- for removed_job in removed :
505- scheduler , props = self .schedule_dependency_failed (
506- removed_job , job
507- )
508- brids , results_deferred = yield self .schedule (
509- ss_for_trigger , scheduler , props
549+ removed = self .get_failed_dependents (
550+ job , build_schedule_order , job_closures
510551 )
511- build_schedule_order .remove (removed_job )
512- scheduled .append (
513- BuildTrigger .ScheduledJob (removed_job , brids , results_deferred )
552+ for removed_job in removed :
553+ scheduler , props = self .schedule_dependency_failed (
554+ removed_job , job
555+ )
556+ brids , results_deferred = yield self .schedule (
557+ ss_for_trigger , scheduler , props
558+ )
559+ build_schedule_order .remove (removed_job )
560+ scheduled .append (
561+ BuildTrigger .ScheduledJob (removed_job , brids , results_deferred )
562+ )
563+ self .brids .extend (brids .values ())
564+ scheduler_log .addStdout (
565+ "\t - removed jobs: "
566+ + ", " .join ([job .drvPath for job in removed ])
567+ + "\n "
514568 )
515- if not self .combine_builds :
516- self .produceEventForBuilds (brids .values (), "started-nix-build" , None )
517- self .brids .extend (brids .values ())
518- scheduler_log .addStdout (
519- "\t - removed jobs: "
520- + ", " .join ([job .drvPath for job in removed ])
521- + "\n "
522- )
569+
570+ for dep in job_closures :
571+ if job .drvPath in job_closures [dep ]:
572+ job_closures [dep ].remove (job .drvPath )
523573
524574 overall_result = worst_status (result , overall_result )
525575 scheduler_log .addStdout (
526576 f"\t - new result: { util .Results [overall_result ].upper ()} \n "
527577 )
528578
529- for dep in job_closures :
530- if job .drvPath in job_closures [dep ]:
531- job_closures [dep ].remove (job .drvPath )
532579 if self .combine_builds :
533- self .produceEvent ("finished-nix-build" , overall_result )
580+ self .produceEventForBuildById ("finished-nix-build" , self . build . buildid , overall_result )
534581 scheduler_log .addStdout ("Done!\n " )
535582 return overall_result
536583
0 commit comments