You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+158Lines changed: 158 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -5,3 +5,161 @@ Taskiq pipelines is a `fire-and-forget` at its limit.
5
5
Imagine you have a really tough functions and you want
6
6
to call them sequentially one after one, but you don't want to wait for them
7
7
to complete. taskiq-pipeline solves this for you.
8
+
9
+
## Installation
10
+
11
+
12
+
You can install it from pypi:
13
+
```
14
+
pip install taskiq-pipelines
15
+
```
16
+
17
+
After you installed it you need to add our super clever middleware
18
+
to your broker.
19
+
20
+
This middleware actually decides what to do next, after current step
21
+
is completed.
22
+
23
+
```python
24
+
from taskiq_pipelines.middleware import PipelineMiddleware
25
+
26
+
my_super_broker =...
27
+
28
+
29
+
my_super_broker.add_middlewares(
30
+
[
31
+
PipelineMiddleware(),
32
+
]
33
+
)
34
+
```
35
+
36
+
Also we have to admit that your broker MUST use result_backend that
37
+
can be read by all your workers. Pipelines work with inmemorybroker,
38
+
feel free to use it in local development.
39
+
40
+
41
+
### Example
42
+
43
+
For this example I'm going to use one single script file.
44
+
45
+
```python
46
+
import asyncio
47
+
from typing import Any, List
48
+
from taskiq.brokers.inmemory_broker import InMemoryBroker
49
+
from taskiq_pipelines import PipelineMiddleware, Pipeline
50
+
51
+
broker = InMemoryBroker()
52
+
broker.add_middlewares([PipelineMiddleware()])
53
+
54
+
55
+
@broker.task
56
+
defadd_one(value: int) -> int:
57
+
return value +1
58
+
59
+
60
+
@broker.task
61
+
defrepeat(value: Any, reps: int) -> List[Any]:
62
+
return [value] * reps
63
+
64
+
65
+
@broker.task
66
+
defcheck(value: int) -> bool:
67
+
return value >=0
68
+
69
+
70
+
asyncdefmain():
71
+
pipe = (
72
+
Pipeline(
73
+
broker,
74
+
add_one, # First of all we call add_one function.
75
+
)
76
+
# 2
77
+
.call_next(repeat, reps=4) # Here we repeat our value 4 times
78
+
# [2, 2, 2, 2]
79
+
.map(add_one) # Here we execute given function for each value.
80
+
# [3, 3, 3, 3]
81
+
.filter(check) # Here we filter some values.
82
+
# But sice our filter filters out all numbers less than zero,
83
+
# our value won't change.
84
+
# [3, 3, 3, 3]
85
+
)
86
+
task =await pipe.kiq(1)
87
+
result =await task.wait_result()
88
+
print("Calculated value:", result.return_value)
89
+
90
+
91
+
if__name__=="__main__":
92
+
asyncio.run(main())
93
+
94
+
```
95
+
96
+
If you run this example, it prints this:
97
+
```bash
98
+
$ python script.py
99
+
Calculated value: [3, 3, 3, 3]
100
+
```
101
+
102
+
Let's talk about this example.
103
+
Two notable things here:
104
+
1. We must add PipelineMiddleware in the list of our middlewares.
105
+
2. We can use only tasks as functions we wan to execute in pipeline.
106
+
If you want to execute ordinary python function - you must wrap it in task.
107
+
108
+
Pipeline itself is just a convinient wrapper over list of steps.
109
+
Constructed pipeline has the same semantics as the ordinary task, and you can add steps
110
+
manually. But all steps of the pipeline must implement `taskiq_pipelines.abc.AbstractStep` class.
111
+
112
+
Pipelines can be serialized to strings with `dumps` method, and you can load them back with `Pipeline.loads` method. So you can share pipelines you want to execute as simple strings.
113
+
114
+
Pipeline assign `task_id` for each task when you call `kiq`, and executes every step with pre-calculated `task_id`,
115
+
so you know all task ids after you call kiq method.
116
+
117
+
118
+
## How does it work?
119
+
120
+
After you call `kiq` method of the pipeline it pre-calculates
121
+
all task_ids, serializes itself and adds serialized string to
122
+
the labels of the first task in the chain.
123
+
124
+
All the magic happens in the middleware.
125
+
After task is executed and result is saved, you can easily deserialize pipeline
126
+
back and calculate pipeline's next move. And that's the trick.
127
+
You can get more information from the source code of each pipeline step.
128
+
129
+
# Available steps
130
+
131
+
We have a few steps available for chaining calls:
132
+
1. Sequential
133
+
2. Mapper
134
+
3. Filter
135
+
136
+
### Sequential steps
137
+
138
+
This type of step is just an ordinary call of the function.
139
+
If you haven't specified `param_name` argument, then the result
140
+
of the previous step will be passed as the first argument of the function.
141
+
Uf you did specify the `param_name` argument, then the result of the previous
142
+
step can be found in key word arguments with the param name you specified.
143
+
144
+
You can add sequential steps with `.call_next` method of the pipeline.
145
+
146
+
### Mapper step
147
+
148
+
This step runs specified task for each item of the previous task's result spawning
149
+
multiple tasks.
150
+
But I have to admit, that the result of the previous task must be iterable.
151
+
Otherwise it will mark the pipeline as failed.
152
+
153
+
After the execution you'll have mapped list.
154
+
You can add mappers by calling `.map` method of the pipeline.
155
+
156
+
### Filter step
157
+
158
+
This step runs specified task for each item of the previous task's result.
159
+
But I have to admit, that the result of the previous task must be iterable.
160
+
Otherwise it will mark the pipeline as failed.
161
+
162
+
If called tasks returned `True` for some element, this element will be added in the final list.
163
+
164
+
After the execution you'll get a list with filtered results.
165
+
You can add filters by calling `.filter` method of the pipeline.
0 commit comments