Skip to content

Commit 2699607

Browse files
✨ Add zipping celery task which returns a download link instead of a path (#8089)
1 parent 359ff33 commit 2699607

File tree

6 files changed

+116
-11
lines changed

6 files changed

+116
-11
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/simcore_s3.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Literal
2+
13
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
24
AsyncJobFilter,
35
AsyncJobGet,
@@ -41,6 +43,7 @@ async def start_export_data(
4143
user_id: UserID,
4244
product_name: ProductName,
4345
paths_to_export: list[PathToExport],
46+
export_as: Literal["path", "download_link"],
4447
) -> tuple[AsyncJobGet, AsyncJobFilter]:
4548
job_filter = get_async_job_filter(user_id=user_id, product_name=product_name)
4649
async_job_rpc_get = await submit(
@@ -49,5 +52,6 @@ async def start_export_data(
4952
method_name=TypeAdapter(RPCMethodName).validate_python("start_export_data"),
5053
job_filter=job_filter,
5154
paths_to_export=paths_to_export,
55+
export_as=export_as,
5256
)
5357
return async_job_rpc_get, job_filter

services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
from aws_library.s3._models import S3ObjectKey
66
from celery import Task # type: ignore[import-untyped]
77
from celery_library.utils import get_app_server
8-
from models_library.api_schemas_storage.storage_schemas import FoldersBody
8+
from models_library.api_schemas_storage.storage_schemas import (
9+
FoldersBody,
10+
LinkType,
11+
PresignedLink,
12+
)
913
from models_library.api_schemas_webserver.storage import PathToExport
1014
from models_library.progress_bar import ProgressReport
1115
from models_library.projects_nodes_io import StorageFileID
@@ -100,3 +104,27 @@ async def _progress_cb(report: ProgressReport) -> None:
100104
return await dsm.create_s3_export(
101105
user_id, object_keys, progress_bar=progress_bar
102106
)
107+
108+
109+
async def export_data_as_download_link(
110+
task: Task,
111+
task_id: TaskID,
112+
*,
113+
user_id: UserID,
114+
paths_to_export: list[PathToExport],
115+
) -> PresignedLink:
116+
"""
117+
AccessRightError: in case user can't access project
118+
"""
119+
s3_object = await export_data(
120+
task=task, task_id=task_id, user_id=user_id, paths_to_export=paths_to_export
121+
)
122+
123+
dsm = get_dsm_provider(get_app_server(task.app).app).get(
124+
SimcoreS3DataManager.get_location_id()
125+
)
126+
127+
download_link = await dsm.create_file_download_link(
128+
user_id=user_id, file_id=s3_object, link_type=LinkType.PRESIGNED
129+
)
130+
return PresignedLink(link=download_link)

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,33 @@
77
from models_library.api_schemas_storage.storage_schemas import (
88
FileUploadCompletionBody,
99
FoldersBody,
10+
PresignedLink,
1011
)
1112
from servicelib.logging_utils import log_context
1213

1314
from ...models import FileMetaData
1415
from ._files import complete_upload_file
1516
from ._paths import compute_path_size, delete_paths
16-
from ._simcore_s3 import deep_copy_files_from_project, export_data
17+
from ._simcore_s3 import (
18+
deep_copy_files_from_project,
19+
export_data,
20+
export_data_as_download_link,
21+
)
1722

1823
_logger = logging.getLogger(__name__)
1924

2025

2126
def setup_worker_tasks(app: Celery) -> None:
2227
register_celery_types()
23-
register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody)
28+
register_pydantic_types(
29+
FileUploadCompletionBody, FileMetaData, FoldersBody, PresignedLink
30+
)
2431

2532
with log_context(_logger, logging.INFO, msg="worker task registration"):
2633
register_task(app, export_data, dont_autoretry_for=(AccessRightError,))
34+
register_task(
35+
app, export_data_as_download_link, dont_autoretry_for=(AccessRightError,)
36+
)
2737
register_task(app, compute_path_size)
2838
register_task(app, complete_upload_file)
2939
register_task(app, delete_paths)

services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Literal
2+
13
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
24
AsyncJobFilter,
35
AsyncJobGet,
@@ -8,7 +10,11 @@
810
from servicelib.celery.task_manager import TaskManager
911
from servicelib.rabbitmq import RPCRouter
1012

11-
from .._worker_tasks._simcore_s3 import deep_copy_files_from_project, export_data
13+
from .._worker_tasks._simcore_s3 import (
14+
deep_copy_files_from_project,
15+
export_data,
16+
export_data_as_download_link,
17+
)
1218

1319
router = RPCRouter()
1420

@@ -38,8 +44,14 @@ async def start_export_data(
3844
task_manager: TaskManager,
3945
job_filter: AsyncJobFilter,
4046
paths_to_export: list[PathToExport],
47+
export_as: Literal["path", "download_link"],
4148
) -> AsyncJobGet:
42-
task_name = export_data.__name__
49+
if export_as == "path":
50+
task_name = export_data.__name__
51+
elif export_as == "download_link":
52+
task_name = export_data_as_download_link.__name__
53+
else:
54+
raise ValueError(f"Invalid export_as value: {export_as}")
4355
task_filter = TaskFilter.model_validate(job_filter.model_dump())
4456
task_uuid = await task_manager.submit_task(
4557
task_metadata=TaskMetadata(

services/storage/tests/unit/test_rpc_handlers_simcore_s3.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from collections.abc import Awaitable, Callable
1414
from copy import deepcopy
1515
from pathlib import Path
16-
from typing import Any
16+
from typing import Any, Literal
1717
from unittest.mock import Mock
1818

1919
import httpx
@@ -30,6 +30,7 @@
3030
from models_library.api_schemas_storage.storage_schemas import (
3131
FileMetaDataGet,
3232
FoldersBody,
33+
PresignedLink,
3334
)
3435
from models_library.api_schemas_webserver.storage import PathToExport
3536
from models_library.basic_types import SHA256Str
@@ -52,6 +53,7 @@
5253
from pytest_simcore.helpers.storage_utils_project import clone_project_data
5354
from servicelib.aiohttp import status
5455
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
56+
from servicelib.rabbitmq._errors import RPCServerError
5557
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import wait_and_get_result
5658
from servicelib.rabbitmq.rpc_interfaces.storage.simcore_s3 import (
5759
copy_folders_from_project,
@@ -514,9 +516,10 @@ async def _request_start_export_data(
514516
user_id: UserID,
515517
product_name: ProductName,
516518
paths_to_export: list[PathToExport],
519+
export_as: Literal["path", "download_link"],
517520
*,
518521
client_timeout: datetime.timedelta = datetime.timedelta(seconds=60),
519-
) -> dict[str, Any]:
522+
) -> str:
520523
with log_context(
521524
logging.INFO,
522525
f"Data export form {paths_to_export=}",
@@ -526,6 +529,7 @@ async def _request_start_export_data(
526529
user_id=user_id,
527530
product_name=product_name,
528531
paths_to_export=paths_to_export,
532+
export_as=export_as,
529533
)
530534

531535
async for async_job_result in wait_and_get_result(
@@ -572,6 +576,10 @@ def task_progress_spy(mocker: MockerFixture) -> Mock:
572576
],
573577
ids=str,
574578
)
579+
@pytest.mark.parametrize(
580+
"export_as",
581+
["path", "download_link"],
582+
)
575583
async def test_start_export_data(
576584
initialized_app: FastAPI,
577585
short_dsm_cleaner_interval: int,
@@ -589,6 +597,7 @@ async def test_start_export_data(
589597
],
590598
project_params: ProjectWithFilesParams,
591599
task_progress_spy: Mock,
600+
export_as: Literal["path", "download_link"],
592601
):
593602
_, src_projects_list = await random_project_with_files(project_params)
594603

@@ -606,18 +615,32 @@ async def test_start_export_data(
606615
user_id,
607616
product_name,
608617
paths_to_export=list(nodes_in_project_to_export),
618+
export_as=export_as,
609619
)
610620

611-
assert re.fullmatch(
612-
rf"^exports/{user_id}/[0-9a-fA-F]{{8}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{12}}\.zip$",
613-
result,
614-
)
621+
if export_as == "path":
622+
assert re.fullmatch(
623+
rf"^exports/{user_id}/[0-9a-fA-F]{{8}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{12}}\.zip$",
624+
result,
625+
)
626+
elif export_as == "download_link":
627+
link = PresignedLink.model_validate(result).link
628+
assert re.search(
629+
rf"exports/{user_id}/[0-9a-fA-F]{{8}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{12}}\.zip",
630+
f"{link}",
631+
)
632+
else:
633+
pytest.fail(f"Unexpected export_as value: {export_as}")
615634

616635
progress_updates = [x[0][2].actual_value for x in task_progress_spy.call_args_list]
617636
assert progress_updates[0] == 0
618637
assert progress_updates[-1] == 1
619638

620639

640+
@pytest.mark.parametrize(
641+
"export_as",
642+
["path", "download_link"],
643+
)
621644
async def test_start_export_data_access_error(
622645
initialized_app: FastAPI,
623646
short_dsm_cleaner_interval: int,
@@ -626,6 +649,7 @@ async def test_start_export_data_access_error(
626649
user_id: UserID,
627650
product_name: ProductName,
628651
faker: Faker,
652+
export_as: Literal["path", "download_link"],
629653
):
630654
path_to_export = TypeAdapter(PathToExport).validate_python(
631655
f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}"
@@ -637,9 +661,35 @@ async def test_start_export_data_access_error(
637661
product_name,
638662
paths_to_export=[path_to_export],
639663
client_timeout=datetime.timedelta(seconds=60),
664+
export_as=export_as,
640665
)
641666

642667
assert isinstance(exc.value, JobError)
643668
assert exc.value.exc_type == "AccessRightError"
644669
assert f" {user_id} " in f"{exc.value}"
645670
assert f" {path_to_export} " in f"{exc.value}"
671+
672+
673+
async def test_start_export_invalid_export_format(
674+
initialized_app: FastAPI,
675+
short_dsm_cleaner_interval: int,
676+
with_storage_celery_worker: TestWorkController,
677+
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
678+
user_id: UserID,
679+
product_name: ProductName,
680+
faker: Faker,
681+
):
682+
path_to_export = TypeAdapter(PathToExport).validate_python(
683+
f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}"
684+
)
685+
with pytest.raises(RPCServerError) as exc:
686+
await _request_start_export_data(
687+
storage_rabbitmq_rpc_client,
688+
user_id,
689+
product_name,
690+
paths_to_export=[path_to_export],
691+
client_timeout=datetime.timedelta(seconds=60),
692+
export_as="invalid_format", # type: ignore
693+
)
694+
695+
assert exc.value.exc_type == "builtins.ValueError"

services/web/server/src/simcore_service_webserver/storage/_rest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ def allow_only_simcore(cls, v: int) -> int:
498498
user_id=_req_ctx.user_id,
499499
product_name=_req_ctx.product_name,
500500
paths_to_export=export_data_post.paths,
501+
export_as="path",
501502
)
502503
_job_id = f"{async_job_rpc_get.job_id}"
503504
return create_data_response(

0 commit comments

Comments
 (0)