Skip to content

Commit 445fd81

Browse files
authored
Added tests. (#37)
* Added tests. Signed-off-by: Pavel Kirilin <[email protected]>
1 parent a522920 commit 445fd81

File tree

16 files changed

+705
-305
lines changed

16 files changed

+705
-305
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ per-file-ignores =
9797
S101,
9898
; Found magic number
9999
WPS432,
100+
; Missing parameter(s) in Docstring
101+
DAR101,
100102

101103
exclude =
102104
./.git,

.github/workflows/test.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ jobs:
7676
- name: Upload coverage reports to Codecov with GitHub Action
7777
uses: codecov/codecov-action@v3
7878
if: matrix.os == 'ubuntu-latest' && matrix.py_version == '3.9'
79+
with:
80+
token: ${{ secrets.CODECOV_TOKEN }}
81+
fail_ci_if_error: true
82+
verbose: true

taskiq/abc/broker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
from typing import ( # noqa: WPS235
88
TYPE_CHECKING,
99
Any,
10-
AsyncGenerator,
1110
Callable,
1211
Coroutine,
1312
Dict,
1413
List,
15-
NoReturn,
1614
Optional,
1715
TypeVar,
1816
Union,

taskiq/abc/middleware.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from typing import TYPE_CHECKING, Any, Coroutine, Union
22

3-
if TYPE_CHECKING:
3+
if TYPE_CHECKING: # pragma: no cover
44
from taskiq.abc.broker import AsyncBroker
55
from taskiq.message import TaskiqMessage
66
from taskiq.result import TaskiqResult
77

88

9-
class TaskiqMiddleware:
9+
class TaskiqMiddleware: # pragma: no cover
1010
"""Base class for middlewares."""
1111

1212
def __init__(self) -> None:

taskiq/brokers/inmemory_broker.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import inspect
22
from collections import OrderedDict
3-
from concurrent.futures import ThreadPoolExecutor
43
from typing import Any, Callable, Coroutine, Optional, TypeVar
54

65
from taskiq.abc.broker import AsyncBroker
76
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
8-
from taskiq.cli.async_task_runner import run_task
9-
from taskiq.exceptions import ResultSetError, TaskiqError
7+
from taskiq.cli.args import TaskiqArgs
8+
from taskiq.cli.receiver import Receiver
9+
from taskiq.exceptions import TaskiqError
1010
from taskiq.message import BrokerMessage
1111

1212
_ReturnType = TypeVar("_ReturnType")
@@ -100,16 +100,16 @@ def __init__( # noqa: WPS211
100100
result_backend=result_backend,
101101
task_id_generator=task_id_generator,
102102
)
103-
self.executor = ThreadPoolExecutor(max_workers=sync_tasks_pool_size)
104-
self.cast_types = cast_types
105-
if logs_format is None:
106-
logs_format = (
107-
"[%(asctime)s]"
108-
"[%(levelname)-7s]"
109-
"[%(module)s:%(funcName)s:%(lineno)d] "
110-
"%(message)s"
111-
)
112-
self.logs_format = logs_format
103+
self.receiver = Receiver(
104+
self,
105+
TaskiqArgs(
106+
broker="",
107+
modules=[],
108+
max_threadpool_threads=sync_tasks_pool_size,
109+
no_parse=not cast_types,
110+
log_collector_format=logs_format or TaskiqArgs.log_collector_format,
111+
),
112+
)
113113

114114
async def kick(self, message: BrokerMessage) -> None:
115115
"""
@@ -119,25 +119,20 @@ async def kick(self, message: BrokerMessage) -> None:
119119
120120
:param message: incomming message.
121121
122-
:raises ResultSetError: if cannot save results in result backend.
123122
:raises TaskiqError: if someone wants to kick unknown task.
124123
"""
125124
target_task = self.available_tasks.get(message.task_name)
126-
taskiq_message = self.formatter.loads(message=message)
127125
if target_task is None:
128126
raise TaskiqError("Unknown task.")
129-
result = await run_task(
130-
target=target_task.original_func,
131-
signature=inspect.signature(target_task.original_func),
132-
message=taskiq_message,
133-
log_collector_format=self.logs_format,
134-
executor=self.executor,
135-
middlewares=self.middlewares,
136-
)
137-
try:
138-
await self.result_backend.set_result(message.task_id, result)
139-
except Exception as exc:
140-
raise ResultSetError("Cannot set result.") from exc
127+
if self.receiver.task_signatures:
128+
if not self.receiver.task_signatures.get(target_task.task_name):
129+
self.receiver.task_signatures[
130+
target_task.task_name
131+
] = inspect.signature(
132+
target_task.original_func,
133+
)
134+
135+
await self.receiver.callback(message=message)
141136

142137
async def listen(
143138
self,

taskiq/brokers/zmq_broker.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
from logging import getLogger
13
from typing import Any, Callable, Coroutine, Optional, TypeVar
24

35
from taskiq.abc.broker import AsyncBroker
@@ -12,6 +14,8 @@
1214

1315
_T = TypeVar("_T") # noqa: WPS111
1416

17+
logger = getLogger(__name__)
18+
1519

1620
class ZeroMQBroker(AsyncBroker):
1721
"""
@@ -67,6 +71,13 @@ async def listen(
6771
6872
:param callback: function to call when message received.
6973
"""
70-
while True: # noqa: WPS457
74+
loop = asyncio.get_event_loop()
75+
while True:
7176
with self.socket.connect(self.sub_host) as sock:
72-
await callback(BrokerMessage.parse_raw(await sock.recv_string()))
77+
received_str = await sock.recv_string()
78+
try:
79+
broker_msg = BrokerMessage.parse_raw(received_str)
80+
except ValueError:
81+
logger.warning("Cannot parse received message %s", received_str)
82+
continue
83+
loop.create_task(callback(broker_msg))

taskiq/cli/args.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@ class TaskiqArgs:
1919
"""Taskiq worker CLI arguments."""
2020

2121
broker: str
22-
tasks_pattern: str
2322
modules: List[str]
24-
fs_discover: bool
25-
log_level: str
26-
workers: int
27-
log_collector_format: str
28-
max_threadpool_threads: int
29-
no_parse: bool
30-
shutdown_timeout: float
31-
reload: bool
32-
no_gitignore: bool
23+
tasks_pattern: str = "tasks.py"
24+
fs_discover: bool = False
25+
log_level: str = "INFO"
26+
workers: int = 2
27+
log_collector_format: str = (
28+
"[%(asctime)s][%(levelname)-7s][%(module)s:%(funcName)s:%(lineno)d] %(message)s"
29+
)
30+
max_threadpool_threads: int = 10
31+
no_parse: bool = False
32+
shutdown_timeout: float = 5
33+
reload: bool = False
34+
no_gitignore: bool = False
3335

3436
@classmethod
3537
def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WPS213
@@ -128,8 +130,5 @@ def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WP
128130
help="Do not use gitignore to check for updated files.",
129131
)
130132

131-
if args is None:
132-
namespace = parser.parse_args(args)
133-
else:
134-
namespace = parser.parse_args()
133+
namespace = parser.parse_args(args)
135134
return TaskiqArgs(**namespace.__dict__)

0 commit comments

Comments
 (0)