11import json
2- from typing import Any , Coroutine , Generic , List , Optional , TypeVar , Union , overload
2+ from typing import (
3+ Any ,
4+ Coroutine ,
5+ Generic ,
6+ List ,
7+ Literal ,
8+ Optional ,
9+ TypeVar ,
10+ Union ,
11+ overload ,
12+ )
313
414import pydantic
515from taskiq import AsyncBroker , AsyncTaskiqTask
616from taskiq .decor import AsyncTaskiqDecoratedTask
717from taskiq .kicker import AsyncKicker
818from typing_extensions import ParamSpec
919
10- from taskiq_pipelines .constants import CURRENT_STEP , PIPELINE_DATA
20+ from taskiq_pipelines .constants import CURRENT_STEP , EMPTY_PARAM_NAME , PIPELINE_DATA
1121from taskiq_pipelines .steps import FilterStep , MapperStep , SequentialStep , parse_step
1222
1323_ReturnType = TypeVar ("_ReturnType" )
@@ -58,7 +68,7 @@ def call_next(
5868 AsyncKicker [Any , Coroutine [Any , Any , _T2 ]],
5969 AsyncTaskiqDecoratedTask [Any , Coroutine [Any , Any , _T2 ]],
6070 ],
61- param_name : Optional [str ] = None ,
71+ param_name : Union [ Optional [str ], Literal [ - 1 ] ] = None ,
6272 ** additional_kwargs : Any ,
6373 ) -> "Pipeline[_FuncParams, _T2]" :
6474 ...
@@ -70,7 +80,7 @@ def call_next(
7080 AsyncKicker [Any , _T2 ],
7181 AsyncTaskiqDecoratedTask [Any , _T2 ],
7282 ],
73- param_name : Optional [str ] = None ,
83+ param_name : Union [ Optional [str ], Literal [ - 1 ] ] = None ,
7484 ** additional_kwargs : Any ,
7585 ) -> "Pipeline[_FuncParams, _T2]" :
7686 ...
@@ -81,7 +91,7 @@ def call_next(
8191 AsyncKicker [Any , Any ],
8292 AsyncTaskiqDecoratedTask [Any , Any ],
8393 ],
84- param_name : Optional [str ] = None ,
94+ param_name : Union [ Optional [str ], Literal [ - 1 ] ] = None ,
8595 ** additional_kwargs : Any ,
8696 ) -> Any :
8797 """
@@ -94,13 +104,14 @@ def call_next(
94104 if param_name is specified.
95105
96106 :param task: task to execute.
97- :param param_name: kwarg param name, defaults to None
107+ :param param_name: kwarg param name, defaults to None.
108+ If set to -1 (EMPTY_PARAM_NAME), result is not passed.
98109 :param additional_kwargs: additional kwargs to task.
99110 :return: updated pipeline.
100111 """
101112 self .steps .append (
102113 DumpedStep (
103- step_type = SequentialStep .step_name ,
114+ step_type = SequentialStep ._step_name , # noqa: WPS437
104115 step_data = SequentialStep .from_task (
105116 task = task ,
106117 param_name = param_name ,
@@ -111,6 +122,62 @@ def call_next(
111122 )
112123 return self
113124
125+ @overload
126+ def call_after (
127+ self : "Pipeline[_FuncParams, _ReturnType]" ,
128+ task : Union [
129+ AsyncKicker [Any , Coroutine [Any , Any , _T2 ]],
130+ AsyncTaskiqDecoratedTask [Any , Coroutine [Any , Any , _T2 ]],
131+ ],
132+ ** additional_kwargs : Any ,
133+ ) -> "Pipeline[_FuncParams, _T2]" :
134+ ...
135+
136+ @overload
137+ def call_after (
138+ self : "Pipeline[_FuncParams, _ReturnType]" ,
139+ task : Union [
140+ AsyncKicker [Any , _T2 ],
141+ AsyncTaskiqDecoratedTask [Any , _T2 ],
142+ ],
143+ ** additional_kwargs : Any ,
144+ ) -> "Pipeline[_FuncParams, _T2]" :
145+ ...
146+
147+ def call_after (
148+ self ,
149+ task : Union [
150+ AsyncKicker [Any , Any ],
151+ AsyncTaskiqDecoratedTask [Any , Any ],
152+ ],
153+ ** additional_kwargs : Any ,
154+ ) -> Any :
155+ """
156+ Adds sequential step.
157+
158+ This task will be executed right after
159+ the previous and result of the previous task
160+ is not passed to the next task.
161+
162+ This is equivalent to call_next(task, param_name=-1).
163+
164+ :param task: task to execute.
165+ :param additional_kwargs: additional kwargs to task.
166+ :return: updated pipeline.
167+ """
168+ self .steps .append (
169+ DumpedStep (
170+ step_type = SequentialStep ._step_name , # noqa: WPS437
171+ step_data = SequentialStep .from_task (
172+ task = task ,
173+ param_name = EMPTY_PARAM_NAME ,
174+ ** additional_kwargs ,
175+ ).dumps (),
176+ task_id = "" ,
177+ ),
178+ )
179+ return self
180+
114181 @overload
115182 def map (
116183 self : "Pipeline[_FuncParams, _ReturnType]" ,
@@ -169,7 +236,7 @@ def map(
169236 """
170237 self .steps .append (
171238 DumpedStep (
172- step_type = MapperStep .step_name ,
239+ step_type = MapperStep ._step_name , # noqa: WPS437
173240 step_data = MapperStep .from_task (
174241 task = task ,
175242 param_name = param_name ,
@@ -241,7 +308,7 @@ def filter(
241308 """
242309 self .steps .append (
243310 DumpedStep (
244- step_type = FilterStep .step_name ,
311+ step_type = FilterStep ._step_name , # noqa: WPS437
245312 step_data = FilterStep .from_task (
246313 task = task ,
247314 param_name = param_name ,
@@ -261,7 +328,7 @@ def dumps(self) -> str:
261328 :returns: serialized pipeline.
262329 """
263330 return json .dumps (
264- [step .dict () for step in self .steps ],
331+ [step .model_dump () for step in self .steps ],
265332 )
266333
267334 @classmethod
@@ -277,7 +344,7 @@ def loads(cls, broker: AsyncBroker, pipe_data: str) -> "Pipeline[Any, Any]":
277344 :return: new
278345 """
279346 pipe : "Pipeline[Any, Any]" = Pipeline (broker )
280- pipe .steps = pydantic .parse_raw_as (List [DumpedStep ], pipe_data )
347+ pipe .steps = pydantic .TypeAdapter (List [DumpedStep ]). validate_json ( pipe_data )
281348 return pipe
282349
283350 async def kiq (
0 commit comments