Skip to content

Commit b9233da

Browse files
author
Andrei Neagu
committed
fixed test
1 parent 6b91cf1 commit b9233da

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
JobNotDoneError,
2020
JobSchedulerError,
2121
)
22-
from servicelib.logging_utils import log_catch
2322
from servicelib.rabbitmq import RPCRouter
2423

2524
from ...modules.celery import get_celery_client
@@ -96,14 +95,10 @@ async def result(
9695
if _status.task_state == TaskState.ABORTED:
9796
raise JobAbortedError(job_id=job_id)
9897
if _status.task_state == TaskState.ERROR:
99-
exc_type = ""
100-
exc_msg = ""
101-
with log_catch(logger=_logger, reraise=False):
102-
# NOTE: recover original error from wrapped error
103-
exception = pickle.loads(base64.b64decode(_result.args[0]))
104-
exc_type = type(exception).__name__
105-
exc_msg = f"{exception}"
106-
98+
# NOTE: recover original error from wrapped error
99+
exception = pickle.loads(base64.b64decode(_result.args[0]))
100+
exc_type = type(exception).__name__
101+
exc_msg = f"{exception}"
107102
raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg, exc=exception)
108103

109104
return AsyncJobResult(result=_result)

services/storage/src/simcore_service_storage/modules/celery/_task.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@
2828
R = TypeVar("R")
2929

3030

31+
def _wrap_exception(exc: Exception) -> Exception:
32+
return Exception(
33+
base64.b64encode(pickle.dumps(exc, protocol=pickle.HIGHEST_PROTOCOL)).decode(
34+
"ascii"
35+
)
36+
)
37+
38+
3139
def _async_task_wrapper(
3240
app: Celery,
3341
) -> Callable[
@@ -73,15 +81,10 @@ def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any:
7381
# by running it's magic, it looses the context of some errors
7482
# this allows to recreate the same error in the caller side excatly as
7583
# it was raised in this context
76-
wrapping_exception = Exception(
77-
base64.b64encode(
78-
pickle.dumps(exc, protocol=pickle.HIGHEST_PROTOCOL)
79-
).decode("ascii")
80-
)
8184
raise task.retry(
8285
max_retries=max_retries,
8386
countdown=delay_between_retries.total_seconds(),
84-
exc=wrapping_exception,
87+
exc=_wrap_exception(exc),
8588
)
8689

8790
return wrapper

services/storage/tests/unit/test_async_jobs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pytest_mock import MockerFixture
2929
from servicelib.rabbitmq import RabbitMQRPCClient
3030
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
31+
from simcore_service_storage.modules.celery._task import _wrap_exception
3132
from simcore_service_storage.modules.celery.client import TaskUUID
3233
from simcore_service_storage.modules.celery.models import TaskState, TaskStatus
3334

@@ -68,7 +69,7 @@ async def get_task_result(self, *args, **kwargs) -> Any:
6869
_ = kwargs
6970
assert self.get_task_result_object is not None
7071
if isinstance(self.get_task_result_object, Exception):
71-
raise self.get_task_result_object
72+
return _wrap_exception(self.get_task_result_object)
7273
return self.get_task_result_object
7374

7475
async def get_task_uuids(self, *args, **kwargs) -> set[TaskUUID]:
@@ -318,7 +319,7 @@ async def test_async_jobs_result_success(
318319
task_state=TaskState.ERROR,
319320
progress_report=ProgressReport(actual_value=1.0, total=1.0),
320321
),
321-
"get_task_result_object": _faker.text(),
322+
"get_task_result_object": Exception("generic exception"),
322323
"get_task_uuids_object": [AsyncJobId(_faker.uuid4())],
323324
},
324325
JobError,

0 commit comments

Comments
 (0)