Skip to content

Commit ac23732

Browse files
author
Alan Christie
committed
feat: Engine now attempts to prefix inputs (from prior steps)
1 parent 3fe5969 commit ac23732

File tree

3 files changed

+50
-14
lines changed

3 files changed

+50
-14
lines changed

workflow/decoder.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,17 @@ def get_step(definition: dict[str, Any], name: str) -> dict[str, Any]:
8686
return {}
8787

8888

89+
def get_step_specification(definition: dict[str, Any], name: str) -> dict[str, Any]:
90+
"""Given a Workflow definition this function returns a named step's specification block
91+
(if it exists)."""
92+
spec: dict[str, Any] = {}
93+
steps: list[dict[str, Any]] = get_steps(definition)
94+
for step in steps:
95+
if step["name"] == name:
96+
spec = step.get("specification", {})
97+
return spec
98+
99+
89100
def get_name(definition: dict[str, Any]) -> str:
90101
"""Given a Workflow definition this function returns its name."""
91102
return str(definition.get("name", ""))

workflow/workflow_abc.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -353,13 +353,6 @@ def get_running_workflow_step_by_name(
353353
# that is the directory within the Project that's the step's working directory.
354354
#
355355
# "instance_directory": ".instance-00000000-0000-0000-0000-00000000000a",
356-
#
357-
# For steps that are not the first in a workflow the following field
358-
# can be expected in the response: -
359-
#
360-
# "prior_running_workflow_step": {
361-
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
362-
# },
363356

364357
@abstractmethod
365358
def set_running_workflow_step_done(

workflow/workflow_engine.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
get_step,
9292
get_step_predefined_variable_connections,
9393
get_step_prior_step_connections,
94+
get_step_specification,
9495
get_step_workflow_variable_connections,
9596
is_workflow_input_variable,
9697
is_workflow_output_variable,
@@ -590,10 +591,16 @@ def _prepare_step(
590591
assert connector.in_ in self._predefined_variables
591592
prime_variables[connector.out] = self._predefined_variables[connector.in_]
592593

593-
# Using the "plumbing" again add any that relate to values used in prior steps.
594+
# Using the "plumbing" again so that we can add any variables
595+
# that relate to values used in prior steps.
594596
#
595-
# The decoder gives us a map indexed by prior step name that's a list of
596-
# "in"/"out" connectors as before.
597+
# The decoder gives us a set of "in"/"out" connectors as above
598+
# indexed by the prior step name.
599+
#
600+
# 'inputs' here are not copied to our step's instance directory,
601+
# instead we need to prefix any 'input' with the instance directory for the
602+
# step the input belongs to. e.g. "file.txt" will become
603+
# ".instance-0000/file.txt".
597604
prior_step_plumbing: dict[str, list[Connector]] = (
598605
get_step_prior_step_connections(step_definition=step_definition)
599606
)
@@ -604,19 +611,44 @@ def _prepare_step(
604611
# For a combiner step we only need to inspect the first instance of
605612
# the prior step (the default replica value is '0').
606613
# We assume all the combiner's prior (parallel) instances
607-
# have the same variables and values.
614+
# have the same variables and values. Combiners handle inputs from
615+
# prior steps differently - i.e. they must use a directory 'glob'
616+
# due to the uncontrolled number of prior steps.
608617
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
609618
name=prior_step_name,
610619
running_workflow_id=rwf_id,
611620
)
612621
assert prior_step
613-
assert "variables" in prior_step
622+
assert "instance_directory" in prior_step
623+
p_instance_dir: str = prior_step["instance_directory"]
624+
# Get prior step Job (tro look for inputs)
625+
# (if we're not a combiner)
626+
p_job_inputs: dict[str, Any] = {}
627+
if not we_are_a_combiner:
628+
p_step_spec: dict[str, Any] = get_step_specification(
629+
wf, prior_step_name
630+
)
631+
_LOGGER.info("get_step_specification() got %s\n", str(p_step_spec))
632+
p_job, _ = self._wapi_adapter.get_job(
633+
collection=p_step_spec["collection"],
634+
job=p_step_spec["job"],
635+
version=p_step_spec["version"],
636+
)
637+
_LOGGER.info("API.get_job() got %s\n", str(p_job))
638+
assert p_job
639+
p_job_inputs = job_definition_decoder.get_inputs(p_job)
614640
# Copy "in" value to "out"...
641+
# (prefixing inputs with instance directory if required)
642+
assert "variables" in prior_step
615643
for connector in connections:
616644
assert connector.in_ in prior_step["variables"]
617-
prime_variables[connector.out] = prior_step["variables"][connector.in_]
645+
value: str = prior_step["variables"][connector.in_]
646+
if not we_are_a_combiner and connector.in_ in p_job_inputs:
647+
# Prefix with prior-step instance directory
648+
value = f"{p_instance_dir}/{value}"
649+
prime_variables[connector.out] = value
618650

619-
# The step's prime variables are now set.
651+
# Our step's prime variables are now set.
620652

621653
# Before we return these to the caller do we have enough
622654
# to satisfy the step Job's command? It's a simple check -

0 commit comments

Comments
 (0)