Skip to content

Commit 3883412

Browse files
author
Alan Christie
committed
refactor: Switch away from workflow replicate property
1 parent c53b245 commit 3883412

10 files changed

+13
-197
lines changed

poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ packages = [
1414
[tool.poetry.dependencies]
1515
python = "^3.12"
1616
im-protobuf = "^8.2.0"
17-
im-data-manager-job-decoder = "^2.4.0"
17+
im-data-manager-job-decoder = "^2.5.0"
1818
jsonschema = "^4.21.1"
1919
pyyaml = ">= 5.3.1, < 7.0"
2020

tests/test_workflow_engine_examples.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
398398
assert project_file_exists(output_file_2)
399399

400400

401+
@pytest.mark.skip(reason="WIP")
401402
def test_workflow_engine_simple_python_fanout(basic_engine):
402403
# Arrange
403404
md, da = basic_engine

tests/test_workflow_validator_for_run_level.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -217,30 +217,6 @@ def test_validate_simple_python_molprops_with_missing_input():
217217
]
218218

219219

220-
def test_validate_replicate_using_undeclared_input():
221-
# Arrange
222-
workflow_filename: str = os.path.join(
223-
os.path.dirname(__file__),
224-
"workflow-definitions",
225-
"replicate-using-undeclared-input.yaml",
226-
)
227-
with open(workflow_filename, "r", encoding="utf8") as workflow_file:
228-
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
229-
assert workflow
230-
231-
# Act
232-
error = WorkflowValidator.validate(
233-
level=ValidationLevel.TAG,
234-
workflow_definition=workflow,
235-
)
236-
237-
# Assert
238-
assert error.error_num == 7
239-
assert error.error_msg == [
240-
"Replicate input variable is not declared: y (step=step-2)"
241-
]
242-
243-
244220
def test_validate_duplicate_step_output_variable_names():
245221
# Arrange
246222
workflow_filename: str = os.path.join(

tests/test_workflow_validator_for_tag_level.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -151,30 +151,6 @@ def test_validate_simple_python_molprops_with_options():
151151
assert error.error_msg is None
152152

153153

154-
def test_validate_replicate_using_undeclared_input():
155-
# Arrange
156-
workflow_filename: str = os.path.join(
157-
os.path.dirname(__file__),
158-
"workflow-definitions",
159-
"replicate-using-undeclared-input.yaml",
160-
)
161-
with open(workflow_filename, "r", encoding="utf8") as workflow_file:
162-
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
163-
assert workflow
164-
165-
# Act
166-
error = WorkflowValidator.validate(
167-
level=ValidationLevel.TAG,
168-
workflow_definition=workflow,
169-
)
170-
171-
# Assert
172-
assert error.error_num == 7
173-
assert error.error_msg == [
174-
"Replicate input variable is not declared: y (step=step-2)"
175-
]
176-
177-
178154
def test_validate_duplicate_step_output_variable_names():
179155
# Arrange
180156
workflow_filename: str = os.path.join(

tests/workflow-definitions/simple-python-fanout.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ steps:
3131
name: desc1
3232
value: "777"
3333
outputFile: results.smi
34-
replicate:
35-
using:
36-
variable: inputFile
3734
variable-mapping:
3835
- variable: inputFile
3936
from-step:

workflow/decoder.py

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import os
77
from dataclasses import dataclass
8-
from enum import Enum
98
from typing import Any
109

1110
import jsonschema
@@ -33,26 +32,6 @@ class Translation:
3332
out: str
3433

3534

36-
class ReplicationOrigin(Enum):
37-
"""Oirgin of a replication variable."""
38-
39-
STEP_VARIABLE = 1
40-
WORKFLOW_VARIABLE = 2
41-
42-
43-
@dataclass
44-
class ReplicationDriver:
45-
"""A step's replication driver.
46-
The 'variable' is the variable for the step-to-be-executed
47-
whose value is 'driven' by the values of the 'source_variable'.
48-
The source variable is either from a step (or a workflow)."""
49-
50-
origin: ReplicationOrigin
51-
variable: str
52-
source_variable: str
53-
source_step_name: str | None = None
54-
55-
5635
def validate_schema(workflow: dict[str, Any]) -> str | None:
5736
"""Checks the Workflow Definition against the built-in schema.
5837
If there's an error the error text is returned, otherwise None.
@@ -173,38 +152,3 @@ def get_step_prior_step_variable_mapping(
173152
Translation(in_=step_variable, out=v_map["variable"])
174153
]
175154
return variable_mapping
176-
177-
178-
def get_step_replication_driver(*, step: dict[str, Any]) -> ReplicationDriver | None:
179-
"""If the step is expected to replicate we return its replication driver,
180-
which consists of a (prior) step name and an (output) variable name.
181-
Otherwise it returns nothing."""
182-
if replicator := step.get("replicate"):
183-
# We need the variable we replicate against,
184-
# and the step that owns the variable.
185-
#
186-
# 'using' is a dict but there can be only single value for now
187-
variable: str = replicator["using"]["variable"]
188-
source_variable: str | None = None
189-
# Is the variable from a prior step?
190-
step_name: str | None = None
191-
step_v_map = get_step_prior_step_variable_mapping(step=step)
192-
for step_name_candidate, mappings in step_v_map.items():
193-
for mapping in mappings:
194-
if mapping.out == variable:
195-
step_name = step_name_candidate
196-
source_variable = mapping.in_
197-
break
198-
if step_name:
199-
break
200-
assert step_name
201-
assert source_variable
202-
203-
return ReplicationDriver(
204-
origin=ReplicationOrigin.STEP_VARIABLE,
205-
variable=variable,
206-
source_step_name=step_name,
207-
source_variable=source_variable,
208-
)
209-
210-
return None

workflow/workflow-schema.yaml

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,6 @@ definitions:
5959
type: string
6060
pattern: ^[a-zA-Z_][a-zA-Z0-9_]*$
6161

62-
# A step replication control variable
63-
# that is based on a step variable
64-
replicate-using-variable:
65-
type: object
66-
additionalProperties: false
67-
properties:
68-
variable:
69-
$ref: '#/definitions/variable-name'
70-
required:
71-
- variable
72-
7362
# A Step variable
7463
# (whose value is derived from a variable used in a prior step)
7564
step-variable-from-step:
@@ -167,14 +156,6 @@ definitions:
167156
# The format of this is essentially idenical to the specification
168157
# used when a Job is launched via the DM API.
169158
$ref: '#/definitions/step-specification'
170-
replicate:
171-
# Used to indicate one input variable that is used to replicate/spawn
172-
# step instances based on the number of values generated for the variable.
173-
type: object
174-
additionalProperties: false
175-
properties:
176-
using:
177-
$ref: '#/definitions/replicate-using-variable'
178159
variable-mapping:
179160
# The map of the source of the step's variables.
180161
# all variables the step needs (that aren;t already in the specification)

workflow/workflow_engine.py

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@
3939
)
4040

4141
from .decoder import (
42-
ReplicationDriver,
43-
ReplicationOrigin,
4442
Translation,
4543
get_step_prior_step_variable_mapping,
46-
get_step_replication_driver,
4744
get_step_workflow_variable_mapping,
4845
)
4946

@@ -380,41 +377,20 @@ def _launch(self, *, rwf: dict[str, Any], step: dict[str, Any]) -> None:
380377

381378
variables: dict[str, Any] = error_or_variables
382379

383-
# A replication number,
384-
# use only for steps expected to replicate (even if just once)
380+
# A step replication number,
381+
# used only for steps expected to run in parallel (even if just once)
385382
step_replication_number: int = 0
386-
# Does this step have a replicating driver?
387-
r_driver: ReplicationDriver | None = get_step_replication_driver(step=step)
388383
replication_values: list[str] = []
389-
if r_driver:
390-
if r_driver.origin == ReplicationOrigin.STEP_VARIABLE:
391-
# We need to get the variable values from a prior step
392-
# We need the prior steps running-workflow-step-id
393-
assert r_driver.source_step_name
394-
response, _ = self._wapi_adapter.get_running_workflow_step_by_name(
395-
name=r_driver.source_step_name,
396-
running_workflow_id=rwf_id,
397-
)
398-
assert "id" in response
399-
o_rwfs_id: str = response["id"]
400-
response, _ = (
401-
self._wapi_adapter.get_running_workflow_step_output_values_for_output(
402-
running_workflow_step_id=o_rwfs_id,
403-
output_variable=r_driver.source_variable,
404-
)
405-
)
406-
assert "output" in response
407-
replication_values = response["output"]
408-
else:
409-
assert False, "Unsupported origin"
384+
source_is_splitter: bool = False
385+
iter_variable: str | None = None
410386

411387
num_step_instances: int = max(1, len(replication_values))
412388
for iteration in range(num_step_instances):
413389

414390
# If we are replicating this step then we must replace the step's variable
415391
# with a value expected for this iteration.
416-
if r_driver:
417-
iter_variable: str = r_driver.variable
392+
if source_is_splitter:
393+
assert iter_variable
418394
iter_value: str = replication_values[iteration]
419395
_LOGGER.info(
420396
"Replicating step: %s iteration=%s variable=%s value=%s",
@@ -424,7 +400,7 @@ def _launch(self, *, rwf: dict[str, Any], step: dict[str, Any]) -> None:
424400
iter_value,
425401
)
426402
# Over-write the replicating variable
427-
# and set the replication numebr to a unique +ve non-zero value...
403+
# and set the replication number to a unique +ve non-zero value...
428404
variables[iter_variable] = iter_value
429405
step_replication_number = iteration + 1
430406

workflow/workflow_validator.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
from .decoder import (
88
get_step_output_variable_names,
9-
get_step_prior_step_variable_mapping,
10-
get_step_workflow_variable_mapping,
119
get_steps,
1210
get_workflow_variable_names,
1311
validate_schema,
@@ -113,39 +111,6 @@ def _validate_tag_level(
113111
error_num=2,
114112
error_msg=[f"Duplicate step names found: {', '.join(duplicate_names)}"],
115113
)
116-
# For each 'replicating' step the replicating variable
117-
# must be declared in the step - which is either a workflow variable
118-
# or a prior step variable.
119-
for step in get_steps(workflow_definition):
120-
if (
121-
replicate_using_input := step.get("replicate", {})
122-
.get("using", {})
123-
.get("variable")
124-
):
125-
found: bool = False
126-
for translation in get_step_workflow_variable_mapping(step=step):
127-
if replicate_using_input == translation.out:
128-
found = True
129-
break
130-
if not found:
131-
for (
132-
step_name,
133-
translations,
134-
) in get_step_prior_step_variable_mapping(step=step).items():
135-
for translation in translations:
136-
if replicate_using_input == translation.out:
137-
found = True
138-
break
139-
if found:
140-
break
141-
if not found:
142-
return ValidationResult(
143-
error_num=7,
144-
error_msg=[
145-
"Replicate input variable is not declared:"
146-
f" {replicate_using_input} (step={step["name"]})"
147-
],
148-
)
149114

150115
return _VALIDATION_SUCCESS
151116

0 commit comments

Comments
 (0)