From 0703aaccd13b8cc8eb8035b528aaf233602b4a81 Mon Sep 17 00:00:00 2001 From: Kalev Takkis Date: Fri, 21 Mar 2025 16:30:02 +0000 Subject: [PATCH 1/9] chore: debugging --- .gitignore | 5 +++ tests/jobs/addcol.py | 8 ++--- tests/test_workflow_engine_examples.py | 2 +- workflow/workflow_engine.py | 44 ++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index fe257a8..6e25b42 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,8 @@ dist/ **/__pycache__/ **/*.pickle tests/project-root/project-*/ + +# temp files +*~ +\#*# +\#* diff --git a/tests/jobs/addcol.py b/tests/jobs/addcol.py index 33831a3..a60120b 100644 --- a/tests/jobs/addcol.py +++ b/tests/jobs/addcol.py @@ -10,7 +10,7 @@ parser.add_argument("-v", "--value", required=True) args = parser.parse_args() -with open(args.inputFile, "rt", encoding="utf8") as input_file: - content = input_file.read() - with open(args.outputFile, "wt", encoding="utf8") as output_file: - output_file.write(content) +# with open(args.inputFile, "rt", encoding="utf8") as input_file: +# content = input_file.read() +# with open(args.outputFile, "wt", encoding="utf8") as output_file: +# output_file.write(content) diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index 2a6a535..dff0dad 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -229,7 +229,7 @@ def test_workflow_engine_shortcut_example_1(basic_engine): assert project_file_exists(output_file_b) -@pytest.mark.skip(reason="The engine does not currently create the required variables") +# @pytest.mark.skip(reason="The engine does not currently create the required variables") def test_workflow_engine_simple_python_molprops(basic_engine): # Arrange da, md = basic_engine diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index bc7c285..4beb750 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -352,6 +352,50 @@ def _validate_step_command( # # TBD + # print('job_collection', job_collection) + # print('job_job', job_job) + # print('job', job) + # print('step', step) + # print('running_workflow_variables', running_workflow_variables) + # print('all_variables', all_variables) + + # print('step inputs', step['inputs']) + # print('step inputs', step['outputs']) + + # # this is the structure i need to process + # # [{'input': 'inputFile', 'from': {'workflow-input': 'candidateMolecules'}}] + # # [{'output': 'outputFile', 'as': '__step1__out.smi'}] + + for item in step.get("inputs", []): + p_key = item["input"] + p_val = "" + if "from" in item.keys(): + val = item["from"] + if "workflow-input" in val.keys(): + p_val = val["workflow-input"] + # don't know what to do with else.. + all_variables[p_key] = p_val + self._wapi_adapter.set_running_workflow_step_variables( + running_workflow_step_id=step["name"], + variables={p_key: p_val}, + ) + + for item in step.get("outputs", []): + p_key = item["output"] + p_val = "" + if "as" in item.keys(): + p_val = item["as"] + + # don't know what to do with else.. + all_variables[p_key] = p_val + self._wapi_adapter.set_running_workflow_step_variables( + running_workflow_step_id=step["name"], + variables={p_key: p_val}, + ) + + # all_variables['inputFile'] = running_workflow_variables['candidateMolecules'] + # all_variables['outputFile'] = '__step1__out.smi' + message, success = decode( job["command"], all_variables, "command", TextEncoding.JINJA2_3_0 ) From 1ac50dd58f7d26fc2b5cb80cc1a21015ae0d0cc0 Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Fri, 21 Mar 2025 17:41:07 +0100 Subject: [PATCH 2/9] fix: Fix setting step variables --- workflow/workflow_engine.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 4beb750..5f60227 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -284,6 +284,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None: def _validate_step_command( self, *, + running_workflow_step_id: str, step: dict[str, Any], running_workflow_variables: dict[str, Any] | None = None, ) -> str | dict[str, Any]: @@ -375,10 +376,6 @@ def _validate_step_command( p_val = val["workflow-input"] # don't know what to do with else.. all_variables[p_key] = p_val - self._wapi_adapter.set_running_workflow_step_variables( - running_workflow_step_id=step["name"], - variables={p_key: p_val}, - ) for item in step.get("outputs", []): p_key = item["output"] @@ -388,10 +385,11 @@ def _validate_step_command( # don't know what to do with else.. all_variables[p_key] = p_val - self._wapi_adapter.set_running_workflow_step_variables( - running_workflow_step_id=step["name"], - variables={p_key: p_val}, - ) + + self._wapi_adapter.set_running_workflow_step_variables( + running_workflow_step_id=running_workflow_step_id, + variables=all_variables, + ) # all_variables['inputFile'] = running_workflow_variables['candidateMolecules'] # all_variables['outputFile'] = '__step1__out.smi' @@ -421,6 +419,7 @@ def _launch( # running workflow record) running_workflow_variables: dict[str, Any] | None = rwf.get("variables") error_or_variables: str | dict[str, Any] = self._validate_step_command( + running_workflow_step_id=rwfs_id, step=step, running_workflow_variables=running_workflow_variables, ) From f37ad52c5c3579303437ef098efd8bce6f4d492e Mon Sep 17 00:00:00 2001 From: Kalev Takkis Date: Mon, 7 Apr 2025 11:32:43 +0100 Subject: [PATCH 3/9] feat: function to prepare parameters for the next step --- tests/jobs/addcol.py | 8 ++-- workflow/workflow_engine.py | 86 +++++++++++++++++++++++-------------- 2 files changed, 57 insertions(+), 37 deletions(-) diff --git a/tests/jobs/addcol.py b/tests/jobs/addcol.py index a60120b..33831a3 100644 --- a/tests/jobs/addcol.py +++ b/tests/jobs/addcol.py @@ -10,7 +10,7 @@ parser.add_argument("-v", "--value", required=True) args = parser.parse_args() -# with open(args.inputFile, "rt", encoding="utf8") as input_file: -# content = input_file.read() -# with open(args.outputFile, "wt", encoding="utf8") as output_file: -# output_file.write(content) +with open(args.inputFile, "rt", encoding="utf8") as input_file: + content = input_file.read() + with open(args.outputFile, "wt", encoding="utf8") as output_file: + output_file.write(content) diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index e5752e3..93790e1 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -353,47 +353,33 @@ def _validate_step_command( # # TBD - # print('job_collection', job_collection) - # print('job_job', job_job) - # print('job', job) - # print('step', step) - # print('running_workflow_variables', running_workflow_variables) - # print('all_variables', all_variables) - - # print('step inputs', step['inputs']) - # print('step inputs', step['outputs']) - - # # this is the structure i need to process - # # [{'input': 'inputFile', 'from': {'workflow-input': 'candidateMolecules'}}] - # # [{'output': 'outputFile', 'as': '__step1__out.smi'}] + wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step( + running_workflow_step_id=running_workflow_step_id, + ) - for item in step.get("inputs", []): - p_key = item["input"] - p_val = "" - if "from" in item.keys(): - val = item["from"] - if "workflow-input" in val.keys(): - p_val = val["workflow-input"] - # don't know what to do with else.. - all_variables[p_key] = p_val - - for item in step.get("outputs", []): - p_key = item["output"] - p_val = "" - if "as" in item.keys(): - p_val = item["as"] + wf_steps = wf_step_data.get("steps", []) + try: + previous_step = wf_steps[wf_step_data["caller_step_index"] - 1] + except IndexError: + previous_step = {} + + inputs = step.get("inputs", []) + outputs = step.get("outputs", []) + previous_step_outputs = previous_step.get("outputs", []) + step_vars = self._set_step_variables( + inputs=inputs, + outputs=outputs, + previous_step_outputs=previous_step_outputs, + ) - # don't know what to do with else.. - all_variables[p_key] = p_val + all_variables |= step_vars + print("all_variables", all_variables) self._wapi_adapter.set_running_workflow_step_variables( running_workflow_step_id=running_workflow_step_id, variables=all_variables, ) - # all_variables['inputFile'] = running_workflow_variables['candidateMolecules'] - # all_variables['outputFile'] = '__step1__out.smi' - message, success = decode( job["command"], all_variables, "command", TextEncoding.JINJA2_3_0 ) @@ -491,3 +477,37 @@ def _set_step_error( error_num=error, error_msg=error_msg, ) + + def _set_step_variables( + self, + inputs: list[dict[str, Any]], + outputs: list[dict[str, Any]], + previous_step_outputs: list[dict[str, Any]], + ) -> dict[str, Any]: + """Prepare input- and output variables for the following step. + + Inputs are defined in step definition but their values may + come from previous step outputs. + """ + result = {} + + for item in inputs: + p_key = item["input"] + p_val = "" + val = item["from"] + if "workflow-input" in val.keys(): + p_val = val["workflow-input"] + elif "step" in val.keys(): + for out in previous_step_outputs: + if out["output"] == val["output"]: + p_val = out["as"] + break + + result[p_key] = p_val + + for item in outputs: + p_key = item["output"] + p_val = item["as"] + result[p_key] = p_val + + return result From b5356cb8b50bfbe357a93fd1cec90241a7a6e088 Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Mon, 7 Apr 2025 12:16:02 +0100 Subject: [PATCH 4/9] feat: Test instance launcher now exposes execution directory --- tests/instance_launcher.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/instance_launcher.py b/tests/instance_launcher.py index edb14e6..39de2a5 100644 --- a/tests/instance_launcher.py +++ b/tests/instance_launcher.py @@ -31,16 +31,17 @@ from tests.wapi_adapter import UnitTestWorkflowAPIAdapter from workflow.workflow_abc import InstanceLauncher, LaunchParameters, LaunchResult +# Relative path to the execution (project) directory +EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID) + # Full path to the 'jobs' directory _JOB_PATH: str = os.path.join(os.path.dirname(__file__), "jobs") -# Relative path to the execution (project) directory -_EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID) def project_file_exists(file_name: str) -> bool: """A convenient test function to verify a file exists in the execution (project) directory.""" - return os.path.isfile(os.path.join(_EXECUTION_DIRECTORY, file_name)) + return os.path.isfile(os.path.join(EXECUTION_DIRECTORY, file_name)) class UnitTestInstanceLauncher(InstanceLauncher): @@ -57,9 +58,9 @@ def __init__( self._msg_dispatcher = msg_dispatcher # Every launcher starts with an empty execution directory... - print(f"Removing execution directory ({_EXECUTION_DIRECTORY})") - assert _EXECUTION_DIRECTORY.startswith("tests/project-root") - shutil.rmtree(_EXECUTION_DIRECTORY, ignore_errors=True) + print(f"Removing execution directory ({EXECUTION_DIRECTORY})") + assert EXECUTION_DIRECTORY.startswith("tests/project-root") + shutil.rmtree(EXECUTION_DIRECTORY, ignore_errors=True) def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert launch_parameters @@ -67,7 +68,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert launch_parameters.specification assert isinstance(launch_parameters.specification, dict) - os.makedirs(_EXECUTION_DIRECTORY, exist_ok=True) + os.makedirs(EXECUTION_DIRECTORY, exist_ok=True) # We're passed a RunningWorkflowStep ID but a record is expected to have been # created bt the caller, we simply create instance records. @@ -108,8 +109,9 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert os.path.isfile(module) subprocess_cmd: List[str] = ["python"] + command_list print(f"Subprocess command: {subprocess_cmd}") + print(f"Execution directory: {EXECUTION_DIRECTORY}") completed_process: CompletedProcess = subprocess.run( - subprocess_cmd, check=False, cwd=_EXECUTION_DIRECTORY + subprocess_cmd, check=False, cwd=EXECUTION_DIRECTORY ) # Simulate a PodMessage (that will contain the instance ID), From 393e3f9ca9f6e8541848da937ac2d2e2b3973b86 Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Mon, 7 Apr 2025 12:16:56 +0100 Subject: [PATCH 5/9] fix: Adjusted default output filenames --- tests/test_workflow_engine_examples.py | 67 +++++++++++++++++-- .../simple-python-molprops.yaml | 4 +- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index dff0dad..ea7a624 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -11,7 +11,11 @@ from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage from tests.config import TEST_PROJECT_ID -from tests.instance_launcher import UnitTestInstanceLauncher, project_file_exists +from tests.instance_launcher import ( + EXECUTION_DIRECTORY, + UnitTestInstanceLauncher, + project_file_exists, +) from tests.message_dispatcher import UnitTestMessageDispatcher from tests.message_queue import UnitTestMessageQueue from tests.wapi_adapter import UnitTestWorkflowAPIAdapter @@ -235,14 +239,65 @@ def test_workflow_engine_simple_python_molprops(basic_engine): da, md = basic_engine # Make sure files that should be generated by the test # do not exist before we run the test. - output_file_a = "a.sdf" - assert not project_file_exists(output_file_a) - output_file_b = "b.sdf" - assert not project_file_exists(output_file_b) + output_file_1 = "step1.out.smi" + assert not project_file_exists(output_file_1) + output_file_2 = "step2.out.smi" + assert not project_file_exists(output_file_2) + # And create the test's input file. + input_file_1 = "input1.smi" + input_file_1_content = """O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1 + RDKit 3D + + 18 19 0 0 0 0 0 0 0 0999 V2000 + 8.7102 -1.3539 24.2760 O 0 0 0 0 0 0 0 0 0 0 0 0 + 9.4334 -2.1203 23.6716 C 0 0 0 0 0 0 0 0 0 0 0 0 + 10.3260 -1.7920 22.4941 C 0 0 0 0 0 0 0 0 0 0 0 0 + 9.5607 -0.5667 21.3699 S 0 0 0 0 0 0 0 0 0 0 0 0 + 7.9641 -1.3976 21.0216 C 0 0 0 0 0 0 0 0 0 0 0 0 + 7.1007 -0.5241 20.1671 C 0 0 0 0 0 0 0 0 0 0 0 0 + 5.7930 -0.1276 20.3932 C 0 0 0 0 0 0 0 0 0 0 0 0 + 5.2841 0.6934 19.3422 C 0 0 0 0 0 0 0 0 0 0 0 0 + 6.2234 0.8796 18.3624 C 0 0 0 0 0 0 0 0 0 0 0 0 + 6.0491 1.8209 16.9402 Cl 0 0 0 0 0 0 0 0 0 0 0 0 + 7.6812 0.0795 18.6678 S 0 0 0 0 0 0 0 0 0 0 0 0 + 9.5928 -3.4405 24.2306 N 0 0 0 0 0 0 0 0 0 0 0 0 + 10.8197 -3.4856 25.0609 C 0 0 0 0 0 0 0 0 0 0 0 0 + 11.0016 -4.9279 25.4571 C 0 0 0 0 0 0 0 0 0 0 0 0 + 9.9315 -5.2800 26.4615 C 0 0 0 0 0 0 0 0 0 0 0 0 + 10.3887 -4.7677 27.7090 O 0 0 0 0 0 0 0 0 0 0 0 0 + 8.5793 -4.6419 26.1747 C 0 0 0 0 0 0 0 0 0 0 0 0 + 8.3826 -4.0949 24.7695 C 0 0 0 0 0 0 0 0 0 0 0 0 + 1 2 2 0 + 2 3 1 0 + 2 12 1 0 + 3 4 1 0 + 4 5 1 0 + 5 6 1 0 + 6 7 2 0 + 7 8 1 0 + 8 9 2 0 + 9 10 1 0 + 9 11 1 0 + 11 6 1 0 + 12 13 1 0 + 13 14 1 0 + 14 15 1 0 + 15 16 1 0 + 15 17 1 0 + 17 18 1 0 + 18 12 1 0 + M END + + $$$$ + """ + with open( + f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8" + ) as input_file: + input_file.writelines(input_file_1_content) # Act r_wfid = start_workflow( - md, da, "simple-python-molprops", {"candidateMolecules": "C"} + md, da, "simple-python-molprops", {"candidateMolecules": input_file_1} ) # Assert diff --git a/tests/workflow-definitions/simple-python-molprops.yaml b/tests/workflow-definitions/simple-python-molprops.yaml index 7f92176..049a010 100644 --- a/tests/workflow-definitions/simple-python-molprops.yaml +++ b/tests/workflow-definitions/simple-python-molprops.yaml @@ -33,7 +33,7 @@ steps: workflow-input: candidateMolecules outputs: - output: outputFile - as: __step1__out.smi + as: step1.out.smi - name: step2 description: Add column 2 @@ -54,4 +54,4 @@ steps: output: outputFile outputs: - output: outputFile - as: __step2__out.smi + as: step2.out.smi From bb07782e5a6e46b6b3b6e9a281dbb11d3ee1d73c Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Mon, 7 Apr 2025 12:28:03 +0100 Subject: [PATCH 6/9] fix: Better execution directory cleanup --- tests/instance_launcher.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/instance_launcher.py b/tests/instance_launcher.py index 39de2a5..7259898 100644 --- a/tests/instance_launcher.py +++ b/tests/instance_launcher.py @@ -60,7 +60,12 @@ def __init__( # Every launcher starts with an empty execution directory... print(f"Removing execution directory ({EXECUTION_DIRECTORY})") assert EXECUTION_DIRECTORY.startswith("tests/project-root") - shutil.rmtree(EXECUTION_DIRECTORY, ignore_errors=True) + for filename in os.listdir(EXECUTION_DIRECTORY): + file_path = os.path.join(EXECUTION_DIRECTORY, filename) + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert launch_parameters From cac36e16591dc2cd1233404f43c8ebb07ba4edca Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Mon, 7 Apr 2025 12:41:29 +0100 Subject: [PATCH 7/9] fix: Attempt to fix execution directory clearing --- tests/instance_launcher.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/instance_launcher.py b/tests/instance_launcher.py index 7259898..ab7cab2 100644 --- a/tests/instance_launcher.py +++ b/tests/instance_launcher.py @@ -60,12 +60,13 @@ def __init__( # Every launcher starts with an empty execution directory... print(f"Removing execution directory ({EXECUTION_DIRECTORY})") assert EXECUTION_DIRECTORY.startswith("tests/project-root") - for filename in os.listdir(EXECUTION_DIRECTORY): - file_path = os.path.join(EXECUTION_DIRECTORY, filename) - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - shutil.rmtree(file_path) + if os.path.isdir(EXECUTION_DIRECTORY): + for filename in os.listdir(EXECUTION_DIRECTORY): + file_path = os.path.join(EXECUTION_DIRECTORY, filename) + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) def launch(self, launch_parameters: LaunchParameters) -> LaunchResult: assert launch_parameters From b5327b94587da85dec8d3ba9bbf21437363d1786 Mon Sep 17 00:00:00 2001 From: Kalev Takkis Date: Mon, 7 Apr 2025 13:09:25 +0100 Subject: [PATCH 8/9] fix: pull input paramater values from correct place --- tests/test_workflow_engine_examples.py | 4 ++-- workflow/workflow_engine.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index ea7a624..fb7b7b2 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -311,5 +311,5 @@ def test_workflow_engine_simple_python_molprops(basic_engine): assert response["running_workflow_steps"][1]["done"] assert response["running_workflow_steps"][1]["success"] # This test should generate a file in the simulated project directory - assert project_file_exists(output_file_a) - assert project_file_exists(output_file_b) + assert project_file_exists(output_file_1) + assert project_file_exists(output_file_2) diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 93790e1..c966434 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -366,10 +366,12 @@ def _validate_step_command( inputs = step.get("inputs", []) outputs = step.get("outputs", []) previous_step_outputs = previous_step.get("outputs", []) + step_vars = self._set_step_variables( inputs=inputs, outputs=outputs, previous_step_outputs=previous_step_outputs, + workflow_variables=all_variables, ) all_variables |= step_vars @@ -483,6 +485,7 @@ def _set_step_variables( inputs: list[dict[str, Any]], outputs: list[dict[str, Any]], previous_step_outputs: list[dict[str, Any]], + workflow_variables: dict[str, Any], ) -> dict[str, Any]: """Prepare input- and output variables for the following step. @@ -496,7 +499,7 @@ def _set_step_variables( p_val = "" val = item["from"] if "workflow-input" in val.keys(): - p_val = val["workflow-input"] + p_val = workflow_variables[val["workflow-input"]] elif "step" in val.keys(): for out in previous_step_outputs: if out["output"] == val["output"]: From d7269ad5120a9de764251977a1bdb6b6a25980c1 Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Mon, 7 Apr 2025 13:13:17 +0100 Subject: [PATCH 9/9] test: Re-enabled skipped test --- tests/test_workflow_engine_examples.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index fb7b7b2..e2c6304 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -204,7 +204,6 @@ def test_workflow_engine_example_smiles_to_file(basic_engine): assert project_file_exists(output_file) -@pytest.mark.skip(reason="The engine does not currently create the required variables") def test_workflow_engine_shortcut_example_1(basic_engine): # Arrange da, md = basic_engine