diff --git a/tests/test_decoder.py b/tests/test_decoder.py index a57ab3e..f870f8a 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -261,20 +261,20 @@ def test_get_workflow_inputs_for_step_with_name_step1(): # Arrange # Act - inputs = decoder.get_workflow_input_names_for_step( + inputs = decoder.get_workflow_job_input_names_for_step( _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step1" ) # Assert assert len(inputs) == 1 - assert "candidateMolecules" in inputs + assert "inputFile" in inputs def test_get_workflow_inputs_for_step_with_name_step2(): # Arrange # Act - inputs = decoder.get_workflow_input_names_for_step( + inputs = decoder.get_workflow_job_input_names_for_step( _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step2" ) @@ -286,7 +286,7 @@ def test_get_workflow_inputs_for_step_with_unkown_step_name(): # Arrange # Act - inputs = decoder.get_workflow_input_names_for_step( + inputs = decoder.get_workflow_job_input_names_for_step( _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "unknown" ) diff --git a/workflow/decoder.py b/workflow/decoder.py index 5aeb8c6..9e75133 100644 --- a/workflow/decoder.py +++ b/workflow/decoder.py @@ -82,15 +82,12 @@ def get_variable_names(definition: dict[str, Any]) -> list[str]: return wf_variable_names -def get_workflow_input_names_for_step( +def get_workflow_job_input_names_for_step( definition: dict[str, Any], name: str ) -> list[str]: - """Given a Workflow definition and a step name we return a list of workflow - input variable names the step expects. To do this we iterate through the step's - inputs to find those that are declared 'from->workflow-input'. - - To get the input (a filename) the caller simply looks these names up - in the variable map.""" + """Given a Workflow definition and a step name we return a list of step Job input + variable names the step expects. To do this we iterate through the step's + inputs to find those that are declared 'from->workflow-input'.""" inputs: list[str] = [] for step in definition.get("steps", {}): if step["name"] == name and "inputs" in step: @@ -98,7 +95,7 @@ def get_workflow_input_names_for_step( # This gives us the name of the workflow input variable # and the name of the step input (Job) variable. inputs.extend( - step_input["from"]["workflow-input"] + step_input["input"] for step_input in step["inputs"] if "from" in step_input and "workflow-input" in step_input["from"] ) diff --git a/workflow/workflow_abc.py b/workflow/workflow_abc.py index 2e7c263..12e3251 100644 --- a/workflow/workflow_abc.py +++ b/workflow/workflow_abc.py @@ -49,14 +49,14 @@ class LaunchParameters: # # ["r-workflow-step-a04d", "r-workflow-step-d904"] running_workflow_step_prior_steps: list[str] | None = None - # Workflow inputs (for this step Instance). These Workflow Inputs (files) are - # expected to be present in the Project directory. It is simply a list of files - # with the ability to rename them. For example, if the step requires "a.sdf" - # from the Project directory (and renamed as 'input.sdf" in the step's - # instance directory) the engine would provide the following list: - + # Workflow step Job inputs (for this step Instance). These Workflow Inputs (files) + # are a list of Job input variable names for file variables where the + # file is expected to be present in the Project directory. It is simply a list of + # Job variable names. The launcher is expected to find the 'value' of these + # variables and then move the file to the instance directory. # - # [("a.sdf", "input.sdf")] - running_workflow_step_inputs: list[tuple[str, str]] | None = None + # ["inputFile"] + running_workflow_step_inputs: list[str] | None = None # The application ID (a custom resource name) # used to identify the 'type' of Instance to create. # For DM Jobs this will be 'datamanagerjobs.squonk.it' diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 8d0e35a..85f57b4 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -39,7 +39,7 @@ ) from .decoder import ( - get_workflow_input_names_for_step, + get_workflow_job_input_names_for_step, get_workflow_output_values_for_step, set_step_variables, ) @@ -571,18 +571,15 @@ def _launch( # We must also identify workflow inputs that are required by the step we are # about to launch and pass those using a launch parameter. The launcher # will ensure these are copied into out instance directory before we are run. + # We cannot provide the variable values (even though we have them) because + # the DM passes input through 'InputHandlers', which may translate the value. + # So we have to pass the name and let the DM move the files after + # the InputHandler has run. # # 'running_workflow_step_inputs' - # A list of string pairs (input/Project filename and output/Instance filename) - # (with relative paths if appropriate. - inputs: list[tuple[str, str]] = [] - for wf_input_name in get_workflow_input_names_for_step(wf, step_name): - # The variable must be known. - # It should have been checked by the time we get here! - assert wf_input_name in variables - # No name change of inputs in this version - inputs.append((variables[wf_input_name], variables[wf_input_name])) - + # A list of Job input variable names + inputs: list[str] = [] + inputs.extend(iter(get_workflow_job_input_names_for_step(wf, step_name))) lp: LaunchParameters = LaunchParameters( project_id=project_id, name=step_name,