73
73
from dataclasses import dataclass
74
74
from typing import Any , Optional
75
75
76
- import decoder .decoder as job_defintion_decoder
76
+ import decoder .decoder as job_definition_decoder
77
77
from decoder .decoder import TextEncoding
78
78
from google .protobuf .message import Message
79
79
from informaticsmatters .protobuf .datamanager .pod_message_pb2 import PodMessage
@@ -134,7 +134,7 @@ def __init__(
134
134
instance_launcher : InstanceLauncher ,
135
135
instance_link_glob : str = ".instance-*" ,
136
136
):
137
- """Initialiser, given a Workflow API adapter, Instance laucnher ,
137
+ """Initialiser, given a Workflow API adapter, Instance launcher ,
138
138
and a step (directory) link 'glob' (a convenient directory glob to
139
139
locate the DM hard-link directories of prior instances inserted into a
140
140
step's instance directory, typically '.instance-*')"""
@@ -326,8 +326,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
326
326
self ._set_step_error (step_name , r_wfid , r_wfsid , exit_code , "Job failed" )
327
327
return
328
328
329
- # If we get here the prior step completed successfullyso we
330
- # mark the Step as DONE (successfully).
329
+ # If we get here the prior step completed successfully
330
+ # so we mark the Step as DONE (successfully).
331
331
wfid = rwf_response ["workflow" ]["id" ]
332
332
assert wfid
333
333
wf_response , _ = self ._wapi_adapter .get_workflow (workflow_id = wfid )
@@ -358,9 +358,9 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
358
358
# For this simple logic it is the next step.
359
359
next_step = wf_response ["steps" ][step_index + 1 ]
360
360
361
- # A mojor piece of work to accomplish is to get ourselves into a position
361
+ # A major piece of work to accomplish is to get ourselves into a position
362
362
# that allows us to check the step command can be executed.
363
- # We do this by compiling a map of variables we belive the step needs.
363
+ # We do this by compiling a map of variables we believe the step needs.
364
364
365
365
# If the step about to be launched is based on a prior step
366
366
# that generates multiple outputs (files) then we have to
@@ -459,7 +459,7 @@ def _prepare_step(
459
459
# whose origin is of type 'files'.
460
460
461
461
our_job_definition : dict [str , Any ] = self ._get_step_job (step = step_definition )
462
- our_inputs : dict [str , Any ] = job_defintion_decoder .get_inputs (
462
+ our_inputs : dict [str , Any ] = job_definition_decoder .get_inputs (
463
463
our_job_definition
464
464
)
465
465
# get all our step connections that relate to prior steps.
@@ -470,10 +470,10 @@ def _prepare_step(
470
470
471
471
we_are_a_combiner : bool = False
472
472
473
- # What step might we be comnining ?
474
- # It'll remain None afdter the next block if we're not combining.
473
+ # What step might we be combining ?
474
+ # It'll remain None after the next block if we're not combining.
475
475
step_name_being_combined : str | None = None
476
- # If we are a combiner, what is the varaible (identifying a set of files)
476
+ # If we are a combiner, what is the variable (identifying a set of files)
477
477
# that is bing combined? There can only be one.
478
478
combiner_input_variable : str | None = None
479
479
for p_step_name , connections in plumbing_of_prior_steps .items ():
@@ -502,7 +502,7 @@ def _prepare_step(
502
502
assert num_step_recplicas_being_combined > 0
503
503
assert "status" in response
504
504
505
- # Assume all the dependent prior step instnaces are done
505
+ # Assume all the dependent prior step instances are done
506
506
# and undo our assumption if not...
507
507
all_step_instances_done : bool = True
508
508
@@ -539,7 +539,7 @@ def _prepare_step(
539
539
error_msg = f"Prior instance of step '{ step_name_being_combined } ' has failed" ,
540
540
)
541
541
542
- # We're not a cmbiner or we are
542
+ # We're not a combiner or we are
543
543
# (and all the dependent instances have completed successfully).
544
544
# We can now compile a set of variables for it.
545
545
@@ -563,9 +563,9 @@ def _prepare_step(
563
563
#
564
564
# The decoder gives us a list of 'Connectors' that are a par of variable
565
565
# names representing "in" (workflow) and "out" (step) variable names.
566
- # "in" variables are worklfow variables, and "out" variables
567
- # are expected Stewp (Job) variables. We use these connections to
568
- # take workflow variables and puth them in our variables map.
566
+ # "in" variables are workflow variables, and "out" variables
567
+ # are expected Step (Job) variables. We use these connections to
568
+ # take workflow variables and put them in our variables map.
569
569
for connector in get_step_workflow_variable_connections (
570
570
step_definition = step_definition
571
571
):
@@ -607,13 +607,13 @@ def _prepare_step(
607
607
608
608
# The step's prime variables are now set.
609
609
610
- # Before we return these to the claler do we have enough
610
+ # Before we return these to the caller do we have enough
611
611
# to satisfy the step Job's command? It's a simple check -
612
612
# we give the step's Job command and our prime variables
613
- # to the Job decoder - it wil tell us if an inportnat
613
+ # to the Job decoder - it wil tell us if an important
614
614
# variable is missing....
615
615
job : dict [str , Any ] = self ._get_step_job (step = step_definition )
616
- message , success = job_defintion_decoder .decode (
616
+ message , success = job_definition_decoder .decode (
617
617
job ["command" ], prime_variables , "command" , TextEncoding .JINJA2_3_0
618
618
)
619
619
if not success :
@@ -628,7 +628,7 @@ def _prepare_step(
628
628
# We need to set the number of step replicas to run.
629
629
#
630
630
# If we're not a combiner and a variable in our "plumbing" refers to a variable
631
- # of type "files" in a prior step then we are expected to run multipe times
631
+ # of type "files" in a prior step then we are expected to run multiple times
632
632
# (even if just once). The number of times we're expected to run is dictated
633
633
# by the number of values (files) in the "files" variable.
634
634
#
@@ -650,7 +650,7 @@ def _prepare_step(
650
650
wf_step : dict [str , Any ] = get_step (wf , p_step_name )
651
651
assert wf_step
652
652
job_definition : dict [str , Any ] = self ._get_step_job (step = wf_step )
653
- jd_outputs : dict [str , Any ] = job_defintion_decoder .get_outputs (
653
+ jd_outputs : dict [str , Any ] = job_definition_decoder .get_outputs (
654
654
job_definition
655
655
)
656
656
for connector in connections :
@@ -712,7 +712,7 @@ def _launch(
712
712
step_definition : dict [str , Any ],
713
713
step_preparation_response : StepPreparationResponse ,
714
714
) -> None :
715
- """Given a runningWorkflow record, a step defitnion (from the Workflow),
715
+ """Given a runningWorkflow record, a step definition (from the Workflow),
716
716
and the step's variables (in a preparation object) this method launches
717
717
one or more instances of the given step."""
718
718
step_name : str = step_definition ["name" ]
0 commit comments