diff --git a/.gitignore b/.gitignore index fe257a8..6e25b42 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,8 @@ dist/ **/__pycache__/ **/*.pickle tests/project-root/project-*/ + +# temp files +*~ +\#*# +\#* diff --git a/tests/instance_launcher.py b/tests/instance_launcher.py index edb14e6..ab7cab2 100644 --- a/tests/instance_launcher.py +++ b/tests/instance_launcher.py @@ -31,16 +31,17 @@ from tests.wapi_adapter import UnitTestWorkflowAPIAdapter from workflow.workflow_abc import InstanceLauncher, LaunchParameters, LaunchResult +# Relative path to the execution (project) directory +EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID) + # Full path to the 'jobs' directory _JOB_PATH: str = os.path.join(os.path.dirname(__file__), "jobs") -# Relative path to the execution (project) directory -_EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID) def project_file_exists(file_name: str) -> bool: """A convenient test function to verify a file exists in the execution (project) directory.""" - return os.path.isfile(os.path.join(_EXECUTION_DIRECTORY, file_name)) + return os.path.isfile(os.path.join(EXECUTION_DIRECTORY, file_name)) class UnitTestInstanceLauncher(InstanceLauncher): @@ -57,9 +58,15 @@ def __init__( self._msg_dispatcher = msg_dispatcher # Every launcher starts with an empty execution directory... - print(f"Removing execution directory ({_EXECUTION_DIRECTORY})") - assert _EXECUTION_DIRECTORY.startswith("tests/project-root") - shutil.rmtree(_EXECUTION_DIRECTORY, ignore_errors=True) + print(f"Removing execution directory ({EXECUTION_DIRECTORY})") + assert EXECUTION_DIRECTORY.startswith("tests/project-root") + if os.path.isdir(EXECUTION_DIRECTORY): + for filename in os.listdir(EXECUTION_DIRECTORY): + file_path = os.path.join(EXECUTION_DIRECTORY, filename) + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert launch_parameters @@ -67,7 +74,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert launch_parameters.specification assert isinstance(launch_parameters.specification, dict) - os.makedirs(_EXECUTION_DIRECTORY, exist_ok=True) + os.makedirs(EXECUTION_DIRECTORY, exist_ok=True) # We're passed a RunningWorkflowStep ID but a record is expected to have been # created bt the caller, we simply create instance records. @@ -108,8 +115,9 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert os.path.isfile(module) subprocess_cmd: List[str] = ["python"] + command_list print(f"Subprocess command: {subprocess_cmd}") + print(f"Execution directory: {EXECUTION_DIRECTORY}") completed_process: CompletedProcess = subprocess.run( - subprocess_cmd, check=False, cwd=_EXECUTION_DIRECTORY + subprocess_cmd, check=False, cwd=EXECUTION_DIRECTORY ) # Simulate a PodMessage (that will contain the instance ID), diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index 2a6a535..e2c6304 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -11,7 +11,11 @@ from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage from tests.config import TEST_PROJECT_ID -from tests.instance_launcher import UnitTestInstanceLauncher, project_file_exists +from tests.instance_launcher import ( + EXECUTION_DIRECTORY, + UnitTestInstanceLauncher, + project_file_exists, +) from tests.message_dispatcher import UnitTestMessageDispatcher from tests.message_queue import UnitTestMessageQueue from tests.wapi_adapter import UnitTestWorkflowAPIAdapter @@ -200,7 +204,6 @@ def test_workflow_engine_example_smiles_to_file(basic_engine): assert project_file_exists(output_file) -@pytest.mark.skip(reason="The engine does not currently create the required variables") def test_workflow_engine_shortcut_example_1(basic_engine): # Arrange da, md = basic_engine @@ -229,20 +232,71 @@ def test_workflow_engine_shortcut_example_1(basic_engine): assert project_file_exists(output_file_b) -@pytest.mark.skip(reason="The engine does not currently create the required variables") +# @pytest.mark.skip(reason="The engine does not currently create the required variables") def test_workflow_engine_simple_python_molprops(basic_engine): # Arrange da, md = basic_engine # Make sure files that should be generated by the test # do not exist before we run the test. - output_file_a = "a.sdf" - assert not project_file_exists(output_file_a) - output_file_b = "b.sdf" - assert not project_file_exists(output_file_b) + output_file_1 = "step1.out.smi" + assert not project_file_exists(output_file_1) + output_file_2 = "step2.out.smi" + assert not project_file_exists(output_file_2) + # And create the test's input file. + input_file_1 = "input1.smi" + input_file_1_content = """O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1 + RDKit 3D + + 18 19 0 0 0 0 0 0 0 0999 V2000 + 8.7102 -1.3539 24.2760 O 0 0 0 0 0 0 0 0 0 0 0 0 + 9.4334 -2.1203 23.6716 C 0 0 0 0 0 0 0 0 0 0 0 0 + 10.3260 -1.7920 22.4941 C 0 0 0 0 0 0 0 0 0 0 0 0 + 9.5607 -0.5667 21.3699 S 0 0 0 0 0 0 0 0 0 0 0 0 + 7.9641 -1.3976 21.0216 C 0 0 0 0 0 0 0 0 0 0 0 0 + 7.1007 -0.5241 20.1671 C 0 0 0 0 0 0 0 0 0 0 0 0 + 5.7930 -0.1276 20.3932 C 0 0 0 0 0 0 0 0 0 0 0 0 + 5.2841 0.6934 19.3422 C 0 0 0 0 0 0 0 0 0 0 0 0 + 6.2234 0.8796 18.3624 C 0 0 0 0 0 0 0 0 0 0 0 0 + 6.0491 1.8209 16.9402 Cl 0 0 0 0 0 0 0 0 0 0 0 0 + 7.6812 0.0795 18.6678 S 0 0 0 0 0 0 0 0 0 0 0 0 + 9.5928 -3.4405 24.2306 N 0 0 0 0 0 0 0 0 0 0 0 0 + 10.8197 -3.4856 25.0609 C 0 0 0 0 0 0 0 0 0 0 0 0 + 11.0016 -4.9279 25.4571 C 0 0 0 0 0 0 0 0 0 0 0 0 + 9.9315 -5.2800 26.4615 C 0 0 0 0 0 0 0 0 0 0 0 0 + 10.3887 -4.7677 27.7090 O 0 0 0 0 0 0 0 0 0 0 0 0 + 8.5793 -4.6419 26.1747 C 0 0 0 0 0 0 0 0 0 0 0 0 + 8.3826 -4.0949 24.7695 C 0 0 0 0 0 0 0 0 0 0 0 0 + 1 2 2 0 + 2 3 1 0 + 2 12 1 0 + 3 4 1 0 + 4 5 1 0 + 5 6 1 0 + 6 7 2 0 + 7 8 1 0 + 8 9 2 0 + 9 10 1 0 + 9 11 1 0 + 11 6 1 0 + 12 13 1 0 + 13 14 1 0 + 14 15 1 0 + 15 16 1 0 + 15 17 1 0 + 17 18 1 0 + 18 12 1 0 + M END + + $$$$ + """ + with open( + f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8" + ) as input_file: + input_file.writelines(input_file_1_content) # Act r_wfid = start_workflow( - md, da, "simple-python-molprops", {"candidateMolecules": "C"} + md, da, "simple-python-molprops", {"candidateMolecules": input_file_1} ) # Assert @@ -256,5 +310,5 @@ def test_workflow_engine_simple_python_molprops(basic_engine): assert response["running_workflow_steps"][1]["done"] assert response["running_workflow_steps"][1]["success"] # This test should generate a file in the simulated project directory - assert project_file_exists(output_file_a) - assert project_file_exists(output_file_b) + assert project_file_exists(output_file_1) + assert project_file_exists(output_file_2) diff --git a/tests/workflow-definitions/simple-python-molprops.yaml b/tests/workflow-definitions/simple-python-molprops.yaml index 7f92176..049a010 100644 --- a/tests/workflow-definitions/simple-python-molprops.yaml +++ b/tests/workflow-definitions/simple-python-molprops.yaml @@ -33,7 +33,7 @@ steps: workflow-input: candidateMolecules outputs: - output: outputFile - as: __step1__out.smi + as: step1.out.smi - name: step2 description: Add column 2 @@ -54,4 +54,4 @@ steps: output: outputFile outputs: - output: outputFile - as: __step2__out.smi + as: step2.out.smi diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 535776a..c966434 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -284,6 +284,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None: def _validate_step_command( self, *, + running_workflow_step_id: str, step: dict[str, Any], running_workflow_variables: dict[str, Any] | None = None, ) -> str | dict[str, Any]: @@ -352,6 +353,35 @@ def _validate_step_command( # # TBD + wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step( + running_workflow_step_id=running_workflow_step_id, + ) + + wf_steps = wf_step_data.get("steps", []) + try: + previous_step = wf_steps[wf_step_data["caller_step_index"] - 1] + except IndexError: + previous_step = {} + + inputs = step.get("inputs", []) + outputs = step.get("outputs", []) + previous_step_outputs = previous_step.get("outputs", []) + + step_vars = self._set_step_variables( + inputs=inputs, + outputs=outputs, + previous_step_outputs=previous_step_outputs, + workflow_variables=all_variables, + ) + + all_variables |= step_vars + print("all_variables", all_variables) + + self._wapi_adapter.set_running_workflow_step_variables( + running_workflow_step_id=running_workflow_step_id, + variables=all_variables, + ) + message, success = decode( job["command"], all_variables, "command", TextEncoding.JINJA2_3_0 ) @@ -377,6 +407,7 @@ def _launch( # running workflow record) running_workflow_variables: dict[str, Any] | None = rwf.get("variables") error_or_variables: str | dict[str, Any] = self._validate_step_command( + running_workflow_step_id=rwfs_id, step=step, running_workflow_variables=running_workflow_variables, ) @@ -448,3 +479,38 @@ def _set_step_error( error_num=error, error_msg=error_msg, ) + + def _set_step_variables( + self, + inputs: list[dict[str, Any]], + outputs: list[dict[str, Any]], + previous_step_outputs: list[dict[str, Any]], + workflow_variables: dict[str, Any], + ) -> dict[str, Any]: + """Prepare input- and output variables for the following step. + + Inputs are defined in step definition but their values may + come from previous step outputs. + """ + result = {} + + for item in inputs: + p_key = item["input"] + p_val = "" + val = item["from"] + if "workflow-input" in val.keys(): + p_val = workflow_variables[val["workflow-input"]] + elif "step" in val.keys(): + for out in previous_step_outputs: + if out["output"] == val["output"]: + p_val = out["as"] + break + + result[p_key] = p_val + + for item in outputs: + p_key = item["output"] + p_val = item["as"] + result[p_key] = p_val + + return result