diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index 8fd8778..4f1993d 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -313,7 +313,7 @@ def test_workflow_engine_simple_python_molprops(basic_engine): assert project_file_exists(output_file_2) -@pytest.mark.skip(reason="The engine does not currently create the required variables") +# @pytest.mark.skip(reason="The engine does not currently create the required variables") def test_workflow_engine_simple_python_molprops_with_options(basic_engine): # Arrange da, md = basic_engine @@ -380,7 +380,11 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine): md, da, "simple-python-molprops-with-options", - {"candidateMolecules": input_file_1}, + { + "candidateMolecules": input_file_1, + "rdkitPropertyName": "prop", + "rdkitPropertyValue": 1.2, + }, ) # Assert diff --git a/workflow/decoder.py b/workflow/decoder.py index db0f4f1..c51286b 100644 --- a/workflow/decoder.py +++ b/workflow/decoder.py @@ -84,7 +84,7 @@ def get_variable_names(definition: dict[str, Any]) -> list[str]: def set_variables_from_options_for_step( definition: dict[str, Any], variables: dict[str, Any], step_name: str -) -> tuple[dict[str, Any], str | None]: +) -> dict[str, Any]: """Given a Workflow definition, an existing map of variables and values, and a step name this function returns a new set of variables by adding variables and values that are required for the step that have been defined in the @@ -109,13 +109,26 @@ def set_variables_from_options_for_step( """ assert isinstance(definition, dict) - assert isinstance(variables, dict) assert step_name - new_variables: dict[str, Any] = variables.copy() + print("workflow", definition) + print("workflow_variables", variables) + + result = {} + options = definition.get("variables", {}).get("options", []) + print("options", options) + print("variables", variables) + + for opt in options: + for step_alias in opt["as"]: + if step_alias["step"] == step_name: + result[step_alias["option"]] = variables[opt["name"]] + # can break the loop because a variable can be a step + # variable only once + break # Success... - return new_variables, None + return result def get_required_variable_names(definition: dict[str, Any]) -> list[str]: @@ -138,3 +151,49 @@ def get_required_variable_names(definition: dict[str, Any]) -> list[str]: if "default" not in option_variable ) return required_variables + + +def set_step_variables( + *, + workflow: dict[str, Any], + inputs: list[dict[str, Any]], + outputs: list[dict[str, Any]], + previous_step_outputs: list[dict[str, Any]], + workflow_variables: dict[str, Any], + step_name: str, +) -> dict[str, Any]: + """Prepare input- and output variables for the following step. + + Inputs are defined in step definition but their values may + come from previous step outputs. + """ + result = {} + + for item in inputs: + p_key = item["input"] + p_val = "" + val = item["from"] + if "workflow-input" in val.keys(): + p_val = workflow_variables[val["workflow-input"]] + elif "step" in val.keys(): + for out in previous_step_outputs: + if out["output"] == val["output"]: + p_val = out["as"] + break + + result[p_key] = p_val + + for item in outputs: + p_key = item["output"] + p_val = item["as"] + result[p_key] = p_val + + options = set_variables_from_options_for_step( + definition=workflow, + variables=workflow_variables, + step_name=step_name, + ) + + result |= options + + return result diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 8d76bdd..1772fd6 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -38,6 +38,8 @@ WorkflowAPIAdapter, ) +from .decoder import set_step_variables + _LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER.setLevel(logging.INFO) _LOGGER.addHandler(logging.StreamHandler(sys.stdout)) @@ -388,11 +390,25 @@ def _validate_step_command( running_workflow_step_id, previous_step_outputs, ) - step_vars = self._set_step_variables( + + # there should probably be an easier way to access this + running_wf_step, _ = self._wapi_adapter.get_running_workflow_step( + running_workflow_step_id=running_workflow_step_id + ) + running_wf_id = running_wf_step["running_workflow"]["id"] + running_wf, _ = self._wapi_adapter.get_running_workflow( + running_workflow_id=running_wf_id + ) + workflow_id = running_wf["workflow"]["id"] + workflow, _ = self._wapi_adapter.get_workflow(workflow_id=workflow_id) + + step_vars = set_step_variables( + workflow=workflow, workflow_variables=all_variables, inputs=inputs, outputs=outputs, previous_step_outputs=previous_step_outputs, + step_name=running_wf_step["name"], ) all_variables |= step_vars _LOGGER.debug( @@ -506,39 +522,3 @@ def _set_step_error( error_num=error_num, error_msg=r_wf_error, ) - - def _set_step_variables( - self, - *, - inputs: list[dict[str, Any]], - outputs: list[dict[str, Any]], - previous_step_outputs: list[dict[str, Any]], - workflow_variables: dict[str, Any], - ) -> dict[str, Any]: - """Prepare input- and output variables for the following step. - - Inputs are defined in step definition but their values may - come from previous step outputs. - """ - result = {} - - for item in inputs: - p_key = item["input"] - p_val = "" - val = item["from"] - if "workflow-input" in val.keys(): - p_val = workflow_variables[val["workflow-input"]] - elif "step" in val.keys(): - for out in previous_step_outputs: - if out["output"] == val["output"]: - p_val = out["as"] - break - - result[p_key] = p_val - - for item in outputs: - p_key = item["output"] - p_val = item["as"] - result[p_key] = p_val - - return result