Skip to content

Commit cc5d8cc

Browse files
committed
fix: it's a mess
1 parent da12869 commit cc5d8cc

File tree

10 files changed

+344
-52
lines changed

10 files changed

+344
-52
lines changed

tests/job-definitions/job-definitions.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,7 @@ jobs:
132132
concatenate:
133133
command: >-
134134
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
135+
136+
splitsmiles:
137+
command: >-
138+
copyf.py {{ inputFile }}

tests/jobs/copyf.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import shutil
2+
import sys
3+
from pathlib import Path
4+
5+
6+
def main():
7+
print("copyf job runnint")
8+
if len(sys.argv) != 2:
9+
print("Usage: python copy_file.py <filename>")
10+
sys.exit(1)
11+
12+
original_path = Path(sys.argv[1])
13+
14+
if not original_path.exists() or not original_path.is_file():
15+
print(f"Error: '{original_path}' does not exist or is not a file.")
16+
sys.exit(1)
17+
18+
# Create a new filename like 'example_copy.txt'
19+
new_name = original_path.absolute().parent.joinpath("chunk_1.smi")
20+
new_path = original_path.with_name(new_name.name)
21+
shutil.copyfile(original_path, new_path)
22+
23+
new_name = original_path.absolute().parent.joinpath("chunk_2.smi")
24+
new_path = original_path.with_name(new_name.name)
25+
26+
shutil.copyfile(original_path, new_path)
27+
28+
29+
if __name__ == "__main__":
30+
main()

tests/jobs/copyf.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#! /bin/bash
2+
3+
cp "$1" chunk_1.smi
4+
cp "$1" chunk_2.smi

tests/jobs/split-smi.sh

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
if [[ $# -lt 3 || $# -gt 4 ]]; then
5+
echo "Usage: $0 <input_file(.smi or .smi.gz)> <lines_per_file> <output_basename> [has_header: yes]"
6+
exit 1
7+
fi
8+
9+
input_file="$1"
10+
lines_per_file="$2"
11+
base_name="$3"
12+
has_header="${4:-no}"
13+
14+
# Determine how to read the file (plain text or gzipped)
15+
if [[ "$input_file" == *.gz ]]; then
16+
reader="zcat"
17+
else
18+
reader="cat"
19+
fi
20+
21+
if ! [[ -f "$input_file" ]]; then
22+
echo "Error: File '$input_file' not found"
23+
exit 1
24+
fi
25+
26+
# Extract header if present
27+
if [[ "$has_header" == "yes" ]]; then
28+
header="$($reader "$input_file" | head -n1)"
29+
data_start=2
30+
else
31+
header=""
32+
data_start=1
33+
fi
34+
35+
# Count number of data lines (excluding header if present)
36+
data_lines="$($reader "$input_file" | tail -n +"$data_start" | wc -l)"
37+
if [[ "$data_lines" -eq 0 ]]; then
38+
echo "No data lines to process."
39+
exit 0
40+
fi
41+
42+
# Calculate number of output files and required zero padding
43+
num_files=$(( (data_lines + lines_per_file - 1) / lines_per_file ))
44+
pad_width=0
45+
if [[ "$num_files" -gt 1 ]]; then
46+
pad_width=${#num_files}
47+
fi
48+
49+
# Split logic
50+
$reader "$input_file" | tail -n +"$data_start" | awk -v header="$header" -v lines="$lines_per_file" -v base="$base_name" -v pad="$pad_width" '
51+
function new_file() {
52+
suffix = (pad > 0) ? sprintf("%0*d", pad, file_index) : file_index
53+
file = base "_" suffix ".smi"
54+
if (header != "") {
55+
print header > file
56+
}
57+
file_index++
58+
line_count = 0
59+
}
60+
{
61+
if (line_count == 0) {
62+
new_file()
63+
}
64+
print >> file
65+
line_count++
66+
if (line_count == lines) {
67+
close(file)
68+
print file " created"
69+
line_count = 0
70+
}
71+
}
72+
' file_index=1

tests/test_workflow_engine_examples.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def start_workflow(
8383
variables=variables,
8484
level=ValidationLevel.RUN,
8585
)
86+
print("vr_result", vr_result)
8687
assert vr_result.error_num == 0
8788
# 3.
8889
response = da.create_running_workflow(
@@ -401,22 +402,44 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
401402
assert project_file_exists(output_file_2)
402403

403404

404-
def test_workflow_engine_simple_python_parallel(basic_engine):
405+
def test_workflow_engine_simple_python_fanout(basic_engine):
405406
# Arrange
406407
md, da = basic_engine
408+
409+
da.mock_get_running_workflow_step_output_values_for_output(
410+
step_name="first-step",
411+
output_variable="outputFile",
412+
output=["chunk_1.smi", "chunk_2.smi"],
413+
)
414+
415+
# da.mock_get_running_workflow_step_output_values_for_output(
416+
# step_name="parallel-step",
417+
# output_variable="outputFile",
418+
# output=["chunk_1_proc.smi", "chunk_2_proc.smi"]
419+
# )
420+
421+
# da.mock_get_running_workflow_step_output_values_for_output(
422+
# step_name="final-step",
423+
# output_variable="outputFile",
424+
# output=["final-step.out.smi"],
425+
# )
426+
407427
# Make sure files that should be generated by the test
408428
# do not exist before we run the test.
409-
output_file_first = "first-step.out.smi"
429+
output_file_first = "chunk_1.smi"
430+
output_file_second = "chunk_2.smi"
410431
assert not project_file_exists(output_file_first)
411-
output_file_pa = "parallel-step-a.out.smi"
412-
assert not project_file_exists(output_file_pa)
413-
output_file_pb = "parallel-step-b.out.smi"
414-
assert not project_file_exists(output_file_pb)
415-
output_file_final = "final-step.out.smi"
416-
assert not project_file_exists(output_file_final)
432+
assert not project_file_exists(output_file_second)
433+
output_file_p_first = "chunk_1_proc.smi"
434+
output_file_p_second = "chunk_2_proc.smi"
435+
assert not project_file_exists(output_file_p_first)
436+
assert not project_file_exists(output_file_p_second)
437+
# output_file_final = "final-step.out.smi"
438+
# assert not project_file_exists(output_file_final)
417439
# And create the test's input file.
418440
input_file_1 = "input1.smi"
419-
input_file_1_content = "O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1"
441+
input_file_1_content = """O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1
442+
COCN1C(=O)NC(C)(C)C1=O"""
420443
with open(
421444
f"{EXECUTION_DIRECTORY}/{input_file_1}", mode="wt", encoding="utf8"
422445
) as input_file:
@@ -426,7 +449,7 @@ def test_workflow_engine_simple_python_parallel(basic_engine):
426449
r_wfid = start_workflow(
427450
md,
428451
da,
429-
"simple-python-parallel",
452+
"simple-python-fanout",
430453
{"candidateMolecules": input_file_1},
431454
)
432455

@@ -435,16 +458,17 @@ def test_workflow_engine_simple_python_parallel(basic_engine):
435458
# Additional, detailed checks...
436459
# Check we only have one RunningWorkflowStep, and it succeeded
437460
response = da.get_running_workflow_steps(running_workflow_id=r_wfid)
461+
print("response", response)
438462

439-
assert response["count"] == 4
463+
assert response["count"] == 2
440464
assert response["running_workflow_steps"][0]["done"]
441465
assert response["running_workflow_steps"][0]["success"]
442466
assert response["running_workflow_steps"][1]["done"]
443467
assert response["running_workflow_steps"][1]["success"]
444-
assert response["running_workflow_steps"][2]["done"]
445-
assert response["running_workflow_steps"][2]["success"]
446-
assert response["running_workflow_steps"][3]["done"]
447-
assert response["running_workflow_steps"][3]["success"]
468+
# assert response["running_workflow_steps"][2]["done"]
469+
# assert response["running_workflow_steps"][2]["success"]
470+
# assert response["running_workflow_steps"][3]["done"]
471+
# assert response["running_workflow_steps"][3]["success"]
448472
# This test should generate a file in the simulated project directory
449-
assert project_file_exists(output_file_first)
450-
assert project_file_exists(output_file_final)
473+
# assert project_file_exists(output_file_first)
474+
# assert project_file_exists(output_file_final)

tests/wapi_adapter.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def get_running_workflow_step_by_name(
202202
for rwfs_id, record in running_workflow_step.items():
203203
if record["running_workflow"]["id"] != running_workflow_id:
204204
continue
205+
print("running wf step by name, record:", record)
205206
if record["name"] == name and record["replica"] == replica:
206207
response = record
207208
response["id"] = rwfs_id
@@ -413,6 +414,11 @@ def get_running_workflow_step_output_values_for_output(
413414
mock_output = Unpickler(pickle_file).load()
414415
UnitTestWorkflowAPIAdapter.lock.release()
415416

417+
print("mock output", mock_output)
418+
print("step", step)
419+
print("step_name", step_name)
420+
# mock output {'first-step': {'output_variable': 'results', 'output': ['chunk_1.smi', 'chunk_2.smi']}}
421+
416422
if step_name not in mock_output:
417423
return {"output": []}, 0
418424
# The record's output variable must match (there's only one record per step atm)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
---
2+
kind: DataManagerWorkflow
3+
kind-version: "2025.2"
4+
name: python-workflow
5+
description: >-
6+
A simple parallel workflow. Input is split into N chunks and N processes of the same job is started
7+
variable-mapping:
8+
inputs:
9+
- name: candidateMolecules
10+
outputs:
11+
- name: clusteredMolecules
12+
from:
13+
step: final-step
14+
output: outputFile
15+
16+
17+
steps:
18+
19+
- name: first-step
20+
description: Create inputs
21+
specification:
22+
collection: workflow-engine-unit-test-jobs
23+
job: splitsmiles
24+
version: "1.0.0"
25+
variables:
26+
name: "count"
27+
value: "1"
28+
inputs:
29+
- input: inputFile
30+
from:
31+
workflow-input: candidateMolecules
32+
outputs:
33+
- output: outputFile
34+
# as: chunk_*.smi
35+
36+
- name: parallel-step
37+
description: Add some params
38+
specification:
39+
collection: workflow-engine-unit-test-jobs
40+
job: append-col
41+
version: "1.0.0"
42+
variables:
43+
name: "desc1"
44+
value: "777"
45+
replicate:
46+
using:
47+
input: inputFile
48+
inputs:
49+
- input: inputFile
50+
from:
51+
step: first-step
52+
output: outputFile
53+
outputs:
54+
- output: outputFile
55+
# as: parallel-step.out.smi
56+
57+
# - name: final-step
58+
# description: Collate results
59+
# specification:
60+
# collection: workflow-engine-unit-test-jobs
61+
# job: concatenate
62+
# version: "1.0.0"
63+
# inputs:
64+
# - input: inputFile
65+
# from:
66+
# step: parallel-step
67+
# output: outputFile
68+
# outputs:
69+
# - output: outputFile
70+
# # as: final-step.out.smi

workflow/decoder.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def set_step_variables(
213213
workflow: dict[str, Any],
214214
inputs: list[dict[str, Any]],
215215
outputs: list[dict[str, Any]],
216+
step_outputs: dict[str, Any],
216217
previous_step_outputs: list[dict[str, Any]],
217218
workflow_variables: dict[str, Any],
218219
step_name: str,
@@ -224,6 +225,13 @@ def set_step_variables(
224225
"""
225226
result = {}
226227

228+
print("ssv: wf vars", workflow_variables)
229+
print("ssv: inputs", inputs)
230+
print("ssv: outputs", outputs)
231+
print("ssv: step_outputs", step_outputs)
232+
print("ssv: prev step outputs", previous_step_outputs)
233+
print("ssv: step_name", step_name)
234+
227235
for item in inputs:
228236
p_key = item["input"]
229237
p_val = ""
@@ -234,7 +242,16 @@ def set_step_variables(
234242
elif "step" in val.keys():
235243
for out in previous_step_outputs:
236244
if out["output"] == val["output"]:
237-
p_val = out["as"]
245+
# p_val = out["as"]
246+
if step_outputs["output"]:
247+
p_val = step_outputs["output"]
248+
print("\n!!!!!!!!!!!!!if clause!!!!!!!!!!!!!!!!!!!!!\n")
249+
print(p_val)
250+
else:
251+
# what do I need to do here??
252+
print("\n!!!!!!!!!!!!!else clause!!!!!!!!!!!!!!!!!!!!!\n")
253+
print(out)
254+
print(val)
238255

239256
# this bit handles multiple inputs: if a step
240257
# requires input from multiple steps, add them to
@@ -250,7 +267,9 @@ def set_step_variables(
250267

251268
for item in outputs:
252269
p_key = item["output"]
253-
p_val = item["as"]
270+
# p_val = item["as"]
271+
# p_val = step_outputs["output"]
272+
p_val = "somefile.smi"
254273
result[p_key] = p_val
255274

256275
options = set_variables_from_options_for_step(

workflow/workflow-schema.yaml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,17 @@ definitions:
206206
- from
207207

208208
# A Step output (with an 'as' - a declared value)
209-
step-output-as:
210-
type: object
211-
additionalProperties: false
212-
properties:
213-
output:
214-
$ref: '#/definitions/template-variable-name'
215-
as:
216-
$ref: '#/definitions/file-name'
217-
required:
218-
- output
219-
- as
209+
# step-output-as:
210+
# type: object
211+
# additionalProperties: false
212+
# properties:
213+
# output:
214+
# $ref: '#/definitions/template-variable-name'
215+
# as:
216+
# $ref: '#/definitions/file-name'
217+
# required:
218+
# - output
219+
# - as
220220

221221

222222
# A step specification variable
@@ -282,9 +282,9 @@ definitions:
282282
- $ref: "#/definitions/step-input-from-workflow"
283283
outputs:
284284
type: array
285-
items:
286-
anyOf:
287-
- $ref: "#/definitions/step-output-as"
285+
# items:
286+
# anyOf:
287+
# - $ref: "#/definitions/step-output-as"
288288
required:
289289
- name
290290
- specification

0 commit comments

Comments
 (0)