Skip to content

Commit a9b8556

Browse files
committed
Fixed pipeline typehints.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 53fe726 commit a9b8556

File tree

2 files changed

+60
-4
lines changed

2 files changed

+60
-4
lines changed

.flake8

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ max-arguments=8
99
max-line-complexity=20
1010

1111
ignore =
12+
; Found statement that has no effect
13+
WPS428,
14+
; Found overly complex annotation
15+
WPS234,
1216
; Found multi-line function type annotation
1317
WPS320,
1418
; Found too deep nesting

taskiq_pipelines/pipeliner.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from typing import Any, Generic, List, Optional, TypeVar, Union
2+
from typing import Any, Coroutine, Generic, List, Optional, TypeVar, Union, overload
33

44
import pydantic
55
from taskiq import AsyncBroker, AsyncTaskiqTask
@@ -36,7 +36,7 @@ class Pipeline(Generic[_FuncParams, _ReturnType]):
3636
but it's nice to have.
3737
"""
3838

39-
def __init__( # noqa: WPS234
39+
def __init__(
4040
self,
4141
broker: AsyncBroker,
4242
task: Optional[
@@ -51,6 +51,19 @@ def __init__( # noqa: WPS234
5151
if task:
5252
self.call_next(task)
5353

54+
@overload
55+
def call_next(
56+
self: "Pipeline[_FuncParams, _ReturnType]",
57+
task: Union[
58+
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
59+
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
60+
],
61+
param_name: Optional[str] = None,
62+
**additional_kwargs: Any,
63+
) -> "Pipeline[_FuncParams, _T2]":
64+
...
65+
66+
@overload
5467
def call_next(
5568
self: "Pipeline[_FuncParams, _ReturnType]",
5669
task: Union[
@@ -60,6 +73,17 @@ def call_next(
6073
param_name: Optional[str] = None,
6174
**additional_kwargs: Any,
6275
) -> "Pipeline[_FuncParams, _T2]":
76+
...
77+
78+
def call_next(
79+
self,
80+
task: Union[
81+
AsyncKicker[Any, Any],
82+
AsyncTaskiqDecoratedTask[Any, Any],
83+
],
84+
param_name: Optional[str] = None,
85+
**additional_kwargs: Any,
86+
) -> Any:
6387
"""
6488
Adds sequential step.
6589
@@ -85,8 +109,23 @@ def call_next(
85109
task_id="",
86110
),
87111
)
88-
return self # type: ignore
112+
return self
113+
114+
@overload
115+
def map(
116+
self: "Pipeline[_FuncParams, _ReturnType]",
117+
task: Union[
118+
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
119+
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
120+
],
121+
param_name: Optional[str] = None,
122+
skip_errors: bool = False,
123+
check_interval: float = 0.5,
124+
**additional_kwargs: Any,
125+
) -> "Pipeline[_FuncParams, List[_T2]]":
126+
...
89127

128+
@overload
90129
def map(
91130
self: "Pipeline[_FuncParams, _ReturnType]",
92131
task: Union[
@@ -98,6 +137,19 @@ def map(
98137
check_interval: float = 0.5,
99138
**additional_kwargs: Any,
100139
) -> "Pipeline[_FuncParams, List[_T2]]":
140+
...
141+
142+
def map(
143+
self,
144+
task: Union[
145+
AsyncKicker[Any, Any],
146+
AsyncTaskiqDecoratedTask[Any, Any],
147+
],
148+
param_name: Optional[str] = None,
149+
skip_errors: bool = False,
150+
check_interval: float = 0.5,
151+
**additional_kwargs: Any,
152+
) -> Any:
101153
"""
102154
Create new map task.
103155
@@ -128,7 +180,7 @@ def map(
128180
task_id="",
129181
),
130182
)
131-
return self # type: ignore
183+
return self
132184

133185
def dumps(self) -> str:
134186
"""

0 commit comments

Comments
 (0)