Skip to content

Commit 37d4496

Browse files
Merge pull request #17 from InformaticsMatters/add-all-params-2
Initial logic to support simple linear job executions
2 parents cdba024 + d7269ad commit 37d4496

File tree

5 files changed

+153
-20
lines changed

5 files changed

+153
-20
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@ dist/
33
**/__pycache__/
44
**/*.pickle
55
tests/project-root/project-*/
6+
7+
# temp files
8+
*~
9+
\#*#
10+
\#*

tests/instance_launcher.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@
3131
from tests.wapi_adapter import UnitTestWorkflowAPIAdapter
3232
from workflow.workflow_abc import InstanceLauncher, LaunchParameters, LaunchResult
3333

34+
# Relative path to the execution (project) directory
35+
EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID)
36+
3437
# Full path to the 'jobs' directory
3538
_JOB_PATH: str = os.path.join(os.path.dirname(__file__), "jobs")
36-
# Relative path to the execution (project) directory
37-
_EXECUTION_DIRECTORY: str = os.path.join("tests", "project-root", TEST_PROJECT_ID)
3839

3940

4041
def project_file_exists(file_name: str) -> bool:
4142
"""A convenient test function to verify a file exists
4243
in the execution (project) directory."""
43-
return os.path.isfile(os.path.join(_EXECUTION_DIRECTORY, file_name))
44+
return os.path.isfile(os.path.join(EXECUTION_DIRECTORY, file_name))
4445

4546

4647
class UnitTestInstanceLauncher(InstanceLauncher):
@@ -57,17 +58,23 @@ def __init__(
5758
self._msg_dispatcher = msg_dispatcher
5859

5960
# Every launcher starts with an empty execution directory...
60-
print(f"Removing execution directory ({_EXECUTION_DIRECTORY})")
61-
assert _EXECUTION_DIRECTORY.startswith("tests/project-root")
62-
shutil.rmtree(_EXECUTION_DIRECTORY, ignore_errors=True)
61+
print(f"Removing execution directory ({EXECUTION_DIRECTORY})")
62+
assert EXECUTION_DIRECTORY.startswith("tests/project-root")
63+
if os.path.isdir(EXECUTION_DIRECTORY):
64+
for filename in os.listdir(EXECUTION_DIRECTORY):
65+
file_path = os.path.join(EXECUTION_DIRECTORY, filename)
66+
if os.path.isfile(file_path) or os.path.islink(file_path):
67+
os.unlink(file_path)
68+
elif os.path.isdir(file_path):
69+
shutil.rmtree(file_path)
6370

6471
def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
6572
assert launch_parameters
6673
assert launch_parameters.project_id == TEST_PROJECT_ID
6774
assert launch_parameters.specification
6875
assert isinstance(launch_parameters.specification, dict)
6976

70-
os.makedirs(_EXECUTION_DIRECTORY, exist_ok=True)
77+
os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)
7178

7279
# We're passed a RunningWorkflowStep ID but a record is expected to have been
7380
# created bt the caller, we simply create instance records.
@@ -108,8 +115,9 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
108115
assert os.path.isfile(module)
109116
subprocess_cmd: List[str] = ["python"] + command_list
110117
print(f"Subprocess command: {subprocess_cmd}")
118+
print(f"Execution directory: {EXECUTION_DIRECTORY}")
111119
completed_process: CompletedProcess = subprocess.run(
112-
subprocess_cmd, check=False, cwd=_EXECUTION_DIRECTORY
120+
subprocess_cmd, check=False, cwd=EXECUTION_DIRECTORY
113121
)
114122

115123
# Simulate a PodMessage (that will contain the instance ID),

tests/test_workflow_engine_examples.py

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage
1212

1313
from tests.config import TEST_PROJECT_ID
14-
from tests.instance_launcher import UnitTestInstanceLauncher, project_file_exists
14+
from tests.instance_launcher import (
15+
EXECUTION_DIRECTORY,
16+
UnitTestInstanceLauncher,
17+
project_file_exists,
18+
)
1519
from tests.message_dispatcher import UnitTestMessageDispatcher
1620
from tests.message_queue import UnitTestMessageQueue
1721
from tests.wapi_adapter import UnitTestWorkflowAPIAdapter
@@ -200,7 +204,6 @@ def test_workflow_engine_example_smiles_to_file(basic_engine):
200204
assert project_file_exists(output_file)
201205

202206

203-
@pytest.mark.skip(reason="The engine does not currently create the required variables")
204207
def test_workflow_engine_shortcut_example_1(basic_engine):
205208
# Arrange
206209
da, md = basic_engine
@@ -229,20 +232,71 @@ def test_workflow_engine_shortcut_example_1(basic_engine):
229232
assert project_file_exists(output_file_b)
230233

231234

232-
@pytest.mark.skip(reason="The engine does not currently create the required variables")
235+
# @pytest.mark.skip(reason="The engine does not currently create the required variables")
233236
def test_workflow_engine_simple_python_molprops(basic_engine):
234237
# Arrange
235238
da, md = basic_engine
236239
# Make sure files that should be generated by the test
237240
# do not exist before we run the test.
238-
output_file_a = "a.sdf"
239-
assert not project_file_exists(output_file_a)
240-
output_file_b = "b.sdf"
241-
assert not project_file_exists(output_file_b)
241+
output_file_1 = "step1.out.smi"
242+
assert not project_file_exists(output_file_1)
243+
output_file_2 = "step2.out.smi"
244+
assert not project_file_exists(output_file_2)
245+
# And create the test's input file.
246+
input_file_1 = "input1.smi"
247+
input_file_1_content = """O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1
248+
RDKit 3D
249+
250+
18 19 0 0 0 0 0 0 0 0999 V2000
251+
8.7102 -1.3539 24.2760 O 0 0 0 0 0 0 0 0 0 0 0 0
252+
9.4334 -2.1203 23.6716 C 0 0 0 0 0 0 0 0 0 0 0 0
253+
10.3260 -1.7920 22.4941 C 0 0 0 0 0 0 0 0 0 0 0 0
254+
9.5607 -0.5667 21.3699 S 0 0 0 0 0 0 0 0 0 0 0 0
255+
7.9641 -1.3976 21.0216 C 0 0 0 0 0 0 0 0 0 0 0 0
256+
7.1007 -0.5241 20.1671 C 0 0 0 0 0 0 0 0 0 0 0 0
257+
5.7930 -0.1276 20.3932 C 0 0 0 0 0 0 0 0 0 0 0 0
258+
5.2841 0.6934 19.3422 C 0 0 0 0 0 0 0 0 0 0 0 0
259+
6.2234 0.8796 18.3624 C 0 0 0 0 0 0 0 0 0 0 0 0
260+
6.0491 1.8209 16.9402 Cl 0 0 0 0 0 0 0 0 0 0 0 0
261+
7.6812 0.0795 18.6678 S 0 0 0 0 0 0 0 0 0 0 0 0
262+
9.5928 -3.4405 24.2306 N 0 0 0 0 0 0 0 0 0 0 0 0
263+
10.8197 -3.4856 25.0609 C 0 0 0 0 0 0 0 0 0 0 0 0
264+
11.0016 -4.9279 25.4571 C 0 0 0 0 0 0 0 0 0 0 0 0
265+
9.9315 -5.2800 26.4615 C 0 0 0 0 0 0 0 0 0 0 0 0
266+
10.3887 -4.7677 27.7090 O 0 0 0 0 0 0 0 0 0 0 0 0
267+
8.5793 -4.6419 26.1747 C 0 0 0 0 0 0 0 0 0 0 0 0
268+
8.3826 -4.0949 24.7695 C 0 0 0 0 0 0 0 0 0 0 0 0
269+
1 2 2 0
270+
2 3 1 0
271+
2 12 1 0
272+
3 4 1 0
273+
4 5 1 0
274+
5 6 1 0
275+
6 7 2 0
276+
7 8 1 0
277+
8 9 2 0
278+
9 10 1 0
279+
9 11 1 0
280+
11 6 1 0
281+
12 13 1 0
282+
13 14 1 0
283+
14 15 1 0
284+
15 16 1 0
285+
15 17 1 0
286+
17 18 1 0
287+
18 12 1 0
288+
M END
289+
290+
$$$$
291+
"""
292+
with open(
293+
f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8"
294+
) as input_file:
295+
input_file.writelines(input_file_1_content)
242296

243297
# Act
244298
r_wfid = start_workflow(
245-
md, da, "simple-python-molprops", {"candidateMolecules": "C"}
299+
md, da, "simple-python-molprops", {"candidateMolecules": input_file_1}
246300
)
247301

248302
# Assert
@@ -256,5 +310,5 @@ def test_workflow_engine_simple_python_molprops(basic_engine):
256310
assert response["running_workflow_steps"][1]["done"]
257311
assert response["running_workflow_steps"][1]["success"]
258312
# This test should generate a file in the simulated project directory
259-
assert project_file_exists(output_file_a)
260-
assert project_file_exists(output_file_b)
313+
assert project_file_exists(output_file_1)
314+
assert project_file_exists(output_file_2)

tests/workflow-definitions/simple-python-molprops.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ steps:
3333
workflow-input: candidateMolecules
3434
outputs:
3535
- output: outputFile
36-
as: __step1__out.smi
36+
as: step1.out.smi
3737

3838
- name: step2
3939
description: Add column 2
@@ -54,4 +54,4 @@ steps:
5454
output: outputFile
5555
outputs:
5656
- output: outputFile
57-
as: __step2__out.smi
57+
as: step2.out.smi

workflow/workflow_engine.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
284284
def _validate_step_command(
285285
self,
286286
*,
287+
running_workflow_step_id: str,
287288
step: dict[str, Any],
288289
running_workflow_variables: dict[str, Any] | None = None,
289290
) -> str | dict[str, Any]:
@@ -352,6 +353,35 @@ def _validate_step_command(
352353
#
353354
# TBD
354355

356+
wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step(
357+
running_workflow_step_id=running_workflow_step_id,
358+
)
359+
360+
wf_steps = wf_step_data.get("steps", [])
361+
try:
362+
previous_step = wf_steps[wf_step_data["caller_step_index"] - 1]
363+
except IndexError:
364+
previous_step = {}
365+
366+
inputs = step.get("inputs", [])
367+
outputs = step.get("outputs", [])
368+
previous_step_outputs = previous_step.get("outputs", [])
369+
370+
step_vars = self._set_step_variables(
371+
inputs=inputs,
372+
outputs=outputs,
373+
previous_step_outputs=previous_step_outputs,
374+
workflow_variables=all_variables,
375+
)
376+
377+
all_variables |= step_vars
378+
print("all_variables", all_variables)
379+
380+
self._wapi_adapter.set_running_workflow_step_variables(
381+
running_workflow_step_id=running_workflow_step_id,
382+
variables=all_variables,
383+
)
384+
355385
message, success = decode(
356386
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
357387
)
@@ -377,6 +407,7 @@ def _launch(
377407
# running workflow record)
378408
running_workflow_variables: dict[str, Any] | None = rwf.get("variables")
379409
error_or_variables: str | dict[str, Any] = self._validate_step_command(
410+
running_workflow_step_id=rwfs_id,
380411
step=step,
381412
running_workflow_variables=running_workflow_variables,
382413
)
@@ -448,3 +479,38 @@ def _set_step_error(
448479
error_num=error,
449480
error_msg=error_msg,
450481
)
482+
483+
def _set_step_variables(
484+
self,
485+
inputs: list[dict[str, Any]],
486+
outputs: list[dict[str, Any]],
487+
previous_step_outputs: list[dict[str, Any]],
488+
workflow_variables: dict[str, Any],
489+
) -> dict[str, Any]:
490+
"""Prepare input- and output variables for the following step.
491+
492+
Inputs are defined in step definition but their values may
493+
come from previous step outputs.
494+
"""
495+
result = {}
496+
497+
for item in inputs:
498+
p_key = item["input"]
499+
p_val = ""
500+
val = item["from"]
501+
if "workflow-input" in val.keys():
502+
p_val = workflow_variables[val["workflow-input"]]
503+
elif "step" in val.keys():
504+
for out in previous_step_outputs:
505+
if out["output"] == val["output"]:
506+
p_val = out["as"]
507+
break
508+
509+
result[p_key] = p_val
510+
511+
for item in outputs:
512+
p_key = item["output"]
513+
p_val = item["as"]
514+
result[p_key] = p_val
515+
516+
return result

0 commit comments

Comments
 (0)