@@ -406,39 +406,51 @@ def _prepare_step(
406
406
407
407
# Before we move on, are we a combiner?
408
408
#
409
- # We are if a variable in our step's plumbing refers to an input of ours
410
- # that is of type 'files'. If we are a combiner then we use the name of the
409
+ # Why?
410
+ #
411
+ # A combiner's execution is based on the possible concurrent execution
412
+ # of one (or more) prior steps. If we are a combiner then we use the name of the
411
413
# step we are combining (there can only be one) so that we can ensure
412
- # all its step instances have finished (successfully). We cannot
413
- # move on until all the files we depend on are ready.
414
+ # all its step instances have finished (successfully) before continuing.
415
+ #
416
+ # We are a combiner if a variable in our step's plumbing refers to an input
417
+ # whose origin is of type 'files'.
414
418
415
419
our_job_definition : dict [str , Any ] = self ._get_step_job (step = step_definition )
416
420
our_inputs : dict [str , Any ] = job_defintion_decoder .get_inputs (
417
421
our_job_definition
418
422
)
423
+ # get all our step connections that relate to prior steps.
424
+ # If we're a combiner we will have variables based on prior steps.
419
425
plumbing_of_prior_steps : dict [str , list [Connector ]] = (
420
426
get_step_prior_step_connections (step_definition = step_definition )
421
427
)
422
- step_is_combiner : bool = False
428
+
429
+ we_are_a_combiner : bool = False
430
+
431
+ # What step might we be comnining?
432
+ # It'll remain None afdter the next block if we're not combining.
423
433
step_name_being_combined : str | None = None
434
+ # If we are a combiner, what is the varaible (identifying a set of files)
435
+ # that is bing combined? There can only be one.
424
436
combiner_input_variable : str | None = None
425
- num_step_recplicas_being_combined : int = 0
426
437
for p_step_name , connections in plumbing_of_prior_steps .items ():
427
438
for connector in connections :
428
439
if our_inputs .get (connector .out , {}).get ("type" ) == "files" :
429
440
step_name_being_combined = p_step_name
430
441
combiner_input_variable = connector .out
431
- step_is_combiner = True
442
+ we_are_a_combiner = True
432
443
break
433
444
if step_name_being_combined :
434
445
break
435
446
436
- if step_is_combiner :
447
+ # If we are a combiner
448
+ # we must make suer that all the step instances we're combining are done.
449
+ # If not, we must leave.
450
+ if we_are_a_combiner :
437
451
assert step_name_being_combined
438
452
assert combiner_input_variable
439
453
440
- # Are all the step instances we're combining done?
441
-
442
454
response , _ = self ._wapi_adapter .get_status_of_all_step_instances_by_name (
443
455
name = step_name_being_combined ,
444
456
running_workflow_id = rwf_id ,
@@ -448,9 +460,12 @@ def _prepare_step(
448
460
assert num_step_recplicas_being_combined > 0
449
461
assert "status" in response
450
462
451
- # Assume they're all done
463
+ # Assume all the dependent prior step instnaces are done
452
464
# and undo our assumption if not...
453
465
all_step_instances_done : bool = True
466
+
467
+ # If anything' still running we must leave.
468
+ # If anything's failed we must 'fail' the running workflow.
454
469
all_step_instances_successful : bool = True
455
470
for status in response ["status" ]:
456
471
if not status ["done" ]:
@@ -460,7 +475,7 @@ def _prepare_step(
460
475
all_step_instances_successful = False
461
476
break
462
477
if not all_step_instances_done :
463
- # Can't move on - other steps need to finish.
478
+ # Can't move on - instances still need to finish
464
479
_LOGGER .debug (
465
480
"Assessing start of combiner step (%s)"
466
481
" but not all steps (%s) to be combined are done" ,
@@ -469,8 +484,7 @@ def _prepare_step(
469
484
)
470
485
return StepPreparationResponse (replicas = 0 )
471
486
elif not all_step_instances_successful :
472
- # Can't move on - all prior steps are done,
473
- # but at least one was not successful.
487
+ # Can't move on - at least one instance was not successful
474
488
_LOGGER .warning (
475
489
"Assessing start of combiner step (%s)"
476
490
" but at least one step (%s) to be combined failed" ,
@@ -483,51 +497,52 @@ def _prepare_step(
483
497
error_msg = f"Prior instance of step '{ step_name_being_combined } ' has failed" ,
484
498
)
485
499
486
- # I think we can start this step,
487
- # so compile a set of variables for it.
500
+ # We're not a cmbiner or we are
501
+ # (and all the dependent instances have completed successfully).
502
+ # We can now compile a set of variables for it.
488
503
489
- # Outputs - a list of step files that are outputs,
490
- # and also designated as workflow outputs.
491
- # Any step can write files to the Projetc directory
492
- # but only job outputs that are also workflow outputs
493
- # are put in this list.
504
+ # Outputs - a list of step files that are workflow outputs.
505
+ # Any step can write files to the Project directory
506
+ # but this only consists of job outputs that are also workflow outputs.
494
507
outputs : set [str ] = set ()
495
508
496
- # Start with any variables provided in the step's specification.
497
- # A map that we will add to (and maybe even over-write)...
498
- variables : dict [str , Any ] = step_definition ["specification" ].get (
509
+ # Our initial set of variables begins with the variables provided in the step's
510
+ # specification. It is a map that we will add to and then (eventually)
511
+ # pass to the instance launcher. Here we refer to them as 'prime_variables'.
512
+ prime_variables : dict [str , Any ] = step_definition ["specification" ].get (
499
513
"variables" , {}
500
514
)
501
- # ...and the running workflow variables
515
+ # The variables provided by the user when running the workflow
516
+ # (the running workflow variables)...
502
517
rwf_variables : dict [str , Any ] = rwf .get ("variables" , {})
503
518
504
- # Process the step's "plumbing" relating to workflow variables.
519
+ # Adjust our prime variables by adding any values
520
+ # from workflow variables that are mentioned in the step's "plumbing".
505
521
#
506
- # This will be a list of Connectors of "in" and "out" variable names.
522
+ # The decoder gives us a list of 'Connectors' that are a par of variable
523
+ # names representing "in" (workflow) and "out" (step) variable names.
507
524
# "in" variables are worklfow variables, and "out" variables
508
- # are expected Job variables. We use this to add variables
509
- # to the variables map.
525
+ # are expected Stewp ( Job) variables. We use these connections to
526
+ # take workflow variables and puth them in our variables map.
510
527
for connector in get_step_workflow_variable_connections (
511
528
step_definition = step_definition
512
529
):
513
530
assert connector .in_ in rwf_variables
514
- variables [connector .out ] = rwf_variables [connector .in_ ]
531
+ prime_variables [connector .out ] = rwf_variables [connector .in_ ]
515
532
if is_workflow_output_variable (wf , connector .in_ ):
516
533
outputs .add (rwf_variables [connector .in_ ])
517
534
518
- # Process the step's "plumbing" relating to pre-defined variables.
535
+ # Add any pre-defined variables used in the step's "plumbing"
519
536
for connector in get_step_predefined_variable_connections (
520
537
step_definition = step_definition
521
538
):
522
539
assert connector .in_ in self ._predefined_variables
523
- variables [connector .out ] = self ._predefined_variables [connector .in_ ]
540
+ prime_variables [connector .out ] = self ._predefined_variables [connector .in_ ]
524
541
525
- # Now process variables (in the "plumbing" block)
526
- # that relate to values used in prior steps.
542
+ # Using the "plumbing" again add any that relate to values used in prior steps.
527
543
#
528
544
# The decoder gives us a map indexed by prior step name that's a list of
529
- # "in"/"out" connectors as above. If this is a combiner step remember
530
- # that the combiner_input_variable is a used as a list.
545
+ # "in"/"out" connectors as before.
531
546
prior_step_plumbing : dict [str , list [Connector ]] = (
532
547
get_step_prior_step_connections (step_definition = step_definition )
533
548
)
@@ -546,35 +561,50 @@ def _prepare_step(
546
561
# Copy "in" value to "out"...
547
562
for connector in connections :
548
563
assert connector .in_ in prior_step ["variables" ]
549
- variables [connector .out ] = prior_step ["variables" ][connector .in_ ]
564
+ prime_variables [connector .out ] = prior_step ["variables" ][connector .in_ ]
550
565
551
- # All variables are set ...
552
- # is this enough to satisfy the setp's Job command?
566
+ # The step's prime variables are now set.
553
567
568
+ # Before we return these to the claler do we have enough
569
+ # to satisfy the step Job's command? It's a simple check -
570
+ # we give the step's Job command and our prime variables
571
+ # to the Job decoder - it wil tell us if an inportnat
572
+ # variable is missing....
554
573
job : dict [str , Any ] = self ._get_step_job (step = step_definition )
555
574
message , success = job_defintion_decoder .decode (
556
- job ["command" ], variables , "command" , TextEncoding .JINJA2_3_0
575
+ job ["command" ], prime_variables , "command" , TextEncoding .JINJA2_3_0
557
576
)
558
577
if not success :
559
578
msg = f"Failed command validation for step { step_name } error_msg={ message } "
560
579
_LOGGER .warning (msg )
561
580
return StepPreparationResponse (replicas = 0 , error_num = 2 , error_msg = msg )
562
581
563
- # Do we replicate this step (run it more than once)?
582
+ # Do we replicate this step (run it more than once in parallel)?
583
+ #
584
+ # Why?
585
+ #
586
+ # We need to set the number of step replicas to run.
564
587
#
565
- # We do if this is not a combiner step and a variable in this step's plumbing
566
- # refers to an output of a prior step whose type is 'files'.
567
- # If the prior step is a 'splitter' we populate the 'replication_values' array
568
- # with the list of files the prior step genrated for its output .
588
+ # If we're not a combiner and a variable in our " plumbing" refers to a variable
589
+ # of type "files" in a prior step then we are expected to run multipe times
590
+ # (even if just once). The number of times we're expected to run is dictated
591
+ # by the number of values ( files) in the "files" variable .
569
592
#
570
- # In this engine we onlhy act on the _first_ match, i.e. there CANNOT
571
- # be more than one prior step variable that is 'files'!
593
+ # In this engine we only act on the _first_ variable match, i.e. we do not
594
+ # expect and wil not act on more than one prior step variable that is of type
595
+ # "files".
596
+ #
597
+ # If we do run more than once we'll set 'iter_variable' to the name of our
598
+ # variable (that is to be given multiple values) and 'iter_values' will
599
+ # be the list of files produced by the dependent step forming out inputs.
600
+ # If the dependent step produces file1, file2, and file3 we'll run out step
601
+ # 3 times, with each being given a different file as its input.
572
602
iter_values : list [str ] = []
573
603
iter_variable : str | None = None
574
- if not step_is_combiner :
604
+ if not we_are_a_combiner :
575
605
for p_step_name , connections in plumbing_of_prior_steps .items ():
576
606
# We need to get the Job definition for each step
577
- # and then check whether the (output) variable is of type ' files' ...
607
+ # and then check whether the (output) variable is of type " files" ...
578
608
wf_step : dict [str , Any ] = get_step (wf , p_step_name )
579
609
assert wf_step
580
610
job_definition : dict [str , Any ] = self ._get_step_job (step = wf_step )
@@ -606,19 +636,26 @@ def _prepare_step(
606
636
break
607
637
608
638
# Get the list of instances we depend upon.
639
+ #
640
+ # We need to do this so that the launcher can hard-link
641
+ # their instance directories into ours.
609
642
dependent_instances : set [str ] = set ()
610
643
for p_step_name in plumbing_of_prior_steps :
611
- # Assume any step can have multiple instances
644
+ # Any step can depend on multiple instances
612
645
response , _ = self ._wapi_adapter .get_status_of_all_step_instances_by_name (
613
646
name = p_step_name ,
614
647
running_workflow_id = rwf_id ,
615
648
)
616
649
for step in response ["status" ]:
617
650
dependent_instances .add (step ["instance_id" ])
618
651
652
+ # We're done.
653
+ # We have a set of prime variables,
654
+ # a list of dependent step instances,
655
+ # and we know how many steps replicas to run.
619
656
num_step_instances : int = max (1 , len (iter_values ))
620
657
return StepPreparationResponse (
621
- variables = variables ,
658
+ variables = prime_variables ,
622
659
replicas = num_step_instances ,
623
660
replica_variable = iter_variable ,
624
661
replica_values = iter_values ,
0 commit comments