Skip to content

Commit 78e82d6

Browse files
committed
call into celery
1 parent df93afe commit 78e82d6

File tree

1 file changed

+29
-57
lines changed

1 file changed

+29
-57
lines changed

services/storage/src/simcore_service_storage/api/rest/_simcore_s3.py

Lines changed: 29 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
1-
import asyncio
21
import logging
3-
from typing import Annotated, Any, cast
2+
from typing import Annotated, cast
43

5-
from fastapi import APIRouter, Depends, FastAPI, Request
6-
from models_library.api_schemas_long_running_tasks.base import TaskProgress
4+
from fastapi import APIRouter, Depends, Request
75
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
6+
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData
87
from models_library.api_schemas_storage.storage_schemas import (
98
FileMetaDataGet,
109
FoldersBody,
1110
)
1211
from models_library.generics import Envelope
1312
from models_library.projects import ProjectID
1413
from servicelib.aiohttp import status
15-
from servicelib.fastapi.long_running_tasks._dependencies import get_tasks_manager
16-
from servicelib.logging_utils import log_context
17-
from servicelib.long_running_tasks._task import start_task
1814
from settings_library.s3 import S3Settings
1915
from yarl import URL
2016

@@ -26,7 +22,12 @@
2622
StorageQueryParamsBase,
2723
)
2824
from ...modules import sts
25+
from ...modules.celery.client import CeleryTaskQueueClient
2926
from ...simcore_s3_dsm import SimcoreS3DataManager
27+
from .._worker_tasks._simcore_s3 import (
28+
deep_copy_files_from_project,
29+
)
30+
from .dependencies.celery import get_celery_client
3031

3132
_logger = logging.getLogger(__name__)
3233

@@ -50,68 +51,39 @@ async def get_or_create_temporary_s3_access(
5051
return Envelope[S3Settings](data=s3_settings)
5152

5253

53-
async def _copy_folders_from_project(
54-
progress: TaskProgress,
55-
app: FastAPI,
56-
query_params: StorageQueryParamsBase,
57-
body: FoldersBody,
58-
) -> Envelope[dict[str, Any]]:
59-
dsm = cast(
60-
SimcoreS3DataManager,
61-
get_dsm_provider(app).get(SimcoreS3DataManager.get_location_id()),
62-
)
63-
with log_context(
64-
_logger,
65-
logging.INFO,
66-
msg=f"copying {body.source['uuid']} -> {body.destination['uuid']}",
67-
):
68-
await dsm.deep_copy_project_simcore_s3(
69-
query_params.user_id,
70-
body.source,
71-
body.destination,
72-
body.nodes_map,
73-
task_progress=progress,
74-
)
75-
76-
return Envelope[dict[str, Any]](data=body.destination)
77-
78-
7954
@router.post(
8055
"/simcore-s3/folders",
8156
response_model=Envelope[TaskGet],
8257
status_code=status.HTTP_202_ACCEPTED,
8358
)
8459
async def copy_folders_from_project(
60+
celery_client: Annotated[CeleryTaskQueueClient, Depends(get_celery_client)],
8561
query_params: Annotated[StorageQueryParamsBase, Depends()],
8662
body: FoldersBody,
8763
request: Request,
8864
):
89-
task_id = None
90-
try:
91-
task_id = start_task(
92-
get_tasks_manager(request),
93-
_copy_folders_from_project,
94-
app=request.app,
95-
query_params=query_params,
96-
body=body,
97-
)
98-
relative_url = URL(f"{request.url}").relative()
65+
async_job_name_data = AsyncJobNameData(
66+
user_id=query_params.user_id,
67+
product_name="osparc", # TODO: fix this
68+
)
69+
task_uuid = await celery_client.send_task(
70+
deep_copy_files_from_project.__name__,
71+
task_context=async_job_name_data.model_dump(),
72+
user_id=async_job_name_data.user_id,
73+
body=body,
74+
)
75+
76+
relative_url = URL(f"{request.url}").relative()
9977

100-
return Envelope[TaskGet](
101-
data=TaskGet(
102-
task_id=task_id,
103-
task_name=f"{request.method} {relative_url}",
104-
status_href=f"{request.url_for('get_task_status', task_id=task_id)}",
105-
result_href=f"{request.url_for('get_task_result', task_id=task_id)}",
106-
abort_href=f"{request.url_for('cancel_and_delete_task', task_id=task_id)}",
107-
)
78+
return Envelope[TaskGet](
79+
data=TaskGet(
80+
task_id=f"{task_uuid}",
81+
task_name=f"{request.method} {relative_url}",
82+
status_href=f"{request.url_for('get_task_status', task_id=f'{task_uuid}')}",
83+
result_href=f"{request.url_for('get_task_result', task_id=f'{task_uuid}')}",
84+
abort_href=f"{request.url_for('cancel_and_delete_task', task_id=f'{task_uuid}')}",
10885
)
109-
except asyncio.CancelledError:
110-
if task_id:
111-
await get_tasks_manager(request).cancel_task(
112-
task_id, with_task_context=None
113-
)
114-
raise
86+
)
11587

11688

11789
@router.delete(

0 commit comments

Comments
 (0)