| 
1 | 1 | from operator import attrgetter  | 
2 | 2 | import pytest  | 
3 | 3 | import attrs  | 
4 |  | -from pydra.engine.workflow import Workflow  | 
 | 4 | +from pydra.engine.workflow import Workflow, WORKFLOW_LZIN  | 
5 | 5 | from pydra.engine.specs import LazyField  | 
6 | 6 | import typing as ty  | 
7 | 7 | from pydra.design import shell, python, workflow, list_fields, TaskSpec  | 
@@ -115,6 +115,7 @@ def MyTestShellWorkflow(  | 
115 | 115 | 
 
  | 
116 | 116 | 
 
  | 
117 | 117 | def test_workflow_canonical():  | 
 | 118 | +    """Test class-based workflow definition"""  | 
118 | 119 | 
 
  | 
119 | 120 |     # NB: We use PascalCase (i.e. class names) as it is translated into a class  | 
120 | 121 | 
 
  | 
@@ -177,6 +178,57 @@ class Outputs:  | 
177 | 178 |     assert list(wf.node_names) == ["Add", "Mul"]  | 
178 | 179 | 
 
  | 
179 | 180 | 
 
  | 
 | 181 | +def test_workflow_lazy():  | 
 | 182 | + | 
 | 183 | +    @workflow.define(lazy=["input_video", "watermark"])  | 
 | 184 | +    def MyTestShellWorkflow(  | 
 | 185 | +        input_video: video.Mp4,  | 
 | 186 | +        watermark: image.Png,  | 
 | 187 | +        watermark_dims: tuple[int, int] = (10, 10),  | 
 | 188 | +    ) -> video.Mp4:  | 
 | 189 | + | 
 | 190 | +        add_watermark = workflow.add(  | 
 | 191 | +            shell.define(  | 
 | 192 | +                "ffmpeg -i <in_video> -i <watermark:image/png> "  | 
 | 193 | +                "-filter_complex <filter> <out|out_video>"  | 
 | 194 | +            )(  | 
 | 195 | +                in_video=input_video,  | 
 | 196 | +                watermark=watermark,  | 
 | 197 | +                filter="overlay={}:{}".format(*watermark_dims),  | 
 | 198 | +            ),  | 
 | 199 | +            name="add_watermark",  | 
 | 200 | +        )  | 
 | 201 | +        output_video = workflow.add(  | 
 | 202 | +            shell.define(  | 
 | 203 | +                "HandBrakeCLI -i <in_video> -o <out|out_video> "  | 
 | 204 | +                "--width <width:int> --height <height:int>",  | 
 | 205 | +                # By default any input/output specified with a flag (e.g. -i <in_video>)  | 
 | 206 | +                # is considered optional, i.e. of type `FsObject | None`, and therefore  | 
 | 207 | +                # won't be used by default. By overriding this with non-optional types,  | 
 | 208 | +                # the fields are specified as being required.  | 
 | 209 | +                inputs={"in_video": video.Mp4},  | 
 | 210 | +                outputs={"out_video": video.Mp4},  | 
 | 211 | +            )(in_video=add_watermark.out_video, width=1280, height=720),  | 
 | 212 | +            name="resize",  | 
 | 213 | +        ).out_video  | 
 | 214 | + | 
 | 215 | +        return output_video  # test implicit detection of output name  | 
 | 216 | + | 
 | 217 | +    input_video = video.Mp4.mock("input.mp4")  | 
 | 218 | +    watermark = image.Png.mock("watermark.png")  | 
 | 219 | +    workflow_spec = MyTestShellWorkflow(  | 
 | 220 | +        input_video=input_video,  | 
 | 221 | +        watermark=watermark,  | 
 | 222 | +    )  | 
 | 223 | +    wf = Workflow.construct(workflow_spec)  | 
 | 224 | +    assert wf["add_watermark"].inputs.in_video == LazyField(  | 
 | 225 | +        name=WORKFLOW_LZIN, field="input_video", type=video.Mp4, type_checked=True  | 
 | 226 | +    )  | 
 | 227 | +    assert wf["add_watermark"].inputs.watermark == LazyField(  | 
 | 228 | +        name=WORKFLOW_LZIN, field="watermark", type=image.Png, type_checked=True  | 
 | 229 | +    )  | 
 | 230 | + | 
 | 231 | + | 
180 | 232 | def test_direct_access_of_workflow_object():  | 
181 | 233 | 
 
  | 
182 | 234 |     @python.define(inputs={"x": float}, outputs={"z": float})  | 
 | 
0 commit comments