6
6
import html
7
7
import json
8
8
from dataclasses import dataclass
9
- from typing import TYPE_CHECKING , Any
9
+ from typing import TYPE_CHECKING , Any , cast
10
10
11
11
from buildbot .interfaces import WorkerSetupError
12
12
from buildbot .plugins import steps , util
18
18
from .build_trigger import BuildTrigger , JobsConfig , TriggerConfig
19
19
from .errors import BuildbotNixError
20
20
from .models import (
21
+ CacheStatus ,
21
22
NixEvalJob ,
22
23
NixEvalJobError ,
23
24
NixEvalJobModel ,
28
29
from .repo_config import BranchConfig
29
30
30
31
if TYPE_CHECKING :
32
+ from collections .abc import AsyncGenerator , Sequence
33
+
31
34
from buildbot .config .builder import BuilderConfig
32
35
from buildbot .locks import MasterLock
33
36
from buildbot .process .log import StreamLog
@@ -172,6 +175,49 @@ async def run(self) -> int:
172
175
173
176
branch_config : BranchConfig = await BranchConfig .extract_during_step (self )
174
177
178
+ # Check if this is a rebuild and try to reuse evaluation from original build
179
+ if not self .build or not self .build .requests :
180
+ log .info ("No build requests available, skipping rebuild check" )
181
+ else :
182
+ buildset_id = self .build .requests [0 ].bsid
183
+ if buildset_id is not None :
184
+ buildset = await self .master .data .get (("buildsets" , str (buildset_id )))
185
+ if (
186
+ buildset
187
+ and (rebuilt_buildid := buildset .get ("rebuilt_buildid" )) is not None
188
+ ):
189
+ # This is a rebuild - try to reuse evaluation from original build
190
+ jobs = await self ._reconstruct_jobs_from_rebuild (rebuilt_buildid )
191
+ if jobs is not None :
192
+ # Successfully reconstructed jobs, process them
193
+ await self ._process_jobs_and_trigger_builds (jobs , branch_config )
194
+ result = util .SUCCESS
195
+ if self .build :
196
+ await CombinedBuildEvent .produce_event_for_build (
197
+ self .master ,
198
+ CombinedBuildEvent .FINISHED_NIX_EVAL ,
199
+ self .build ,
200
+ result ,
201
+ warnings_count = self .warnings_count ,
202
+ )
203
+ return result
204
+
205
+ # Either not a rebuild or reconstruction failed - run full evaluation
206
+ result = await self ._run_nix_eval_jobs (branch_config )
207
+
208
+ if self .build :
209
+ await CombinedBuildEvent .produce_event_for_build (
210
+ self .master ,
211
+ CombinedBuildEvent .FINISHED_NIX_EVAL ,
212
+ self .build ,
213
+ result ,
214
+ warnings_count = self .warnings_count ,
215
+ )
216
+
217
+ return result
218
+
219
+ async def _run_nix_eval_jobs (self , branch_config : BranchConfig ) -> int :
220
+ """Run nix-eval-jobs and process the results."""
175
221
# run nix-eval-jobs --flake .#checks to generate the dict of stages
176
222
# !! Careful, the command attribute has to be specified here as the call
177
223
# !! to `makeRemoteShellCommand` inside `BranchConfig.extract_during_step`
@@ -214,16 +260,8 @@ async def run(self) -> int:
214
260
# Process warnings if any
215
261
result = await self ._process_warnings (result , branch_config = branch_config )
216
262
217
- if self .build :
218
- await CombinedBuildEvent .produce_event_for_build (
219
- self .master ,
220
- CombinedBuildEvent .FINISHED_NIX_EVAL ,
221
- self .build ,
222
- result ,
223
- warnings_count = self .warnings_count ,
224
- )
225
263
if result in (util .SUCCESS , util .WARNINGS ):
226
- # create a ShellCommand for each stage and add them to the build
264
+ # Parse the nix-eval-jobs output
227
265
jobs : list [NixEvalJob ] = []
228
266
229
267
for line in self .observer .getStdout ().split ("\n " ):
@@ -235,48 +273,184 @@ async def run(self) -> int:
235
273
raise BuildbotNixError (msg ) from e
236
274
jobs .append (NixEvalJobModel .validate_python (job ))
237
275
238
- failed_jobs : list [ NixEvalJobError ] = []
239
- successful_jobs : list [ NixEvalJobSuccess ] = []
276
+ # Process jobs and trigger builds
277
+ await self . _process_jobs_and_trigger_builds ( jobs , branch_config )
240
278
241
- for job in jobs :
242
- # report unbuildable jobs
243
- if isinstance (job , NixEvalJobError ):
244
- failed_jobs .append (job )
245
- elif (
246
- job .system in self .nix_eval_config .supported_systems
247
- and isinstance (job , NixEvalJobSuccess )
248
- ):
249
- successful_jobs .append (job )
279
+ return result
250
280
251
- self .number_of_jobs = len (successful_jobs )
281
+ async def _process_jobs_and_trigger_builds (
282
+ self , jobs : list [NixEvalJob ], branch_config : BranchConfig
283
+ ) -> None :
284
+ """Process jobs and trigger builds. Used by both normal eval and rebuild paths."""
285
+ failed_jobs : list [NixEvalJobError ] = []
286
+ successful_jobs : list [NixEvalJobSuccess ] = []
252
287
253
- if self .build :
254
- trigger_config = TriggerConfig (
255
- builds_scheduler = f"{ self .project .project_id } -nix-build" ,
256
- failed_eval_scheduler = f"{ self .project .project_id } -nix-failed-eval" ,
257
- dependency_failed_scheduler = f"{ self .project .project_id } -nix-dependency-failed" ,
258
- cached_failure_scheduler = f"{ self .project .project_id } -nix-cached-failure" ,
259
- )
288
+ for job in jobs :
289
+ # report unbuildable jobs
290
+ if isinstance (job , NixEvalJobError ):
291
+ failed_jobs .append (job )
292
+ elif job .system in self .nix_eval_config .supported_systems and isinstance (
293
+ job , NixEvalJobSuccess
294
+ ):
295
+ successful_jobs .append (job )
260
296
261
- jobs_config = JobsConfig (
262
- successful_jobs = successful_jobs ,
263
- failed_jobs = failed_jobs ,
264
- cache_failed_builds = self .nix_eval_config .cache_failed_builds ,
265
- failed_build_report_limit = self .nix_eval_config .failed_build_report_limit ,
266
- )
267
- self .build .addStepsAfterCurrentStep (
268
- [
269
- BuildTrigger (
270
- project = self .project ,
271
- trigger_config = trigger_config ,
272
- jobs_config = jobs_config ,
273
- nix_attr_prefix = branch_config .attribute ,
274
- name = "build flake" ,
275
- ),
276
- ]
297
+ self .number_of_jobs = len (successful_jobs )
298
+
299
+ if self .build :
300
+ trigger_config = TriggerConfig (
301
+ builds_scheduler = f"{ self .project .project_id } -nix-build" ,
302
+ failed_eval_scheduler = f"{ self .project .project_id } -nix-failed-eval" ,
303
+ dependency_failed_scheduler = f"{ self .project .project_id } -nix-dependency-failed" ,
304
+ cached_failure_scheduler = f"{ self .project .project_id } -nix-cached-failure" ,
305
+ )
306
+
307
+ jobs_config = JobsConfig (
308
+ successful_jobs = successful_jobs ,
309
+ failed_jobs = failed_jobs ,
310
+ cache_failed_builds = self .nix_eval_config .cache_failed_builds ,
311
+ failed_build_report_limit = self .nix_eval_config .failed_build_report_limit ,
312
+ )
313
+ self .build .addStepsAfterCurrentStep (
314
+ [
315
+ BuildTrigger (
316
+ project = self .project ,
317
+ trigger_config = trigger_config ,
318
+ jobs_config = jobs_config ,
319
+ nix_attr_prefix = branch_config .attribute ,
320
+ name = "build flake" ,
321
+ ),
322
+ ]
323
+ )
324
+
325
+ async def _check_store_paths_batch (
326
+ self , paths : list [str ], batch_size : int = 1000
327
+ ) -> AsyncGenerator [bool , None ]:
328
+ """Check validity of store paths in batches. Yields validity status for each path."""
329
+ for i in range (0 , len (paths ), batch_size ):
330
+ batch_paths = paths [i : i + batch_size ]
331
+ cmd = await self .makeRemoteShellCommand (
332
+ command = ["nix-store" , "--check-validity" , * batch_paths ],
333
+ collectStdout = True ,
334
+ collectStderr = False ,
335
+ )
336
+ await self .runCommand (cmd )
337
+
338
+ if cmd .results () == util .SUCCESS :
339
+ # All paths in batch are valid
340
+ for _ in batch_paths :
341
+ yield True
342
+ else :
343
+ # Check individually to find which are valid
344
+ for path in batch_paths :
345
+ cmd = await self .makeRemoteShellCommand (
346
+ command = ["nix-store" , "--check-validity" , path ],
347
+ collectStdout = False ,
348
+ collectStderr = False ,
349
+ )
350
+ await self .runCommand (cmd )
351
+ yield cmd .results () == util .SUCCESS
352
+
353
+ async def _reconstruct_job_from_build (
354
+ self , build_id : int , original_build_id : int
355
+ ) -> tuple [NixEvalJobSuccess , str ] | None :
356
+ """Validate and reconstruct a NixEvalJob from build properties.
357
+
358
+ Returns tuple of (job, out_path) or None if validation fails.
359
+ """
360
+ props = await self .master .db .builds .getBuildProperties (build_id )
361
+ required_props = ["attr" , "drv_path" , "out_path" , "system" ]
362
+
363
+ # Validate required properties
364
+ for prop in required_props :
365
+ if prop not in props or props [prop ][0 ] is None :
366
+ log .info (
367
+ f"Cannot reconstruct job from build { original_build_id } : missing required property '{ prop } '"
277
368
)
369
+ return None
370
+
371
+ # Extract properties
372
+ attr = props ["attr" ][0 ]
373
+ drv_path = props ["drv_path" ][0 ]
374
+ out_path = props ["out_path" ][0 ]
375
+ system = props ["system" ][0 ]
376
+
377
+ job = NixEvalJobSuccess (
378
+ attr = attr ,
379
+ attrPath = attr .split ("." ),
380
+ drvPath = drv_path ,
381
+ outputs = {"out" : out_path },
382
+ system = system ,
383
+ name = attr ,
384
+ cacheStatus = CacheStatus .notBuilt ,
385
+ neededBuilds = [],
386
+ neededSubstitutes = [],
387
+ )
278
388
279
- return result
389
+ return job , out_path
390
+
391
+ async def _reconstruct_jobs_from_rebuild (
392
+ self , original_build_id : int
393
+ ) -> list [NixEvalJob ] | None :
394
+ """Reconstruct job list from the original build's triggered builds."""
395
+ # Get all builds triggered by the original eval
396
+ triggered_builds = await self .master .db .builds .get_triggered_builds (
397
+ original_build_id
398
+ )
399
+
400
+ if not triggered_builds :
401
+ return None
402
+
403
+ # Reconstruct all jobs
404
+ jobs = []
405
+ outputs_to_check = []
406
+
407
+ for build in triggered_builds :
408
+ result = await self ._reconstruct_job_from_build (build .id , original_build_id )
409
+ if result is None :
410
+ # Missing required properties, can't reconstruct
411
+ return None
412
+
413
+ job , out_path = result
414
+ jobs .append (job )
415
+
416
+ # Collect outputs that need checking
417
+ if build .results == util .SUCCESS and out_path :
418
+ outputs_to_check .append ((job , out_path ))
419
+
420
+ # Batch check which outputs still exist
421
+ if outputs_to_check :
422
+ output_paths = [path for _ , path in outputs_to_check ]
423
+
424
+ # Process validity results as they come from the generator
425
+ validity_iter = self ._check_store_paths_batch (output_paths )
426
+ i = 0
427
+ async for is_valid in validity_iter :
428
+ if is_valid :
429
+ outputs_to_check [i ][0 ].cacheStatus = CacheStatus .local
430
+ i += 1
431
+
432
+ # Verify derivations exist for jobs that need rebuilding
433
+ jobs_to_rebuild = [job for job in jobs if job .cacheStatus != CacheStatus .local ]
434
+
435
+ if jobs_to_rebuild and not await self ._verify_derivations_exist (
436
+ jobs_to_rebuild
437
+ ):
438
+ return None
439
+
440
+ self .descriptionDone = [f"reused eval from build { original_build_id } " ]
441
+ return cast ("list[NixEvalJobError | NixEvalJobSuccess]" , jobs )
442
+
443
+ async def _verify_derivations_exist (
444
+ self , jobs : Sequence [NixEvalJobError | NixEvalJobSuccess ]
445
+ ) -> bool :
446
+ """Verify all derivations exist for the given jobs."""
447
+ drv_paths = [job .drvPath for job in jobs if isinstance (job , NixEvalJobSuccess )]
448
+
449
+ # Check all derivations - if any is invalid, return False
450
+ async for is_valid in self ._check_store_paths_batch (drv_paths ):
451
+ if not is_valid :
452
+ return False
453
+ return True
280
454
281
455
async def _process_warnings (self , result : int , branch_config : BranchConfig ) -> int :
282
456
"""Process stderr output for warnings and update build status."""
0 commit comments