Skip to content

Commit 25dd38a

Browse files
authored
Merge pull request #16 from taskiq-python/issue-13
Add errors
2 parents e1c8374 + c28efb7 commit 25dd38a

File tree

4 files changed

+58
-5
lines changed

4 files changed

+58
-5
lines changed

taskiq/cli/async_task_runner.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,9 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
275275
try:
276276
await broker.result_backend.set_result(message.task_id, result)
277277
except Exception as exc:
278-
logger.exception(exc)
278+
logger.exception(
279+
"Can't set result in %s result backend: \n%s",
280+
broker.result_backend.__class__.__name__,
281+
exc,
282+
exc_info=True,
283+
)

taskiq/exceptions.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,27 @@ class TaskiqError(Exception):
44

55
class TaskiqResultTimeoutError(TaskiqError):
66
"""Waiting for task results has timed out."""
7+
8+
9+
class BrokerError(TaskiqError):
10+
"""Base class for all broker errors."""
11+
12+
13+
class SendTaskError(BrokerError):
14+
"""Error if the broker was unable to send the task to the queue."""
15+
16+
17+
class ResultBackendError(TaskiqError):
18+
"""Base class for all ResultBackend errors."""
19+
20+
21+
class ResultGetError(ResultBackendError):
22+
"""Error if ResultBackend was unable to get result."""
23+
24+
25+
class ResultSetError(ResultBackendError):
26+
"""Error if ResultBackend was unable to set result."""
27+
28+
29+
class ResultIsReadyError(ResultBackendError):
30+
"""Error if ResultBackend was unable to find out if the task is ready."""

taskiq/kicker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from pydantic import BaseModel
77

8+
from taskiq.exceptions import SendTaskError
89
from taskiq.message import TaskiqMessage
910
from taskiq.task import AsyncTaskiqTask
1011
from taskiq.types_helpers import T_, FuncParams_, ReturnType_
@@ -105,13 +106,18 @@ async def kiq(
105106
:param args: function's arguments.
106107
:param kwargs: function's key word arguments.
107108
109+
:raises SendTaskError: if we can't send task to the broker.
110+
108111
:returns: taskiq task.
109112
"""
110113
logger.debug(
111114
f"Kicking {self.task_name} with args={args} and kwargs={kwargs}.",
112115
)
113116
message = self._prepare_message(*args, **kwargs)
114-
await self.broker.kick(message)
117+
try:
118+
await self.broker.kick(message)
119+
except Exception as exc:
120+
raise SendTaskError() from exc
115121
return self.broker.result_backend.generate_task(message.task_id)
116122

117123
@classmethod

taskiq/task.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
from time import time
33
from typing import TYPE_CHECKING, Generic
44

5-
from taskiq.exceptions import TaskiqResultTimeoutError
5+
from taskiq.exceptions import (
6+
ResultGetError,
7+
ResultIsReadyError,
8+
TaskiqResultTimeoutError,
9+
)
610
from taskiq.types_helpers import ReturnType_
711

812
if TYPE_CHECKING:
@@ -25,18 +29,32 @@ async def is_ready(self) -> bool:
2529
"""
2630
Checks if task is completed.
2731
32+
:raises ResultIsReadyError: if we can't get info about task readyness.
33+
2834
:return: True if task is completed.
2935
"""
30-
return await self.result_backend.is_result_ready(self.task_id)
36+
try:
37+
return await self.result_backend.is_result_ready(self.task_id)
38+
except Exception as exc:
39+
raise ResultIsReadyError() from exc
3140

3241
async def get_result(self, with_logs: bool = False) -> "TaskiqResult[ReturnType_]":
3342
"""
3443
Get result of a task from result backend.
3544
3645
:param with_logs: whether you want to fetch logs from worker.
46+
47+
:raises ResultGetError: if we can't get result from ResultBackend.
48+
3749
:return: task's return value.
3850
"""
39-
return await self.result_backend.get_result(self.task_id, with_logs=with_logs)
51+
try:
52+
return await self.result_backend.get_result(
53+
self.task_id,
54+
with_logs=with_logs,
55+
)
56+
except Exception as exc:
57+
raise ResultGetError() from exc
4058

4159
async def wait_result(
4260
self,

0 commit comments

Comments
 (0)