Skip to content

Commit 7770d7f

Browse files
author
Alan Christie
committed
feat: First successful replicating workflow test
1 parent 8f1c098 commit 7770d7f

File tree

7 files changed

+111
-118
lines changed

7 files changed

+111
-118
lines changed

tests/test_decoder.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,6 @@ def test_validate_schema_for_step_specification_variable_names():
142142
assert error is None
143143

144144

145-
@pytest.mark.skip(reason="We do not support combination atm")
146-
def test_validate_schema_for_simple_python_parallel():
147-
# Arrange
148-
149-
# Act
150-
error = decoder.validate_schema(_SIMPLE_PYTHON_PARALLEL_WORKFLOW)
151-
152-
# Assert
153-
assert error is None
154-
155-
156145
def test_get_workflow_variables_for_smiple_python_molprops():
157146
# Arrange
158147

tests/test_workflow_engine_examples.py

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -398,41 +398,22 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
398398
assert project_file_exists(output_file_2)
399399

400400

401-
@pytest.mark.skip(reason="WIP")
402401
def test_workflow_engine_simple_python_fanout(basic_engine):
403402
# Arrange
404403
md, da = basic_engine
405404

406405
da.mock_get_running_workflow_step_output_values_for_output(
407406
step_name="first-step",
408-
output_variable="outputFile",
407+
output_variable="outputBase",
409408
output=["chunk_1.smi", "chunk_2.smi"],
410409
)
411410

412-
# da.mock_get_running_workflow_step_output_values_for_output(
413-
# step_name="parallel-step",
414-
# output_variable="outputFile",
415-
# output=["chunk_1_proc.smi", "chunk_2_proc.smi"]
416-
# )
417-
418-
# da.mock_get_running_workflow_step_output_values_for_output(
419-
# step_name="final-step",
420-
# output_variable="outputFile",
421-
# output=["final-step.out.smi"],
422-
# )
423-
424411
# Make sure files that should be generated by the test
425412
# do not exist before we run the test.
426413
output_file_first = "chunk_1.smi"
427414
output_file_second = "chunk_2.smi"
428415
assert not project_file_exists(output_file_first)
429416
assert not project_file_exists(output_file_second)
430-
output_file_p_first = "chunk_1_proc.smi"
431-
output_file_p_second = "chunk_2_proc.smi"
432-
assert not project_file_exists(output_file_p_first)
433-
assert not project_file_exists(output_file_p_second)
434-
# output_file_final = "final-step.out.smi"
435-
# assert not project_file_exists(output_file_final)
436417
# And create the test's input file.
437418
input_file_1 = "input1.smi"
438419
input_file_1_content = """O=C(CSCc1ccc(Cl)s1)N1CCC(O)CC1
@@ -458,15 +439,10 @@ def test_workflow_engine_simple_python_fanout(basic_engine):
458439
print("response")
459440
pprint(response)
460441

461-
assert response["count"] == 2
442+
assert response["count"] == 3
462443
assert response["running_workflow_steps"][0]["done"]
463444
assert response["running_workflow_steps"][0]["success"]
464445
assert response["running_workflow_steps"][1]["done"]
465446
assert response["running_workflow_steps"][1]["success"]
466-
# assert response["running_workflow_steps"][2]["done"]
467-
# assert response["running_workflow_steps"][2]["success"]
468-
# assert response["running_workflow_steps"][3]["done"]
469-
# assert response["running_workflow_steps"][3]["success"]
470-
# This test should generate a file in the simulated project directory
471-
# assert project_file_exists(output_file_first)
472-
# assert project_file_exists(output_file_final)
447+
assert response["running_workflow_steps"][2]["done"]
448+
assert response["running_workflow_steps"][2]["success"]

tests/test_workflow_validator_for_run_level.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -217,28 +217,6 @@ def test_validate_simple_python_molprops_with_missing_input():
217217
]
218218

219219

220-
@pytest.mark.skip("Unsupported workflow")
221-
def test_validate_simple_python_parallel():
222-
# Arrange
223-
workflow_filename: str = os.path.join(
224-
os.path.dirname(__file__),
225-
"workflow-definitions",
226-
"simple-python-parallel.yaml",
227-
)
228-
with open(workflow_filename, "r", encoding="utf8") as workflow_file:
229-
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
230-
assert workflow
231-
232-
# Act
233-
error = WorkflowValidator.validate(
234-
level=ValidationLevel.TAG,
235-
workflow_definition=workflow,
236-
)
237-
238-
# Assert
239-
assert error.error_num == 0
240-
241-
242220
def test_validate_replicate_using_undeclared_input():
243221
# Arrange
244222
workflow_filename: str = os.path.join(

tests/test_workflow_validator_for_tag_level.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,28 +109,6 @@ def test_validate_shortcut_example_1():
109109
assert error.error_msg is None
110110

111111

112-
@pytest.mark.skip("Unsupported workflow")
113-
def test_validate_simple_python_parallel():
114-
# Arrange
115-
workflow_filename: str = os.path.join(
116-
os.path.dirname(__file__),
117-
"workflow-definitions",
118-
"simple-python-parallel.yaml",
119-
)
120-
with open(workflow_filename, "r", encoding="utf8") as workflow_file:
121-
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
122-
assert workflow
123-
124-
# Act
125-
error = WorkflowValidator.validate(
126-
level=ValidationLevel.TAG,
127-
workflow_definition=workflow,
128-
)
129-
130-
# Assert
131-
assert error.error_num == 0
132-
133-
134112
def test_validate_simple_python_molprops():
135113
# Arrange
136114
workflow_filename: str = os.path.join(

tests/workflow-definitions/simple-python-fanout.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ description: >-
77
88
steps:
99
- name: first-step
10-
description: Create inputs
10+
description: Split an input file
1111
specification:
1212
collection: workflow-engine-unit-test-jobs
1313
job: splitsmiles
1414
version: "1.0.0"
1515
variables:
1616
name: count
1717
value: "1"
18-
outputFile: results.smi
18+
outputBase: chunk
1919
variable-mapping:
2020
- variable: inputFile
2121
from-workflow:
@@ -38,6 +38,6 @@ steps:
3838
- variable: inputFile
3939
from-step:
4040
name: first-step
41-
variable: outputFile
41+
variable: outputBase
4242
out:
4343
- outputFile

workflow/decoder.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import os
77
from dataclasses import dataclass
8+
from enum import Enum
89
from typing import Any
910

1011
import jsonschema
@@ -32,6 +33,26 @@ class Translation:
3233
out: str
3334

3435

36+
class ReplicationOrigin(Enum):
37+
"""Oirgin of a replication variable."""
38+
39+
STEP_VARIABLE = 1
40+
WORKFLOW_VARIABLE = 2
41+
42+
43+
@dataclass
44+
class ReplicationDriver:
45+
"""A step's replication driver.
46+
The 'variable' is the variable for the step-to-be-executed
47+
whose value is 'driven' by the values of the 'source_variable'.
48+
The source variable is either from a step (or a workflow)."""
49+
50+
origin: ReplicationOrigin
51+
variable: str
52+
source_variable: str
53+
source_step_name: str | None = None
54+
55+
3556
def validate_schema(workflow: dict[str, Any]) -> str | None:
3657
"""Checks the Workflow Definition against the built-in schema.
3758
If there's an error the error text is returned, otherwise None.
@@ -154,11 +175,36 @@ def get_step_prior_step_variable_mapping(
154175
return variable_mapping
155176

156177

157-
def get_step_replicator(*, step: dict[str, Any]) -> str | Any:
158-
"""Return step's replication info"""
159-
replicator = step.get("replicate")
160-
if replicator:
178+
def get_step_replication_driver(*, step: dict[str, Any]) -> ReplicationDriver | None:
179+
"""If the step is expected to replicate we return its replication driver,
180+
which consists of a (prior) step name and an (output) variable name.
181+
Otherwise it returns nothing."""
182+
if replicator := step.get("replicate"):
183+
# We need the variable we replicate against,
184+
# and the step that owns the variable.
185+
#
161186
# 'using' is a dict but there can be only single value for now
162-
replicator = list(replicator["using"].values())[0]
187+
variable: str = replicator["using"]["variable"]
188+
source_variable: str | None = None
189+
# Is the variable from a prior step?
190+
step_name: str | None = None
191+
step_v_map = get_step_prior_step_variable_mapping(step=step)
192+
for step_name_candidate, mappings in step_v_map.items():
193+
for mapping in mappings:
194+
if mapping.out == variable:
195+
step_name = step_name_candidate
196+
source_variable = mapping.in_
197+
break
198+
if step_name:
199+
break
200+
assert step_name
201+
assert source_variable
202+
203+
return ReplicationDriver(
204+
origin=ReplicationOrigin.STEP_VARIABLE,
205+
variable=variable,
206+
source_step_name=step_name,
207+
source_variable=source_variable,
208+
)
163209

164-
return replicator
210+
return None

workflow/workflow_engine.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
)
4040

4141
from .decoder import (
42+
ReplicationDriver,
43+
ReplicationOrigin,
4244
Translation,
4345
get_step_prior_step_variable_mapping,
46+
get_step_replication_driver,
4447
get_step_workflow_variable_mapping,
4548
)
4649

@@ -340,8 +343,6 @@ def _validate_step_command(
340343
name=prior_step_name, running_workflow_id=running_workflow_id
341344
)
342345
# Copy "in" value to "out"...
343-
print(v_map)
344-
print(prior_step["variables"])
345346
for tr in v_map:
346347
assert tr.in_ in prior_step["variables"]
347348
all_variables[tr.out] = prior_step["variables"][tr.in_]
@@ -378,34 +379,63 @@ def _launch(self, *, rwf: dict[str, Any], step: dict[str, Any]) -> None:
378379
return
379380

380381
variables: dict[str, Any] = error_or_variables
381-
num_replicas: int = 0
382-
# Is this a replicating step?
383-
# The number of 'replicas' is zero if the step is only launched once
384-
# (i.e. there are no replicas).
385-
386-
# replicator = get_step_replicator(step=step)
387-
# if replicator:
388-
# single_step_variables = []
389-
# for replicating_param in variables[replicator]:
390-
# ssv = {**variables}
391-
# ssv[replicator] = replicating_param
392-
# single_step_variables.append(ssv)
393-
# else:
394-
# single_step_variables = [variables]
395-
396-
assert num_replicas >= 0
397-
step_replication_number: int = 1 if num_replicas else 0
398-
for _ in range(1 + num_replicas):
382+
383+
# A replication number,
384+
# use only for steps expected to replicate (even if just once)
385+
step_replication_number: int = 0
386+
# Does this step have a replicating driver?
387+
r_driver: ReplicationDriver | None = get_step_replication_driver(step=step)
388+
replication_values: list[str] = []
389+
if r_driver:
390+
if r_driver.origin == ReplicationOrigin.STEP_VARIABLE:
391+
# We need to get the variable values from a prior step
392+
# We need the prior steps running-workflow-step-id
393+
assert r_driver.source_step_name
394+
response, _ = self._wapi_adapter.get_running_workflow_step_by_name(
395+
name=r_driver.source_step_name,
396+
running_workflow_id=rwf_id,
397+
)
398+
assert "id" in response
399+
o_rwfs_id: str = response["id"]
400+
response, _ = (
401+
self._wapi_adapter.get_running_workflow_step_output_values_for_output(
402+
running_workflow_step_id=o_rwfs_id,
403+
output_variable=r_driver.source_variable,
404+
)
405+
)
406+
assert "output" in response
407+
replication_values = response["output"]
408+
else:
409+
assert False, "Unsupported origin"
410+
411+
num_step_instances: int = max(1, len(replication_values))
412+
for iteration in range(num_step_instances):
413+
414+
# If we are replicating this step then we must replace the step's variable
415+
# with a value expected for this iteration.
416+
if r_driver:
417+
iter_variable: str = r_driver.variable
418+
iter_value: str = replication_values[iteration]
419+
_LOGGER.info(
420+
"Replicating step: %s iteration=%s variable=%s value=%s",
421+
step_name,
422+
iteration,
423+
iter_variable,
424+
iter_value,
425+
)
426+
# Over-write the replicating variable
427+
# and set the replication numebr to a unique +ve non-zero value...
428+
variables[iter_variable] = iter_value
429+
step_replication_number = iteration + 1
399430

400431
_LOGGER.info(
401432
"Launching step: %s RunningWorkflow=%s (name=%s)"
402-
" variables=%s project=%s (step_replication_number=%s)",
433+
" variables=%s project=%s",
403434
step_name,
404435
rwf_id,
405436
rwf["name"],
406437
variables,
407438
project_id,
408-
step_replication_number,
409439
)
410440

411441
lp: LaunchParameters = LaunchParameters(
@@ -436,10 +466,6 @@ def _launch(self, *, rwf: dict[str, Any], step: dict[str, Any]) -> None:
436466
lr.command,
437467
)
438468

439-
# Do we need to increment the replication number?
440-
if num_replicas:
441-
step_replication_number += 1
442-
443469
def _set_step_error(
444470
self,
445471
step_name: str,

0 commit comments

Comments
 (0)