|
4 | 4 | "cell_type": "markdown", |
5 | 5 | "metadata": {}, |
6 | 6 | "source": [ |
7 | | - "# Workflow design" |
| 7 | + "# Workflow design\n", |
| 8 | + "\n", |
| 9 | + "In Pydra, workflows are DAG of component tasks to be executed on specified inputs.\n", |
| 10 | + "Workflow specifications are dataclasses, which interchangeable with Python and shell tasks\n", |
| 11 | + "specifications and executed in the same way." |
8 | 12 | ] |
9 | 13 | }, |
10 | 14 | { |
11 | 15 | "cell_type": "markdown", |
12 | 16 | "metadata": {}, |
13 | 17 | "source": [ |
14 | | - "Given two task specifications, `Add` and `Mul`" |
| 18 | + "## Constructor functions\n", |
| 19 | + "\n", |
| 20 | + "Workflows are typically defined using the `pydra.design.workflow.define` decorator on \n", |
| 21 | + "a \"constructor\" function that generates the workflow. For example, given two task\n", |
| 22 | + "specifications, `Add` and `Mul`." |
15 | 23 | ] |
16 | 24 | }, |
17 | 25 | { |
|
47 | 55 | "outputs": [], |
48 | 56 | "source": [ |
49 | 57 | "@workflow.define\n", |
50 | | - "def MyTestWorkflow(a, b):\n", |
| 58 | + "def BasicWorkflow(a, b):\n", |
51 | 59 | " add = workflow.add(Add(a=a, b=b))\n", |
52 | 60 | " mul = workflow.add(Mul(a=add.out, b=b))\n", |
53 | 61 | " return mul.out" |
|
72 | 80 | "from fileformats import image, video\n", |
73 | 81 | "\n", |
74 | 82 | "@workflow.define\n", |
75 | | - "def MyTestShellWorkflow(\n", |
| 83 | + "def ShellWorkflow(\n", |
76 | 84 | " input_video: video.Mp4,\n", |
77 | 85 | " watermark: image.Png,\n", |
78 | 86 | " watermark_dims: tuple[int, int] = (10, 10),\n", |
|
209 | 217 | " return float(value)\n", |
210 | 218 | "\n", |
211 | 219 | "@workflow.define\n", |
212 | | - "class MyLibraryWorkflow(WorkflowSpec[\"MyLibraryWorkflow.Outputs\"]):\n", |
| 220 | + "class LibraryWorkflow(WorkflowSpec[\"MyLibraryWorkflow.Outputs\"]):\n", |
213 | 221 | "\n", |
214 | 222 | " a: int\n", |
215 | 223 | " b: float = workflow.arg(\n", |
|
248 | 256 | " return sum(x)\n", |
249 | 257 | "\n", |
250 | 258 | "@workflow.define\n", |
251 | | - "def MySplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n", |
| 259 | + "def SplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n", |
252 | 260 | " # Multiply over all combinations of the elements of a and b, then combine the results\n", |
253 | 261 | " # for each a element into a list over each b element\n", |
254 | 262 | " mul = workflow.add(Mul()).split(x=a, y=b).combine(\"x\")\n", |
|
271 | 279 | "outputs": [], |
272 | 280 | "source": [ |
273 | 281 | "@workflow.define\n", |
274 | | - "def MySplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n", |
| 282 | + "def SplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n", |
275 | 283 | " mul = workflow.add(Mul()).split(x=a, y=b)\n", |
276 | 284 | " add = workflow.add(Add(x=mul.out, y=c)).combine(\"Mul.x\")\n", |
277 | 285 | " sum = workflow.add(Sum(x=add.out))\n", |
|
301 | 309 | "outputs": [], |
302 | 310 | "source": [ |
303 | 311 | "@workflow.define\n", |
304 | | - "def MyConditionalWorkflow(\n", |
| 312 | + "def ConditionalWorkflow(\n", |
305 | 313 | " input_video: video.Mp4,\n", |
306 | 314 | " watermark: image.Png,\n", |
307 | 315 | " watermark_dims: tuple[int, int] | None = None,\n", |
|
371 | 379 | "placeholders see [Conditional construction](../explanation/conditional-lazy.html)" |
372 | 380 | ] |
373 | 381 | }, |
| 382 | + { |
| 383 | + "cell_type": "markdown", |
| 384 | + "metadata": {}, |
| 385 | + "source": [ |
| 386 | + "## Typing\n", |
| 387 | + "\n", |
| 388 | + "Pydra utilizes Python type annotations to implement strong type-checking, which is performed\n", |
| 389 | + "when values or upstream outputs are assigned to task specification inputs.\n", |
| 390 | + "\n", |
| 391 | + "Task input and output fields do not need to be assigned types, since they will default to `typing.Any`.\n", |
| 392 | + "However, if they are assigned a type and a value or output from an upstream node conflicts\n", |
| 393 | + "with the type, a `TypeError` will be raised at construction time.\n", |
| 394 | + "\n", |
| 395 | + "Note that the type-checking \"assumes the best\", and will pass if the upstream field is typed\n", |
| 396 | + "by `Any` or a super-class of the field being assigned to. For example, an input of\n", |
| 397 | + "`fileformats.generic.File` passed to a field expecting a `fileformats.image.Png` file type,\n", |
| 398 | + "because `Png` is a subtype of `File`, where as `fileformats.image.Jpeg` input would fail\n", |
| 399 | + "since it is clearly not the intended type.\n" |
| 400 | + ] |
| 401 | + }, |
| 402 | + { |
| 403 | + "cell_type": "code", |
| 404 | + "execution_count": 10, |
| 405 | + "metadata": {}, |
| 406 | + "outputs": [], |
| 407 | + "source": [ |
| 408 | + "from fileformats import generic\n", |
| 409 | + "\n", |
| 410 | + "Mp4Handbrake = shell.define(\n", |
| 411 | + " \"HandBrakeCLI -i <in_video:video/mp4> -o <out|out_video:video/mp4> \"\n", |
| 412 | + " \"--width <width:int> --height <height:int>\",\n", |
| 413 | + ")\n", |
| 414 | + "\n", |
| 415 | + "\n", |
| 416 | + "QuicktimeHandbrake = shell.define(\n", |
| 417 | + " \"HandBrakeCLI -i <in_video:video/quicktime> -o <out|out_video:video/quicktime> \"\n", |
| 418 | + " \"--width <width:int> --height <height:int>\",\n", |
| 419 | + ")\n", |
| 420 | + "\n", |
| 421 | + "@workflow.define\n", |
| 422 | + "def TypeErrorWorkflow(\n", |
| 423 | + " input_video: video.Mp4,\n", |
| 424 | + " watermark: generic.File,\n", |
| 425 | + " watermark_dims: tuple[int, int] = (10, 10),\n", |
| 426 | + ") -> video.Mp4:\n", |
| 427 | + "\n", |
| 428 | + " add_watermark = workflow.add(\n", |
| 429 | + " shell.define(\n", |
| 430 | + " \"ffmpeg -i <in_video> -i <watermark:image/png> \"\n", |
| 431 | + " \"-filter_complex <filter> <out|out_video:video/mp4>\"\n", |
| 432 | + " )(\n", |
| 433 | + " in_video=input_video,\n", |
| 434 | + " watermark=watermark, # Type is OK because generic.File is superclass of image.Png\n", |
| 435 | + " filter=\"overlay={}:{}\".format(*watermark_dims),\n", |
| 436 | + " ),\n", |
| 437 | + " name=\"add_watermark\",\n", |
| 438 | + " )\n", |
| 439 | + "\n", |
| 440 | + " try:\n", |
| 441 | + " handbrake = workflow.add(\n", |
| 442 | + " QuicktimeHandbrake(in_video=add_watermark.out_video, width=1280, height=720),\n", |
| 443 | + " ) # This will raise a TypeError because the input video is an Mp4\n", |
| 444 | + " except TypeError:\n", |
| 445 | + " handbrake = workflow.add(\n", |
| 446 | + " Mp4Handbrake(in_video=add_watermark.out_video, width=1280, height=720),\n", |
| 447 | + " ) # The type of the input video is now correct\n", |
| 448 | + "\n", |
| 449 | + " return handbrake.output_video" |
| 450 | + ] |
| 451 | + }, |
374 | 452 | { |
375 | 453 | "cell_type": "markdown", |
376 | 454 | "metadata": {}, |
|
0 commit comments