Skip to content

Commit 445e670

Browse files
authored
Merge pull request #12 from taskiq-python/feature/tests
2 parents 44a71ce + 73daebc commit 445e670

File tree

9 files changed

+70
-29
lines changed

9 files changed

+70
-29
lines changed

.flake8

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ per-file-ignores =
106106
DAR101,
107107
; Found too many arguments
108108
WPS211,
109+
; Found nested function
110+
WPS430,
111+
; Found too short name
112+
WPS111,
109113

110114
; all init files
111115
__init__.py:

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ repos:
4141
language: system
4242
pass_filenames: false
4343
types: [python]
44-
args: [--count, taskiq_pipelines]
44+
args: [--count, taskiq_pipelines, tests]
4545

4646
- id: mypy
4747
name: Validate types with MyPy

taskiq_pipelines/abc.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
class AbstractStep(ABC):
1010
"""Abstract pipeline step."""
1111

12-
step_name: str
13-
known_steps: "Dict[str, Type[AbstractStep]]" = {}
12+
_step_name: str
13+
_known_steps: "Dict[str, Type[AbstractStep]]" = {}
1414

1515
def __init_subclass__(cls, step_name: str, **kwargs: Any) -> None:
1616
super().__init_subclass__(**kwargs)
1717
# Sets step name to the step.
18-
cls.step_name = step_name
18+
cls._step_name = step_name
1919
# Registers new subclass in the dict of
2020
# known steps.
21-
cls.known_steps[step_name] = cls
21+
cls._known_steps[step_name] = cls
2222

2323
@abstractmethod
2424
def dumps(self) -> str:

taskiq_pipelines/pipeliner.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def call_next(
111111
"""
112112
self.steps.append(
113113
DumpedStep(
114-
step_type=SequentialStep.step_name,
114+
step_type=SequentialStep._step_name, # noqa: WPS437
115115
step_data=SequentialStep.from_task(
116116
task=task,
117117
param_name=param_name,
@@ -167,7 +167,7 @@ def call_after(
167167
"""
168168
self.steps.append(
169169
DumpedStep(
170-
step_type=SequentialStep.step_name,
170+
step_type=SequentialStep._step_name, # noqa: WPS437
171171
step_data=SequentialStep.from_task(
172172
task=task,
173173
param_name=EMPTY_PARAM_NAME,
@@ -236,7 +236,7 @@ def map(
236236
"""
237237
self.steps.append(
238238
DumpedStep(
239-
step_type=MapperStep.step_name,
239+
step_type=MapperStep._step_name, # noqa: WPS437
240240
step_data=MapperStep.from_task(
241241
task=task,
242242
param_name=param_name,
@@ -308,7 +308,7 @@ def filter(
308308
"""
309309
self.steps.append(
310310
DumpedStep(
311-
step_type=FilterStep.step_name,
311+
step_type=FilterStep._step_name, # noqa: WPS437
312312
step_data=FilterStep.from_task(
313313
task=task,
314314
param_name=param_name,

taskiq_pipelines/steps/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
def parse_step(step_type: str, step_data: str) -> AbstractStep:
13-
step_cls = AbstractStep.known_steps.get(step_type)
13+
step_cls = AbstractStep._known_steps.get(step_type) # noqa: WPS437
1414
if step_cls is None:
1515
logger.warning(f"Unknown step type: {step_type}")
1616
raise ValueError("Unknown step type.")

taskiq_pipelines/steps/sequential.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,6 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
2424
param_name: Union[Optional[int], str]
2525
additional_kwargs: Dict[str, Any]
2626

27-
@pydantic.validator("param_name")
28-
def validate_param_name(
29-
self,
30-
value: Union[Optional[str], int],
31-
) -> Union[Optional[str], int]:
32-
"""
33-
Validate param_name.
34-
35-
:param value: value to validate.
36-
:raises ValueError: if value is not str, None or -1 (EMPTY_PARAM_NAME).
37-
:return: param value.
38-
"""
39-
if isinstance(value, int) and value != EMPTY_PARAM_NAME:
40-
raise ValueError("must be str, None or -1 (EMPTY_PARAM_NAME)")
41-
return value
42-
4327
def dumps(self) -> str:
4428
"""
4529
Dumps step as string.

taskiq_pipelines/tests/test_stub.py

Lines changed: 0 additions & 3 deletions
This file was deleted.

tests/conftest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import pytest
2+
3+
4+
@pytest.fixture(scope="session")
5+
def anyio_backend() -> str:
6+
"""
7+
Anyio backend.
8+
9+
Backend for anyio pytest plugin.
10+
:return: backend name.
11+
"""
12+
return "asyncio"

tests/test_steps.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import List
2+
3+
import pytest
4+
from taskiq import InMemoryBroker
5+
6+
from taskiq_pipelines import Pipeline, PipelineMiddleware
7+
8+
9+
@pytest.mark.anyio
10+
async def test_success() -> None:
11+
"""Tests that sequential step works as expected."""
12+
broker = InMemoryBroker().with_middlewares(PipelineMiddleware())
13+
14+
@broker.task
15+
def add(i: int) -> int:
16+
return i + 1
17+
18+
@broker.task
19+
def double(i: int) -> int:
20+
return i * 2
21+
22+
pipe = Pipeline(broker, add).call_next(double)
23+
sent = await pipe.kiq(1)
24+
res = await sent.wait_result()
25+
assert res.return_value == 4
26+
27+
28+
@pytest.mark.anyio
29+
async def test_mapping_success() -> None:
30+
"""Test that map step works as expected."""
31+
broker = InMemoryBroker().with_middlewares(PipelineMiddleware())
32+
33+
@broker.task
34+
def ranger(i: int) -> List[int]:
35+
return list(range(i))
36+
37+
@broker.task
38+
def double(i: int) -> int:
39+
return i * 2
40+
41+
pipe = Pipeline(broker, ranger).map(double)
42+
sent = await pipe.kiq(4)
43+
res = await sent.wait_result()
44+
assert res.return_value == list(map(double, ranger(4)))

0 commit comments

Comments
 (0)