Skip to content

Commit b5230f1

Browse files
bisgaard-itisgiancarloromeosanderegg
authored
Introduce links for async jobs actions (hateos style) in webserver (ITISFoundation#7320)
Co-authored-by: Giancarlo Romeo <[email protected]> Co-authored-by: Sylvain <[email protected]>
1 parent fb2ccf3 commit b5230f1

File tree

29 files changed

+1611
-1135
lines changed

29 files changed

+1611
-1135
lines changed

api/specs/web-server/_long_running_tasks.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
# pylint: disable=too-many-arguments
55

66

7-
from typing import Annotated
7+
from typing import Annotated, Any
88

99
from fastapi import APIRouter, Depends, status
1010
from models_library.generics import Envelope
11+
from models_library.rest_error import EnvelopedError
1112
from servicelib.aiohttp.long_running_tasks._routes import _PathParam
1213
from servicelib.long_running_tasks._models import TaskGet, TaskStatus
1314
from simcore_service_webserver._meta import API_VTAG
15+
from simcore_service_webserver.tasks._exception_handlers import (
16+
_TO_HTTP_ERROR_MAP as data_export_http_error_map,
17+
)
1418

1519
router = APIRouter(
1620
prefix=f"/{API_VTAG}",
@@ -19,37 +23,52 @@
1923
],
2024
)
2125

26+
_data_export_responses: dict[int | str, dict[str, Any]] = {
27+
i.status_code: {"model": EnvelopedError}
28+
for i in data_export_http_error_map.values()
29+
}
30+
2231

2332
@router.get(
2433
"/tasks",
2534
response_model=Envelope[list[TaskGet]],
35+
name="list_tasks",
36+
description="Lists all long running tasks",
37+
responses=_data_export_responses,
2638
)
27-
def list_tasks():
28-
...
39+
def get_async_jobs(): ...
2940

3041

3142
@router.get(
3243
"/tasks/{task_id}",
3344
response_model=Envelope[TaskStatus],
45+
name="get_task_status",
46+
description="Retrieves the status of a task",
47+
responses=_data_export_responses,
3448
)
35-
def get_task_status(
49+
def get_async_job_status(
3650
_path_params: Annotated[_PathParam, Depends()],
37-
):
38-
...
51+
): ...
3952

4053

4154
@router.delete(
4255
"/tasks/{task_id}",
56+
name="cancel_and_delete_task",
57+
description="Cancels and deletes a task",
58+
responses=_data_export_responses,
4359
status_code=status.HTTP_204_NO_CONTENT,
4460
)
45-
def cancel_and_delete_task(
61+
def abort_async_job(
4662
_path_params: Annotated[_PathParam, Depends()],
47-
):
48-
...
63+
): ...
4964

5065

51-
@router.get("/tasks/{task_id}/result")
52-
def get_task_result(
66+
@router.get(
67+
"/tasks/{task_id}/result",
68+
name="get_task_result",
69+
description="Retrieves the result of a task",
70+
responses=_data_export_responses,
71+
)
72+
def get_async_job_result(
5373
_path_params: Annotated[_PathParam, Depends()],
54-
):
55-
...
74+
): ...

api/specs/web-server/_storage.py

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
# pylint: disable=too-many-arguments
55

66

7-
from typing import Annotated, TypeAlias
8-
from uuid import UUID
7+
from typing import Annotated, Any, TypeAlias
98

109
from fastapi import APIRouter, Depends, Query, status
10+
from models_library.api_schemas_long_running_tasks.tasks import (
11+
TaskGet,
12+
)
1113
from models_library.api_schemas_storage.storage_schemas import (
1214
FileLocation,
1315
FileMetaDataGet,
@@ -22,19 +24,19 @@
2224
from models_library.api_schemas_webserver.storage import (
2325
DataExportPost,
2426
ListPathsQueryParams,
25-
StorageAsyncJobGet,
26-
StorageAsyncJobResult,
27-
StorageAsyncJobStatus,
2827
StorageLocationPathParams,
2928
StoragePathComputeSizeParams,
3029
)
3130
from models_library.generics import Envelope
3231
from models_library.projects_nodes_io import LocationID
33-
from models_library.users import UserID
32+
from models_library.rest_error import EnvelopedError
3433
from pydantic import AnyUrl, ByteSize
3534
from servicelib.fastapi.rest_pagination import CustomizedPathsCursorPage
3635
from simcore_service_webserver._meta import API_VTAG
3736
from simcore_service_webserver.storage.schemas import DatasetMetaData, FileMetaData
37+
from simcore_service_webserver.tasks._exception_handlers import (
38+
_TO_HTTP_ERROR_MAP as data_export_http_error_map,
39+
)
3840

3941
router = APIRouter(
4042
prefix=f"/{API_VTAG}",
@@ -71,7 +73,7 @@ async def list_storage_paths(
7173

7274
@router.post(
7375
"/storage/locations/{location_id}/paths/{path}:size",
74-
response_model=Envelope[StorageAsyncJobGet],
76+
response_model=Envelope[TaskGet],
7577
status_code=status.HTTP_202_ACCEPTED,
7678
)
7779
async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depends()]):
@@ -205,46 +207,18 @@ async def is_completed_upload_file(
205207

206208

207209
# data export
210+
_data_export_responses: dict[int | str, dict[str, Any]] = {
211+
i.status_code: {"model": EnvelopedError}
212+
for i in data_export_http_error_map.values()
213+
}
214+
215+
208216
@router.post(
209217
"/storage/locations/{location_id}/export-data",
210-
response_model=Envelope[StorageAsyncJobGet],
218+
response_model=Envelope[TaskGet],
211219
name="export_data",
212220
description="Export data",
221+
responses=_data_export_responses,
213222
)
214223
async def export_data(data_export: DataExportPost, location_id: LocationID):
215224
"""Trigger data export. Returns async job id for getting status and results"""
216-
217-
218-
@router.get(
219-
"/storage/async-jobs/{job_id}/status",
220-
response_model=Envelope[StorageAsyncJobStatus],
221-
name="get_async_job_status",
222-
)
223-
async def get_async_job_status(job_id: UUID):
224-
"""Get async job status"""
225-
226-
227-
@router.post(
228-
"/storage/async-jobs/{job_id}:abort",
229-
name="abort_async_job",
230-
)
231-
async def abort_async_job(job_id: UUID):
232-
"""aborts execution of an async job"""
233-
234-
235-
@router.get(
236-
"/storage/async-jobs/{job_id}/result",
237-
response_model=Envelope[StorageAsyncJobResult],
238-
name="get_async_job_result",
239-
)
240-
async def get_async_job_result(job_id: UUID):
241-
"""Get the result of the async job"""
242-
243-
244-
@router.get(
245-
"/storage/async-jobs",
246-
response_model=Envelope[list[StorageAsyncJobGet]],
247-
name="get_async_jobs",
248-
)
249-
async def get_async_jobs(user_id: UserID):
250-
"""Retrunsa list of async jobs for the user"""

api/specs/web-server/_tasks.py

Lines changed: 0 additions & 43 deletions
This file was deleted.

packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class TaskStatus(BaseModel):
1111
task_progress: TaskProgress
1212
done: bool
13-
started: datetime
13+
started: datetime | None
1414

1515

1616
class TaskResult(BaseModel):

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ class AsyncJobStatus(BaseModel):
1616

1717

1818
class AsyncJobResult(BaseModel):
19-
result: Any | None
20-
error: Any | None
19+
result: Any
2120

2221

2322
class AsyncJobGet(BaseModel):

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,27 @@ class BaseAsyncjobRpcError(OsparcErrorMixin, RuntimeError):
55
pass
66

77

8-
class StatusError(BaseAsyncjobRpcError):
8+
class JobSchedulerError(BaseAsyncjobRpcError):
9+
msg_template: str = "Celery exception: {exc}"
10+
11+
12+
class JobMissingError(BaseAsyncjobRpcError):
13+
msg_template: str = "Job {job_id} does not exist"
14+
15+
16+
class JobStatusError(BaseAsyncjobRpcError):
917
msg_template: str = "Could not get status of job {job_id}"
1018

1119

12-
class ResultError(BaseAsyncjobRpcError):
13-
msg_template: str = "Could not get results of job {job_id}"
20+
class JobNotDoneError(BaseAsyncjobRpcError):
21+
msg_template: str = "Job {job_id} not done"
22+
23+
24+
class JobAbortedError(BaseAsyncjobRpcError):
25+
msg_template: str = "Job {job_id} aborted"
26+
27+
28+
class JobError(BaseAsyncjobRpcError):
29+
msg_template: str = (
30+
"Job {job_id} failed with exception type {exc_type} and message {exc_msg}"
31+
)

packages/models-library/src/models_library/api_schemas_storage/data_export_async_jobs.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,11 @@ class StorageRpcBaseError(OsparcErrorMixin, RuntimeError):
1717
pass
1818

1919

20-
class InvalidLocationIdError(StorageRpcBaseError):
21-
msg_template: str = "Invalid location_id {location_id}"
22-
23-
2420
class InvalidFileIdentifierError(StorageRpcBaseError):
2521
msg_template: str = "Could not find the file {file_id}"
2622

2723

2824
class AccessRightError(StorageRpcBaseError):
29-
msg_template: str = "User {user_id} does not have access to file {file_id} with location {location_id}"
30-
31-
32-
class DataExportError(StorageRpcBaseError):
33-
msg_template: str = "Could not complete data export job with id {job_id}"
25+
msg_template: str = (
26+
"User {user_id} does not have access to file {file_id} with location {location_id}"
27+
)
Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,18 @@
11
from pathlib import Path
2-
from typing import Annotated, Any
2+
from typing import Annotated
33

44
from pydantic import BaseModel, Field
55

6-
from ..api_schemas_rpc_async_jobs.async_jobs import (
7-
AsyncJobGet,
8-
AsyncJobId,
9-
AsyncJobResult,
10-
AsyncJobStatus,
11-
)
126
from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput
137
from ..api_schemas_storage.storage_schemas import (
148
DEFAULT_NUMBER_OF_PATHS_PER_PAGE,
159
MAX_NUMBER_OF_PATHS_PER_PAGE,
1610
)
17-
from ..progress_bar import ProgressReport
1811
from ..projects_nodes_io import LocationID, StorageFileID
1912
from ..rest_pagination import (
2013
CursorQueryParameters,
2114
)
22-
from ._base import InputSchema, OutputSchema
15+
from ._base import InputSchema
2316

2417

2518
class StorageLocationPathParams(BaseModel):
@@ -51,40 +44,3 @@ def to_rpc_schema(self, location_id: LocationID) -> DataExportTaskStartInput:
5144
file_and_folder_ids=self.paths,
5245
location_id=location_id,
5346
)
54-
55-
56-
class StorageAsyncJobGet(OutputSchema):
57-
job_id: AsyncJobId
58-
59-
@classmethod
60-
def from_rpc_schema(cls, async_job_rpc_get: AsyncJobGet) -> "StorageAsyncJobGet":
61-
return StorageAsyncJobGet(job_id=async_job_rpc_get.job_id)
62-
63-
64-
class StorageAsyncJobStatus(OutputSchema):
65-
job_id: AsyncJobId
66-
progress: ProgressReport
67-
done: bool
68-
69-
@classmethod
70-
def from_rpc_schema(
71-
cls, async_job_rpc_status: AsyncJobStatus
72-
) -> "StorageAsyncJobStatus":
73-
return StorageAsyncJobStatus(
74-
job_id=async_job_rpc_status.job_id,
75-
progress=async_job_rpc_status.progress,
76-
done=async_job_rpc_status.done,
77-
)
78-
79-
80-
class StorageAsyncJobResult(OutputSchema):
81-
result: Any | None
82-
error: Any | None
83-
84-
@classmethod
85-
def from_rpc_schema(
86-
cls, async_job_rpc_result: AsyncJobResult
87-
) -> "StorageAsyncJobResult":
88-
return StorageAsyncJobResult(
89-
result=async_job_rpc_result.result, error=async_job_rpc_result.error
90-
)

0 commit comments

Comments
 (0)