|
| 1 | +--- |
| 2 | +title: task |
| 3 | +parent: API Reference |
| 4 | +layout: default |
| 5 | +nav_order: 1 |
| 6 | +permalink: /docs/ApiReference/task |
| 7 | +--- |
| 8 | + |
| 9 | +# pyper.task |
| 10 | +{: .no_toc } |
| 11 | + |
| 12 | +* TOC |
| 13 | +{:toc} |
| 14 | + |
| 15 | +> For convenience, we will use the following terminology on this page: |
| 16 | +> * **Producer**: The _first_ task within a pipeline |
| 17 | +> * **Producer-consumer**: Any task after the first task within a pipeline |
| 18 | +
|
| 19 | +## task |
| 20 | + |
| 21 | +```python |
| 22 | +def __new__( |
| 23 | + cls, |
| 24 | + func: Optional[Callable] = None, |
| 25 | + /, |
| 26 | + *, |
| 27 | + branch: bool = False, |
| 28 | + join: bool = False, |
| 29 | + workers: int = 1, |
| 30 | + throttle: int = 0, |
| 31 | + multiprocess: bool = False, |
| 32 | + bind: Optional[Tuple[Tuple[Any], Dict[str, Any]]] = None): |
| 33 | +``` |
| 34 | + |
| 35 | +Used to initialize a [Pipeline](Pipeline) object, consisting of one 'task' (one functional operation). |
| 36 | + |
| 37 | +Pipelines created this way can be [composed](../UserGuide/ComposingPipelines) into new pipelines that contain multiple tasks. |
| 38 | + |
| 39 | +--- |
| 40 | + |
| 41 | +{: .text-green-200 .text-gamma} |
| 42 | +**Parameters** |
| 43 | + |
| 44 | +{: .text-beta} |
| 45 | +### `func` |
| 46 | + |
| 47 | +* **type:** `Optional[Callable]` |
| 48 | +* **default:** `None` |
| 49 | + |
| 50 | +The function or callable object defining the logic of the task. Does not need to be passed explicitly if using `@task` as a decorator. |
| 51 | + |
| 52 | +```python |
| 53 | +from pyper import task |
| 54 | + |
| 55 | +@task |
| 56 | +def add_one(x: int): |
| 57 | + return x + 1 |
| 58 | +``` |
| 59 | + |
| 60 | +{: .text-beta} |
| 61 | +### `branch` |
| 62 | + |
| 63 | +* **type:** `bool` |
| 64 | +* **default:** `False` |
| 65 | + |
| 66 | +When `branch` is `False`, the output of the task is the value it returns. |
| 67 | +Setting `branch` to `True` allows a task to generate multiple outputs. This requires the task to return an `Iterable` (or `AsyncIterable`). |
| 68 | + |
| 69 | +```python |
| 70 | +from pyper import task |
| 71 | + |
| 72 | +def create_data(x: int): |
| 73 | + return [x + 1, x + 2, x + 3] |
| 74 | + |
| 75 | +if __name__ == "__main__": |
| 76 | + pipeline1 = task(create_data) |
| 77 | + for output in pipeline1(0): |
| 78 | + print(output) |
| 79 | + # Prints: |
| 80 | + # [1, 2, 3] |
| 81 | + |
| 82 | + pipeline2 = task(create_data, branch=True) |
| 83 | + for output in pipeline2(0): |
| 84 | + print(output) |
| 85 | + # Prints: |
| 86 | + # 1 |
| 87 | + # 2 |
| 88 | + # 3 |
| 89 | +``` |
| 90 | + |
| 91 | +This can be applied to generator functions (or async generator functions) to submit outputs lazily: |
| 92 | + |
| 93 | +```python |
| 94 | +from pyper import task |
| 95 | + |
| 96 | +def create_data(x: int): |
| 97 | + yield 1 |
| 98 | + yield 2 |
| 99 | + yield 3 |
| 100 | + |
| 101 | +if __name__ == "__main__": |
| 102 | + pipeline = task(create_data, branch=True) |
| 103 | + for output in pipeline(0): |
| 104 | + print(output) |
| 105 | + # Prints: |
| 106 | + # 1 |
| 107 | + # 2 |
| 108 | + # 3 |
| 109 | +``` |
| 110 | + |
| 111 | +{: .text-beta} |
| 112 | +### `join` |
| 113 | + |
| 114 | +* **type:** `bool` |
| 115 | +* **default:** `False` |
| 116 | + |
| 117 | +When `join` is `False`, a producer-consumer takes each individual output from the previous task as input. When `True`, a producer-consumer takes a stream of inputs from the previous task. |
| 118 | + |
| 119 | +```python |
| 120 | +from typing import Iterable |
| 121 | +from pyper import task |
| 122 | + |
| 123 | +@task(branch=True) |
| 124 | +def create_data(x: int): |
| 125 | + return [x + 1, x + 2, x + 3] |
| 126 | + |
| 127 | +@task(branch=True, join=True) |
| 128 | +def running_total(data: Iterable[int]): |
| 129 | + total = 0 |
| 130 | + for item in data: |
| 131 | + total += item |
| 132 | + yield total |
| 133 | + |
| 134 | +if __name__ == "__main__": |
| 135 | + pipeline = create_data | running_total |
| 136 | + for output in pipeline(0): |
| 137 | + print(output) |
| 138 | + # Prints: |
| 139 | + # 1 |
| 140 | + # 3 |
| 141 | + # 6 |
| 142 | +``` |
| 143 | + |
| 144 | +{: .warning} |
| 145 | +A producer _cannot_ have `join` set as `True` |
| 146 | + |
| 147 | +A task with `join=True` can also be run with multiple workers, which will pull from the previous task in a thread-safe/process-safe way. |
| 148 | +Note, however, that the order of outputs cannot be maintained consistently when a joined task is run with more than one worker. |
| 149 | + |
| 150 | +{: .text-beta} |
| 151 | +### `workers` |
| 152 | + |
| 153 | +* **type:** `int` |
| 154 | +* **default:** `1` |
| 155 | + |
| 156 | +The parameter `workers` takes a `int` value which determines the number of workers executing the task concurrently or in parallel. |
| 157 | + |
| 158 | +```python |
| 159 | +import time |
| 160 | +from pyper import task |
| 161 | + |
| 162 | +def slow_func(data: int): |
| 163 | + time.sleep(2) |
| 164 | + return data |
| 165 | + |
| 166 | +if __name__ == "__main__": |
| 167 | + pipeline = task(range, branch=True) | task(slow_func, workers=20) |
| 168 | + # Runs in ~2 seconds |
| 169 | + for output in pipeline(20): |
| 170 | + print(output) |
| 171 | +``` |
| 172 | + |
| 173 | +{: .warning} |
| 174 | +A producer _cannot_ have `workers` set greater than `1` |
| 175 | + |
| 176 | +{: .text-beta} |
| 177 | +### `throttle` |
| 178 | + |
| 179 | +* **type:** `int` |
| 180 | +* **default:** `0` |
| 181 | + |
| 182 | +The parameter `throttle` determines the maximum size of a task's output queue. The purpose of this parameter is to give finer control over memory in situations where: |
| 183 | + |
| 184 | +* A producer/producer-consumer generates data very quickly |
| 185 | +* A producer-consumer/consumer processes that data very slowly |
| 186 | + |
| 187 | +```python |
| 188 | +import time |
| 189 | +from pyper import task |
| 190 | + |
| 191 | +@task(branch=True, throttle=5000) |
| 192 | +def fast_producer(): |
| 193 | + for i in range(1_000_000): |
| 194 | + yield i |
| 195 | + |
| 196 | +@task |
| 197 | +def slow_consumer(data: int): |
| 198 | + time.sleep(10) |
| 199 | + return data |
| 200 | +``` |
| 201 | + |
| 202 | +In the example above, workers on `fast_producer` are paused after `5000` values have been generated, until workers for `slow_consumer` are ready to start processing again. |
| 203 | +If no throttle were specified, workers for `fast_producer` would quickly flood its output queue with up to `1_000_000` values, which all have to be allocated in memory. |
| 204 | + |
| 205 | +{: .text-beta} |
| 206 | +### `multiprocess` |
| 207 | + |
| 208 | +* **type:** `bool` |
| 209 | +* **default:** `False` |
| 210 | + |
| 211 | +By default, synchronous tasks are run in `threading.Thread` workers and asynchronous tasks are run in `asyncio.Task` workers. |
| 212 | +The `multiprocess` parameter allows synchronous tasks be be run with `multiprocessing.Process` instead, benefitting heavily CPU-bound tasks. |
| 213 | + |
| 214 | +```python |
| 215 | +from pyper import task |
| 216 | + |
| 217 | +def slow_func(data: int): |
| 218 | + for i in range(1, 10_000_000): |
| 219 | + i *= i |
| 220 | + return data |
| 221 | + |
| 222 | +if __name__ == "__main__": |
| 223 | + pipeline = ( |
| 224 | + task(range, branch=True) |
| 225 | + | task(slow_func, workers=20, multiprocess=True) |
| 226 | + ) |
| 227 | + for output in pipeline(20): |
| 228 | + print(output) |
| 229 | +``` |
| 230 | + |
| 231 | +{: .warning} |
| 232 | +An asynchronous task cannot set `multiprocessing` as `True` |
| 233 | + |
| 234 | +See some [considerations](../UserGuide/AdvancedConcepts#cpu-bound-work) for when to set this parameter. |
| 235 | + |
| 236 | +{: .text-beta} |
| 237 | +### `bind` |
| 238 | + |
| 239 | +* **type:** `Optional[Tuple[Tuple[Any], Dict[str, Any]]]` |
| 240 | +* **default:** `None` |
| 241 | + |
| 242 | +The parameter `bind` allows additional `args` and `kwargs` to be bound to a task when creating a pipeline. |
| 243 | + |
| 244 | +```python |
| 245 | +from pyper import task |
| 246 | + |
| 247 | +def apply_multiplier(data: int, multiplier: int): |
| 248 | + return data * multiplier |
| 249 | + |
| 250 | +if __name__ == "__main__": |
| 251 | + pipeline = ( |
| 252 | + task(range, branch=True) |
| 253 | + | task(apply_multiplier, bind=task.bind(multiplier=10)) |
| 254 | + ) |
| 255 | + for output in pipeline(1, 4): |
| 256 | + print(output) |
| 257 | + # Prints: |
| 258 | + # 10 |
| 259 | + # 20 |
| 260 | + # 30 |
| 261 | +``` |
| 262 | + |
| 263 | +Given that each producer-consumer expects to be given one input argument, the purpose of the `bind` parameter is to allow functions to be defined flexibly in terms of the inputs they wish to take, as well as allowing tasks to access external states, like contexts. |
| 264 | + |
| 265 | +## task.bind |
| 266 | + |
| 267 | +```python |
| 268 | +@staticmethod |
| 269 | +def bind(*args, **kwargs): |
| 270 | +``` |
| 271 | + |
| 272 | +`task.bind` is the utility method that can be used to supply arguments to the `bind` parameter, which uses `functools.partial` under the hood. |
| 273 | + |
| 274 | +The method accepts normal valid `*args` and `**kwargs`. |
0 commit comments