|
| 1 | +Dataflows Components: Task and Workflow |
| 2 | +======================================= |
| 3 | +A *Task* is the basic runnable component of *Pydra* and is described by the |
| 4 | +class ``TaskBase``. A *Task* has named inputs and outputs, thus allowing |
| 5 | +construction of dataflows. It can be hashed and executes in a specific working |
| 6 | +directory. Any *Pydra*'s *Task* can be used as a function in a script, thus allowing |
| 7 | +dual use in *Pydra*'s *Workflows* and in standalone scripts. There are several |
| 8 | +classes that inherit from ``TaskBase`` and each has a different application: |
| 9 | + |
| 10 | + |
| 11 | +Function Tasks |
| 12 | +-------------- |
| 13 | + |
| 14 | +* ``FunctionTask`` is a *Task* that executes Python functions. Most Python functions |
| 15 | + declared in an existing library, package, or interactively in a terminal can |
| 16 | + be converted to a ``FunctionTask`` by using *Pydra*'s decorator - ``mark.task``. |
| 17 | + |
| 18 | + .. code-block:: python |
| 19 | +
|
| 20 | + import numpy as np |
| 21 | + from pydra import mark |
| 22 | + fft = mark.annotate({'a': np.ndarray, |
| 23 | + 'return': float})(np.fft.fft) |
| 24 | + fft_task = mark.task(fft)() |
| 25 | + result = fft_task(a=np.random.rand(512)) |
| 26 | +
|
| 27 | +
|
| 28 | + `fft_task` is now a *Pydra* *Task* and result will contain a *Pydra*'s ``Result`` object. |
| 29 | + In addition, the user can use Python's function annotation or another *Pydra* |
| 30 | + decorator --- ``mark.annotate`` in order to specify the output. In the |
| 31 | + following example, we decorate an arbitrary Python function to create named |
| 32 | + outputs: |
| 33 | + |
| 34 | + .. code-block:: python |
| 35 | +
|
| 36 | + @mark.task |
| 37 | + @mark.annotate( |
| 38 | + {"return": {"mean": float, "std": float}} |
| 39 | + ) |
| 40 | + def mean_dev(my_data): |
| 41 | + import statistics as st |
| 42 | + return st.mean(my_data), st.stdev(my_data) |
| 43 | +
|
| 44 | + result = mean_dev(my_data=[...])() |
| 45 | +
|
| 46 | + When the *Task* is executed `result.output` will contain two attributes: `mean` |
| 47 | + and `std`. Named attributes facilitate passing different outputs to |
| 48 | + different downstream nodes in a dataflow. |
| 49 | + |
| 50 | + |
| 51 | +.. _shell_command_task: |
| 52 | + |
| 53 | +Shell Command Tasks |
| 54 | +------------------- |
| 55 | + |
| 56 | +* ``ShellCommandTask`` is a *Task* used to run shell commands and executables. |
| 57 | + It can be used with a simple command without any arguments, or with specific |
| 58 | + set of arguments and flags, e.g.: |
| 59 | + |
| 60 | + .. code-block:: python |
| 61 | +
|
| 62 | + ShellCommandTask(executable="pwd") |
| 63 | +
|
| 64 | + ShellCommandTask(executable="ls", args="my_dir") |
| 65 | +
|
| 66 | + The *Task* can accommodate more complex shell commands by allowing the user to |
| 67 | + customize inputs and outputs of the commands. |
| 68 | + One can generate an input |
| 69 | + specification to specify names of inputs, positions in the command, types of |
| 70 | + the inputs, and other metadata. |
| 71 | + As a specific example, FSL's BET command (Brain |
| 72 | + Extraction Tool) can be called on the command line as: |
| 73 | + |
| 74 | + .. code-block:: python |
| 75 | +
|
| 76 | + bet input_file output_file -m |
| 77 | +
|
| 78 | + Each of the command argument can be treated as a named input to the |
| 79 | + ``ShellCommandTask``, and can be included in the input specification. |
| 80 | + As shown next, even an output is specified by constructing |
| 81 | + the *out_file* field form a template: |
| 82 | + |
| 83 | + .. code-block:: python |
| 84 | +
|
| 85 | + bet_input_spec = SpecInfo( |
| 86 | + name="Input", |
| 87 | + fields=[ |
| 88 | + ( "in_file", File, |
| 89 | + { "help_string": "input file ...", |
| 90 | + "position": 1, |
| 91 | + "mandatory": True } ), |
| 92 | + ( "out_file", str, |
| 93 | + { "help_string": "name of output ...", |
| 94 | + "position": 2, |
| 95 | + "output_file_template": |
| 96 | + "{in_file}_br" } ), |
| 97 | + ( "mask", bool, |
| 98 | + { "help_string": "create binary mask", |
| 99 | + "argstr": "-m", } ) ], |
| 100 | + bases=(ShellSpec,) ) |
| 101 | +
|
| 102 | + ShellCommandTask(executable="bet", |
| 103 | + input_spec=bet_input_spec) |
| 104 | +
|
| 105 | + More details are in the :ref:`Input Specification section`. |
| 106 | + |
| 107 | +Container Tasks |
| 108 | +--------------- |
| 109 | +* ``ContainerTask`` class is a child class of ``ShellCommandTask`` and serves as |
| 110 | + a parent class for ``DockerTask`` and ``SingularityTask``. Both *Container Tasks* |
| 111 | + run shell commands or executables within containers with specific user defined |
| 112 | + environments using Docker_ and Singularity_ software respectively. |
| 113 | + This might be extremely useful for users and projects that require environment |
| 114 | + encapsulation and sharing. |
| 115 | + Using container technologies helps improve scientific |
| 116 | + workflows reproducibility, one of the key concept behind *Pydra*. |
| 117 | + |
| 118 | + These *Container Tasks* can be defined by using |
| 119 | + ``DockerTask`` and ``SingularityTask`` classes directly, or can be created |
| 120 | + automatically from ``ShellCommandTask``, when an optional argument |
| 121 | + ``container_info`` is used when creating a *Shell Task*. The following two |
| 122 | + types of syntax are equivalent: |
| 123 | + |
| 124 | + .. code-block:: python |
| 125 | +
|
| 126 | + DockerTask(executable="pwd", image="busybox") |
| 127 | +
|
| 128 | + ShellCommandTask(executable="ls", |
| 129 | + container_info=("docker", "busybox")) |
| 130 | +
|
| 131 | +Workflows |
| 132 | +--------- |
| 133 | +* ``Workflow`` - is a subclass of *Task* that provides support for creating *Pydra* |
| 134 | + dataflows. As a subclass, a *Workflow* acts like a *Task* and has inputs, outputs, |
| 135 | + is hashable, and is treated as a single unit. Unlike *Tasks*, workflows embed |
| 136 | + a directed acyclic graph. Each node of the graph contains a *Task* of any type, |
| 137 | + including another *Workflow*, and can be added to the *Workflow* simply by calling |
| 138 | + the ``add`` method. The connections between *Tasks* are defined by using so |
| 139 | + called *Lazy Inputs* or *Lazy Outputs*. These are special attributes that allow |
| 140 | + assignment of values when a *Workflow* is executed rather than at the point of |
| 141 | + assignment. The following example creates a *Workflow* from two *Pydra* *Tasks*. |
| 142 | + |
| 143 | + .. code-block:: python |
| 144 | +
|
| 145 | + # creating workflow with two input fields |
| 146 | + wf = Workflow(input_spec=["x", "y"]) |
| 147 | + # adding a task and connecting task's input |
| 148 | + # to the workflow input |
| 149 | + wf.add(mult(name="mlt", |
| 150 | + x=wf.lzin.x, y=wf.lzin.y)) |
| 151 | + # adding anoter task and connecting |
| 152 | + # task's input to the "mult" task's output |
| 153 | + wf.add(add2(name="add", x=wf.mlt.lzout.out)) |
| 154 | + # setting worflow output |
| 155 | + wf.set_output([("out", wf.add.lzout.out)]) |
| 156 | +
|
| 157 | +
|
| 158 | +Task's State |
| 159 | +------------ |
| 160 | +All Tasks, including Workflows, can have an optional attribute representing an instance of the State class. |
| 161 | +This attribute controls the execution of a Task over different input parameter sets. |
| 162 | +This class is at the heart of Pydra's powerful Map-Reduce over arbitrary inputs of nested dataflows feature. |
| 163 | +The State class formalizes how users can specify arbitrary combinations. |
| 164 | +Its functionality is used to create and track different combinations of input parameters, |
| 165 | +and optionally allow limited or complete recombinations. |
| 166 | +In order to specify how the inputs should be split into parameter sets, and optionally combined after |
| 167 | +the Task execution, the user can set splitter and combiner attributes of the State class. |
| 168 | + |
| 169 | +.. code-block:: python |
| 170 | +
|
| 171 | + task_with_state = |
| 172 | + add2(x=[1, 5]).split("x").combine("x") |
| 173 | +
|
| 174 | +In this example, the ``State`` class is responsible for creating a list of two |
| 175 | +separate inputs, *[{x: 1}, {x:5}]*, each run of the *Task* should get one |
| 176 | +element from the list. |
| 177 | +The results are grouped back when returning the result from the *Task*. |
| 178 | +While this example |
| 179 | +illustrates mapping and grouping of results over a single parameter, *Pydra* |
| 180 | +extends this to arbitrary combinations of input fields and downstream grouping |
| 181 | +over nested dataflows. Details of how splitters and combiners power *Pydra*'s |
| 182 | +scalable dataflows are described in the next section. |
| 183 | + |
| 184 | + |
| 185 | + |
| 186 | +.. _Docker: https://www.docker.com/ |
| 187 | +.. _Singularity: https://www.singularity.lbl.gov/ |
0 commit comments