Skip to content

Commit aeb9b73

Browse files
committed
Add call_after function
1 parent 38dc7b2 commit aeb9b73

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

taskiq_pipelines/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
"""Pipelines for taskiq tasks."""
2-
from taskiq_pipelines.constants import EMPTY_PARAM_NAME
32
from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
43
from taskiq_pipelines.middleware import PipelineMiddleware
54
from taskiq_pipelines.pipeliner import Pipeline
@@ -9,5 +8,4 @@
98
"PipelineError",
109
"AbortPipeline",
1110
"PipelineMiddleware",
12-
"EMPTY_PARAM_NAME",
1311
]

taskiq_pipelines/pipeliner.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from taskiq.kicker import AsyncKicker
1818
from typing_extensions import ParamSpec
1919

20-
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
20+
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
2121
from taskiq_pipelines.steps import FilterStep, MapperStep, SequentialStep, parse_step
2222

2323
_ReturnType = TypeVar("_ReturnType")
@@ -122,6 +122,62 @@ def call_next(
122122
)
123123
return self
124124

125+
@overload
126+
def call_after(
127+
self: "Pipeline[_FuncParams, _ReturnType]",
128+
task: Union[
129+
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
130+
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
131+
],
132+
**additional_kwargs: Any,
133+
) -> "Pipeline[_FuncParams, _T2]":
134+
...
135+
136+
@overload
137+
def call_after(
138+
self: "Pipeline[_FuncParams, _ReturnType]",
139+
task: Union[
140+
AsyncKicker[Any, _T2],
141+
AsyncTaskiqDecoratedTask[Any, _T2],
142+
],
143+
**additional_kwargs: Any,
144+
) -> "Pipeline[_FuncParams, _T2]":
145+
...
146+
147+
def call_after(
148+
self,
149+
task: Union[
150+
AsyncKicker[Any, Any],
151+
AsyncTaskiqDecoratedTask[Any, Any],
152+
],
153+
**additional_kwargs: Any,
154+
) -> Any:
155+
"""
156+
Adds sequential step.
157+
158+
This task will be executed right after
159+
the previous and result of the previous task
160+
is not passed to the next task.
161+
162+
This is equivalent to call_next(task, param_name=-1).
163+
164+
:param task: task to execute.
165+
:param additional_kwargs: additional kwargs to task.
166+
:return: updated pipeline.
167+
"""
168+
self.steps.append(
169+
DumpedStep(
170+
step_type=SequentialStep.step_name,
171+
step_data=SequentialStep.from_task(
172+
task=task,
173+
param_name=EMPTY_PARAM_NAME,
174+
**additional_kwargs,
175+
).dumps(),
176+
task_id="",
177+
),
178+
)
179+
return self
180+
125181
@overload
126182
def map(
127183
self: "Pipeline[_FuncParams, _ReturnType]",

0 commit comments

Comments
 (0)