From 3523e7bc65956def605438e36dffa307b792605d Mon Sep 17 00:00:00 2001 From: Kalev Takkis Date: Fri, 23 May 2025 09:39:04 +0100 Subject: [PATCH] feat: run multiple jobs in parallel --- tests/job-definitions/job-definitions.yaml | 8 ++ tests/jobs/addcol2.py | 18 ++++ tests/jobs/concatenate.py | 14 +++ tests/test_workflow_engine_examples.py | 49 ++++++++++ .../shortcut-example-1.yaml | 2 +- .../simple-python-parallel.yaml | 90 +++++++++++++++++++ workflow/decoder.py | 16 +++- workflow/workflow_engine.py | 11 ++- 8 files changed, 203 insertions(+), 5 deletions(-) create mode 100644 tests/jobs/addcol2.py create mode 100644 tests/jobs/concatenate.py create mode 100644 tests/workflow-definitions/simple-python-parallel.yaml diff --git a/tests/job-definitions/job-definitions.yaml b/tests/job-definitions/job-definitions.yaml index 85de046..66afcd3 100644 --- a/tests/job-definitions/job-definitions.yaml +++ b/tests/job-definitions/job-definitions.yaml @@ -124,3 +124,11 @@ jobs: cluster-butina: command: >- addcol.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }} + + append-col: + command: >- + addcol2.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }} + + concatenate: + command: >- + concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }} diff --git a/tests/jobs/addcol2.py b/tests/jobs/addcol2.py new file mode 100644 index 0000000..6ab0e97 --- /dev/null +++ b/tests/jobs/addcol2.py @@ -0,0 +1,18 @@ +import argparse + +parser = argparse.ArgumentParser( + prog="addcol", + description="Takes a SMILES string and writes it to a file", +) +parser.add_argument("-i", "--inputFile", required=True) +parser.add_argument("-o", "--outputFile", required=True) +parser.add_argument("-n", "--name", required=True) +parser.add_argument("-v", "--value", required=True) +args = parser.parse_args() + +with ( + open(args.inputFile, "rt", encoding="utf8") as ifile, + open(args.outputFile, "wt", encoding="utf8") as ofile, +): + for line in ifile.read().splitlines(): + ofile.write(f"{line}\t{args.value}\n") diff --git a/tests/jobs/concatenate.py b/tests/jobs/concatenate.py new file mode 100644 index 0000000..2f6b22b --- /dev/null +++ b/tests/jobs/concatenate.py @@ -0,0 +1,14 @@ +import argparse + +parser = argparse.ArgumentParser( + prog="addcol", + description="Takes a list of files and writes them into single outputfile", +) +parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r")) +parser.add_argument("-o", "--outputFile", required=True) +args = parser.parse_args() + + +with open(args.outputFile, "wt", encoding="utf8") as ofile: + for f in args.inputFile: + ofile.write(f.read()) diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index 980f446..51b1053 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -399,3 +399,52 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine): # This test should generate a file in the simulated project directory assert project_file_exists(output_file_1) assert project_file_exists(output_file_2) + + +def test_workflow_engine_simple_python_parallel(basic_engine): + # Arrange + da, md = basic_engine + # Make sure files that should be generated by the test + # do not exist before we run the test. + output_file_first = "first-step.out.smi" + assert not project_file_exists(output_file_first) + output_file_pa = "parallel-step-a.out.smi" + assert not project_file_exists(output_file_pa) + output_file_pb = "parallel-step-b.out.smi" + assert not project_file_exists(output_file_pb) + output_file_final = "final-step.out.smi" + assert not project_file_exists(output_file_final) + # And create the test's input file. + input_file_1 = "input1.smi" + input_file_1_content = "O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1" + with open( + f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8" + ) as input_file: + input_file.writelines(input_file_1_content) + + # Act + r_wfid = start_workflow( + md, + da, + "simple-python-parallel", + {"candidateMolecules": input_file_1}, + ) + + # Assert + wait_for_workflow(da, r_wfid) + # Additional, detailed checks... + # Check we only have one RunningWorkflowStep, and it succeeded + response = da.get_running_workflow_steps(running_workflow_id=r_wfid) + + assert response["count"] == 4 + assert response["running_workflow_steps"][0]["done"] + assert response["running_workflow_steps"][0]["success"] + assert response["running_workflow_steps"][1]["done"] + assert response["running_workflow_steps"][1]["success"] + assert response["running_workflow_steps"][2]["done"] + assert response["running_workflow_steps"][2]["success"] + assert response["running_workflow_steps"][3]["done"] + assert response["running_workflow_steps"][3]["success"] + # This test should generate a file in the simulated project directory + assert project_file_exists(output_file_first) + assert project_file_exists(output_file_final) diff --git a/tests/workflow-definitions/shortcut-example-1.yaml b/tests/workflow-definitions/shortcut-example-1.yaml index 494af9b..24a443a 100644 --- a/tests/workflow-definitions/shortcut-example-1.yaml +++ b/tests/workflow-definitions/shortcut-example-1.yaml @@ -25,7 +25,7 @@ steps: inputs: - input: 'inputFile' from: - step: step-1 + step: example-1-step-1 output: 'outputFile' outputs: - output: 'outputFile' diff --git a/tests/workflow-definitions/simple-python-parallel.yaml b/tests/workflow-definitions/simple-python-parallel.yaml new file mode 100644 index 0000000..68ee767 --- /dev/null +++ b/tests/workflow-definitions/simple-python-parallel.yaml @@ -0,0 +1,90 @@ +--- +kind: DataManagerWorkflow +kind-version: "2024.1" +name: python-workflow +description: A simple branching workflow +variables: + inputs: + - name: candidateMolecules + type: squonk/x-smiles + outputs: + - name: clusteredMolecules + from: + step: final-step + output: outputFile + as: clustered-molecules.smi + + +steps: + +- name: first-step + description: Create inputs + specification: + collection: workflow-engine-unit-test-jobs + job: rdkit-molprops + version: "1.0.0" + variables: + name: "unnecessary" + value: "0" + inputs: + - input: inputFile + from: + workflow-input: candidateMolecules + outputs: + - output: outputFile + as: first-step.out.smi + +- name: parallel-step-a + description: Add some params + specification: + collection: workflow-engine-unit-test-jobs + job: append-col + version: "1.0.0" + variables: + name: "desc1" + value: "777" + inputs: + - input: inputFile + from: + step: first-step + output: outputFile + outputs: + - output: outputFile + as: parallel-step-a.out.smi + +- name: parallel-step-b + description: Add some other params + specification: + collection: workflow-engine-unit-test-jobs + job: append-col + version: "1.0.0" + variables: + name: "desc2" + value: "999" + inputs: + - input: inputFile + from: + step: first-step + output: outputFile + outputs: + - output: outputFile + as: parallel-step-b.out.smi + +- name: final-step + description: Collate results + specification: + collection: workflow-engine-unit-test-jobs + job: concatenate + version: "1.0.0" + inputs: + - input: inputFile + from: + step: parallel-step-a + output: outputFile + - input: inputFile + from: + step: parallel-step-b + output: outputFile + outputs: + - output: outputFile + as: final-step.out.smi diff --git a/workflow/decoder.py b/workflow/decoder.py index 4267027..a96e603 100644 --- a/workflow/decoder.py +++ b/workflow/decoder.py @@ -170,13 +170,23 @@ def set_step_variables( val = item["from"] if "workflow-input" in val.keys(): p_val = workflow_variables[val["workflow-input"]] + result[p_key] = p_val elif "step" in val.keys(): for out in previous_step_outputs: if out["output"] == val["output"]: p_val = out["as"] - break - result[p_key] = p_val + # this bit handles multiple inputs: if a step + # requires input from multiple steps, add them to + # the list in result dict. this is the reason for + # mypy ignore statements, mypy doesn't understand + # redefinition + if p_key in result: + if not isinstance(result[p_key], set): + result[p_key] = {result[p_key]} # type: ignore [assignment] + result[p_key].add(p_val) # type: ignore [attr-defined] + else: + result[p_key] = p_val for item in outputs: p_key = item["output"] @@ -191,4 +201,6 @@ def set_step_variables( result |= options + print("final step vars", result) + return result diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 1772fd6..3be7a42 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -369,8 +369,15 @@ def _validate_step_command( ) if our_index > 0: - previous_step = wf_step_data["steps"][our_index - 1] - previous_step_outputs = previous_step.get("outputs", []) + # resolve all previous steps + previous_step_names = set() + for inp in inputs: + if step_name := inp["from"].get("step", None): + previous_step_names.add(step_name) + + for step in wf_step_data["steps"]: + if step["name"] in previous_step_names: + previous_step_outputs.extend(step.get("outputs", [])) _LOGGER.debug( "Index %s (%s) workflow_variables=%s",