Skip to content

Commit d91899e

Browse files
author
Andrei Neagu
committed
updated interface and tests
1 parent 4185157 commit d91899e

File tree

6 files changed

+137
-94
lines changed

6 files changed

+137
-94
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/simcore_s3.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ async def start_data_export(
3737
user_id: UserID,
3838
product_name: str,
3939
paths_to_export: list[StorageFileID],
40-
) -> AsyncJobGet:
40+
) -> tuple[AsyncJobGet, AsyncJobNameData]:
4141
job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name)
42-
return await submit(
42+
async_job_rpc_get = await submit(
4343
rabbitmq_rpc_client,
4444
rpc_namespace=STORAGE_RPC_NAMESPACE,
4545
method_name=TypeAdapter(RPCMethodName).validate_python("start_data_export"),
4646
job_id_data=job_id_data,
4747
paths_to_export=paths_to_export,
4848
)
49+
return async_job_rpc_get, job_id_data

services/storage/tests/unit/api/_worker_tasks/conftest.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

services/storage/tests/unit/api/_worker_tasks/test__data_export.py

Lines changed: 0 additions & 58 deletions
This file was deleted.

services/storage/tests/unit/test_rpc_handlers_simcore_s3.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import asyncio
1010
import datetime
1111
import logging
12+
import re
1213
from collections.abc import Awaitable, Callable
1314
from copy import deepcopy
1415
from pathlib import Path
@@ -29,9 +30,11 @@
2930
)
3031
from models_library.basic_types import SHA256Str
3132
from models_library.products import ProductName
33+
from models_library.progress_bar import ProgressReport, ProgressStructuredMessage
3234
from models_library.projects_nodes_io import NodeID, NodeIDStr, SimcoreS3FileID
3335
from models_library.users import UserID
3436
from pydantic import ByteSize, TypeAdapter
37+
from pytest_mock import MockerFixture
3538
from pytest_simcore.helpers.fastapi import url_from_operation_id
3639
from pytest_simcore.helpers.httpx_assert_checks import assert_status
3740
from pytest_simcore.helpers.logging_tools import log_context
@@ -49,6 +52,7 @@
4952
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import wait_and_get_result
5053
from servicelib.rabbitmq.rpc_interfaces.storage.simcore_s3 import (
5154
copy_folders_from_project,
55+
start_data_export,
5256
)
5357
from simcore_postgres_database.storage_models import file_meta_data
5458
from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker
@@ -501,3 +505,132 @@ async def test_create_and_delete_folders_from_project(
501505
for _ in range(num_concurrent_calls)
502506
]
503507
)
508+
509+
510+
async def _request_start_data_export(
511+
rpc_client: RabbitMQRPCClient,
512+
user_id: UserID,
513+
product_name: ProductName,
514+
paths_to_export: list[SimcoreS3FileID],
515+
*,
516+
client_timeout: datetime.timedelta = datetime.timedelta(seconds=60),
517+
) -> dict[str, Any]:
518+
with log_context(
519+
logging.INFO,
520+
f"Data export form {paths_to_export=}",
521+
) as ctx:
522+
async_job_get, async_job_name = await start_data_export(
523+
rpc_client,
524+
user_id=user_id,
525+
product_name=product_name,
526+
paths_to_export=[],
527+
)
528+
529+
async for async_job_result in wait_and_get_result(
530+
rpc_client,
531+
rpc_namespace=STORAGE_RPC_NAMESPACE,
532+
method_name=copy_folders_from_project.__name__,
533+
job_id=async_job_get.job_id,
534+
job_id_data=async_job_name,
535+
client_timeout=client_timeout,
536+
):
537+
ctx.logger.info("%s", f"<-- current state is {async_job_result=}")
538+
if async_job_result.done:
539+
result = await async_job_result.result()
540+
assert isinstance(result, AsyncJobResult)
541+
return result.result
542+
543+
pytest.fail(reason="data export failed!")
544+
545+
546+
@pytest.fixture
547+
def mock_task_progress(mocker: MockerFixture) -> list[ProgressReport]:
548+
progress_updates = []
549+
550+
async def _progress(*args, **_) -> None:
551+
progress_updates.append(args[2])
552+
553+
mocker.patch(
554+
"simcore_service_storage.modules.celery.worker.CeleryTaskQueueWorker.set_task_progress",
555+
side_effect=_progress,
556+
)
557+
return progress_updates
558+
559+
560+
@pytest.mark.parametrize(
561+
"location_id",
562+
[SimcoreS3DataManager.get_location_id()],
563+
ids=[SimcoreS3DataManager.get_location_name()],
564+
indirect=True,
565+
)
566+
@pytest.mark.parametrize(
567+
"project_params",
568+
[
569+
ProjectWithFilesParams(
570+
num_nodes=1,
571+
allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("210Mib"),),
572+
allowed_file_checksums=(
573+
TypeAdapter(SHA256Str).validate_python(
574+
"0b3216d95ec5a36c120ba16c88911dcf5ff655925d0fbdbc74cf95baf86de6fc"
575+
),
576+
),
577+
workspace_files_count=0,
578+
),
579+
],
580+
ids=str,
581+
)
582+
async def test_start_data_export(
583+
initialized_app: FastAPI,
584+
short_dsm_cleaner_interval: int,
585+
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
586+
user_id: UserID,
587+
product_name: ProductName,
588+
create_project: Callable[[], Awaitable[dict[str, Any]]],
589+
sqlalchemy_async_engine: AsyncEngine,
590+
random_project_with_files: Callable[
591+
[ProjectWithFilesParams],
592+
Awaitable[
593+
tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]]]
594+
],
595+
],
596+
project_params: ProjectWithFilesParams,
597+
mock_task_progress: list[ProgressReport],
598+
):
599+
_, src_projects_list = await random_project_with_files(project_params)
600+
601+
paths_to_export: set[SimcoreS3FileID] = set()
602+
for x in src_projects_list.values():
603+
paths_to_export |= x.keys()
604+
605+
result = await _request_start_data_export(
606+
storage_rabbitmq_rpc_client,
607+
user_id,
608+
product_name,
609+
paths_to_export=list(paths_to_export),
610+
)
611+
612+
assert re.fullmatch(
613+
rf"^exports/{user_id}/[0-9a-fA-F]{{8}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{12}}\.zip$",
614+
result,
615+
)
616+
617+
assert mock_task_progress == [
618+
ProgressReport(
619+
actual_value=0.0,
620+
total=1.0,
621+
attempt=0,
622+
unit=None,
623+
message=ProgressStructuredMessage(
624+
description="data export", current=0.0, total=1, unit=None, sub=None
625+
),
626+
),
627+
ProgressReport(
628+
actual_value=1.0,
629+
total=1.0,
630+
attempt=0,
631+
unit=None,
632+
message=ProgressStructuredMessage(
633+
description="data export", current=1.0, total=1, unit=None, sub=None
634+
),
635+
),
636+
]

services/web/server/src/simcore_service_webserver/storage/_rest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ def allow_only_simcore(cls, v: int) -> int:
458458
data_export_post = await parse_request_body_as(
459459
model_schema_cls=DataExportPost, request=request
460460
)
461-
async_job_rpc_get = await start_data_export(
461+
async_job_rpc_get, _ = await start_data_export(
462462
rabbitmq_rpc_client=rabbitmq_rpc_client,
463463
user_id=_req_ctx.user_id,
464464
product_name=_req_ctx.product_name,

0 commit comments

Comments
 (0)