Skip to content

Commit 38dc7b2

Browse files
committed
Add empty param name support
1 parent 5f35b93 commit 38dc7b2

File tree

4 files changed

+37
-10
lines changed

4 files changed

+37
-10
lines changed

taskiq_pipelines/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Pipelines for taskiq tasks."""
2+
from taskiq_pipelines.constants import EMPTY_PARAM_NAME
23
from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
34
from taskiq_pipelines.middleware import PipelineMiddleware
45
from taskiq_pipelines.pipeliner import Pipeline
@@ -8,4 +9,5 @@
89
"PipelineError",
910
"AbortPipeline",
1011
"PipelineMiddleware",
12+
"EMPTY_PARAM_NAME",
1113
]

taskiq_pipelines/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1+
from typing import Literal
2+
13
CURRENT_STEP = "_pipe_current_step"
24
PIPELINE_DATA = "_pipe_data"
5+
6+
EMPTY_PARAM_NAME: Literal[-1] = -1

taskiq_pipelines/pipeliner.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
import json
2-
from typing import Any, Coroutine, Generic, List, Optional, TypeVar, Union, overload
2+
from typing import (
3+
Any,
4+
Coroutine,
5+
Generic,
6+
List,
7+
Literal,
8+
Optional,
9+
TypeVar,
10+
Union,
11+
overload,
12+
)
313

414
import pydantic
515
from taskiq import AsyncBroker, AsyncTaskiqTask
@@ -58,7 +68,7 @@ def call_next(
5868
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
5969
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
6070
],
61-
param_name: Optional[str] = None,
71+
param_name: Union[Optional[str], Literal[-1]] = None,
6272
**additional_kwargs: Any,
6373
) -> "Pipeline[_FuncParams, _T2]":
6474
...
@@ -70,7 +80,7 @@ def call_next(
7080
AsyncKicker[Any, _T2],
7181
AsyncTaskiqDecoratedTask[Any, _T2],
7282
],
73-
param_name: Optional[str] = None,
83+
param_name: Union[Optional[str], Literal[-1]] = None,
7484
**additional_kwargs: Any,
7585
) -> "Pipeline[_FuncParams, _T2]":
7686
...
@@ -81,7 +91,7 @@ def call_next(
8191
AsyncKicker[Any, Any],
8292
AsyncTaskiqDecoratedTask[Any, Any],
8393
],
84-
param_name: Optional[str] = None,
94+
param_name: Union[Optional[str], Literal[-1]] = None,
8595
**additional_kwargs: Any,
8696
) -> Any:
8797
"""
@@ -94,7 +104,8 @@ def call_next(
94104
if param_name is specified.
95105
96106
:param task: task to execute.
97-
:param param_name: kwarg param name, defaults to None
107+
:param param_name: kwarg param name, defaults to None.
108+
If set to -1 (EMPTY_PARAM_NAME), result is not passed.
98109
:param additional_kwargs: additional kwargs to task.
99110
:return: updated pipeline.
100111
"""

taskiq_pipelines/steps/sequential.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
from typing import Any, Dict, Optional, Union
1+
from typing import Any, Dict, Literal, Optional, Union
22

33
import pydantic
44
from taskiq import AsyncBroker, AsyncTaskiqDecoratedTask, TaskiqResult
55
from taskiq.kicker import AsyncKicker
66

77
from taskiq_pipelines.abc import AbstractStep
8-
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
8+
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
99

1010

1111
class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
@@ -19,9 +19,17 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
1919

2020
task_name: str
2121
labels: Dict[str, str]
22-
param_name: Optional[str]
22+
# order is important here, otherwise pydantic will always choose str.
23+
# we use int instead of Literal[-1] because pydantic thinks that -1 is always str.
24+
param_name: Union[Optional[int], str]
2325
additional_kwargs: Dict[str, Any]
2426

27+
@pydantic.validator("param_name")
28+
def validate_param_name(cls, value: Union[Optional[str], int]) -> Union[Optional[str], int]:
29+
if isinstance(value, int) and value != EMPTY_PARAM_NAME:
30+
raise ValueError("must be str, None or -1 (EMPTY_PARAM_NAME)")
31+
return value
32+
2533
def dumps(self) -> str:
2634
"""
2735
Dumps step as string.
@@ -78,9 +86,11 @@ async def act(
7886
**{PIPELINE_DATA: pipe_data, CURRENT_STEP: step_number}, # type: ignore
7987
)
8088
)
81-
if self.param_name:
89+
if isinstance(self.param_name, str):
8290
self.additional_kwargs[self.param_name] = result.return_value
8391
await kicker.kiq(**self.additional_kwargs)
92+
elif self.param_name == EMPTY_PARAM_NAME:
93+
await kicker.kiq(**self.additional_kwargs)
8494
else:
8595
await kicker.kiq(result.return_value, **self.additional_kwargs)
8696

@@ -91,7 +101,7 @@ def from_task(
91101
AsyncKicker[Any, Any],
92102
AsyncTaskiqDecoratedTask[Any, Any],
93103
],
94-
param_name: Optional[str],
104+
param_name: Union[Optional[str], int],
95105
**additional_kwargs: Any,
96106
) -> "SequentialStep":
97107
"""

0 commit comments

Comments
 (0)