Skip to content

Commit b18792d

Browse files
committed
further improvements
1 parent f93631d commit b18792d

File tree

6 files changed

+165
-21
lines changed

6 files changed

+165
-21
lines changed

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class BaseAsyncjobRpcError(OsparcErrorMixin, RuntimeError):
66

77

88
class JobSchedulerError(BaseAsyncjobRpcError):
9-
msg_template: str = "Celery exception: {exc}"
9+
msg_template: str = "Async job scheduler exception: {exc}"
1010

1111

1212
class JobMissingError(BaseAsyncjobRpcError):
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Annotated
2+
3+
from fastapi import Depends
4+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
5+
6+
from ...services_rpc.async_jobs import AsyncJobClient
7+
from .rabbitmq import get_rabbitmq_rpc_client
8+
9+
10+
def get_async_jobs_client(
11+
rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
12+
) -> AsyncJobClient:
13+
return AsyncJobClient(_rabbitmq_rpc_client=rabbitmq_rpc_client)

services/api-server/src/simcore_service_api_server/api/routes/tasks.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,15 @@
1212
AsyncJobId,
1313
AsyncJobNameData,
1414
)
15-
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
1615
from models_library.products import ProductName
1716
from models_library.users import UserID
1817
from servicelib.fastapi.dependencies import get_app
19-
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
20-
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
2118
from simcore_service_api_server.models.schemas.tasks import ApiServerEnvelope
2219

20+
from ...services_rpc.async_jobs import AsyncJobClient
2321
from ..dependencies.authentication import get_current_user_id
24-
from ..dependencies.rabbitmq import get_rabbitmq_rpc_client
2522
from ..dependencies.services import get_product_name
23+
from ..dependencies.tasks import get_async_jobs_client
2624

2725
router = APIRouter()
2826
_logger = logging.getLogger(__name__)
@@ -37,11 +35,9 @@ async def get_async_jobs(
3735
app: Annotated[FastAPI, Depends(get_app)],
3836
user_id: Annotated[UserID, Depends(get_current_user_id)],
3937
product_name: Annotated[ProductName, Depends(get_product_name)],
40-
rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
38+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
4139
):
4240
user_async_jobs = await async_jobs.list_jobs(
43-
rabbitmq_rpc_client=rabbitmq_rpc_client,
44-
rpc_namespace=STORAGE_RPC_NAMESPACE,
4541
job_id_data=_get_job_id_data(user_id, product_name),
4642
filter_="",
4743
)
@@ -70,11 +66,9 @@ async def get_async_job_status(
7066
task_id: AsyncJobId,
7167
user_id: Annotated[UserID, Depends(get_current_user_id)],
7268
product_name: Annotated[ProductName, Depends(get_product_name)],
73-
rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
69+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
7470
):
7571
async_job_rpc_status = await async_jobs.status(
76-
rabbitmq_rpc_client=rabbitmq_rpc_client,
77-
rpc_namespace=STORAGE_RPC_NAMESPACE,
7872
job_id=task_id,
7973
job_id_data=_get_job_id_data(user_id, product_name),
8074
)
@@ -95,11 +89,9 @@ async def cancel_async_job(
9589
task_id: AsyncJobId,
9690
user_id: Annotated[UserID, Depends(get_current_user_id)],
9791
product_name: Annotated[ProductName, Depends(get_product_name)],
98-
rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
92+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
9993
):
10094
await async_jobs.cancel(
101-
rabbitmq_rpc_client=rabbitmq_rpc_client,
102-
rpc_namespace=STORAGE_RPC_NAMESPACE,
10395
job_id=task_id,
10496
job_id_data=_get_job_id_data(user_id, product_name),
10597
)
@@ -110,11 +102,9 @@ async def get_async_job_result(
110102
task_id: AsyncJobId,
111103
user_id: Annotated[UserID, Depends(get_current_user_id)],
112104
product_name: Annotated[ProductName, Depends(get_product_name)],
113-
rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
105+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
114106
):
115107
async_job_rpc_result = await async_jobs.result(
116-
rabbitmq_rpc_client=rabbitmq_rpc_client,
117-
rpc_namespace=STORAGE_RPC_NAMESPACE,
118108
job_id=task_id,
119109
job_id_data=_get_job_id_data(user_id, product_name),
120110
)
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
1+
import logging
2+
3+
from common_library.error_codes import create_error_code
4+
from fastapi import status
5+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
16
from starlette.requests import Request
27
from starlette.responses import JSONResponse
38

49
from ...exceptions.backend_errors import BaseBackEndError
510
from ._utils import create_error_json_response
611

12+
_logger = logging.getLogger(__name__)
13+
714

815
async def backend_error_handler(request: Request, exc: Exception) -> JSONResponse:
916
assert request # nosec
1017
assert isinstance(exc, BaseBackEndError)
11-
12-
return create_error_json_response(f"{exc}", status_code=exc.status_code)
18+
user_error_msg = f"{exc}"
19+
if not exc.status_code >= status.HTTP_500_INTERNAL_SERVER_ERROR:
20+
oec = create_error_code(exc)
21+
user_error_msg += f" [{oec}]"
22+
_logger.exception(
23+
**create_troubleshootting_log_kwargs(
24+
user_error_msg,
25+
error=exc,
26+
error_code=oec,
27+
tip="Unexpected error",
28+
)
29+
)
30+
return create_error_json_response(user_error_msg, status_code=exc.status_code)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from fastapi import status
2+
3+
from .backend_errors import BaseBackEndError
4+
5+
6+
class TaskBaseError(BaseBackEndError):
7+
pass
8+
9+
10+
class TaskSchedulerError(TaskBaseError):
11+
msg_template: str = "TaskScheduler error"
12+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
13+
14+
15+
class TaskMissingError(TaskBaseError):
16+
msg_template: str = "Task {job_id} does not exist"
17+
status_code = status.HTTP_404_NOT_FOUND
18+
19+
20+
class TaskStatusError(TaskBaseError):
21+
msg_template: str = "Could not get status of task {job_id}"
22+
status_code = status.HTTP_404_NOT_FOUND
23+
24+
25+
class TaskNotDoneError(TaskBaseError):
26+
msg_template: str = "Task {job_id} not done"
27+
status_code = status.HTTP_409_CONFLICT
28+
29+
30+
class TaskCancelledError(TaskBaseError):
31+
msg_template: str = "Task {job_id} cancelled"
32+
status_code = status.HTTP_409_CONFLICT
33+
34+
35+
class TaskError(TaskBaseError):
36+
msg_template: str = "Task '{job_id}' failed"
37+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,93 @@
1+
import functools
2+
from dataclasses import dataclass
3+
4+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
5+
AsyncJobGet,
6+
AsyncJobId,
7+
AsyncJobNameData,
8+
AsyncJobResult,
9+
AsyncJobStatus,
10+
)
11+
from models_library.api_schemas_rpc_async_jobs.exceptions import (
12+
JobAbortedError,
13+
JobError,
14+
JobMissingError,
15+
JobNotDoneError,
16+
JobSchedulerError,
17+
JobStatusError,
18+
)
19+
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
20+
from servicelib.long_running_tasks.errors import TaskCancelledError
121
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
22+
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
23+
from simcore_service_api_server.exceptions.task_errors import (
24+
TaskError,
25+
TaskMissingError,
26+
TaskNotDoneError,
27+
TaskSchedulerError,
28+
TaskStatusError,
29+
)
30+
31+
from ..exceptions.service_errors_utils import service_exception_mapper
32+
33+
_exception_mapper = functools.partial(
34+
service_exception_mapper, service_name="Async jobs"
35+
)
36+
37+
_exception_map = {
38+
JobSchedulerError: TaskSchedulerError,
39+
JobMissingError: TaskMissingError,
40+
JobStatusError: TaskStatusError,
41+
JobNotDoneError: TaskNotDoneError,
42+
JobAbortedError: TaskCancelledError,
43+
JobError: TaskError,
44+
}
245

346

47+
@dataclass
448
class AsyncJobClient:
49+
_rabbitmq_rpc_client: RabbitMQRPCClient
50+
51+
@_exception_mapper(rpc_exception_map=_exception_map)
52+
async def cancel(
53+
self, *, job_id: AsyncJobId, job_id_data: AsyncJobNameData
54+
) -> None:
55+
return await async_jobs.cancel(
56+
self._rabbitmq_rpc_client,
57+
rpc_namespace=STORAGE_RPC_NAMESPACE,
58+
job_id=job_id,
59+
job_id_data=job_id_data,
60+
)
61+
62+
@_exception_mapper(rpc_exception_map=_exception_map)
63+
async def status(
64+
self, *, job_id: AsyncJobId, job_id_data: AsyncJobNameData
65+
) -> AsyncJobStatus:
66+
return await async_jobs.status(
67+
self._rabbitmq_rpc_client,
68+
rpc_namespace=STORAGE_RPC_NAMESPACE,
69+
job_id=job_id,
70+
job_id_data=job_id_data,
71+
)
72+
73+
@_exception_mapper(rpc_exception_map=_exception_map)
74+
async def result(
75+
self, *, job_id: AsyncJobId, job_id_data: AsyncJobNameData
76+
) -> AsyncJobResult:
77+
return await async_jobs.result(
78+
self._rabbitmq_rpc_client,
79+
rpc_namespace=STORAGE_RPC_NAMESPACE,
80+
job_id=job_id,
81+
job_id_data=job_id_data,
82+
)
583

6-
def __init__(self, rabbitmq_rpc_client: RabbitMQRPCClient):
7-
self._rabbitmq_rpc_client = rabbitmq_rpc_client
84+
@_exception_mapper(rpc_exception_map=_exception_map)
85+
async def list_jobs(
86+
self, *, filter_: str, job_id_data: AsyncJobNameData
87+
) -> list[AsyncJobGet]:
88+
return await async_jobs.list_jobs(
89+
self._rabbitmq_rpc_client,
90+
rpc_namespace=STORAGE_RPC_NAMESPACE,
91+
filter_=filter_,
92+
job_id_data=job_id_data,
93+
)

0 commit comments

Comments
 (0)