Skip to content

Commit 482dc74

Browse files
fix tasks endpoints
1 parent 56a8623 commit 482dc74

File tree

5 files changed

+174
-70
lines changed

5 files changed

+174
-70
lines changed

services/web/server/src/simcore_service_webserver/storage/_rest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
from ..models import AuthenticatedRequestContext, WebServerOwnerMetadata
5858
from ..rabbitmq import get_rabbitmq_rpc_client
5959
from ..security.decorators import permission_required
60-
from ..tasks._controller._rest_exceptions import handle_export_data_exceptions
60+
from ..tasks._controller._rest_exceptions import handle_rest_requests_exceptions
6161
from .schemas import StorageFileIDStr
6262
from .settings import StorageSettings, get_plugin_settings
6363

@@ -486,7 +486,7 @@ class _PathParams(BaseModel):
486486
)
487487
@login_required
488488
@permission_required("storage.files.*")
489-
@handle_export_data_exceptions
489+
@handle_rest_requests_exceptions
490490
async def export_data(request: web.Request) -> web.Response:
491491
class _PathParams(BaseModel):
492492
location_id: LocationID
Lines changed: 42 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
"""Handlers exposed by subsystem"""
2-
31
import logging
42
from typing import Final
5-
from uuid import UUID
63

74
from aiohttp import web
85
from models_library.api_schemas_long_running_tasks.base import TaskProgress
@@ -11,11 +8,6 @@
118
TaskResult,
129
TaskStatus,
1310
)
14-
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
15-
AsyncJobId,
16-
)
17-
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
18-
from pydantic import BaseModel
1911
from servicelib.aiohttp import status
2012
from servicelib.aiohttp.long_running_tasks.server import (
2113
get_long_running_manager,
@@ -26,15 +18,15 @@
2618
from servicelib.aiohttp.rest_responses import create_data_response
2719
from servicelib.celery.models import OwnerMetadata
2820
from servicelib.long_running_tasks import lrt_api
29-
from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs
21+
from simcore_service_webserver.tasks._controller._rest_schemas import TaskPathParams
3022

3123
from ..._meta import API_VTAG
24+
from ...celery import get_task_manager
3225
from ...login.decorators import login_required
3326
from ...long_running_tasks.plugin import webserver_request_context_decorator
3427
from ...models import AuthenticatedRequestContext, WebServerOwnerMetadata
35-
from ...rabbitmq import get_rabbitmq_rpc_client
36-
from ...security.decorators import permission_required
37-
from ._rest_exceptions import handle_export_data_exceptions
28+
from .. import _service
29+
from ._rest_exceptions import handle_rest_requests_exceptions
3830

3931
log = logging.getLogger(__name__)
4032

@@ -46,13 +38,12 @@
4638

4739
@routes.get(
4840
_task_prefix,
49-
name="get_async_jobs",
41+
name="get_tasks",
5042
)
5143
@login_required
52-
@permission_required("storage.files.*")
53-
@handle_export_data_exceptions
44+
@handle_rest_requests_exceptions
5445
@webserver_request_context_decorator
55-
async def get_async_jobs(request: web.Request) -> web.Response:
46+
async def get_tasks(request: web.Request) -> web.Response:
5647
inprocess_long_running_manager = get_long_running_manager(request.app)
5748
inprocess_tracked_tasks = await lrt_api.list_tasks(
5849
inprocess_long_running_manager.rpc_client,
@@ -62,28 +53,26 @@ async def get_async_jobs(request: web.Request) -> web.Response:
6253

6354
_req_ctx = AuthenticatedRequestContext.model_validate(request)
6455

65-
rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app)
66-
67-
user_async_jobs = await async_jobs.list_jobs(
68-
rabbitmq_rpc_client=rabbitmq_rpc_client,
69-
rpc_namespace=STORAGE_RPC_NAMESPACE,
56+
tasks = await _service.list_tasks(
57+
task_manager=get_task_manager(request.app),
7058
owner_metadata=OwnerMetadata.model_validate(
7159
WebServerOwnerMetadata(
7260
user_id=_req_ctx.user_id,
7361
product_name=_req_ctx.product_name,
7462
).model_dump()
7563
),
7664
)
65+
7766
return create_data_response(
7867
[
7968
TaskGet(
80-
task_id=f"{job.job_id}",
81-
task_name=job.job_name,
82-
status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=str(job.job_id))))}",
83-
abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=str(job.job_id))))}",
84-
result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=str(job.job_id))))}",
69+
task_id=f"{task.job_id}",
70+
task_name=task.job_name,
71+
status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=str(task.job_id))))}",
72+
abort_href=f"{request.url.with_path(str(request.app.router['cancel_async_job'].url_for(task_id=str(task.job_id))))}",
73+
result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=str(task.job_id))))}",
8574
)
86-
for job in user_async_jobs
75+
for task in tasks
8776
]
8877
+ [
8978
TaskGet(
@@ -98,40 +87,35 @@ async def get_async_jobs(request: web.Request) -> web.Response:
9887
)
9988

10089

101-
class _StorageAsyncJobId(BaseModel):
102-
task_id: AsyncJobId
103-
104-
10590
@routes.get(
10691
_task_prefix + "/{task_id}",
107-
name="get_async_job_status",
92+
name="get_task_status",
10893
)
10994
@login_required
110-
@handle_export_data_exceptions
111-
async def get_async_job_status(request: web.Request) -> web.Response:
95+
@handle_rest_requests_exceptions
96+
async def get_task_status(request: web.Request) -> web.Response:
11297

11398
_req_ctx = AuthenticatedRequestContext.model_validate(request)
114-
rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app)
99+
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
115100

116-
async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request)
117-
async_job_rpc_status = await async_jobs.status(
118-
rabbitmq_rpc_client=rabbitmq_rpc_client,
119-
rpc_namespace=STORAGE_RPC_NAMESPACE,
120-
job_id=async_job_get.task_id,
101+
task_status = await _service.get_task_status(
102+
task_manager=get_task_manager(request.app),
121103
owner_metadata=OwnerMetadata.model_validate(
122104
WebServerOwnerMetadata(
123105
user_id=_req_ctx.user_id,
124106
product_name=_req_ctx.product_name,
125107
).model_dump()
126108
),
109+
task_uuid=_path_params.task_id,
127110
)
128-
_task_id = f"{async_job_rpc_status.job_id}"
111+
112+
_task_id = f"{task_status.job_id}"
129113
return create_data_response(
130114
TaskStatus(
131115
task_progress=TaskProgress(
132-
task_id=_task_id, percent=async_job_rpc_status.progress.percent_value
116+
task_id=_task_id, percent=task_status.progress.percent_value
133117
),
134-
done=async_job_rpc_status.done,
118+
done=task_status.done,
135119
started=None,
136120
),
137121
status=status.HTTP_200_OK,
@@ -140,61 +124,52 @@ async def get_async_job_status(request: web.Request) -> web.Response:
140124

141125
@routes.delete(
142126
_task_prefix + "/{task_id}",
143-
name="cancel_async_job",
127+
name="cancel_task",
144128
)
145129
@login_required
146-
@permission_required("storage.files.*")
147-
@handle_export_data_exceptions
148-
async def cancel_async_job(request: web.Request) -> web.Response:
130+
@handle_rest_requests_exceptions
131+
async def cancel_task(request: web.Request) -> web.Response:
149132

150133
_req_ctx = AuthenticatedRequestContext.model_validate(request)
134+
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
151135

152-
rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app)
153-
async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request)
154-
155-
await async_jobs.cancel(
156-
rabbitmq_rpc_client=rabbitmq_rpc_client,
157-
rpc_namespace=STORAGE_RPC_NAMESPACE,
158-
job_id=async_job_get.task_id,
136+
await _service.cancel_task(
137+
task_manager=get_task_manager(request.app),
159138
owner_metadata=OwnerMetadata.model_validate(
160139
WebServerOwnerMetadata(
161140
user_id=_req_ctx.user_id,
162141
product_name=_req_ctx.product_name,
163142
).model_dump()
164143
),
144+
task_uuid=_path_params.task_id,
165145
)
166146

167147
return web.Response(status=status.HTTP_204_NO_CONTENT)
168148

169149

170150
@routes.get(
171151
_task_prefix + "/{task_id}/result",
172-
name="get_async_job_result",
152+
name="get_task_result",
173153
)
174154
@login_required
175-
@permission_required("storage.files.*")
176-
@handle_export_data_exceptions
177-
async def get_async_job_result(request: web.Request) -> web.Response:
178-
class _PathParams(BaseModel):
179-
task_id: UUID
155+
@handle_rest_requests_exceptions
156+
async def get_task_result(request: web.Request) -> web.Response:
180157

181158
_req_ctx = AuthenticatedRequestContext.model_validate(request)
159+
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
182160

183-
rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app)
184-
async_job_get = parse_request_path_parameters_as(_PathParams, request)
185-
async_job_rpc_result = await async_jobs.result(
186-
rabbitmq_rpc_client=rabbitmq_rpc_client,
187-
rpc_namespace=STORAGE_RPC_NAMESPACE,
188-
job_id=async_job_get.task_id,
161+
task_result = await _service.get_task_result(
162+
task_manager=get_task_manager(request.app),
189163
owner_metadata=OwnerMetadata.model_validate(
190164
WebServerOwnerMetadata(
191165
user_id=_req_ctx.user_id,
192166
product_name=_req_ctx.product_name,
193167
).model_dump()
194168
),
169+
task_uuid=_path_params.task_id,
195170
)
196171

197172
return create_data_response(
198-
TaskResult(result=async_job_rpc_result.result, error=None),
173+
TaskResult(result=task_result.result, error=None),
199174
status=status.HTTP_200_OK,
200175
)

services/web/server/src/simcore_service_webserver/tasks/_controller/_rest_exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,6 @@
6969
}
7070

7171

72-
handle_export_data_exceptions = exception_handling_decorator(
72+
handle_rest_requests_exceptions = exception_handling_decorator(
7373
to_exceptions_handlers_map(_TO_HTTP_ERROR_MAP)
7474
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from uuid import UUID
2+
3+
from pydantic import BaseModel
4+
from simcore_service_webserver.models import ConfigDict
5+
6+
7+
class TaskPathParams(BaseModel):
8+
task_id: UUID
9+
model_config = ConfigDict(extra="forbid", frozen=True)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import logging
2+
3+
from celery_library.errors import (
4+
TaskManagerError,
5+
TaskNotFoundError,
6+
TransferrableCeleryError,
7+
decode_celery_transferrable_error,
8+
)
9+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
10+
AsyncJobGet,
11+
AsyncJobResult,
12+
AsyncJobStatus,
13+
)
14+
from models_library.api_schemas_rpc_async_jobs.exceptions import (
15+
JobError,
16+
JobMissingError,
17+
JobNotDoneError,
18+
JobSchedulerError,
19+
)
20+
from servicelib.celery.models import OwnerMetadata, TaskState, TaskUUID
21+
from servicelib.celery.task_manager import TaskManager
22+
from servicelib.logging_utils import log_catch
23+
24+
_logger = logging.getLogger(__name__)
25+
26+
27+
async def cancel_task(
28+
task_manager: TaskManager,
29+
owner_metadata: OwnerMetadata,
30+
task_uuid: TaskUUID,
31+
):
32+
try:
33+
await task_manager.cancel_task(
34+
owner_metadata=owner_metadata,
35+
task_uuid=task_uuid,
36+
)
37+
except TaskNotFoundError as exc:
38+
raise JobMissingError(job_id=task_uuid) from exc
39+
except TaskManagerError as exc:
40+
raise JobSchedulerError(exc=f"{exc}") from exc
41+
42+
43+
async def get_task_result(
44+
task_manager: TaskManager,
45+
owner_metadata: OwnerMetadata,
46+
task_uuid: TaskUUID,
47+
) -> AsyncJobResult:
48+
try:
49+
_status = await task_manager.get_task_status(
50+
owner_metadata=owner_metadata,
51+
task_uuid=task_uuid,
52+
)
53+
if not _status.is_done:
54+
raise JobNotDoneError(job_id=task_uuid)
55+
_result = await task_manager.get_task_result(
56+
owner_metadata=owner_metadata,
57+
task_uuid=task_uuid,
58+
)
59+
except TaskNotFoundError as exc:
60+
raise JobMissingError(job_id=task_uuid) from exc
61+
except InternalError as exc:
62+
raise JobSchedulerError(exc=f"{exc}") from exc
63+
64+
if _status.task_state == TaskState.FAILURE:
65+
# fallback exception to report
66+
exc_type = type(_result).__name__
67+
exc_msg = f"{_result}"
68+
69+
# try to recover the original error
70+
exception = None
71+
with log_catch(_logger, reraise=False):
72+
assert isinstance(_result, TransferrableCeleryError) # nosec
73+
exception = decode_celery_transferrable_error(_result)
74+
exc_type = type(exception).__name__
75+
exc_msg = f"{exception}"
76+
77+
if exception is None:
78+
_logger.warning("Was not expecting '%s': '%s'", exc_type, exc_msg)
79+
80+
raise JobError(job_id=task_uuid, exc_type=exc_type, exc_msg=exc_msg)
81+
82+
return AsyncJobResult(result=_result)
83+
84+
85+
async def get_task_status(
86+
task_manager: TaskManager,
87+
owner_metadata: OwnerMetadata,
88+
task_uuid: TaskUUID,
89+
) -> AsyncJobStatus:
90+
try:
91+
task_status = await task_manager.get_task_status(
92+
owner_metadata=owner_metadata,
93+
task_uuid=task_uuid,
94+
)
95+
except TaskNotFoundError as exc:
96+
raise JobMissingError(job_id=task_uuid) from exc
97+
except InternalError as exc:
98+
raise JobSchedulerError(exc=f"{exc}") from exc
99+
100+
return AsyncJobStatus(
101+
job_id=task_uuid,
102+
progress=task_status.progress_report,
103+
done=task_status.is_done,
104+
)
105+
106+
107+
async def list_tasks(
108+
task_manager: TaskManager,
109+
owner_metadata: OwnerMetadata,
110+
) -> list[AsyncJobGet]:
111+
try:
112+
tasks = await task_manager.list_tasks(
113+
owner_metadata=owner_metadata,
114+
)
115+
except InternalError as exc:
116+
raise JobSchedulerError(exc=f"{exc}") from exc
117+
118+
return [
119+
AsyncJobGet(job_id=task.uuid, job_name=task.metadata.name) for task in tasks
120+
]

0 commit comments

Comments
 (0)