Skip to content

Commit ac08c18

Browse files
committed
added cononical workflow test
1 parent 35c952d commit ac08c18

File tree

8 files changed

+433
-87
lines changed

8 files changed

+433
-87
lines changed

pydra/design/base.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,14 @@ def __call__(
186186
return task(**kwargs)
187187

188188
def _check_for_unset_values(self):
189-
if unset := [k for k, v in attrs.asdict(self).items() if v is attrs.NOTHING]:
189+
if unset := [
190+
k
191+
for k, v in attrs.asdict(self, recurse=False).items()
192+
if v is attrs.NOTHING
193+
]:
190194
raise ValueError(
191-
f"The following values in the {self!r} interface need to be set before it "
192-
f"can be executed: {unset}"
195+
f"The following values {unset} in the {self!r} interface need to be set "
196+
"before the workflow can be constructed"
193197
)
194198

195199

pydra/design/shell.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ def add_arg(name, field_type, kwds, is_option=False):
413413
kwds["type"] = field
414414
field = field_type(name=name, **kwds)
415415
elif not isinstance(field, field_type): # If field type is outarg not out
416-
field = field_type(**attrs.asdict(field))
416+
field = field_type(**attrs.asdict(field, recurse=False))
417417
field.name = name
418418
type_ = kwds.pop("type", field.type)
419419
if field.type is ty.Any:

pydra/design/tests/test_workflow.py

Lines changed: 195 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,37 @@
1+
from operator import attrgetter
2+
import pytest
3+
import attrs
14
from pydra.engine.workflow import Workflow
25
from pydra.engine.specs import LazyField
36
import typing as ty
4-
from pydra.design import shell, python, workflow, list_fields
7+
from pydra.design import shell, python, workflow, list_fields, TaskSpec
58
from fileformats import video, image
69

710

811
def test_workflow():
912

10-
@workflow.define
11-
def MyTestWorkflow(a, b):
13+
# NB: We use PascalCase (i.e. class names) as it is translated into a class
1214

13-
@python.define
14-
def Add(a, b):
15-
return a + b
15+
@python.define
16+
def Add(a, b):
17+
return a + b
1618

17-
@python.define
18-
def Mul(a, b):
19-
return a * b
19+
@python.define
20+
def Mul(a, b):
21+
return a * b
2022

23+
@workflow.define
24+
def MyTestWorkflow(a, b):
2125
add = workflow.add(Add(a=a, b=b))
2226
mul = workflow.add(Mul(a=add.out, b=b))
2327
return mul.out
2428

2529
constructor = MyTestWorkflow().constructor
2630
assert constructor.__name__ == "MyTestWorkflow"
31+
32+
# The constructor function is included as a part of the specification so it is
33+
# included in the hash by default and can be overridden if needed. Not 100% sure
34+
# if this is a good idea or not
2735
assert list_fields(MyTestWorkflow) == [
2836
workflow.arg(name="a"),
2937
workflow.arg(name="b"),
@@ -36,29 +44,42 @@ def Mul(a, b):
3644
wf = Workflow.construct(workflow_spec)
3745
assert wf.inputs.a == 1
3846
assert wf.inputs.b == 2.0
39-
assert wf.outputs.out == LazyField(name="Mul", field="out", type=ty.Any)
47+
assert wf.outputs.out == LazyField(
48+
name="Mul", field="out", type=ty.Any, type_checked=True
49+
)
50+
51+
# Nodes are named after the specs by default
4052
assert list(wf.node_names) == ["Add", "Mul"]
4153

4254

4355
def test_shell_workflow():
4456

4557
@workflow.define
46-
def MyTestShellWorkflow(input_video: video.Mp4, watermark: image.Png) -> video.Mp4:
58+
def MyTestShellWorkflow(
59+
input_video: video.Mp4,
60+
watermark: image.Png,
61+
watermark_dims: tuple[int, int] = (10, 10),
62+
) -> video.Mp4:
4763

4864
add_watermark = workflow.add(
4965
shell.define(
50-
"ffmpeg -i <in_video> -i <watermark:image/png> -filter_complex <filter> <out|out_video>"
51-
)(in_video=input_video, watermark=watermark, filter="overlay=10:10"),
66+
"ffmpeg -i <in_video> -i <watermark:image/png> "
67+
"-filter_complex <filter> <out|out_video>"
68+
)(
69+
in_video=input_video,
70+
watermark=watermark,
71+
filter="overlay={}:{}".format(*watermark_dims),
72+
),
5273
name="add_watermark",
5374
)
5475
output_video = workflow.add(
5576
shell.define(
56-
(
57-
"HandBrakeCLI -i <in_video> -o <out|out_video> "
58-
"--width <width:int> --height <height:int>"
59-
),
60-
# this specifies that this output is required even though it has a flag,
61-
# optional inputs and outputs are of type * | None
77+
"HandBrakeCLI -i <in_video> -o <out|out_video> "
78+
"--width <width:int> --height <height:int>",
79+
# By default any input/output specified with a flag (e.g. -i <in_video>)
80+
# is considered optional, i.e. of type `FsObject | None`, and therefore
81+
# won't be used by default. By overriding this with non-optional types,
82+
# the fields are specified as being required.
6283
inputs={"in_video": video.Mp4},
6384
outputs={"out_video": video.Mp4},
6485
)(in_video=add_watermark.out_video, width=1280, height=720),
@@ -72,6 +93,7 @@ def MyTestShellWorkflow(input_video: video.Mp4, watermark: image.Png) -> video.M
7293
assert list_fields(MyTestShellWorkflow) == [
7394
workflow.arg(name="input_video", type=video.Mp4),
7495
workflow.arg(name="watermark", type=image.Png),
96+
workflow.arg(name="watermark_dims", type=tuple[int, int], default=(10, 10)),
7597
workflow.arg(name="constructor", type=ty.Callable, default=constructor),
7698
]
7799
assert list_fields(MyTestShellWorkflow.Outputs) == [
@@ -87,12 +109,86 @@ def MyTestShellWorkflow(input_video: video.Mp4, watermark: image.Png) -> video.M
87109
assert wf.inputs.input_video == input_video
88110
assert wf.inputs.watermark == watermark
89111
assert wf.outputs.output_video == LazyField(
90-
name="resize", field="out_video", type=video.Mp4
112+
name="resize", field="out_video", type=video.Mp4, type_checked=True
91113
)
92114
assert list(wf.node_names) == ["add_watermark", "resize"]
93115

94116

95-
def test_workflow_alt_syntax():
117+
def test_workflow_canonical():
118+
119+
# NB: We use PascalCase (i.e. class names) as it is translated into a class
120+
121+
@python.define
122+
def Add(a, b):
123+
return a + b
124+
125+
@python.define
126+
def Mul(a, b):
127+
return a * b
128+
129+
def a_converter(value):
130+
if value is attrs.NOTHING:
131+
return value
132+
return float(value)
133+
134+
@workflow.define
135+
class MyTestWorkflow(TaskSpec["MyTestWorkflow.Outputs"]):
136+
137+
a: int
138+
b: float = workflow.arg(
139+
help_string="A float input",
140+
converter=a_converter,
141+
)
142+
143+
@staticmethod
144+
def constructor(a, b):
145+
add = workflow.add(Add(a=a, b=b))
146+
mul = workflow.add(Mul(a=add.out, b=b))
147+
return mul.out
148+
149+
class Outputs:
150+
out: float
151+
152+
constructor = MyTestWorkflow().constructor
153+
assert constructor.__name__ == "constructor"
154+
155+
# The constructor function is included as a part of the specification so it is
156+
# included in the hash by default and can be overridden if needed. Not 100% sure
157+
# if this is a good idea or not
158+
assert sorted(list_fields(MyTestWorkflow), key=attrgetter("name")) == [
159+
workflow.arg(name="a", type=int),
160+
workflow.arg(
161+
name="b", type=float, help_string="A float input", converter=a_converter
162+
),
163+
workflow.arg(name="constructor", type=ty.Callable, default=constructor),
164+
]
165+
assert list_fields(MyTestWorkflow.Outputs) == [
166+
workflow.out(name="out", type=float),
167+
]
168+
workflow_spec = MyTestWorkflow(a=1, b=2.0)
169+
wf = Workflow.construct(workflow_spec)
170+
assert wf.inputs.a == 1
171+
assert wf.inputs.b == 2.0
172+
assert wf.outputs.out == LazyField(
173+
name="Mul", field="out", type=ty.Any, type_checked=True
174+
)
175+
176+
# Nodes are named after the specs by default
177+
assert list(wf.node_names) == ["Add", "Mul"]
178+
179+
180+
def test_direct_access_of_workflow_object():
181+
182+
@python.define(inputs={"x": float}, outputs={"z": float})
183+
def Add(x, y):
184+
return x + y
185+
186+
def Mul(x, y):
187+
return x * y
188+
189+
@python.define(outputs=["divided"])
190+
def Divide(x, y):
191+
return x / y
96192

97193
@workflow.define(outputs=["out1", "out2"])
98194
def MyTestWorkflow(a: int, b: float) -> tuple[float, float]:
@@ -107,22 +203,11 @@ def MyTestWorkflow(a: int, b: float) -> tuple[float, float]:
107203
out2: The second output
108204
"""
109205

110-
@python.define(inputs={"x": float}, outputs={"out": float})
111-
def Add(x, y):
112-
return x + y
113-
114-
def Mul(x, y):
115-
return x * y
116-
117-
@python.define(outputs=["divided"])
118-
def Divide(x, y):
119-
return x / y
120-
121206
wf = workflow.this()
122207

123208
add = wf.add(Add(x=a, y=b), name="addition")
124-
mul = wf.add(python.define(Mul, outputs={"out": float})(x=add.out, y=b))
125-
divide = wf.add(Divide(x=wf["addition"].lzout.out, y=mul.out), name="division")
209+
mul = wf.add(python.define(Mul, outputs={"out": float})(x=add.z, y=b))
210+
divide = wf.add(Divide(x=wf["addition"].lzout.z, y=mul.out), name="division")
126211

127212
# Alter one of the inputs to a node after it has been initialised
128213
wf["Mul"].inputs.y *= 2
@@ -144,33 +229,38 @@ def Divide(x, y):
144229
wf = Workflow.construct(workflow_spec)
145230
assert wf.inputs.a == 1
146231
assert wf.inputs.b == 2.0
147-
assert wf.outputs.out1 == LazyField(name="Mul", field="out", type=float)
148-
assert wf.outputs.out2 == LazyField(name="division", field="divided", type=ty.Any)
232+
assert wf.outputs.out1 == LazyField(
233+
name="Mul", field="out", type=float, type_checked=True
234+
)
235+
assert wf.outputs.out2 == LazyField(
236+
name="division", field="divided", type=ty.Any, type_checked=True
237+
)
149238
assert list(wf.node_names) == ["addition", "Mul", "division"]
150239

151240

152241
def test_workflow_set_outputs_directly():
153242

154-
@workflow.define(outputs={"out1": float, "out2": float})
155-
def MyTestWorkflow(a: int, b: float):
243+
@python.define
244+
def Add(a, b):
245+
return a + b
156246

157-
@python.define
158-
def Add(a, b):
159-
return a + b
247+
@python.define
248+
def Mul(a, b):
249+
return a * b
160250

161-
@python.define
162-
def Mul(a, b):
163-
return a * b
251+
@workflow.define(outputs={"out1": float, "out2": float})
252+
def MyTestWorkflow(a: int, b: float):
164253

165254
wf = workflow.this()
166-
167255
add = wf.add(Add(a=a, b=b))
168256
wf.add(Mul(a=add.out, b=b))
169257

258+
# Set the outputs of the workflow directly instead of returning them them in
259+
# a tuple
170260
wf.outputs.out2 = add.out # Using the returned lzout outputs
171261
wf.outputs.out1 = wf["Mul"].lzout.out # accessing the lzout outputs via getitem
172262

173-
# no return required when the outputs are set directly
263+
# no return is used when the outputs are set directly
174264

175265
assert list_fields(MyTestWorkflow) == [
176266
workflow.arg(name="a", type=int),
@@ -187,6 +277,63 @@ def Mul(a, b):
187277
wf = Workflow.construct(workflow_spec)
188278
assert wf.inputs.a == 1
189279
assert wf.inputs.b == 2.0
190-
assert wf.outputs.out1 == LazyField(name="Mul", field="out", type=ty.Any)
191-
assert wf.outputs.out2 == LazyField(name="Add", field="out", type=ty.Any)
280+
assert wf.outputs.out1 == LazyField(
281+
name="Mul", field="out", type=ty.Any, type_checked=True
282+
)
283+
assert wf.outputs.out2 == LazyField(
284+
name="Add", field="out", type=ty.Any, type_checked=True
285+
)
192286
assert list(wf.node_names) == ["Add", "Mul"]
287+
288+
289+
def test_workflow_split_combine():
290+
291+
@python.define
292+
def Mul(x: float, y: float) -> float:
293+
return x * y
294+
295+
@python.define
296+
def Sum(x: list[float]) -> float:
297+
return sum(x)
298+
299+
@workflow.define
300+
def MyTestWorkflow(a: list[int], b: list[float]) -> list[float]:
301+
302+
wf = workflow.this()
303+
mul = wf.add(Mul())
304+
# We could avoid having to specify the splitter and combiner on a separate
305+
# line by making 'split' and 'combine' reserved keywords for Outputs class attrs
306+
wf["Mul"].split(x=a, y=b).combine("a")
307+
sum = wf.add(Sum(x=mul.out))
308+
return sum.out
309+
310+
wf = Workflow.construct(MyTestWorkflow(a=[1, 2, 3], b=[1.0, 10.0, 100.0]))
311+
assert wf["Mul"]._state.splitter == ["x", "y"]
312+
assert wf["Mul"]._state.combiner == ["x"]
313+
314+
315+
def test_workflow_split_after_access_fail():
316+
"""It isn't possible to split/combine a node after one of its outputs has been type
317+
checked as this changes the type of the outputs and renders the type checking
318+
invalid
319+
"""
320+
321+
@python.define
322+
def Add(x, y):
323+
return x + y
324+
325+
@python.define
326+
def Mul(x, y):
327+
return x * y
328+
329+
@workflow.define
330+
def MyTestWorkflow(a: list[int], b: list[float]) -> list[float]:
331+
332+
wf = workflow.this()
333+
add = wf.add(Add())
334+
mul = wf.add(Mul(x=add.out, y=2.0)) # << Add.out is accessed here
335+
wf["Add"].split(x=a, y=b).combine("x")
336+
return mul.out
337+
338+
with pytest.raises(RuntimeError, match="Outputs .* have already been accessed"):
339+
Workflow.construct(MyTestWorkflow(a=[1, 2, 3], b=[1.0, 10.0, 100.0]))

pydra/design/workflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import typing as ty
22
import inspect
33
import attrs
4-
from pydra.engine.task import FunctionTask
4+
from pydra.engine.core import WorkflowTask
55
from pydra.engine.workflow import Workflow
66
from .base import (
77
Arg,
@@ -148,7 +148,7 @@ def make(wrapped: ty.Callable | type) -> TaskSpec:
148148
parsed_inputs[inpt_name].lazy = True
149149

150150
interface = make_task_spec(
151-
FunctionTask,
151+
WorkflowTask,
152152
parsed_inputs,
153153
parsed_outputs,
154154
name=name,

0 commit comments

Comments
 (0)