Skip to content

Commit c08ed5c

Browse files
author
Alan Christie
committed
feat: Refactoring
1 parent cdd936e commit c08ed5c

File tree

1 file changed

+65
-62
lines changed

1 file changed

+65
-62
lines changed

workflow/workflow_engine.py

Lines changed: 65 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
141141
# Now find the first step (index 0)...
142142
first_step: dict[str, Any] = wf_response["steps"][0]
143143

144-
sp_resp = self._prepare_step_variables(
144+
sp_resp = self._prepare_step(
145145
wf=wf_response, step_definition=first_step, rwf=rwf_response
146146
)
147147
assert sp_resp.variables is not None
@@ -300,7 +300,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
300300
# Do we need a 'prepare variables' function?
301301
# One that returns a map of variables or nothing
302302
# (e.g. 'nothing' when a step launch cannot be attempted)
303-
sp_resp = self._prepare_step_variables(
303+
sp_resp = self._prepare_step(
304304
wf=wf_response, step_definition=next_step, rwf=rwf_response
305305
)
306306
if sp_resp.iterations == 0:
@@ -353,93 +353,96 @@ def _get_step_job(self, *, step: dict[str, Any]) -> dict[str, Any]:
353353

354354
return job
355355

356-
def _validate_step_command(
356+
def _prepare_step(
357357
self,
358358
*,
359-
running_workflow_id: str,
360-
step: dict[str, Any],
361-
running_workflow_variables: dict[str, Any],
362-
) -> str | dict[str, Any]:
363-
"""Returns an error message if the command isn't valid.
364-
Without a message we return all the variables that were (successfully)
365-
applied to the command."""
359+
wf: dict[str, Any],
360+
step_definition: dict[str, Any],
361+
rwf: dict[str, Any],
362+
) -> StepPreparationResponse:
363+
"""Attempts to prepare a map of step variables. If variables cannot be
364+
presented to the step we return an object with 'iterations' set to zero."""
365+
366+
step_name: str = step_definition["name"]
367+
rwf_id: str = rwf["id"]
368+
369+
# Compile a set of variables for this step.
366370

367371
# Start with any variables provided in the step's specification.
368-
# This will be ou t"all variables" map for this step,
369-
# whcih we will add to (and maybe even over-write)...
370-
all_variables: dict[str, Any] = step["specification"].get("variables", {})
372+
# A map that we will add to (and maybe even over-write)...
373+
variables: dict[str, Any] = step_definition["specification"].get(
374+
"variables", {}
375+
)
371376

372-
# Next, we iterate through the step's "variable mapping" block.
373-
# This tells us all the variables that are set from either the
374-
# 'workflow' or 'a prior step'.
377+
# All the running workflow variables
378+
rwf_variables: dict[str, Any] = rwf.get("variables", {})
375379

376-
# Start with any workflow variables in the step.
377-
# This will be a list of Translations of "in" and "out" variable names.
380+
# Process the step's plumbing realting to workflow variables.
381+
# This will be a list of Connectors of "in" and "out" variable names.
378382
# "in" variables are worklfow variables, and "out" variables
379383
# are expected Job variables. We use this to add variables
380-
# to the "all variables" map.
381-
for connector in get_step_workflow_variable_connections(step_definition=step):
382-
assert connector.in_ in running_workflow_variables
383-
all_variables[connector.out] = running_workflow_variables[connector.in_]
384+
# to the variables map.
385+
for connector in get_step_workflow_variable_connections(
386+
step_definition=step_definition
387+
):
388+
assert connector.in_ in rwf_variables
389+
variables[connector.out] = rwf_variables[connector.in_]
384390

385-
# Now we apply variables from the "variable mapping" block
391+
# Now we apply variables from the "plumbing" block
386392
# related to values used in prior steps. The decoder gives
387393
# us a map indexed by prior step name that's a list of "in" "out"
388394
# tuples as above.
389395
prior_step_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
390-
step_definition=step
396+
step_definition=step_definition
391397
)
392398
for prior_step_name, connections in prior_step_plumbing.items():
393399
# Retrieve the prior "running" step
394400
# in order to get the variables that were set there...
395401
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
396-
name=prior_step_name, running_workflow_id=running_workflow_id
402+
name=prior_step_name, running_workflow_id=rwf_id
397403
)
398404
# Copy "in" value to "out"...
399405
for connector in connections:
400406
assert connector.in_ in prior_step["variables"]
401-
all_variables[connector.out] = prior_step["variables"][connector.in_]
407+
variables[connector.out] = prior_step["variables"][connector.in_]
402408

403409
# Now ... can the command be compiled!?
404-
job: dict[str, Any] = self._get_step_job(step=step)
410+
job: dict[str, Any] = self._get_step_job(step=step_definition)
405411
message, success = job_defintion_decoder.decode(
406-
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
412+
job["command"], variables, "command", TextEncoding.JINJA2_3_0
407413
)
408-
return all_variables if success else message
409-
410-
def _prepare_step_variables(
411-
self,
412-
*,
413-
wf: dict[str, Any],
414-
step_definition: dict[str, Any],
415-
rwf: dict[str, Any],
416-
) -> StepPreparationResponse:
417-
"""Attempts to prepare a map of step variables. If variables cannot be
418-
presented to the step we return an object with 'iterations' set to zero."""
419-
420-
step_name: str = step_definition["name"]
421-
rwf_id: str = rwf["id"]
422-
423-
# We start with all the workflow variables that were provided
424-
# by the user when they "ran" the workflow. We're given a full set of
425-
# variables in response (on success) or an error string (on failure)
426-
rwf_variables: dict[str, Any] = rwf.get("variables", {})
427-
error_or_variables: str | dict[str, Any] = self._validate_step_command(
428-
running_workflow_id=rwf_id,
429-
step=step_definition,
430-
running_workflow_variables=rwf_variables,
431-
)
432-
if isinstance(error_or_variables, str):
433-
error_msg = error_or_variables
434-
msg = f"Failed command validation error_msg={error_msg}"
414+
if not success:
415+
msg = f"Failed command validation error_msg={message}"
435416
_LOGGER.warning(msg)
436417
self._set_step_error(step_name, rwf_id, None, 1, msg)
437418
return StepPreparationResponse(iterations=0)
438419

439-
variables: dict[str, Any] = error_or_variables
420+
# Our inputs
421+
our_job_definition: dict[str, Any] = self._get_step_job(step=step_definition)
422+
our_inputs: dict[str, Any] = job_defintion_decoder.get_inputs(
423+
our_job_definition
424+
)
425+
426+
# Are we a combiner step?
427+
#
428+
# We are if a variable in our step's plumbing refers to an input that is
429+
# of type 'files'. A combiner's input is required to accept a space-separated
430+
# set of files.
431+
we_are_a_combiner: bool = False
432+
our_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
433+
step_definition=step_definition
434+
)
435+
for p_step_name, connections in our_plumbing.items():
436+
for connector in connections:
437+
if our_inputs.get(connector.out, {}).get("type") == "files":
438+
we_are_a_combiner = True
439+
440+
assert not we_are_a_combiner
441+
442+
# We're not a combiner...
440443

441444
# Do we replicate this step (run it more than once)?
442-
# We do if a variable in this step's mapping block
445+
# We do if a variable in this step's plumbing
443446
# refers to an output of a prior step whose type is 'files'.
444447
# If the prior step is a 'splitter' we populate the 'replication_values' array
445448
# with the list of files the prior step genrated for its output.
@@ -448,12 +451,9 @@ def _prepare_step_variables(
448451
# be more than one prior step variable that is 'files'!
449452
iter_values: list[str] = []
450453
iter_variable: str | None = None
451-
plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
452-
step_definition=step_definition
453-
)
454-
for p_step_name, connections in plumbing.items():
454+
for p_step_name, connections in our_plumbing.items():
455455
# We need to get the Job definition for each step
456-
# and then check whether the (ouptu) variable is of type 'files'...
456+
# and then check whether the (output) variable is of type 'files'...
457457
wf_step: dict[str, Any] = get_step(wf, p_step_name)
458458
assert wf_step
459459
job_definition: dict[str, Any] = self._get_step_job(step=wf_step)
@@ -497,6 +497,9 @@ def _launch(
497497
step_definition: dict[str, Any],
498498
step_preparation_response: StepPreparationResponse,
499499
) -> None:
500+
"""Given a runningWorkflow record, a step defitnion (from the Workflow),
501+
and the step's variables (in a preparation object) this method launches
502+
one or more instances of the given step."""
500503
step_name: str = step_definition["name"]
501504
rwf_id: str = rwf["id"]
502505
project_id = rwf["project"]["id"]

0 commit comments

Comments
 (0)