Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tests/job-definitions/job-definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,11 @@ jobs:
cluster-butina:
command: >-
addcol.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }}

append-col:
command: >-
addcol2.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }}

concatenate:
command: >-
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
18 changes: 18 additions & 0 deletions tests/jobs/addcol2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import argparse

parser = argparse.ArgumentParser(
prog="addcol",
description="Takes a SMILES string and writes it to a file",
)
parser.add_argument("-i", "--inputFile", required=True)
parser.add_argument("-o", "--outputFile", required=True)
parser.add_argument("-n", "--name", required=True)
parser.add_argument("-v", "--value", required=True)
args = parser.parse_args()

with (
open(args.inputFile, "rt", encoding="utf8") as ifile,
open(args.outputFile, "wt", encoding="utf8") as ofile,
):
for line in ifile.read().splitlines():
ofile.write(f"{line}\t{args.value}\n")
14 changes: 14 additions & 0 deletions tests/jobs/concatenate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import argparse

parser = argparse.ArgumentParser(
prog="addcol",
description="Takes a list of files and writes them into single outputfile",
)
parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r"))
parser.add_argument("-o", "--outputFile", required=True)
args = parser.parse_args()


with open(args.outputFile, "wt", encoding="utf8") as ofile:
for f in args.inputFile:
ofile.write(f.read())
49 changes: 49 additions & 0 deletions tests/test_workflow_engine_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,52 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
# This test should generate a file in the simulated project directory
assert project_file_exists(output_file_1)
assert project_file_exists(output_file_2)


def test_workflow_engine_simple_python_parallel(basic_engine):
# Arrange
da, md = basic_engine
# Make sure files that should be generated by the test
# do not exist before we run the test.
output_file_first = "first-step.out.smi"
assert not project_file_exists(output_file_first)
output_file_pa = "parallel-step-a.out.smi"
assert not project_file_exists(output_file_pa)
output_file_pb = "parallel-step-b.out.smi"
assert not project_file_exists(output_file_pb)
output_file_final = "final-step.out.smi"
assert not project_file_exists(output_file_final)
# And create the test's input file.
input_file_1 = "input1.smi"
input_file_1_content = "O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1"
with open(
f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8"
) as input_file:
input_file.writelines(input_file_1_content)

# Act
r_wfid = start_workflow(
md,
da,
"simple-python-parallel",
{"candidateMolecules": input_file_1},
)

# Assert
wait_for_workflow(da, r_wfid)
# Additional, detailed checks...
# Check we only have one RunningWorkflowStep, and it succeeded
response = da.get_running_workflow_steps(running_workflow_id=r_wfid)

assert response["count"] == 4
assert response["running_workflow_steps"][0]["done"]
assert response["running_workflow_steps"][0]["success"]
assert response["running_workflow_steps"][1]["done"]
assert response["running_workflow_steps"][1]["success"]
assert response["running_workflow_steps"][2]["done"]
assert response["running_workflow_steps"][2]["success"]
assert response["running_workflow_steps"][3]["done"]
assert response["running_workflow_steps"][3]["success"]
# This test should generate a file in the simulated project directory
assert project_file_exists(output_file_first)
assert project_file_exists(output_file_final)
2 changes: 1 addition & 1 deletion tests/workflow-definitions/shortcut-example-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ steps:
inputs:
- input: 'inputFile'
from:
step: step-1
step: example-1-step-1
output: 'outputFile'
outputs:
- output: 'outputFile'
Expand Down
90 changes: 90 additions & 0 deletions tests/workflow-definitions/simple-python-parallel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
---
kind: DataManagerWorkflow
kind-version: "2024.1"
name: python-workflow
description: A simple branching workflow
variables:
inputs:
- name: candidateMolecules
type: squonk/x-smiles
outputs:
- name: clusteredMolecules
from:
step: final-step
output: outputFile
as: clustered-molecules.smi


steps:

- name: first-step
description: Create inputs
specification:
collection: workflow-engine-unit-test-jobs
job: rdkit-molprops
version: "1.0.0"
variables:
name: "unnecessary"
value: "0"
inputs:
- input: inputFile
from:
workflow-input: candidateMolecules
outputs:
- output: outputFile
as: first-step.out.smi

- name: parallel-step-a
description: Add some params
specification:
collection: workflow-engine-unit-test-jobs
job: append-col
version: "1.0.0"
variables:
name: "desc1"
value: "777"
inputs:
- input: inputFile
from:
step: first-step
output: outputFile
outputs:
- output: outputFile
as: parallel-step-a.out.smi

- name: parallel-step-b
description: Add some other params
specification:
collection: workflow-engine-unit-test-jobs
job: append-col
version: "1.0.0"
variables:
name: "desc2"
value: "999"
inputs:
- input: inputFile
from:
step: first-step
output: outputFile
outputs:
- output: outputFile
as: parallel-step-b.out.smi

- name: final-step
description: Collate results
specification:
collection: workflow-engine-unit-test-jobs
job: concatenate
version: "1.0.0"
inputs:
- input: inputFile
from:
step: parallel-step-a
output: outputFile
- input: inputFile
from:
step: parallel-step-b
output: outputFile
outputs:
- output: outputFile
as: final-step.out.smi
16 changes: 14 additions & 2 deletions workflow/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,23 @@ def set_step_variables(
val = item["from"]
if "workflow-input" in val.keys():
p_val = workflow_variables[val["workflow-input"]]
result[p_key] = p_val
elif "step" in val.keys():
for out in previous_step_outputs:
if out["output"] == val["output"]:
p_val = out["as"]
break

result[p_key] = p_val
# this bit handles multiple inputs: if a step
# requires input from multiple steps, add them to
# the list in result dict. this is the reason for
# mypy ignore statements, mypy doesn't understand
# redefinition
if p_key in result:
if not isinstance(result[p_key], set):
result[p_key] = {result[p_key]} # type: ignore [assignment]
result[p_key].add(p_val) # type: ignore [attr-defined]
else:
result[p_key] = p_val

for item in outputs:
p_key = item["output"]
Expand All @@ -191,4 +201,6 @@ def set_step_variables(

result |= options

print("final step vars", result)

return result
11 changes: 9 additions & 2 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,15 @@ def _validate_step_command(
)

if our_index > 0:
previous_step = wf_step_data["steps"][our_index - 1]
previous_step_outputs = previous_step.get("outputs", [])
# resolve all previous steps
previous_step_names = set()
for inp in inputs:
if step_name := inp["from"].get("step", None):
previous_step_names.add(step_name)

for step in wf_step_data["steps"]:
if step["name"] in previous_step_names:
previous_step_outputs.extend(step.get("outputs", []))

_LOGGER.debug(
"Index %s (%s) workflow_variables=%s",
Expand Down