Skip to content
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ dist/
**/__pycache__/
**/*.pickle
tests/project-root/project-*/

# temp files
*~
\#*#
\#*
24 changes: 16 additions & 8 deletions tests/instance_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@
from tests.wapi_adapter import UnitTestWorkflowAPIAdapter
from workflow.workflow_abc import InstanceLauncher, LaunchParameters, LaunchResult

# Relative path to the execution (project) directory
EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID)

# Full path to the 'jobs' directory
_JOB_PATH: str = os.path.join(os.path.dirname(__file__), "jobs")
# Relative path to the execution (project) directory
_EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID)


def project_file_exists(file_name: str) -> bool:
"""A convenient test function to verify a file exists
in the execution (project) directory."""
return os.path.isfile(os.path.join(_EXECUTION_DIRECTORY, file_name))
return os.path.isfile(os.path.join(EXECUTION_DIRECTORY, file_name))


class UnitTestInstanceLauncher(InstanceLauncher):
Expand All @@ -57,17 +58,23 @@ def __init__(
self._msg_dispatcher = msg_dispatcher

# Every launcher starts with an empty execution directory...
print(f"Removing execution directory ({_EXECUTION_DIRECTORY})")
assert _EXECUTION_DIRECTORY.startswith("tests/project-root")
shutil.rmtree(_EXECUTION_DIRECTORY, ignore_errors=True)
print(f"Removing execution directory ({EXECUTION_DIRECTORY})")
assert EXECUTION_DIRECTORY.startswith("tests/project-root")
if os.path.isdir(EXECUTION_DIRECTORY):
for filename in os.listdir(EXECUTION_DIRECTORY):
file_path = os.path.join(EXECUTION_DIRECTORY, filename)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)

def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
assert launch_parameters
assert launch_parameters.project_id == TEST_PROJECT_ID
assert launch_parameters.specification
assert isinstance(launch_parameters.specification, dict)

os.makedirs(_EXECUTION_DIRECTORY, exist_ok=True)
os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)

# We're passed a RunningWorkflowStep ID but a record is expected to have been
# created bt the caller, we simply create instance records.
Expand Down Expand Up @@ -108,8 +115,9 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
assert os.path.isfile(module)
subprocess_cmd: List[str] = ["python"] + command_list
print(f"Subprocess command: {subprocess_cmd}")
print(f"Execution directory: {EXECUTION_DIRECTORY}")
completed_process: CompletedProcess = subprocess.run(
subprocess_cmd, check=False, cwd=_EXECUTION_DIRECTORY
subprocess_cmd, check=False, cwd=EXECUTION_DIRECTORY
)

# Simulate a PodMessage (that will contain the instance ID),
Expand Down
74 changes: 64 additions & 10 deletions tests/test_workflow_engine_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage

from tests.config import TEST_PROJECT_ID
from tests.instance_launcher import UnitTestInstanceLauncher, project_file_exists
from tests.instance_launcher import (
EXECUTION_DIRECTORY,
UnitTestInstanceLauncher,
project_file_exists,
)
from tests.message_dispatcher import UnitTestMessageDispatcher
from tests.message_queue import UnitTestMessageQueue
from tests.wapi_adapter import UnitTestWorkflowAPIAdapter
Expand Down Expand Up @@ -200,7 +204,6 @@ def test_workflow_engine_example_smiles_to_file(basic_engine):
assert project_file_exists(output_file)


@pytest.mark.skip(reason="The engine does not currently create the required variables")
def test_workflow_engine_shortcut_example_1(basic_engine):
# Arrange
da, md = basic_engine
Expand Down Expand Up @@ -229,20 +232,71 @@ def test_workflow_engine_shortcut_example_1(basic_engine):
assert project_file_exists(output_file_b)


@pytest.mark.skip(reason="The engine does not currently create the required variables")
# @pytest.mark.skip(reason="The engine does not currently create the required variables")
def test_workflow_engine_simple_python_molprops(basic_engine):
# Arrange
da, md = basic_engine
# Make sure files that should be generated by the test
# do not exist before we run the test.
output_file_a = "a.sdf"
assert not project_file_exists(output_file_a)
output_file_b = "b.sdf"
assert not project_file_exists(output_file_b)
output_file_1 = "step1.out.smi"
assert not project_file_exists(output_file_1)
output_file_2 = "step2.out.smi"
assert not project_file_exists(output_file_2)
# And create the test's input file.
input_file_1 = "input1.smi"
input_file_1_content = """O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1
RDKit 3D

18 19 0 0 0 0 0 0 0 0999 V2000
8.7102 -1.3539 24.2760 O 0 0 0 0 0 0 0 0 0 0 0 0
9.4334 -2.1203 23.6716 C 0 0 0 0 0 0 0 0 0 0 0 0
10.3260 -1.7920 22.4941 C 0 0 0 0 0 0 0 0 0 0 0 0
9.5607 -0.5667 21.3699 S 0 0 0 0 0 0 0 0 0 0 0 0
7.9641 -1.3976 21.0216 C 0 0 0 0 0 0 0 0 0 0 0 0
7.1007 -0.5241 20.1671 C 0 0 0 0 0 0 0 0 0 0 0 0
5.7930 -0.1276 20.3932 C 0 0 0 0 0 0 0 0 0 0 0 0
5.2841 0.6934 19.3422 C 0 0 0 0 0 0 0 0 0 0 0 0
6.2234 0.8796 18.3624 C 0 0 0 0 0 0 0 0 0 0 0 0
6.0491 1.8209 16.9402 Cl 0 0 0 0 0 0 0 0 0 0 0 0
7.6812 0.0795 18.6678 S 0 0 0 0 0 0 0 0 0 0 0 0
9.5928 -3.4405 24.2306 N 0 0 0 0 0 0 0 0 0 0 0 0
10.8197 -3.4856 25.0609 C 0 0 0 0 0 0 0 0 0 0 0 0
11.0016 -4.9279 25.4571 C 0 0 0 0 0 0 0 0 0 0 0 0
9.9315 -5.2800 26.4615 C 0 0 0 0 0 0 0 0 0 0 0 0
10.3887 -4.7677 27.7090 O 0 0 0 0 0 0 0 0 0 0 0 0
8.5793 -4.6419 26.1747 C 0 0 0 0 0 0 0 0 0 0 0 0
8.3826 -4.0949 24.7695 C 0 0 0 0 0 0 0 0 0 0 0 0
1 2 2 0
2 3 1 0
2 12 1 0
3 4 1 0
4 5 1 0
5 6 1 0
6 7 2 0
7 8 1 0
8 9 2 0
9 10 1 0
9 11 1 0
11 6 1 0
12 13 1 0
13 14 1 0
14 15 1 0
15 16 1 0
15 17 1 0
17 18 1 0
18 12 1 0
M END

$$$$
"""
with open(
f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8"
) as input_file:
input_file.writelines(input_file_1_content)

# Act
r_wfid = start_workflow(
md, da, "simple-python-molprops", {"candidateMolecules": "C"}
md, da, "simple-python-molprops", {"candidateMolecules": input_file_1}
)

# Assert
Expand All @@ -256,5 +310,5 @@ def test_workflow_engine_simple_python_molprops(basic_engine):
assert response["running_workflow_steps"][1]["done"]
assert response["running_workflow_steps"][1]["success"]
# This test should generate a file in the simulated project directory
assert project_file_exists(output_file_a)
assert project_file_exists(output_file_b)
assert project_file_exists(output_file_1)
assert project_file_exists(output_file_2)
4 changes: 2 additions & 2 deletions tests/workflow-definitions/simple-python-molprops.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ steps:
workflow-input: candidateMolecules
outputs:
- output: outputFile
as: __step1__out.smi
as: step1.out.smi

- name: step2
description: Add column 2
Expand All @@ -54,4 +54,4 @@ steps:
output: outputFile
outputs:
- output: outputFile
as: __step2__out.smi
as: step2.out.smi
66 changes: 66 additions & 0 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
def _validate_step_command(
self,
*,
running_workflow_step_id: str,
step: dict[str, Any],
running_workflow_variables: dict[str, Any] | None = None,
) -> str | dict[str, Any]:
Expand Down Expand Up @@ -352,6 +353,35 @@ def _validate_step_command(
#
# TBD

wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step(
running_workflow_step_id=running_workflow_step_id,
)

wf_steps = wf_step_data.get("steps", [])
try:
previous_step = wf_steps[wf_step_data["caller_step_index"] - 1]
except IndexError:
previous_step = {}

inputs = step.get("inputs", [])
outputs = step.get("outputs", [])
previous_step_outputs = previous_step.get("outputs", [])

step_vars = self._set_step_variables(
inputs=inputs,
outputs=outputs,
previous_step_outputs=previous_step_outputs,
workflow_variables=all_variables,
)

all_variables |= step_vars
print("all_variables", all_variables)

self._wapi_adapter.set_running_workflow_step_variables(
running_workflow_step_id=running_workflow_step_id,
variables=all_variables,
)

message, success = decode(
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
)
Expand All @@ -377,6 +407,7 @@ def _launch(
# running workflow record)
running_workflow_variables: dict[str, Any] | None = rwf.get("variables")
error_or_variables: str | dict[str, Any] = self._validate_step_command(
running_workflow_step_id=rwfs_id,
step=step,
running_workflow_variables=running_workflow_variables,
)
Expand Down Expand Up @@ -448,3 +479,38 @@ def _set_step_error(
error_num=error,
error_msg=error_msg,
)

def _set_step_variables(
self,
inputs: list[dict[str, Any]],
outputs: list[dict[str, Any]],
previous_step_outputs: list[dict[str, Any]],
workflow_variables: dict[str, Any],
) -> dict[str, Any]:
"""Prepare input- and output variables for the following step.

Inputs are defined in step definition but their values may
come from previous step outputs.
"""
result = {}

for item in inputs:
p_key = item["input"]
p_val = ""
val = item["from"]
if "workflow-input" in val.keys():
p_val = workflow_variables[val["workflow-input"]]
elif "step" in val.keys():
for out in previous_step_outputs:
if out["output"] == val["output"]:
p_val = out["as"]
break

result[p_key] = p_val

for item in outputs:
p_key = item["output"]
p_val = item["as"]
result[p_key] = p_val

return result