Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions tests/test_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,20 +261,20 @@ def test_get_workflow_inputs_for_step_with_name_step1():
# Arrange

# Act
inputs = decoder.get_workflow_input_names_for_step(
inputs = decoder.get_workflow_job_input_names_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step1"
)

# Assert
assert len(inputs) == 1
assert "candidateMolecules" in inputs
assert "inputFile" in inputs


def test_get_workflow_inputs_for_step_with_name_step2():
# Arrange

# Act
inputs = decoder.get_workflow_input_names_for_step(
inputs = decoder.get_workflow_job_input_names_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step2"
)

Expand All @@ -286,7 +286,7 @@ def test_get_workflow_inputs_for_step_with_unkown_step_name():
# Arrange

# Act
inputs = decoder.get_workflow_input_names_for_step(
inputs = decoder.get_workflow_job_input_names_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "unknown"
)

Expand Down
13 changes: 5 additions & 8 deletions workflow/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,20 @@ def get_variable_names(definition: dict[str, Any]) -> list[str]:
return wf_variable_names


def get_workflow_input_names_for_step(
def get_workflow_job_input_names_for_step(
definition: dict[str, Any], name: str
) -> list[str]:
"""Given a Workflow definition and a step name we return a list of workflow
input variable names the step expects. To do this we iterate through the step's
inputs to find those that are declared 'from->workflow-input'.

To get the input (a filename) the caller simply looks these names up
in the variable map."""
"""Given a Workflow definition and a step name we return a list of step Job input
variable names the step expects. To do this we iterate through the step's
inputs to find those that are declared 'from->workflow-input'."""
inputs: list[str] = []
for step in definition.get("steps", {}):
if step["name"] == name and "inputs" in step:
# Find all the workflow inputs.
# This gives us the name of the workflow input variable
# and the name of the step input (Job) variable.
inputs.extend(
step_input["from"]["workflow-input"]
step_input["input"]
for step_input in step["inputs"]
if "from" in step_input and "workflow-input" in step_input["from"]
)
Expand Down
14 changes: 7 additions & 7 deletions workflow/workflow_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ class LaunchParameters:
#
# ["r-workflow-step-a04d", "r-workflow-step-d904"]
running_workflow_step_prior_steps: list[str] | None = None
# Workflow inputs (for this step Instance). These Workflow Inputs (files) are
# expected to be present in the Project directory. It is simply a list of files
# with the ability to rename them. For example, if the step requires "a.sdf"
# from the Project directory (and renamed as 'input.sdf" in the step's
# instance directory) the engine would provide the following list: -
# Workflow step Job inputs (for this step Instance). These Workflow Inputs (files)
# are a list of Job input variable names for file variables where the
# file is expected to be present in the Project directory. It is simply a list of
# Job variable names. The launcher is expected to find the 'value' of these
# variables and then move the file to the instance directory.
#
# [("a.sdf", "input.sdf")]
running_workflow_step_inputs: list[tuple[str, str]] | None = None
# ["inputFile"]
running_workflow_step_inputs: list[str] | None = None
# The application ID (a custom resource name)
# used to identify the 'type' of Instance to create.
# For DM Jobs this will be 'datamanagerjobs.squonk.it'
Expand Down
19 changes: 8 additions & 11 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
)

from .decoder import (
get_workflow_input_names_for_step,
get_workflow_job_input_names_for_step,
get_workflow_output_values_for_step,
set_step_variables,
)
Expand Down Expand Up @@ -571,18 +571,15 @@ def _launch(
# We must also identify workflow inputs that are required by the step we are
# about to launch and pass those using a launch parameter. The launcher
# will ensure these are copied into out instance directory before we are run.
# We cannot provide the variable values (even though we have them) because
# the DM passes input through 'InputHandlers', which may translate the value.
# So we have to pass the name and let the DM move the files after
# the InputHandler has run.
#
# 'running_workflow_step_inputs'
# A list of string pairs (input/Project filename and output/Instance filename)
# (with relative paths if appropriate.
inputs: list[tuple[str, str]] = []
for wf_input_name in get_workflow_input_names_for_step(wf, step_name):
# The variable must be known.
# It should have been checked by the time we get here!
assert wf_input_name in variables
# No name change of inputs in this version
inputs.append((variables[wf_input_name], variables[wf_input_name]))

# A list of Job input variable names
inputs: list[str] = []
inputs.extend(iter(get_workflow_job_input_names_for_step(wf, step_name)))
lp: LaunchParameters = LaunchParameters(
project_id=project_id,
name=step_name,
Expand Down