Skip to content

Commit 801eb85

Browse files
authored
Merge branch 'master' into autoscaling/refactoring
2 parents 567266b + 0eb2563 commit 801eb85

File tree

25 files changed

+709
-51
lines changed

25 files changed

+709
-51
lines changed

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class BaseAsyncjobRpcError(OsparcErrorMixin, RuntimeError):
66

77

88
class JobSchedulerError(BaseAsyncjobRpcError):
9-
msg_template: str = "Celery exception: {exc}"
9+
msg_template: str = "Async job scheduler exception: {exc}"
1010

1111

1212
class JobMissingError(BaseAsyncjobRpcError):
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# pylint: disable=unused-argument
2+
3+
from dataclasses import dataclass
4+
5+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
6+
AsyncJobGet,
7+
AsyncJobId,
8+
AsyncJobNameData,
9+
AsyncJobResult,
10+
AsyncJobStatus,
11+
)
12+
from models_library.api_schemas_rpc_async_jobs.exceptions import BaseAsyncjobRpcError
13+
from models_library.progress_bar import ProgressReport
14+
from models_library.rabbitmq_basic_types import RPCNamespace
15+
from pydantic import validate_call
16+
from pytest_mock import MockType
17+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
18+
19+
20+
@dataclass
21+
class AsyncJobSideEffects:
22+
exception: BaseAsyncjobRpcError | None = None
23+
24+
@validate_call(config={"arbitrary_types_allowed": True})
25+
async def cancel(
26+
self,
27+
rabbitmq_rpc_client: RabbitMQRPCClient | MockType,
28+
*,
29+
rpc_namespace: RPCNamespace,
30+
job_id: AsyncJobId,
31+
job_id_data: AsyncJobNameData,
32+
) -> None:
33+
if self.exception is not None:
34+
raise self.exception
35+
return None
36+
37+
@validate_call(config={"arbitrary_types_allowed": True})
38+
async def status(
39+
self,
40+
rabbitmq_rpc_client: RabbitMQRPCClient | MockType,
41+
*,
42+
rpc_namespace: RPCNamespace,
43+
job_id: AsyncJobId,
44+
job_id_data: AsyncJobNameData,
45+
) -> AsyncJobStatus:
46+
if self.exception is not None:
47+
raise self.exception
48+
49+
return AsyncJobStatus(
50+
job_id=job_id,
51+
progress=ProgressReport(
52+
actual_value=50.0,
53+
total=100.0,
54+
attempt=1,
55+
),
56+
done=False,
57+
)
58+
59+
@validate_call(config={"arbitrary_types_allowed": True})
60+
async def result(
61+
self,
62+
rabbitmq_rpc_client: RabbitMQRPCClient | MockType,
63+
*,
64+
rpc_namespace: RPCNamespace,
65+
job_id: AsyncJobId,
66+
job_id_data: AsyncJobNameData,
67+
) -> AsyncJobResult:
68+
if self.exception is not None:
69+
raise self.exception
70+
return AsyncJobResult(result="Success")
71+
72+
@validate_call(config={"arbitrary_types_allowed": True})
73+
async def list_jobs(
74+
self,
75+
rabbitmq_rpc_client: RabbitMQRPCClient | MockType,
76+
*,
77+
rpc_namespace: RPCNamespace,
78+
job_id_data: AsyncJobNameData,
79+
filter_: str = "",
80+
) -> list[AsyncJobGet]:
81+
if self.exception is not None:
82+
raise self.exception
83+
return [
84+
AsyncJobGet(
85+
job_id=AsyncJobId("123e4567-e89b-12d3-a456-426614174000"),
86+
job_name="Example Job",
87+
)
88+
]

services/api-server/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.9.0
1+
0.9.1

services/api-server/openapi.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"info": {
44
"title": "osparc.io public API",
55
"description": "osparc-simcore public API specifications",
6-
"version": "0.9.0"
6+
"version": "0.9.1"
77
},
88
"paths": {
99
"/v0/meta": {
@@ -7977,6 +7977,18 @@
79777977
"items": {},
79787978
"type": "array",
79797979
"title": "Errors"
7980+
},
7981+
"support_id": {
7982+
"anyOf": [
7983+
{
7984+
"type": "string",
7985+
"pattern": "OEC:([a-fA-F0-9]{12})-(\\d{13,14})"
7986+
},
7987+
{
7988+
"type": "null"
7989+
}
7990+
],
7991+
"title": "Support Id"
79807992
}
79817993
},
79827994
"type": "object",

services/api-server/setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.9.0
2+
current_version = 0.9.1
33
commit = True
44
message = services/api-server version: {current_version} → {new_version}
55
tag = False
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Annotated
2+
3+
from fastapi import Depends
4+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
5+
6+
from ...services_rpc.async_jobs import AsyncJobClient
7+
from .rabbitmq import get_rabbitmq_rpc_client
8+
9+
10+
def get_async_jobs_client(
11+
rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)],
12+
) -> AsyncJobClient:
13+
return AsyncJobClient(_rabbitmq_rpc_client=rabbitmq_rpc_client)

services/api-server/src/simcore_service_api_server/api/root.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
solvers_jobs_read,
1919
studies,
2020
studies_jobs,
21+
tasks,
2122
users,
2223
wallets,
2324
)
@@ -65,6 +66,7 @@ def create_router(settings: ApplicationSettings):
6566
router.include_router(
6667
functions_routes.function_router, tags=["functions"], prefix=_FUNCTIONS_PREFIX
6768
)
69+
router.include_router(tasks.router, tags=["tasks"], prefix="/tasks")
6870

6971
# NOTE: multiple-files upload is currently disabled
7072
# Web form to upload files at http://localhost:8000/v0/upload-form-view
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import logging
2+
from typing import Annotated, Any
3+
4+
from fastapi import APIRouter, Depends, FastAPI, status
5+
from models_library.api_schemas_long_running_tasks.base import TaskProgress
6+
from models_library.api_schemas_long_running_tasks.tasks import (
7+
TaskGet,
8+
TaskResult,
9+
TaskStatus,
10+
)
11+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
12+
AsyncJobId,
13+
AsyncJobNameData,
14+
)
15+
from models_library.products import ProductName
16+
from models_library.users import UserID
17+
from servicelib.fastapi.dependencies import get_app
18+
19+
from ...models.schemas.base import ApiServerEnvelope
20+
from ...models.schemas.errors import ErrorGet
21+
from ...services_rpc.async_jobs import AsyncJobClient
22+
from ..dependencies.authentication import get_current_user_id, get_product_name
23+
from ..dependencies.tasks import get_async_jobs_client
24+
from ._constants import (
25+
FMSG_CHANGELOG_NEW_IN_VERSION,
26+
create_route_description,
27+
)
28+
29+
router = APIRouter()
30+
_logger = logging.getLogger(__name__)
31+
32+
33+
def _get_job_id_data(user_id: UserID, product_name: ProductName) -> AsyncJobNameData:
34+
return AsyncJobNameData(user_id=user_id, product_name=product_name)
35+
36+
37+
_DEFAULT_TASK_STATUS_CODES: dict[int | str, dict[str, Any]] = {
38+
status.HTTP_500_INTERNAL_SERVER_ERROR: {
39+
"description": "Internal server error",
40+
"model": ErrorGet,
41+
},
42+
}
43+
44+
45+
@router.get(
46+
"",
47+
response_model=ApiServerEnvelope[list[TaskGet]],
48+
responses=_DEFAULT_TASK_STATUS_CODES,
49+
description=create_route_description(
50+
base="List all tasks",
51+
changelog=[
52+
FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"),
53+
],
54+
),
55+
include_in_schema=False, # TO BE RELEASED in 0.10-rc1
56+
)
57+
async def list_tasks(
58+
app: Annotated[FastAPI, Depends(get_app)],
59+
user_id: Annotated[UserID, Depends(get_current_user_id)],
60+
product_name: Annotated[ProductName, Depends(get_product_name)],
61+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
62+
):
63+
user_async_jobs = await async_jobs.list_jobs(
64+
job_id_data=_get_job_id_data(user_id, product_name),
65+
filter_="",
66+
)
67+
app_router = app.router
68+
data = [
69+
TaskGet(
70+
task_id=f"{job.job_id}",
71+
task_name=job.job_name,
72+
status_href=app_router.url_path_for(
73+
"get_task_status", task_id=f"{job.job_id}"
74+
),
75+
abort_href=app_router.url_path_for("cancel_task", task_id=f"{job.job_id}"),
76+
result_href=app_router.url_path_for(
77+
"get_task_result", task_id=f"{job.job_id}"
78+
),
79+
)
80+
for job in user_async_jobs
81+
]
82+
return ApiServerEnvelope(data=data)
83+
84+
85+
@router.get(
86+
"/{task_id}",
87+
response_model=TaskStatus,
88+
responses=_DEFAULT_TASK_STATUS_CODES,
89+
description=create_route_description(
90+
base="Get task status",
91+
changelog=[
92+
FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"),
93+
],
94+
),
95+
include_in_schema=False, # TO BE RELEASED in 0.10-rc1
96+
)
97+
async def get_task_status(
98+
task_id: AsyncJobId,
99+
user_id: Annotated[UserID, Depends(get_current_user_id)],
100+
product_name: Annotated[ProductName, Depends(get_product_name)],
101+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
102+
):
103+
async_job_rpc_status = await async_jobs.status(
104+
job_id=task_id,
105+
job_id_data=_get_job_id_data(user_id, product_name),
106+
)
107+
_task_id = f"{async_job_rpc_status.job_id}"
108+
return TaskStatus(
109+
task_progress=TaskProgress(
110+
task_id=_task_id, percent=async_job_rpc_status.progress.percent_value
111+
),
112+
done=async_job_rpc_status.done,
113+
started=None,
114+
)
115+
116+
117+
@router.post(
118+
"/{task_id}:cancel",
119+
status_code=status.HTTP_204_NO_CONTENT,
120+
responses=_DEFAULT_TASK_STATUS_CODES,
121+
description=create_route_description(
122+
base="Cancel task",
123+
changelog=[
124+
FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"),
125+
],
126+
),
127+
include_in_schema=False, # TO BE RELEASED in 0.10-rc1
128+
)
129+
async def cancel_task(
130+
task_id: AsyncJobId,
131+
user_id: Annotated[UserID, Depends(get_current_user_id)],
132+
product_name: Annotated[ProductName, Depends(get_product_name)],
133+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
134+
):
135+
await async_jobs.cancel(
136+
job_id=task_id,
137+
job_id_data=_get_job_id_data(user_id, product_name),
138+
)
139+
140+
141+
@router.get(
142+
"/{task_id}/result",
143+
response_model=TaskResult,
144+
responses={
145+
status.HTTP_404_NOT_FOUND: {
146+
"description": "Task result not found",
147+
"model": ErrorGet,
148+
},
149+
status.HTTP_409_CONFLICT: {
150+
"description": "Task is cancelled",
151+
"model": ErrorGet,
152+
},
153+
**_DEFAULT_TASK_STATUS_CODES,
154+
},
155+
description=create_route_description(
156+
base="Get task result",
157+
changelog=[
158+
FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"),
159+
],
160+
),
161+
include_in_schema=False, # TO BE RELEASED in 0.10-rc1
162+
)
163+
async def get_task_result(
164+
task_id: AsyncJobId,
165+
user_id: Annotated[UserID, Depends(get_current_user_id)],
166+
product_name: Annotated[ProductName, Depends(get_product_name)],
167+
async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)],
168+
):
169+
async_job_rpc_result = await async_jobs.result(
170+
job_id=task_id,
171+
job_id_data=_get_job_id_data(user_id, product_name),
172+
)
173+
return TaskResult(result=async_job_rpc_result.result, error=None)
Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,32 @@
1+
import logging
2+
3+
from common_library.error_codes import create_error_code
4+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
5+
from servicelib.status_codes_utils import is_5xx_server_error
16
from starlette.requests import Request
27
from starlette.responses import JSONResponse
38

49
from ...exceptions.backend_errors import BaseBackEndError
510
from ._utils import create_error_json_response
611

12+
_logger = logging.getLogger(__name__)
13+
714

815
async def backend_error_handler(request: Request, exc: Exception) -> JSONResponse:
916
assert request # nosec
1017
assert isinstance(exc, BaseBackEndError)
11-
12-
return create_error_json_response(f"{exc}", status_code=exc.status_code)
18+
user_error_msg = f"{exc}"
19+
support_id = None
20+
if is_5xx_server_error(exc.status_code):
21+
support_id = create_error_code(exc)
22+
_logger.exception(
23+
**create_troubleshootting_log_kwargs(
24+
user_error_msg,
25+
error=exc,
26+
error_code=support_id,
27+
tip="Unexpected error",
28+
)
29+
)
30+
return create_error_json_response(
31+
user_error_msg, status_code=exc.status_code, support_id=support_id
32+
)

services/api-server/src/simcore_service_api_server/exceptions/handlers/_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from collections.abc import Awaitable, Callable
22
from typing import Any, TypeAlias
33

4+
from common_library.error_codes import ErrorCodeStr
45
from fastapi.encoders import jsonable_encoder
56
from fastapi.requests import Request
67
from fastapi.responses import JSONResponse
@@ -13,13 +14,13 @@
1314

1415

1516
def create_error_json_response(
16-
*errors: Any, status_code: int, **kwargs
17+
*errors: Any, status_code: int, support_id: ErrorCodeStr | None = None, **kwargs
1718
) -> JSONResponse:
1819
"""
1920
Converts errors to Error response model defined in the OAS
2021
"""
2122

22-
error_model = ErrorGet(errors=list(errors))
23+
error_model = ErrorGet(errors=list(errors), support_id=support_id, **kwargs)
2324
return JSONResponse(
2425
content=jsonable_encoder(error_model),
2526
status_code=status_code,

0 commit comments

Comments
 (0)