Skip to content

Commit d9bca89

Browse files
authored
Added exception catches from dependencies. (#153)
1 parent 8afb1c9 commit d9bca89

File tree

5 files changed

+66
-15
lines changed

5 files changed

+66
-15
lines changed

.flake8

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,6 @@ ignore =
9595
; Found too many public instance attributes
9696
WPS230,
9797

98-
; all init files
99-
__init__.py:
100-
; ignore not used imports
101-
F401,
102-
; ignore import with wildcard
103-
F403,
104-
; Found wrong metadata variable
105-
WPS410,
106-
; Found commented out cod
107-
E800,
108-
10998
per-file-ignores =
11099
; all tests
111100
test_*.py,tests.py,tests_*.py,*/tests/*:
@@ -120,6 +109,21 @@ per-file-ignores =
120109
; Found complex default value
121110
WPS404,
122111

112+
; all init files
113+
__init__.py:
114+
; ignore not used imports
115+
F401,
116+
; ignore import with wildcard
117+
F403,
118+
; Found wrong metadata variable
119+
WPS410,
120+
; Found commented out cod
121+
E800,
122+
123+
taskiq/serialization.py:
124+
; Found commented out code
125+
E800,
126+
123127
exclude =
124128
./.git,
125129
./venv,

taskiq/abc/broker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def __init__(
8686
"Setting result backend with constructor is deprecated. "
8787
"Please use `with_result_backend` instead.",
8888
TaskiqDeprecationWarning,
89+
stacklevel=2,
8990
)
9091
if task_id_generator is None:
9192
task_id_generator = default_id_generator
@@ -94,6 +95,7 @@ def __init__(
9495
"Setting id generator with constructor is deprecated. "
9596
"Please use `with_id_generator` instead.",
9697
TaskiqDeprecationWarning,
98+
stacklevel=2,
9799
)
98100
self.middlewares: "List[TaskiqMiddleware]" = []
99101
self.result_backend = result_backend

taskiq/receiver/receiver.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,19 @@ async def run_task( # noqa: C901, WPS210
206206
)
207207
dep_ctx = dependency_graph.async_ctx(broker_ctx)
208208
# Resolve all function's dependencies.
209-
kwargs = await dep_ctx.resolve_kwargs()
210-
211-
# We udpate kwargs with kwargs from network.
212-
kwargs.update(message.kwargs)
213209

214210
# Start a timer.
215211
start_time = time()
212+
216213
try:
214+
# We put kwargs resolving here,
215+
# to be able to catch any exception (for example ),
216+
# that happen while resolving dependencies.
217+
if dep_ctx:
218+
kwargs = await dep_ctx.resolve_kwargs()
219+
# We udpate kwargs with kwargs from network.
220+
kwargs.update(message.kwargs)
221+
217222
# If the function is a coroutine, we await it.
218223
if asyncio.iscoroutinefunction(target):
219224
returned = await target(*message.args, **kwargs)

tests/conftest.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
from typing import Generator
2+
13
import pytest
24

5+
from taskiq.abc.broker import AsyncBroker
6+
37

48
@pytest.fixture(scope="session")
59
def anyio_backend() -> str:
@@ -10,3 +14,17 @@ def anyio_backend() -> str:
1014
:return: backend name.
1115
"""
1216
return "asyncio"
17+
18+
19+
@pytest.fixture(autouse=True)
20+
def reset_broker() -> Generator[None, None, None]:
21+
"""
22+
Restore async broker.
23+
24+
This fixtures sets some global
25+
broker variables to default state.
26+
"""
27+
yield
28+
AsyncBroker.available_tasks = {}
29+
AsyncBroker.is_worker_process = False
30+
AsyncBroker.is_scheduler_process = False

tests/test_requeue.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,25 @@ async def task(context: Context = TaskiqDepends()) -> None:
2020
await kicked.wait_result()
2121

2222
assert runs_count == 2
23+
24+
25+
@pytest.mark.anyio
26+
async def test_requeue_from_dependency() -> None:
27+
broker = InMemoryBroker()
28+
29+
runs_count = 0
30+
31+
async def dep_func(context: Context = TaskiqDepends()) -> None:
32+
nonlocal runs_count
33+
runs_count += 1
34+
if runs_count < 2:
35+
await context.requeue()
36+
37+
@broker.task
38+
async def task(_: None = TaskiqDepends(dep_func)) -> None:
39+
return None
40+
41+
kicked = await task.kiq()
42+
await kicked.wait_result()
43+
44+
assert runs_count == 2

0 commit comments

Comments
 (0)