Skip to content

Commit 367efa4

Browse files
authored
Merge pull request #10 from cofob/feature-empty-param
2 parents 5f35b93 + d986520 commit 367efa4

File tree

5 files changed

+107
-11
lines changed

5 files changed

+107
-11
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ ignore =
8888
; Consider possible security implications associated with pickle module
8989
; Pickle and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue
9090
S403, S301
91+
; Found too many imported names from a module
92+
WPS235
9193

9294
per-file-ignores =
9395
; all tests

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,14 @@ We have a few steps available for chaining calls:
138138
This type of step is just an ordinary call of the function.
139139
If you haven't specified `param_name` argument, then the result
140140
of the previous step will be passed as the first argument of the function.
141-
Uf you did specify the `param_name` argument, then the result of the previous
141+
If you did specify the `param_name` argument, then the result of the previous
142142
step can be found in key word arguments with the param name you specified.
143143

144144
You can add sequential steps with `.call_next` method of the pipeline.
145145

146+
If you don't want to pass the result of the previous step to the next one,
147+
you can use `.call_after` method of the pipeline.
148+
146149
### Mapper step
147150

148151
This step runs specified task for each item of the previous task's result spawning

taskiq_pipelines/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1+
from typing import Literal
2+
13
CURRENT_STEP = "_pipe_current_step"
24
PIPELINE_DATA = "_pipe_data"
5+
6+
EMPTY_PARAM_NAME: Literal[-1] = -1

taskiq_pipelines/pipeliner.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
import json
2-
from typing import Any, Coroutine, Generic, List, Optional, TypeVar, Union, overload
2+
from typing import (
3+
Any,
4+
Coroutine,
5+
Generic,
6+
List,
7+
Literal,
8+
Optional,
9+
TypeVar,
10+
Union,
11+
overload,
12+
)
313

414
import pydantic
515
from taskiq import AsyncBroker, AsyncTaskiqTask
616
from taskiq.decor import AsyncTaskiqDecoratedTask
717
from taskiq.kicker import AsyncKicker
818
from typing_extensions import ParamSpec
919

10-
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
20+
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
1121
from taskiq_pipelines.steps import FilterStep, MapperStep, SequentialStep, parse_step
1222

1323
_ReturnType = TypeVar("_ReturnType")
@@ -58,7 +68,7 @@ def call_next(
5868
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
5969
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
6070
],
61-
param_name: Optional[str] = None,
71+
param_name: Union[Optional[str], Literal[-1]] = None,
6272
**additional_kwargs: Any,
6373
) -> "Pipeline[_FuncParams, _T2]":
6474
...
@@ -70,7 +80,7 @@ def call_next(
7080
AsyncKicker[Any, _T2],
7181
AsyncTaskiqDecoratedTask[Any, _T2],
7282
],
73-
param_name: Optional[str] = None,
83+
param_name: Union[Optional[str], Literal[-1]] = None,
7484
**additional_kwargs: Any,
7585
) -> "Pipeline[_FuncParams, _T2]":
7686
...
@@ -81,7 +91,7 @@ def call_next(
8191
AsyncKicker[Any, Any],
8292
AsyncTaskiqDecoratedTask[Any, Any],
8393
],
84-
param_name: Optional[str] = None,
94+
param_name: Union[Optional[str], Literal[-1]] = None,
8595
**additional_kwargs: Any,
8696
) -> Any:
8797
"""
@@ -94,7 +104,8 @@ def call_next(
94104
if param_name is specified.
95105
96106
:param task: task to execute.
97-
:param param_name: kwarg param name, defaults to None
107+
:param param_name: kwarg param name, defaults to None.
108+
If set to -1 (EMPTY_PARAM_NAME), result is not passed.
98109
:param additional_kwargs: additional kwargs to task.
99110
:return: updated pipeline.
100111
"""
@@ -111,6 +122,62 @@ def call_next(
111122
)
112123
return self
113124

125+
@overload
126+
def call_after(
127+
self: "Pipeline[_FuncParams, _ReturnType]",
128+
task: Union[
129+
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
130+
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
131+
],
132+
**additional_kwargs: Any,
133+
) -> "Pipeline[_FuncParams, _T2]":
134+
...
135+
136+
@overload
137+
def call_after(
138+
self: "Pipeline[_FuncParams, _ReturnType]",
139+
task: Union[
140+
AsyncKicker[Any, _T2],
141+
AsyncTaskiqDecoratedTask[Any, _T2],
142+
],
143+
**additional_kwargs: Any,
144+
) -> "Pipeline[_FuncParams, _T2]":
145+
...
146+
147+
def call_after(
148+
self,
149+
task: Union[
150+
AsyncKicker[Any, Any],
151+
AsyncTaskiqDecoratedTask[Any, Any],
152+
],
153+
**additional_kwargs: Any,
154+
) -> Any:
155+
"""
156+
Adds sequential step.
157+
158+
This task will be executed right after
159+
the previous and result of the previous task
160+
is not passed to the next task.
161+
162+
This is equivalent to call_next(task, param_name=-1).
163+
164+
:param task: task to execute.
165+
:param additional_kwargs: additional kwargs to task.
166+
:return: updated pipeline.
167+
"""
168+
self.steps.append(
169+
DumpedStep(
170+
step_type=SequentialStep.step_name,
171+
step_data=SequentialStep.from_task(
172+
task=task,
173+
param_name=EMPTY_PARAM_NAME,
174+
**additional_kwargs,
175+
).dumps(),
176+
task_id="",
177+
),
178+
)
179+
return self
180+
114181
@overload
115182
def map(
116183
self: "Pipeline[_FuncParams, _ReturnType]",

taskiq_pipelines/steps/sequential.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from taskiq.kicker import AsyncKicker
66

77
from taskiq_pipelines.abc import AbstractStep
8-
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
8+
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
99

1010

1111
class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
@@ -19,9 +19,27 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
1919

2020
task_name: str
2121
labels: Dict[str, str]
22-
param_name: Optional[str]
22+
# order is important here, otherwise pydantic will always choose str.
23+
# we use int instead of Literal[-1] because pydantic thinks that -1 is always str.
24+
param_name: Union[Optional[int], str]
2325
additional_kwargs: Dict[str, Any]
2426

27+
@pydantic.validator("param_name")
28+
def validate_param_name(
29+
self,
30+
value: Union[Optional[str], int],
31+
) -> Union[Optional[str], int]:
32+
"""
33+
Validate param_name.
34+
35+
:param value: value to validate.
36+
:raises ValueError: if value is not str, None or -1 (EMPTY_PARAM_NAME).
37+
:return: param value.
38+
"""
39+
if isinstance(value, int) and value != EMPTY_PARAM_NAME:
40+
raise ValueError("must be str, None or -1 (EMPTY_PARAM_NAME)")
41+
return value
42+
2543
def dumps(self) -> str:
2644
"""
2745
Dumps step as string.
@@ -78,9 +96,11 @@ async def act(
7896
**{PIPELINE_DATA: pipe_data, CURRENT_STEP: step_number}, # type: ignore
7997
)
8098
)
81-
if self.param_name:
99+
if isinstance(self.param_name, str):
82100
self.additional_kwargs[self.param_name] = result.return_value
83101
await kicker.kiq(**self.additional_kwargs)
102+
elif self.param_name == EMPTY_PARAM_NAME:
103+
await kicker.kiq(**self.additional_kwargs)
84104
else:
85105
await kicker.kiq(result.return_value, **self.additional_kwargs)
86106

@@ -91,7 +111,7 @@ def from_task(
91111
AsyncKicker[Any, Any],
92112
AsyncTaskiqDecoratedTask[Any, Any],
93113
],
94-
param_name: Optional[str],
114+
param_name: Union[Optional[str], int],
95115
**additional_kwargs: Any,
96116
) -> "SequentialStep":
97117
"""

0 commit comments

Comments
 (0)