Skip to content

Commit 8de6a41

Browse files
committed
Added tests.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 44a71ce commit 8de6a41

File tree

7 files changed

+67
-28
lines changed

7 files changed

+67
-28
lines changed

taskiq_pipelines/abc.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
class AbstractStep(ABC):
1010
"""Abstract pipeline step."""
1111

12-
step_name: str
13-
known_steps: "Dict[str, Type[AbstractStep]]" = {}
12+
_step_name: str
13+
_known_steps: "Dict[str, Type[AbstractStep]]" = {}
1414

1515
def __init_subclass__(cls, step_name: str, **kwargs: Any) -> None:
1616
super().__init_subclass__(**kwargs)
1717
# Sets step name to the step.
18-
cls.step_name = step_name
18+
cls._step_name = step_name
1919
# Registers new subclass in the dict of
2020
# known steps.
21-
cls.known_steps[step_name] = cls
21+
cls._known_steps[step_name] = cls
2222

2323
@abstractmethod
2424
def dumps(self) -> str:

taskiq_pipelines/pipeliner.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def call_next(
111111
"""
112112
self.steps.append(
113113
DumpedStep(
114-
step_type=SequentialStep.step_name,
114+
step_type=SequentialStep._step_name,
115115
step_data=SequentialStep.from_task(
116116
task=task,
117117
param_name=param_name,
@@ -167,7 +167,7 @@ def call_after(
167167
"""
168168
self.steps.append(
169169
DumpedStep(
170-
step_type=SequentialStep.step_name,
170+
step_type=SequentialStep._step_name,
171171
step_data=SequentialStep.from_task(
172172
task=task,
173173
param_name=EMPTY_PARAM_NAME,
@@ -236,7 +236,7 @@ def map(
236236
"""
237237
self.steps.append(
238238
DumpedStep(
239-
step_type=MapperStep.step_name,
239+
step_type=MapperStep._step_name,
240240
step_data=MapperStep.from_task(
241241
task=task,
242242
param_name=param_name,
@@ -308,7 +308,7 @@ def filter(
308308
"""
309309
self.steps.append(
310310
DumpedStep(
311-
step_type=FilterStep.step_name,
311+
step_type=FilterStep._step_name,
312312
step_data=FilterStep.from_task(
313313
task=task,
314314
param_name=param_name,

taskiq_pipelines/steps/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
def parse_step(step_type: str, step_data: str) -> AbstractStep:
13-
step_cls = AbstractStep.known_steps.get(step_type)
13+
step_cls = AbstractStep._known_steps.get(step_type)
1414
if step_cls is None:
1515
logger.warning(f"Unknown step type: {step_type}")
1616
raise ValueError("Unknown step type.")

taskiq_pipelines/steps/sequential.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,6 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
2424
param_name: Union[Optional[int], str]
2525
additional_kwargs: Dict[str, Any]
2626

27-
@pydantic.validator("param_name")
28-
def validate_param_name(
29-
self,
30-
value: Union[Optional[str], int],
31-
) -> Union[Optional[str], int]:
32-
"""
33-
Validate param_name.
34-
35-
:param value: value to validate.
36-
:raises ValueError: if value is not str, None or -1 (EMPTY_PARAM_NAME).
37-
:return: param value.
38-
"""
39-
if isinstance(value, int) and value != EMPTY_PARAM_NAME:
40-
raise ValueError("must be str, None or -1 (EMPTY_PARAM_NAME)")
41-
return value
42-
4327
def dumps(self) -> str:
4428
"""
4529
Dumps step as string.

taskiq_pipelines/tests/test_stub.py

Lines changed: 0 additions & 3 deletions
This file was deleted.

tests/conftest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import pytest
2+
3+
4+
@pytest.fixture(scope="session")
5+
def anyio_backend() -> str:
6+
"""
7+
Anyio backend.
8+
9+
Backend for anyio pytest plugin.
10+
:return: backend name.
11+
"""
12+
return "asyncio"

tests/test_steps.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from typing import List
2+
3+
import pytest
4+
from taskiq import InMemoryBroker
5+
6+
from taskiq_pipelines import Pipeline, PipelineMiddleware
7+
8+
9+
@pytest.mark.anyio
10+
async def test_success() -> None:
11+
"""Test stub for CI."""
12+
13+
broker = InMemoryBroker().with_middlewares(PipelineMiddleware())
14+
15+
@broker.task
16+
def add(i: int) -> int:
17+
return i + 1
18+
19+
@broker.task
20+
def double(i: int) -> int:
21+
return i * 2
22+
23+
pipe = Pipeline(broker, add).call_next(double)
24+
sent = await pipe.kiq(1)
25+
res = await sent.wait_result()
26+
assert res.return_value == 4
27+
28+
29+
@pytest.mark.anyio
30+
async def test_mapping_success() -> None:
31+
"""Test stub for CI."""
32+
33+
broker = InMemoryBroker().with_middlewares(PipelineMiddleware())
34+
35+
@broker.task
36+
def ranger(i: int) -> List[int]:
37+
return list(range(i))
38+
39+
@broker.task
40+
def double(i: int) -> int:
41+
return i * 2
42+
43+
pipe = Pipeline(broker, ranger).map(double)
44+
sent = await pipe.kiq(4)
45+
res = await sent.wait_result()
46+
assert res.return_value == list(map(double, ranger(4)))

0 commit comments

Comments
 (0)