Skip to content

Commit 6a8873f

Browse files
author
Andrei Neagu
committed
handle unpicklable errors
1 parent 35e7048 commit 6a8873f

File tree

3 files changed

+75
-4
lines changed

3 files changed

+75
-4
lines changed

packages/service-library/src/servicelib/long_running_tasks/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ class TaskExceptionError(BaseLongRunningError):
3434
)
3535

3636

37+
class TaskRaisedUnserializableError(BaseLongRunningError):
38+
msg_template: str = (
39+
"Task {task_id} finished with an unserializable exception: '{exception}'\n{traceback}"
40+
)
41+
42+
3743
class TaskClientTimeoutError(BaseLongRunningError):
3844
msg_template: str = (
3945
"Timed out after {timeout} seconds while awaiting '{task_id}' to complete"

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
TaskNotCompletedError,
3232
TaskNotFoundError,
3333
TaskNotRegisteredError,
34+
TaskRaisedUnserializableError,
3435
)
3536
from .models import (
3637
LRTNamespace,
@@ -345,7 +346,32 @@ async def _tasks_monitor(self) -> None:
345346
},
346347
),
347348
)
348-
result_field = ResultField(str_error=dumps(e))
349+
try:
350+
result_field = ResultField(str_error=dumps(e))
351+
except (
352+
Exception # pylint:disable=broad-except
353+
) as serialization_error:
354+
_logger.exception(
355+
**create_troubleshootting_log_kwargs(
356+
(
357+
f"Execution of {task_id=} finished with error "
358+
f"which could not be serialized"
359+
),
360+
error=serialization_error,
361+
error_context={
362+
"task_id": task_id,
363+
"task_data": task_data,
364+
"namespace": self.lrt_namespace,
365+
},
366+
),
367+
)
368+
result_field = ResultField(
369+
str_error=dumps(
370+
TaskRaisedUnserializableError(
371+
task_id=task_id, exception=serialization_error
372+
)
373+
)
374+
)
349375

350376
# update and store in Redis
351377
updates = {"is_done": is_done, "result_field": task_data.result_field}

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
TaskNotCompletedError,
2626
TaskNotFoundError,
2727
TaskNotRegisteredError,
28+
TaskRaisedUnserializableError,
2829
)
2930
from servicelib.long_running_tasks.models import (
3031
LRTNamespace,
@@ -37,7 +38,7 @@
3738
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
3839
from settings_library.rabbit import RabbitSettings
3940
from settings_library.redis import RedisSettings
40-
from tenacity import TryAgain
41+
from tenacity import TryAgain, retry, stop_after_attempt
4142
from tenacity.asyncio import AsyncRetrying
4243
from tenacity.retry import retry_if_exception_type
4344
from tenacity.stop import stop_after_delay
@@ -77,19 +78,30 @@ async def a_background_task(
7778

7879

7980
async def fast_background_task(progress: TaskProgress) -> int:
80-
"""this task does nothing and returns a constant"""
8181
return 42
8282

8383

8484
async def failing_background_task(progress: TaskProgress):
85-
"""this task does nothing and returns a constant"""
8685
msg = "failing asap"
8786
raise _TetingError(msg)
8887

8988

89+
async def failing_unpicklable_background_task(progress: TaskProgress):
90+
@retry(
91+
stop=stop_after_attempt(2),
92+
reraise=False,
93+
)
94+
async def _innter_fail() -> None:
95+
msg = "always fails with retry"
96+
raise _TetingError(msg)
97+
98+
await _innter_fail()
99+
100+
90101
TaskRegistry.register(a_background_task)
91102
TaskRegistry.register(fast_background_task)
92103
TaskRegistry.register(failing_background_task)
104+
TaskRegistry.register(failing_unpicklable_background_task)
93105

94106

95107
@pytest.fixture
@@ -385,6 +397,33 @@ async def test_get_result_finished_with_error(
385397
raise error
386398

387399

400+
async def test_get_result_finished_with_unpicklable_error(
401+
long_running_manager: BaseLongRunningManager, empty_context: TaskContext
402+
):
403+
task_id = await lrt_api.start_task(
404+
long_running_manager.rpc_client,
405+
long_running_manager.lrt_namespace,
406+
failing_unpicklable_background_task.__name__,
407+
task_context=empty_context,
408+
)
409+
# wait for result
410+
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
411+
with attempt:
412+
assert (
413+
await long_running_manager.tasks_manager.get_task_status(
414+
task_id, with_task_context=empty_context
415+
)
416+
).done
417+
418+
result = await long_running_manager.tasks_manager.get_task_result(
419+
task_id, with_task_context=empty_context
420+
)
421+
assert result.str_error is not None # nosec
422+
error = loads(result.str_error)
423+
with pytest.raises(TaskRaisedUnserializableError, match="cannot pickle"):
424+
raise error
425+
426+
388427
async def test_cancel_task_from_different_manager(
389428
rabbit_service: RabbitSettings,
390429
use_in_memory_redis: RedisSettings,

0 commit comments

Comments
 (0)