Skip to content

Commit 501a672

Browse files
author
Andrei Neagu
committed
changed how error is transferred
1 parent 6fe3901 commit 501a672

File tree

6 files changed

+50
-84
lines changed

6 files changed

+50
-84
lines changed

packages/service-library/src/servicelib/long_running_tasks/_lrt_client.py

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@
55
from models_library.rabbitmq_basic_types import RPCMethodName
66
from pydantic import PositiveInt, TypeAdapter
77

8-
from ..logging_errors import create_troubleshootting_log_kwargs
98
from ..logging_utils import log_decorator
109
from ..rabbitmq._client_rpc import RabbitMQRPCClient
1110
from ._rabbit_namespace import get_rabbit_namespace
1211
from ._serialization import string_to_object
12+
from .errors import RPCTransferrableTaskError
1313
from .models import (
14-
ErrorResponse,
1514
LRTNamespace,
1615
RegisteredTaskName,
1716
TaskBase,
@@ -97,34 +96,19 @@ async def get_task_result(
9796
task_context: TaskContext,
9897
task_id: TaskId,
9998
) -> Any:
100-
serialized_result = await rabbitmq_rpc_client.request(
101-
get_rabbit_namespace(namespace),
102-
TypeAdapter(RPCMethodName).validate_python("get_task_result"),
103-
task_context=task_context,
104-
task_id=task_id,
105-
timeout_s=_RPC_TIMEOUT_SHORT_REQUESTS,
106-
)
107-
assert isinstance(serialized_result, ErrorResponse | str) # nosec
108-
if isinstance(serialized_result, ErrorResponse):
109-
error = string_to_object(serialized_result.str_error_object)
110-
_logger.info(
111-
**create_troubleshootting_log_kwargs(
112-
f"Task '{task_id}' raised the following error:\n{serialized_result.str_traceback}",
113-
error=error,
114-
error_context={
115-
"task_id": task_id,
116-
"namespace": namespace,
117-
"task_context": task_context,
118-
},
119-
tip=(
120-
f"The caller of this function should handle the exception. "
121-
f"To figure out where it was running check {namespace=}"
122-
),
123-
)
99+
try:
100+
serialized_result = await rabbitmq_rpc_client.request(
101+
get_rabbit_namespace(namespace),
102+
TypeAdapter(RPCMethodName).validate_python("get_task_result"),
103+
task_context=task_context,
104+
task_id=task_id,
105+
timeout_s=_RPC_TIMEOUT_SHORT_REQUESTS,
124106
)
125-
raise error
126-
127-
return string_to_object(serialized_result)
107+
assert isinstance(serialized_result, str) # nosec
108+
return string_to_object(serialized_result)
109+
except RPCTransferrableTaskError as e:
110+
decoded_error = string_to_object(f"{e}")
111+
raise decoded_error from None
128112

129113

130114
@log_decorator(_logger, level=logging.DEBUG)

packages/service-library/src/servicelib/long_running_tasks/_lrt_server.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@
22
from contextlib import suppress
33
from typing import TYPE_CHECKING, Any
44

5-
from ..logging_errors import create_troubleshootting_log_kwargs
65
from ..rabbitmq import RPCRouter
7-
from ._serialization import string_to_object
8-
from .errors import BaseLongRunningError, TaskNotFoundError
6+
from .errors import BaseLongRunningError, RPCTransferrableTaskError, TaskNotFoundError
97
from .models import (
10-
ErrorResponse,
118
RegisteredTaskName,
129
TaskBase,
1310
TaskContext,
@@ -66,43 +63,19 @@ async def get_task_status(
6663
)
6764

6865

69-
@router.expose(reraise_if_error_type=(BaseLongRunningError,))
66+
@router.expose(reraise_if_error_type=(BaseLongRunningError, RPCTransferrableTaskError))
7067
async def get_task_result(
7168
long_running_manager: "BaseLongRunningManager",
7269
*,
7370
task_context: TaskContext,
7471
task_id: TaskId,
75-
) -> ErrorResponse | str:
72+
) -> str:
7673
try:
7774
result_field = await long_running_manager.tasks_manager.get_task_result(
7875
task_id, with_task_context=task_context
7976
)
80-
if result_field.error_response is not None:
81-
task_raised_error_traceback = result_field.error_response.str_traceback
82-
task_raised_error = string_to_object(
83-
result_field.error_response.str_error_object
84-
)
85-
_logger.info(
86-
**create_troubleshootting_log_kwargs(
87-
f"Execution of {task_id=} finished with error:\n{task_raised_error_traceback}",
88-
error=task_raised_error,
89-
error_context={
90-
"task_id": task_id,
91-
"task_context": task_context,
92-
"namespace": long_running_manager.lrt_namespace,
93-
},
94-
tip="This exception is logged for debugging purposes, the client side will handle it",
95-
)
96-
)
97-
allowed_errors = (
98-
await long_running_manager.tasks_manager.get_allowed_errors(
99-
task_id, with_task_context=task_context
100-
)
101-
)
102-
if type(task_raised_error) in allowed_errors:
103-
return result_field.error_response
104-
105-
raise task_raised_error
77+
if result_field.str_error is not None:
78+
raise RPCTransferrableTaskError(result_field.str_error)
10679

10780
if result_field.str_result is not None:
10881
return result_field.str_result

packages/service-library/src/servicelib/long_running_tasks/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,10 @@ class GenericClientError(BaseLongRunningError):
4444
msg_template: str = (
4545
"Unexpected error while '{action}' for '{task_id}': status={status} body={body}"
4646
)
47+
48+
49+
class RPCTransferrableTaskError(Exception):
50+
"""
51+
The message contains the task's exception serialized as string.
52+
Decode it and raise to obtain the task's original exception.
53+
"""

packages/service-library/src/servicelib/long_running_tasks/models.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,13 @@
3333
RegisteredTaskName: TypeAlias = str
3434

3535

36-
class ErrorResponse(BaseModel):
37-
str_error_object: str
38-
str_traceback: str
39-
40-
4136
class ResultField(BaseModel):
4237
str_result: str | None = None
43-
error_response: ErrorResponse | None = None
38+
str_error: str | None = None
4439

4540
@model_validator(mode="after")
4641
def validate_mutually_exclusive(self) -> "ResultField":
47-
if self.str_result is not None and self.error_response is not None:
42+
if self.str_result is not None and self.str_error is not None:
4843
msg = "Cannot set both 'result' and 'error' - they are mutually exclusive"
4944
raise ValueError(msg)
5045
return self

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import functools
44
import inspect
55
import logging
6-
import traceback
76
import urllib.parse
87
from contextlib import suppress
98
from typing import Any, ClassVar, Final, Protocol, TypeAlias
@@ -22,6 +21,7 @@
2221
)
2322

2423
from ..background_task import create_periodic_task
24+
from ..logging_errors import create_troubleshootting_log_kwargs
2525
from ..redis import RedisClientSDK, exclusive
2626
from ._redis_store import RedisStore
2727
from ._serialization import object_to_string
@@ -33,7 +33,6 @@
3333
TaskNotRegisteredError,
3434
)
3535
from .models import (
36-
ErrorResponse,
3736
LRTNamespace,
3837
RegisteredTaskName,
3938
ResultField,
@@ -321,22 +320,30 @@ async def _tasks_monitor(self) -> None:
321320
except asyncio.InvalidStateError:
322321
# task was not completed try again next time and see if it is done
323322
continue
324-
except asyncio.CancelledError as e:
323+
except asyncio.CancelledError:
325324
result_field = ResultField(
326-
error_response=ErrorResponse(
327-
str_error_object=object_to_string(
328-
TaskCancelledError(task_id=task_id)
329-
),
330-
str_traceback="".join(traceback.format_tb(e.__traceback__)),
331-
)
325+
str_error=object_to_string(TaskCancelledError(task_id=task_id))
332326
)
333327
except Exception as e: # pylint:disable=broad-except
334-
result_field = ResultField(
335-
error_response=ErrorResponse(
336-
str_error_object=object_to_string(e),
337-
str_traceback="".join(traceback.format_tb(e.__traceback__)),
338-
)
328+
allowed_errors = TaskRegistry.get_allowed_errors(
329+
task_data.registered_task_name
339330
)
331+
if type(e) not in allowed_errors:
332+
_logger.exception(
333+
**create_troubleshootting_log_kwargs(
334+
(
335+
f"Execution of {task_id=} finished with unexpected error, "
336+
f"only the following are {allowed_errors=} are permitted"
337+
),
338+
error=e,
339+
error_context={
340+
"task_id": task_id,
341+
"task_data": task_data,
342+
"namespace": self.lrt_namespace,
343+
},
344+
),
345+
)
346+
result_field = ResultField(str_error=object_to_string(e))
340347

341348
# update and store in Redis
342349
updates = {"is_done": is_done, "result_field": task_data.result_field}

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,8 @@ async def test_get_result_finished_with_error(
378378
result = await long_running_manager.tasks_manager.get_task_result(
379379
task_id, with_task_context=empty_context
380380
)
381-
assert result.error_response is not None # nosec
382-
error = string_to_object(result.error_response.str_error_object)
381+
assert result.str_error is not None # nosec
382+
error = string_to_object(result.str_error)
383383
with pytest.raises(_TetingError, match="failing asap"):
384384
raise error
385385

0 commit comments

Comments
 (0)