Skip to content

Commit fdcdca0

Browse files
committed
now completion is a worker task
1 parent d29a277 commit fdcdca0

File tree

1 file changed

+39
-41
lines changed
  • services/storage/src/simcore_service_storage/api/rest

1 file changed

+39
-41
lines changed

services/storage/src/simcore_service_storage/api/rest/_files.py

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import asyncio
21
import logging
32
from typing import Annotated, cast
43
from urllib.parse import quote
54

6-
from fastapi import APIRouter, Depends, Header, HTTPException, Request
5+
from fastapi import APIRouter, Depends, Header, Request
6+
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData
77
from models_library.api_schemas_storage.storage_schemas import (
88
FileMetaDataGet,
99
FileMetaDataGetv010,
@@ -34,9 +34,11 @@
3434
StorageQueryParamsBase,
3535
UploadLinks,
3636
)
37-
from ...modules.long_running_tasks import get_completed_upload_tasks
37+
from ...modules.celery.client import CeleryTaskQueueClient
38+
from ...modules.celery.models import TaskUUID
3839
from ...simcore_s3_dsm import SimcoreS3DataManager
39-
from ...utils.utils import create_upload_completion_task_name
40+
from .._worker_tasks._files import complete_upload_file as remote_complete_upload_file
41+
from .dependencies.celery import get_celery_client
4042

4143
_logger = logging.getLogger(__name__)
4244

@@ -263,21 +265,27 @@ async def abort_upload_file(
263265
status_code=status.HTTP_202_ACCEPTED,
264266
)
265267
async def complete_upload_file(
268+
celery_client: Annotated[CeleryTaskQueueClient, Depends(get_celery_client)],
266269
query_params: Annotated[StorageQueryParamsBase, Depends()],
267270
location_id: LocationID,
268271
file_id: StorageFileID,
269272
body: FileUploadCompletionBody,
270273
request: Request,
271274
):
272-
dsm = get_dsm_provider(request.app).get(location_id)
273275
# NOTE: completing a multipart upload on AWS can take up to several minutes
274276
# if it returns slow we return a 202 - Accepted, the client will have to check later
275277
# for completeness
276-
task = asyncio.create_task(
277-
dsm.complete_file_upload(file_id, query_params.user_id, body.parts),
278-
name=create_upload_completion_task_name(query_params.user_id, file_id),
278+
async_job_name_data = AsyncJobNameData(
279+
user_id=query_params.user_id, product_name="osparc"
280+
)
281+
task_uuid = await celery_client.send_task(
282+
remote_complete_upload_file.__name__,
283+
task_context=async_job_name_data.model_dump(),
284+
user_id=async_job_name_data.user_id,
285+
location_id=location_id,
286+
file_id=file_id,
287+
body=body,
279288
)
280-
get_completed_upload_tasks(request.app)[task.get_name()] = task
281289

282290
route = (
283291
URL(f"{request.url}")
@@ -287,7 +295,7 @@ async def complete_upload_file(
287295
"is_completed_upload_file",
288296
location_id=f"{location_id}",
289297
file_id=file_id,
290-
future_id=task.get_name(),
298+
future_id=f"{task_uuid}",
291299
),
292300
safe=":/",
293301
),
@@ -310,48 +318,38 @@ async def complete_upload_file(
310318
response_model=Envelope[FileUploadCompleteFutureResponse],
311319
)
312320
async def is_completed_upload_file(
321+
celery_client: Annotated[CeleryTaskQueueClient, Depends(get_celery_client)],
313322
query_params: Annotated[StorageQueryParamsBase, Depends()],
314323
location_id: LocationID,
315324
file_id: StorageFileID,
316325
future_id: str,
317-
request: Request,
318326
):
319327
# NOTE: completing a multipart upload on AWS can take up to several minutes
320328
# therefore we wait a bit to see if it completes fast and return a 204
321329
# if it returns slow we return a 202 - Accepted, the client will have to check later
322330
# for completeness
323-
task_name = create_upload_completion_task_name(query_params.user_id, file_id)
324-
assert task_name == future_id # nosec # NOTE: fastapi auto-decode path parameters
331+
async_job_name_data = AsyncJobNameData(
332+
user_id=query_params.user_id, product_name="osparc"
333+
)
334+
task_status = await celery_client.get_task_status(
335+
task_context=async_job_name_data.model_dump(), task_uuid=TaskUUID(future_id)
336+
)
325337
# first check if the task is in the app
326-
if task := get_completed_upload_tasks(request.app).get(task_name):
327-
if task.done():
328-
new_fmd: FileMetaData = task.result()
329-
get_completed_upload_tasks(request.app).pop(task_name)
330-
response = FileUploadCompleteFutureResponse(
331-
state=FileUploadCompleteState.OK, e_tag=new_fmd.entity_tag
332-
)
333-
else:
334-
# the task is still running
335-
response = FileUploadCompleteFutureResponse(
336-
state=FileUploadCompleteState.NOK
337-
)
338-
return Envelope[FileUploadCompleteFutureResponse](data=response)
339-
# there is no task, either wrong call or storage was restarted
340-
# we try to get the file to see if it exists in S3
341-
dsm = get_dsm_provider(request.app).get(location_id)
342-
if fmd := await dsm.get_file(
343-
user_id=query_params.user_id,
344-
file_id=file_id,
345-
):
346-
return Envelope[FileUploadCompleteFutureResponse](
347-
data=FileUploadCompleteFutureResponse(
348-
state=FileUploadCompleteState.OK, e_tag=fmd.entity_tag
349-
)
338+
if task_status.is_done:
339+
task_result = await celery_client.get_task_result(
340+
task_context=async_job_name_data.model_dump(), task_uuid=TaskUUID(future_id)
350341
)
351-
raise HTTPException(
352-
status.HTTP_404_NOT_FOUND,
353-
detail="Not found. Upload could not be completed. Please try again and contact support if it fails again.",
354-
)
342+
assert isinstance(task_result, FileMetaData), f"{task_result=}" # nosec
343+
new_fmd = task_result
344+
assert new_fmd.location_id == location_id # nosec
345+
assert new_fmd.file_id == file_id # nosec
346+
response = FileUploadCompleteFutureResponse(
347+
state=FileUploadCompleteState.OK, e_tag=new_fmd.entity_tag
348+
)
349+
else:
350+
# the task is still running
351+
response = FileUploadCompleteFutureResponse(state=FileUploadCompleteState.NOK)
352+
return Envelope[FileUploadCompleteFutureResponse](data=response)
355353

356354

357355
@router.delete(

0 commit comments

Comments
 (0)