Skip to content

Commit 4b922c0

Browse files
Sobes76rusAnton
andauthored
feat: exceptions serialization (#135)
Co-authored-by: Anton <[email protected]>
1 parent 4ae1712 commit 4b922c0

File tree

10 files changed

+869
-5
lines changed

10 files changed

+869
-5
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ ignore =
103103
F403,
104104
; Found wrong metadata variable
105105
WPS410,
106+
; Found commented out cod
107+
E800,
106108

107109
per-file-ignores =
108110
; all tests

taskiq/abc/middleware.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def on_error(
9797
self,
9898
message: "TaskiqMessage",
9999
result: "TaskiqResult[Any]",
100-
exception: Exception,
100+
exception: BaseException,
101101
) -> "Union[None, Coroutine[Any, Any, None]]":
102102
"""
103103
This function is called when exception is found.

taskiq/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,7 @@ class ResultSetError(ResultBackendError):
2828

2929
class ResultIsReadyError(ResultBackendError):
3030
"""Error if ResultBackend was unable to find out if the task is ready."""
31+
32+
33+
class SecurityError(TaskiqError):
34+
"""Security related exception."""

taskiq/middlewares/retry_middleware.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async def on_error(
2222
self,
2323
message: "TaskiqMessage",
2424
result: "TaskiqResult[Any]",
25-
exception: Exception,
25+
exception: BaseException,
2626
) -> None:
2727
"""
2828
Retry on error.

taskiq/receiver/receiver.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ async def run_task( # noqa: C901, WPS210
201201
target,
202202
message,
203203
)
204-
except Exception as exc:
204+
except BaseException as exc: # noqa: WPS424
205205
found_exception = exc
206206
logger.error(
207207
"Exception found while executing function: %s",
@@ -219,6 +219,7 @@ async def run_task( # noqa: C901, WPS210
219219
log=None,
220220
return_value=returned,
221221
execution_time=round(execution_time, 2),
222+
error=found_exception,
222223
)
223224
# If exception is found we execute middlewares.
224225
if found_exception is not None:

taskiq/result.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
1-
from typing import Generic, Optional, TypeVar
1+
import json
2+
import pickle # noqa: S403
3+
from functools import partial
4+
from typing import Any, Callable, Dict, Generic, Optional, TypeVar
25

6+
from pydantic import validator
37
from pydantic.generics import GenericModel
8+
from typing_extensions import Self
9+
10+
from taskiq.serialization import exception_to_python, prepare_exception
411

512
_ReturnType = TypeVar("_ReturnType")
613

714

15+
def _json_encoder(value: Any, default: Callable[[Any], Any]) -> Any:
16+
if isinstance(value, BaseException):
17+
return prepare_exception(value, json)
18+
19+
return default(value)
20+
21+
22+
def _json_dumps(value: Any, *, default: Callable[[Any], Any], **kwargs: Any) -> str:
23+
return json.dumps(value, default=partial(_json_encoder, default=default), **kwargs)
24+
25+
826
class TaskiqResult(GenericModel, Generic[_ReturnType]):
927
"""Result of a remote task invocation."""
1028

@@ -15,3 +33,37 @@ class TaskiqResult(GenericModel, Generic[_ReturnType]):
1533
log: Optional[str] = None
1634
return_value: _ReturnType
1735
execution_time: float
36+
37+
error: Optional[BaseException] = None
38+
39+
class Config:
40+
arbitrary_types_allowed = True
41+
json_dumps = _json_dumps # type: ignore
42+
json_loads = json.loads
43+
44+
def raise_for_error(self) -> "Self":
45+
"""Raise exception if `error`.
46+
47+
:raises error: task execution exception
48+
:returns: TaskiqResult
49+
"""
50+
if self.error is not None:
51+
raise self.error
52+
return self
53+
54+
def __getstate__(self) -> Dict[Any, Any]:
55+
dict = super().__getstate__() # noqa: WPS125
56+
vals: Dict[str, Any] = dict["__dict__"]
57+
58+
if "error" in vals and vals["error"] is not None:
59+
vals["error"] = prepare_exception(
60+
vals["error"],
61+
pickle,
62+
)
63+
64+
return dict
65+
66+
@validator("error", pre=True)
67+
@classmethod
68+
def _validate_error(cls, value: Any) -> Optional[BaseException]:
69+
return exception_to_python(value)

0 commit comments

Comments
 (0)