Skip to content

Commit 4c530c7

Browse files
GitHKAndrei Neagu
andauthored
🐛 Fixed encoding unpicklable errors (#8263)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 4f37444 commit 4c530c7

File tree

4 files changed

+77
-6
lines changed

4 files changed

+77
-6
lines changed

packages/service-library/src/servicelib/long_running_tasks/_serialization.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,6 @@ def loads(obj_str: str) -> Any:
7878
msg = f"Could not reconstruct object from data: {data}"
7979
raise ValueError(msg) from e
8080

81+
if isinstance(data, Exception):
82+
raise data
8183
return data

packages/service-library/src/servicelib/long_running_tasks/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ class TaskExceptionError(BaseLongRunningError):
3434
)
3535

3636

37+
class TaskRaisedUnserializableError(BaseLongRunningError):
38+
msg_template: str = (
39+
"Task {task_id} raised an exception that could not be serialized.\n"
40+
"Original exception: '{original_exception_str}'\n"
41+
"As a consequence, the following error was raised: '{exception}'"
42+
)
43+
44+
3745
class TaskClientTimeoutError(BaseLongRunningError):
3846
msg_template: str = (
3947
"Timed out after {timeout} seconds while awaiting '{task_id}' to complete"

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
TaskNotCompletedError,
3333
TaskNotFoundError,
3434
TaskNotRegisteredError,
35+
TaskRaisedUnserializableError,
3536
)
3637
from .models import (
3738
LRTNamespace,
@@ -353,7 +354,30 @@ async def _tasks_monitor(self) -> None:
353354
},
354355
),
355356
)
356-
result_field = ResultField(str_error=dumps(e))
357+
try:
358+
result_field = ResultField(str_error=dumps(e))
359+
except (
360+
Exception # pylint:disable=broad-except
361+
) as serialization_error:
362+
_logger.exception(
363+
**create_troubleshootting_log_kwargs(
364+
(
365+
f"Execution of {task_id=} finished with an error "
366+
f"which could not be serialized"
367+
),
368+
error=serialization_error,
369+
tip="Check the error above for more details",
370+
),
371+
)
372+
result_field = ResultField(
373+
str_error=dumps(
374+
TaskRaisedUnserializableError(
375+
task_id=task_id,
376+
exception=serialization_error,
377+
original_exception_str=f"{e}",
378+
)
379+
)
380+
)
357381

358382
# update and store in Redis
359383
updates = {"is_done": is_done, "result_field": task_data.result_field}

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
TaskNotCompletedError,
2323
TaskNotFoundError,
2424
TaskNotRegisteredError,
25+
TaskRaisedUnserializableError,
2526
)
2627
from servicelib.long_running_tasks.manager import (
2728
LongRunningManager,
@@ -37,7 +38,7 @@
3738
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
3839
from settings_library.rabbit import RabbitSettings
3940
from settings_library.redis import RedisSettings
40-
from tenacity import TryAgain
41+
from tenacity import TryAgain, retry, stop_after_attempt
4142
from tenacity.asyncio import AsyncRetrying
4243
from tenacity.retry import retry_if_exception_type
4344
from tenacity.stop import stop_after_delay
@@ -77,19 +78,30 @@ async def a_background_task(
7778

7879

7980
async def fast_background_task(progress: TaskProgress) -> int:
80-
"""this task does nothing and returns a constant"""
8181
return 42
8282

8383

8484
async def failing_background_task(progress: TaskProgress):
85-
"""this task does nothing and returns a constant"""
8685
msg = "failing asap"
8786
raise _TetingError(msg)
8887

8988

89+
async def failing_unpicklable_background_task(progress: TaskProgress):
90+
@retry(
91+
stop=stop_after_attempt(2),
92+
reraise=False,
93+
)
94+
async def _innter_fail() -> None:
95+
msg = "always fails with retry"
96+
raise _TetingError(msg)
97+
98+
await _innter_fail()
99+
100+
90101
TaskRegistry.register(a_background_task)
91102
TaskRegistry.register(fast_background_task)
92103
TaskRegistry.register(failing_background_task)
104+
TaskRegistry.register(failing_unpicklable_background_task)
93105

94106

95107
@pytest.fixture
@@ -380,9 +392,34 @@ async def test_get_result_finished_with_error(
380392
task_id, with_task_context=empty_context
381393
)
382394
assert result.str_error is not None # nosec
383-
error = loads(result.str_error)
384395
with pytest.raises(_TetingError, match="failing asap"):
385-
raise error
396+
loads(result.str_error)
397+
398+
399+
async def test_get_result_finished_with_unpicklable_error(
400+
long_running_manager: LongRunningManager, empty_context: TaskContext
401+
):
402+
task_id = await lrt_api.start_task(
403+
long_running_manager.rpc_client,
404+
long_running_manager.lrt_namespace,
405+
failing_unpicklable_background_task.__name__,
406+
task_context=empty_context,
407+
)
408+
# wait for result
409+
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
410+
with attempt:
411+
assert (
412+
await long_running_manager.tasks_manager.get_task_status(
413+
task_id, with_task_context=empty_context
414+
)
415+
).done
416+
417+
result = await long_running_manager.tasks_manager.get_task_result(
418+
task_id, with_task_context=empty_context
419+
)
420+
assert result.str_error is not None # nosec
421+
with pytest.raises(TaskRaisedUnserializableError, match="cannot pickle"):
422+
loads(result.str_error)
386423

387424

388425
async def test_cancel_task_from_different_manager(

0 commit comments

Comments
 (0)