Skip to content

Commit 0f220c0

Browse files
Merge pull request #22 from InformaticsMatters/parallel-jobs
feat: run multiple jobs in parallel
2 parents 6b16607 + 3523e7b commit 0f220c0

File tree

8 files changed

+203
-5
lines changed

8 files changed

+203
-5
lines changed

tests/job-definitions/job-definitions.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,11 @@ jobs:
124124
cluster-butina:
125125
command: >-
126126
addcol.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }}
127+
128+
append-col:
129+
command: >-
130+
addcol2.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }}
131+
132+
concatenate:
133+
command: >-
134+
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}

tests/jobs/addcol2.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import argparse
2+
3+
parser = argparse.ArgumentParser(
4+
prog="addcol",
5+
description="Takes a SMILES string and writes it to a file",
6+
)
7+
parser.add_argument("-i", "--inputFile", required=True)
8+
parser.add_argument("-o", "--outputFile", required=True)
9+
parser.add_argument("-n", "--name", required=True)
10+
parser.add_argument("-v", "--value", required=True)
11+
args = parser.parse_args()
12+
13+
with (
14+
open(args.inputFile, "rt", encoding="utf8") as ifile,
15+
open(args.outputFile, "wt", encoding="utf8") as ofile,
16+
):
17+
for line in ifile.read().splitlines():
18+
ofile.write(f"{line}\t{args.value}\n")

tests/jobs/concatenate.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import argparse
2+
3+
parser = argparse.ArgumentParser(
4+
prog="addcol",
5+
description="Takes a list of files and writes them into single outputfile",
6+
)
7+
parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r"))
8+
parser.add_argument("-o", "--outputFile", required=True)
9+
args = parser.parse_args()
10+
11+
12+
with open(args.outputFile, "wt", encoding="utf8") as ofile:
13+
for f in args.inputFile:
14+
ofile.write(f.read())

tests/test_workflow_engine_examples.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,3 +399,52 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
399399
# This test should generate a file in the simulated project directory
400400
assert project_file_exists(output_file_1)
401401
assert project_file_exists(output_file_2)
402+
403+
404+
def test_workflow_engine_simple_python_parallel(basic_engine):
405+
# Arrange
406+
da, md = basic_engine
407+
# Make sure files that should be generated by the test
408+
# do not exist before we run the test.
409+
output_file_first = "first-step.out.smi"
410+
assert not project_file_exists(output_file_first)
411+
output_file_pa = "parallel-step-a.out.smi"
412+
assert not project_file_exists(output_file_pa)
413+
output_file_pb = "parallel-step-b.out.smi"
414+
assert not project_file_exists(output_file_pb)
415+
output_file_final = "final-step.out.smi"
416+
assert not project_file_exists(output_file_final)
417+
# And create the test's input file.
418+
input_file_1 = "input1.smi"
419+
input_file_1_content = "O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1"
420+
with open(
421+
f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8"
422+
) as input_file:
423+
input_file.writelines(input_file_1_content)
424+
425+
# Act
426+
r_wfid = start_workflow(
427+
md,
428+
da,
429+
"simple-python-parallel",
430+
{"candidateMolecules": input_file_1},
431+
)
432+
433+
# Assert
434+
wait_for_workflow(da, r_wfid)
435+
# Additional, detailed checks...
436+
# Check we only have one RunningWorkflowStep, and it succeeded
437+
response = da.get_running_workflow_steps(running_workflow_id=r_wfid)
438+
439+
assert response["count"] == 4
440+
assert response["running_workflow_steps"][0]["done"]
441+
assert response["running_workflow_steps"][0]["success"]
442+
assert response["running_workflow_steps"][1]["done"]
443+
assert response["running_workflow_steps"][1]["success"]
444+
assert response["running_workflow_steps"][2]["done"]
445+
assert response["running_workflow_steps"][2]["success"]
446+
assert response["running_workflow_steps"][3]["done"]
447+
assert response["running_workflow_steps"][3]["success"]
448+
# This test should generate a file in the simulated project directory
449+
assert project_file_exists(output_file_first)
450+
assert project_file_exists(output_file_final)

tests/workflow-definitions/shortcut-example-1.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ steps:
2525
inputs:
2626
- input: 'inputFile'
2727
from:
28-
step: step-1
28+
step: example-1-step-1
2929
output: 'outputFile'
3030
outputs:
3131
- output: 'outputFile'
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
---
2+
kind: DataManagerWorkflow
3+
kind-version: "2024.1"
4+
name: python-workflow
5+
description: A simple branching workflow
6+
variables:
7+
inputs:
8+
- name: candidateMolecules
9+
type: squonk/x-smiles
10+
outputs:
11+
- name: clusteredMolecules
12+
from:
13+
step: final-step
14+
output: outputFile
15+
as: clustered-molecules.smi
16+
17+
18+
steps:
19+
20+
- name: first-step
21+
description: Create inputs
22+
specification:
23+
collection: workflow-engine-unit-test-jobs
24+
job: rdkit-molprops
25+
version: "1.0.0"
26+
variables:
27+
name: "unnecessary"
28+
value: "0"
29+
inputs:
30+
- input: inputFile
31+
from:
32+
workflow-input: candidateMolecules
33+
outputs:
34+
- output: outputFile
35+
as: first-step.out.smi
36+
37+
- name: parallel-step-a
38+
description: Add some params
39+
specification:
40+
collection: workflow-engine-unit-test-jobs
41+
job: append-col
42+
version: "1.0.0"
43+
variables:
44+
name: "desc1"
45+
value: "777"
46+
inputs:
47+
- input: inputFile
48+
from:
49+
step: first-step
50+
output: outputFile
51+
outputs:
52+
- output: outputFile
53+
as: parallel-step-a.out.smi
54+
55+
- name: parallel-step-b
56+
description: Add some other params
57+
specification:
58+
collection: workflow-engine-unit-test-jobs
59+
job: append-col
60+
version: "1.0.0"
61+
variables:
62+
name: "desc2"
63+
value: "999"
64+
inputs:
65+
- input: inputFile
66+
from:
67+
step: first-step
68+
output: outputFile
69+
outputs:
70+
- output: outputFile
71+
as: parallel-step-b.out.smi
72+
73+
- name: final-step
74+
description: Collate results
75+
specification:
76+
collection: workflow-engine-unit-test-jobs
77+
job: concatenate
78+
version: "1.0.0"
79+
inputs:
80+
- input: inputFile
81+
from:
82+
step: parallel-step-a
83+
output: outputFile
84+
- input: inputFile
85+
from:
86+
step: parallel-step-b
87+
output: outputFile
88+
outputs:
89+
- output: outputFile
90+
as: final-step.out.smi

workflow/decoder.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,23 @@ def set_step_variables(
170170
val = item["from"]
171171
if "workflow-input" in val.keys():
172172
p_val = workflow_variables[val["workflow-input"]]
173+
result[p_key] = p_val
173174
elif "step" in val.keys():
174175
for out in previous_step_outputs:
175176
if out["output"] == val["output"]:
176177
p_val = out["as"]
177-
break
178178

179-
result[p_key] = p_val
179+
# this bit handles multiple inputs: if a step
180+
# requires input from multiple steps, add them to
181+
# the list in result dict. this is the reason for
182+
# mypy ignore statements, mypy doesn't understand
183+
# redefinition
184+
if p_key in result:
185+
if not isinstance(result[p_key], set):
186+
result[p_key] = {result[p_key]} # type: ignore [assignment]
187+
result[p_key].add(p_val) # type: ignore [attr-defined]
188+
else:
189+
result[p_key] = p_val
180190

181191
for item in outputs:
182192
p_key = item["output"]
@@ -191,4 +201,6 @@ def set_step_variables(
191201

192202
result |= options
193203

204+
print("final step vars", result)
205+
194206
return result

workflow/workflow_engine.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,15 @@ def _validate_step_command(
369369
)
370370

371371
if our_index > 0:
372-
previous_step = wf_step_data["steps"][our_index - 1]
373-
previous_step_outputs = previous_step.get("outputs", [])
372+
# resolve all previous steps
373+
previous_step_names = set()
374+
for inp in inputs:
375+
if step_name := inp["from"].get("step", None):
376+
previous_step_names.add(step_name)
377+
378+
for step in wf_step_data["steps"]:
379+
if step["name"] in previous_step_names:
380+
previous_step_outputs.extend(step.get("outputs", []))
374381

375382
_LOGGER.debug(
376383
"Index %s (%s) workflow_variables=%s",

0 commit comments

Comments
 (0)