Skip to content

Commit e98d825

Browse files
authored
Added test for retry middleware. (#55)
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent da3044f commit e98d825

21 files changed

+170
-177
lines changed

pyproject.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ warn_unused_ignores = false
8282
profile = "black"
8383
multi_line_output = 3
8484

85+
[tool.coverage.run]
86+
omit = [
87+
"taskiq/__main__.py",
88+
"taskiq/abc/cmd.py",
89+
"taskiq/cli/scheduler/args.py",
90+
"taskiq/cli/scheduler/cmd.py",
91+
"taskiq/cli/utils.py",
92+
"taskiq/cli/worker/args.py",
93+
"taskiq/cli/worker/async_task_runner.py",
94+
"taskiq/cli/worker/cmd.py",
95+
]
96+
8597
[build-system]
8698
requires = ["poetry-core>=1.0.0"]
8799
build-backend = "poetry.core.masonry.api"

taskiq/__main__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from taskiq.abc.cmd import TaskiqCMD
88

99

10-
def main() -> None: # noqa: C901, WPS210
10+
def main() -> None: # noqa: C901, WPS210 # pragma: no cover
1111
"""
1212
Main entrypoint of the taskiq.
1313
@@ -72,5 +72,5 @@ def main() -> None: # noqa: C901, WPS210
7272
command.exec(sys.argv[1:])
7373

7474

75-
if __name__ == "__main__":
75+
if __name__ == "__main__": # pragma: no cover
7676
main()

taskiq/abc/broker.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,15 @@ async def listen(
167167
def task(
168168
self,
169169
task_name: Callable[_FuncParams, _ReturnType],
170+
**lavels: Any,
170171
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: # pragma: no cover
171172
...
172173

173174
@overload
174175
def task(
175176
self,
176177
task_name: Optional[str] = None,
177-
**labels: Union[str, int],
178+
**labels: Any,
178179
) -> Callable[
179180
[Callable[_FuncParams, _ReturnType]],
180181
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
@@ -184,7 +185,7 @@ def task(
184185
def task( # type: ignore[misc]
185186
self,
186187
task_name: Optional[str] = None,
187-
**labels: Union[str, int],
188+
**labels: Any,
188189
) -> Any:
189190
"""
190191
Decorator that turns function into a task.
@@ -223,7 +224,7 @@ def inner(
223224
if inner_task_name is None:
224225
fmodule = func.__module__
225226
if fmodule == "__main__": # pragma: no cover
226-
fmodule = ".".join( # noqa: WPS220
227+
fmodule = ".".join(
227228
sys.argv[0]
228229
.removesuffix(
229230
".py",

taskiq/abc/cmd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Sequence
33

44

5-
class TaskiqCMD(ABC):
5+
class TaskiqCMD(ABC): # pragma: no cover
66
"""Base class for new commands."""
77

88
short_help = ""

taskiq/decor.py

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from typing_extensions import ParamSpec
1313

1414
from taskiq.kicker import AsyncKicker
15-
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
15+
from taskiq.task import AsyncTaskiqTask
1616

1717
if TYPE_CHECKING: # pragma: no cover
1818
from taskiq.abc.broker import AsyncBroker
@@ -93,40 +93,6 @@ async def kiq(
9393
"""
9494
return await self.kicker().kiq(*args, **kwargs)
9595

96-
@overload
97-
def kiq_sync(
98-
self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]",
99-
*args: _FuncParams.args,
100-
**kwargs: _FuncParams.kwargs,
101-
) -> SyncTaskiqTask[_T]:
102-
...
103-
104-
@overload
105-
def kiq_sync(
106-
self: "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]",
107-
*args: _FuncParams.args,
108-
**kwargs: _FuncParams.kwargs,
109-
) -> SyncTaskiqTask[_ReturnType]:
110-
...
111-
112-
def kiq_sync(
113-
self,
114-
*args: _FuncParams.args,
115-
**kwargs: _FuncParams.kwargs,
116-
) -> Any:
117-
"""
118-
This method sends function call over the network.
119-
120-
It gets current broker and calls it's kick method,
121-
returning what it returns.
122-
123-
:param args: function's arguments.
124-
:param kwargs: function's key word arguments.
125-
126-
:returns: taskiq task.
127-
"""
128-
return self.kicker().kiq_sync(*args, **kwargs)
129-
13096
def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
13197
"""
13298
This function returns kicker object.

taskiq/dependencies.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,6 @@ def __eq__(self, rhs: object) -> bool:
140140
rhs.kwargs,
141141
)
142142

143-
def __str__(self) -> str:
144-
if self.dependency is None:
145-
dep_name = "<from hint>"
146-
else:
147-
dep_name = (
148-
f"{self.dependency.__module__}:" # noqa: WPS237
149-
f"{self.dependency.__name__}"
150-
)
151-
return (
152-
f"TaskiqDepends({dep_name}, "
153-
f"use_cache={self.use_cache}, "
154-
f"kwargs={self.kwargs})"
155-
)
156-
157143

158144
class DependencyResolveContext:
159145
"""

taskiq/kicker.py

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from taskiq.abc.middleware import TaskiqMiddleware
1919
from taskiq.exceptions import SendTaskError
2020
from taskiq.message import TaskiqMessage
21-
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
22-
from taskiq.utils import maybe_awaitable, run_sync
21+
from taskiq.task import AsyncTaskiqTask
22+
from taskiq.utils import maybe_awaitable
2323

2424
if TYPE_CHECKING: # pragma: no cover
2525
from taskiq.abc.broker import AsyncBroker
@@ -142,40 +142,6 @@ async def kiq( # noqa: C901
142142
result_backend=self.broker.result_backend,
143143
)
144144

145-
@overload
146-
def kiq_sync(
147-
self: "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]",
148-
*args: _FuncParams.args,
149-
**kwargs: _FuncParams.kwargs,
150-
) -> SyncTaskiqTask[_T]:
151-
...
152-
153-
@overload
154-
def kiq_sync(
155-
self: "AsyncKicker[_FuncParams, _ReturnType]",
156-
*args: _FuncParams.args,
157-
**kwargs: _FuncParams.kwargs,
158-
) -> SyncTaskiqTask[_ReturnType]:
159-
...
160-
161-
def kiq_sync(
162-
self,
163-
*args: _FuncParams.args,
164-
**kwargs: _FuncParams.kwargs,
165-
) -> Any:
166-
"""
167-
This method sends function call over the network.
168-
169-
It just wraps async kiq call in run_sync
170-
funcion.
171-
172-
:param args: function's arguments.
173-
:param kwargs: function's key word arguments.
174-
175-
:returns: sync taskiq task.
176-
"""
177-
return SyncTaskiqTask(run_sync(self.kiq(*args, **kwargs)))
178-
179145
@classmethod
180146
def _prepare_arg(cls, arg: Any) -> Any:
181147
"""

taskiq/schedule_sources/label_based.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ async def get_schedules(self) -> List["ScheduledTask"]:
2828
if task.broker != self.broker:
2929
continue
3030
for schedule in task.labels.get("schedule", []):
31+
if "cron" not in schedule:
32+
continue
3133
labels = schedule.get("labels", {})
3234
labels.update(task.labels)
3335
schedules.append(

taskiq/task.py

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
ResultIsReadyError,
99
TaskiqResultTimeoutError,
1010
)
11-
from taskiq.utils import run_sync
1211

1312
if TYPE_CHECKING: # pragma: no cover
1413
from taskiq.abc.result_backend import AsyncResultBackend
@@ -67,60 +66,6 @@ def wait_result( # noqa: WPS234
6766
"""
6867

6968

70-
class SyncTaskiqTask(_Task[_ReturnType]):
71-
"""Sync wrapper over AsyncTaskiqTask."""
72-
73-
def __init__(self, async_task: "AsyncTaskiqTask[_ReturnType]") -> None:
74-
self.async_task = async_task
75-
76-
def is_ready(self) -> bool:
77-
"""
78-
Checks if task is completed.
79-
80-
:return: True if task is completed.
81-
"""
82-
return run_sync(self.async_task.is_ready())
83-
84-
def get_result(self, with_logs: bool = False) -> "TaskiqResult[_ReturnType]":
85-
"""
86-
Get result of a task from result backend.
87-
88-
:param with_logs: whether you want to fetch logs from worker.
89-
90-
:return: task's return value.
91-
"""
92-
return run_sync(self.async_task.get_result(with_logs=with_logs))
93-
94-
def wait_result(
95-
self,
96-
check_interval: float = 0.2,
97-
timeout: float = -1,
98-
with_logs: bool = False,
99-
) -> "TaskiqResult[_ReturnType]":
100-
"""
101-
Waits until result is ready.
102-
103-
This method just checks whether the task is
104-
ready. And if it is it returns the result.
105-
106-
It may throw TaskiqResultTimeoutError if
107-
task didn't became ready in provided
108-
period of time.
109-
110-
:param check_interval: How often checks are performed.
111-
:param timeout: timeout for the result.
112-
:param with_logs: whether you want to fetch logs from worker.
113-
:return: task's return value.
114-
"""
115-
return run_sync(
116-
self.async_task.wait_result(
117-
check_interval=check_interval,
118-
timeout=timeout,
119-
with_logs=with_logs,
120-
),
121-
)
122-
123-
12469
class AsyncTaskiqTask(_Task[_ReturnType]):
12570
"""AsyncTask for AsyncResultBackend."""
12671

taskiq/utils.py

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,11 @@
1-
import asyncio
21
import inspect
3-
from concurrent.futures import ThreadPoolExecutor
4-
from typing import Any, Awaitable, Coroutine, TypeVar, Union
2+
from typing import Any, Coroutine, TypeVar, Union
53

64
_T = TypeVar("_T") # noqa: WPS111
75

86

9-
def run_sync(coroutine: "Coroutine[Any, Any, _T]") -> _T:
10-
"""
11-
Run the coroutine synchronously.
12-
13-
This function tries to run corouting using asyncio.run.
14-
15-
If it's not possible, it manually creates executor and
16-
runs async function returns it's result.
17-
18-
1. When called within a coroutine.
19-
2. When called from ``python -m asyncio``, or iPython with %autoawait
20-
enabled, which means an event loop may already be running in the
21-
current thread.
22-
23-
:param coroutine: awaitable to execute.
24-
:returns: the same type as if it were awaited.
25-
"""
26-
try:
27-
# We try this first, as in most situations this will work.
28-
return asyncio.run(coroutine)
29-
except RuntimeError:
30-
# An event loop already exists.
31-
with ThreadPoolExecutor(max_workers=1) as executor:
32-
future = executor.submit(asyncio.run, coroutine)
33-
return future.result()
34-
35-
367
async def maybe_awaitable(
37-
possible_coroutine: "Union[_T, Awaitable[_T]]",
8+
possible_coroutine: "Union[_T, Coroutine[Any, Any, _T]]",
389
) -> _T:
3910
"""
4011
Awaits coroutine if needed.

0 commit comments

Comments
 (0)