@@ -417,6 +417,9 @@ def _get_step_job(self, *, step: dict[str, Any]) -> dict[str, Any]:
417
417
# the keys "collection", "job", and "version". Here we assume that
418
418
# the workflow definition has passed the RUN-level validation
419
419
# which means we can get these values.
420
+ #
421
+ # The validator should have verified the Job exists, but it might not
422
+ # when we need it - so this method might return '{}'.
420
423
assert "specification" in step
421
424
step_spec : dict [str , Any ] = step ["specification" ]
422
425
job_collection : str = step_spec ["collection" ]
@@ -464,6 +467,12 @@ def _prepare_step(
464
467
# whose origin is of type 'files'.
465
468
466
469
our_job_definition : dict [str , Any ] = self ._get_step_job (step = step_definition )
470
+ if not our_job_definition :
471
+ return StepPreparationResponse (
472
+ replicas = 0 ,
473
+ error_num = 1 ,
474
+ error_msg = f"The Job for step '{ step_name } ' is not present" ,
475
+ )
467
476
our_inputs : dict [str , Any ] = job_definition_decoder .get_inputs (
468
477
our_job_definition
469
478
)
@@ -540,7 +549,7 @@ def _prepare_step(
540
549
)
541
550
return StepPreparationResponse (
542
551
replicas = 0 ,
543
- error_num = 1 ,
552
+ error_num = 2 ,
544
553
error_msg = f"Prior instance of step '{ step_name_being_combined } ' has failed" ,
545
554
)
546
555
@@ -661,14 +670,16 @@ def _prepare_step(
661
670
# we give the step's Job command and our prime variables
662
671
# to the Job decoder - it wil tell us if an important
663
672
# variable is missing....
664
- job : dict [str , Any ] = self ._get_step_job (step = step_definition )
665
673
message , success = job_definition_decoder .decode (
666
- job ["command" ], prime_variables , "command" , TextEncoding .JINJA2_3_0
674
+ our_job_definition ["command" ],
675
+ prime_variables ,
676
+ "command" ,
677
+ TextEncoding .JINJA2_3_0 ,
667
678
)
668
679
if not success :
669
680
msg = f"Failed command validation for step { step_name } error_msg={ message } "
670
681
_LOGGER .warning (msg )
671
- return StepPreparationResponse (replicas = 0 , error_num = 2 , error_msg = msg )
682
+ return StepPreparationResponse (replicas = 0 , error_num = 3 , error_msg = msg )
672
683
673
684
# Do we replicate this step (run it more than once in parallel)?
674
685
#
@@ -699,6 +710,12 @@ def _prepare_step(
699
710
wf_step : dict [str , Any ] = get_step (wf , p_step_name )
700
711
assert wf_step
701
712
job_definition : dict [str , Any ] = self ._get_step_job (step = wf_step )
713
+ if not job_definition :
714
+ return StepPreparationResponse (
715
+ replicas = 0 ,
716
+ error_num = 4 ,
717
+ error_msg = f"The Job for step '{ p_step_name } ' is not present" ,
718
+ )
702
719
jd_outputs : dict [str , Any ] = job_definition_decoder .get_outputs (
703
720
job_definition
704
721
)
0 commit comments