Skip to content

Commit 06fde28

Browse files
committed
add log_catch to every rpc endpoint in storage
1 parent a34eb4b commit 06fde28

File tree

3 files changed

+114
-95
lines changed

3 files changed

+114
-95
lines changed

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

Lines changed: 73 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,19 @@ async def _assert_job_exists(
4747
async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData):
4848
assert app # nosec
4949
assert job_id_data # nosec
50-
try:
51-
await _assert_job_exists(
52-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
53-
)
54-
await get_celery_client(app).abort_task(
55-
task_context=job_id_data.model_dump(),
56-
task_uuid=job_id,
57-
)
58-
except CeleryError as exc:
59-
raise JobSchedulerError(exc=f"{exc}") from exc
50+
with log_catch(logger=_logger):
51+
try:
52+
await _assert_job_exists(
53+
job_id=job_id,
54+
job_id_data=job_id_data,
55+
celery_client=get_celery_client(app),
56+
)
57+
await get_celery_client(app).abort_task(
58+
task_context=job_id_data.model_dump(),
59+
task_uuid=job_id,
60+
)
61+
except CeleryError as exc:
62+
raise JobSchedulerError(exc=f"{exc}") from exc
6063

6164

6265
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
@@ -66,22 +69,25 @@ async def status(
6669
assert app # nosec
6770
assert job_id_data # nosec
6871

69-
try:
70-
await _assert_job_exists(
71-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
72+
with log_catch(logger=_logger):
73+
try:
74+
await _assert_job_exists(
75+
job_id=job_id,
76+
job_id_data=job_id_data,
77+
celery_client=get_celery_client(app),
78+
)
79+
task_status = await get_celery_client(app).get_task_status(
80+
task_context=job_id_data.model_dump(),
81+
task_uuid=job_id,
82+
)
83+
except CeleryError as exc:
84+
raise JobSchedulerError(exc=f"{exc}") from exc
85+
86+
return AsyncJobStatus(
87+
job_id=job_id,
88+
progress=task_status.progress_report,
89+
done=task_status.is_done,
7290
)
73-
task_status = await get_celery_client(app).get_task_status(
74-
task_context=job_id_data.model_dump(),
75-
task_uuid=job_id,
76-
)
77-
except CeleryError as exc:
78-
raise JobSchedulerError(exc=f"{exc}") from exc
79-
80-
return AsyncJobStatus(
81-
job_id=job_id,
82-
progress=task_status.progress_report,
83-
done=task_status.is_done,
84-
)
8591

8692

8793
@router.expose(
@@ -100,48 +106,52 @@ async def result(
100106
assert job_id # nosec
101107
assert job_id_data # nosec
102108

103-
try:
104-
await _assert_job_exists(
105-
job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app)
106-
)
107-
_status = await get_celery_client(app).get_task_status(
108-
task_context=job_id_data.model_dump(),
109-
task_uuid=job_id,
110-
)
111-
if not _status.is_done:
112-
raise JobNotDoneError(job_id=job_id)
113-
_result = await get_celery_client(app).get_task_result(
114-
task_context=job_id_data.model_dump(),
115-
task_uuid=job_id,
116-
)
117-
except CeleryError as exc:
118-
raise JobSchedulerError(exc=f"{exc}") from exc
119-
120-
if _status.task_state == TaskState.ABORTED:
121-
raise JobAbortedError(job_id=job_id)
122-
if _status.task_state == TaskState.ERROR:
123-
exc_type = ""
124-
exc_msg = ""
125-
with log_catch(logger=_logger, reraise=False):
126-
task_error = TaskError.model_validate(_result)
127-
exc_type = task_error.exc_type
128-
exc_msg = task_error.exc_msg
129-
raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg)
130-
131-
return AsyncJobResult(result=_result)
109+
with log_catch(logger=_logger):
110+
try:
111+
await _assert_job_exists(
112+
job_id=job_id,
113+
job_id_data=job_id_data,
114+
celery_client=get_celery_client(app),
115+
)
116+
_status = await get_celery_client(app).get_task_status(
117+
task_context=job_id_data.model_dump(),
118+
task_uuid=job_id,
119+
)
120+
if not _status.is_done:
121+
raise JobNotDoneError(job_id=job_id)
122+
_result = await get_celery_client(app).get_task_result(
123+
task_context=job_id_data.model_dump(),
124+
task_uuid=job_id,
125+
)
126+
except CeleryError as exc:
127+
raise JobSchedulerError(exc=f"{exc}") from exc
128+
129+
if _status.task_state == TaskState.ABORTED:
130+
raise JobAbortedError(job_id=job_id)
131+
if _status.task_state == TaskState.ERROR:
132+
exc_type = ""
133+
exc_msg = ""
134+
with log_catch(logger=_logger, reraise=False):
135+
task_error = TaskError.model_validate(_result)
136+
exc_type = task_error.exc_type
137+
exc_msg = task_error.exc_msg
138+
raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg)
139+
140+
return AsyncJobResult(result=_result)
132141

133142

134143
@router.expose(reraise_if_error_type=(JobSchedulerError,))
135144
async def list_jobs(
136145
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
137146
) -> list[AsyncJobGet]:
138147
assert app # nosec
139-
140-
try:
141-
task_uuids = await get_celery_client(app).get_task_uuids(
142-
task_context=job_id_data.model_dump(),
143-
)
144-
except CeleryError as exc:
145-
raise JobSchedulerError(exc=f"{exc}") from exc
146-
147-
return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids]
148+
assert filter_ # nosec
149+
with log_catch(logger=_logger):
150+
try:
151+
task_uuids = await get_celery_client(app).get_task_uuids(
152+
task_context=job_id_data.model_dump(),
153+
)
154+
except CeleryError as exc:
155+
raise JobSchedulerError(exc=f"{exc}") from exc
156+
157+
return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids]

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
13
from celery.exceptions import CeleryError # type: ignore[import-untyped]
24
from fastapi import FastAPI
35
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
@@ -10,6 +12,7 @@
1012
DataExportTaskStartInput,
1113
InvalidFileIdentifierError,
1214
)
15+
from servicelib.logging_utils import log_catch
1316
from servicelib.rabbitmq import RPCRouter
1417

1518
from ...datcore_dsm import DatCoreDataManager
@@ -19,6 +22,8 @@
1922
from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError
2023
from ...simcore_s3_dsm import SimcoreS3DataManager
2124

25+
_logger = logging.getLogger(__name__)
26+
2227
router = RPCRouter()
2328

2429

@@ -36,30 +41,31 @@ async def start_data_export(
3641
) -> AsyncJobGet:
3742
assert app # nosec
3843

39-
dsm = get_dsm_provider(app).get(data_export_start.location_id)
44+
with log_catch(_logger):
45+
dsm = get_dsm_provider(app).get(data_export_start.location_id)
4046

41-
try:
42-
for _id in data_export_start.file_and_folder_ids:
43-
if isinstance(dsm, DatCoreDataManager):
44-
_ = await dsm.get_file(user_id=job_id_data.user_id, file_id=_id)
45-
elif isinstance(dsm, SimcoreS3DataManager):
46-
await dsm.can_read_file(user_id=job_id_data.user_id, file_id=_id)
47+
try:
48+
for _id in data_export_start.file_and_folder_ids:
49+
if isinstance(dsm, DatCoreDataManager):
50+
_ = await dsm.get_file(user_id=job_id_data.user_id, file_id=_id)
51+
elif isinstance(dsm, SimcoreS3DataManager):
52+
await dsm.can_read_file(user_id=job_id_data.user_id, file_id=_id)
4753

48-
except (FileAccessRightError, DatcoreAdapterError) as err:
49-
raise AccessRightError(
50-
user_id=job_id_data.user_id,
51-
file_id=_id,
52-
location_id=data_export_start.location_id,
53-
) from err
54+
except (FileAccessRightError, DatcoreAdapterError) as err:
55+
raise AccessRightError(
56+
user_id=job_id_data.user_id,
57+
file_id=_id,
58+
location_id=data_export_start.location_id,
59+
) from err
5460

55-
try:
56-
task_uuid = await get_celery_client(app).send_task(
57-
"export_data",
58-
task_context=job_id_data.model_dump(),
59-
files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature
61+
try:
62+
task_uuid = await get_celery_client(app).send_task(
63+
"export_data",
64+
task_context=job_id_data.model_dump(),
65+
files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature
66+
)
67+
except CeleryError as exc:
68+
raise JobSchedulerError(exc=f"{exc}") from exc
69+
return AsyncJobGet(
70+
job_id=task_uuid,
6071
)
61-
except CeleryError as exc:
62-
raise JobSchedulerError(exc=f"{exc}") from exc
63-
return AsyncJobGet(
64-
job_id=task_uuid,
65-
)
Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from pathlib import Path
23

34
from fastapi import FastAPI
@@ -6,11 +7,13 @@
67
AsyncJobNameData,
78
)
89
from models_library.projects_nodes_io import LocationID
10+
from servicelib.logging_utils import log_catch
911
from servicelib.rabbitmq import RPCRouter
1012

1113
from ...modules.celery import get_celery_client
1214
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size
1315

16+
_logger = logging.getLogger(__name__)
1417
router = RPCRouter()
1518

1619

@@ -23,13 +26,13 @@ async def compute_path_size(
2326
path: Path,
2427
) -> AsyncJobGet:
2528
assert app # nosec
29+
with log_catch(logger=_logger):
30+
task_uuid = await get_celery_client(app).send_task(
31+
remote_compute_path_size.__name__,
32+
task_context=job_id_data.model_dump(),
33+
user_id=job_id_data.user_id,
34+
location_id=location_id,
35+
path=path,
36+
)
2637

27-
task_uuid = await get_celery_client(app).send_task(
28-
remote_compute_path_size.__name__,
29-
task_context=job_id_data.model_dump(),
30-
user_id=job_id_data.user_id,
31-
location_id=location_id,
32-
path=path,
33-
)
34-
35-
return AsyncJobGet(job_id=task_uuid)
38+
return AsyncJobGet(job_id=task_uuid)

0 commit comments

Comments
 (0)