Skip to content

Commit 5f1c5d7

Browse files
authored
✨Storage: batchDelete (#7450)
1 parent bc543e5 commit 5f1c5d7

File tree

13 files changed

+353
-20
lines changed

13 files changed

+353
-20
lines changed

api/specs/web-server/_storage.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
PresignedLink,
2323
)
2424
from models_library.api_schemas_webserver.storage import (
25+
BatchDeletePathsBodyParams,
2526
DataExportPost,
2627
ListPathsQueryParams,
2728
StorageLocationPathParams,
@@ -80,6 +81,19 @@ async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depen
8081
"""Compute the size of a path"""
8182

8283

84+
@router.post(
85+
"/storage/locations/{location_id}/-/paths:batchDelete",
86+
response_model=Envelope[TaskGet],
87+
status_code=status.HTTP_202_ACCEPTED,
88+
description="Deletes Paths",
89+
)
90+
async def batch_delete_paths(
91+
_path: Annotated[StorageLocationPathParams, Depends()],
92+
_body: Annotated[BatchDeletePathsBodyParams, Depends()],
93+
):
94+
"""deletes files/folders if user has the rights to"""
95+
96+
8397
@router.get(
8498
"/storage/locations/{location_id}/datasets",
8599
response_model=Envelope[list[DatasetMetaData]],

packages/models-library/src/models_library/api_schemas_webserver/storage.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ class ListPathsQueryParams(InputSchema, CursorQueryParameters):
3636
] = DEFAULT_NUMBER_OF_PATHS_PER_PAGE
3737

3838

39+
class BatchDeletePathsBodyParams(InputSchema):
40+
paths: set[Path]
41+
42+
3943
class DataExportPost(InputSchema):
4044
paths: list[StorageFileID]
4145

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,23 @@ async def compute_path_size(
3232
path=path,
3333
)
3434
return async_job_rpc_get, job_id_data
35+
36+
37+
async def delete_paths(
38+
client: RabbitMQRPCClient,
39+
*,
40+
user_id: UserID,
41+
product_name: ProductName,
42+
location_id: LocationID,
43+
paths: set[Path],
44+
) -> tuple[AsyncJobGet, AsyncJobNameData]:
45+
job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name)
46+
async_job_rpc_get = await submit(
47+
rabbitmq_rpc_client=client,
48+
rpc_namespace=STORAGE_RPC_NAMESPACE,
49+
method_name=RPCMethodName("delete_paths"),
50+
job_id_data=job_id_data,
51+
location_id=location_id,
52+
paths=paths,
53+
)
54+
return async_job_rpc_get, job_id_data

services/storage/src/simcore_service_storage/api/_worker_tasks/_paths.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
from pathlib import Path
33

44
from celery import Task # type: ignore[import-untyped]
5-
from models_library.projects_nodes_io import LocationID
5+
from models_library.projects_nodes_io import LocationID, StorageFileID
66
from models_library.users import UserID
7-
from pydantic import ByteSize
7+
from pydantic import ByteSize, TypeAdapter
88
from servicelib.logging_utils import log_context
9+
from servicelib.utils import limited_gather
910

11+
from ...constants import MAX_CONCURRENT_S3_TASKS
1012
from ...dsm import get_dsm_provider
1113
from ...modules.celery.models import TaskId
1214
from ...modules.celery.utils import get_fastapi_app
@@ -25,3 +27,26 @@ async def compute_path_size(
2527
):
2628
dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id)
2729
return await dsm.compute_path_size(user_id, path=Path(path))
30+
31+
32+
async def delete_paths(
33+
task: Task,
34+
task_id: TaskId,
35+
user_id: UserID,
36+
location_id: LocationID,
37+
paths: set[Path],
38+
) -> None:
39+
assert task_id # nosec
40+
with log_context(
41+
_logger,
42+
logging.INFO,
43+
msg=f"delete {paths=} in {location_id=} for {user_id=}",
44+
):
45+
dsm = get_dsm_provider(get_fastapi_app(task.app)).get(location_id)
46+
files_ids: set[StorageFileID] = {
47+
TypeAdapter(StorageFileID).validate_python(f"{path}") for path in paths
48+
}
49+
await limited_gather(
50+
*[dsm.delete_file(user_id, file_id) for file_id in files_ids],
51+
limit=MAX_CONCURRENT_S3_TASKS,
52+
)

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from ...modules.celery._task import define_task
88
from ...modules.celery.tasks import export_data
99
from ._files import complete_upload_file
10-
from ._paths import compute_path_size
10+
from ._paths import compute_path_size, delete_paths
1111
from ._simcore_s3 import deep_copy_files_from_project
1212

1313
_logger = logging.getLogger(__name__)
@@ -22,5 +22,6 @@ def setup_worker_tasks(app: Celery) -> None:
2222
):
2323
define_task(app, export_data)
2424
define_task(app, compute_path_size)
25+
define_task(app, delete_paths)
2526
define_task(app, complete_upload_file)
2627
define_task(app, deep_copy_files_from_project)

services/storage/src/simcore_service_storage/api/rpc/_paths.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from ...modules.celery import get_celery_client
1313
from .._worker_tasks._paths import compute_path_size as remote_compute_path_size
14+
from .._worker_tasks._paths import delete_paths as remote_delete_paths
1415

1516
_logger = logging.getLogger(__name__)
1617
router = RPCRouter()
@@ -20,7 +21,6 @@
2021
async def compute_path_size(
2122
app: FastAPI,
2223
job_id_data: AsyncJobNameData,
23-
# user_id: UserID,
2424
location_id: LocationID,
2525
path: Path,
2626
) -> AsyncJobGet:
@@ -33,3 +33,20 @@ async def compute_path_size(
3333
)
3434

3535
return AsyncJobGet(job_id=task_uuid)
36+
37+
38+
@router.expose(reraise_if_error_type=None)
39+
async def delete_paths(
40+
app: FastAPI,
41+
job_id_data: AsyncJobNameData,
42+
location_id: LocationID,
43+
paths: set[Path],
44+
) -> AsyncJobGet:
45+
task_uuid = await get_celery_client(app).send_task(
46+
remote_delete_paths.__name__,
47+
task_context=job_id_data.model_dump(),
48+
user_id=job_id_data.user_id,
49+
location_id=location_id,
50+
paths=paths,
51+
)
52+
return AsyncJobGet(job_id=task_uuid)

services/storage/src/simcore_service_storage/modules/celery/_celery_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@ def register_celery_types() -> None:
5959
_register_pydantic_types(FileMetaData)
6060
_register_pydantic_types(FoldersBody)
6161
_register_pydantic_types(TaskError)
62+
register_type(set, _class_full_name(set), encoder=list, decoder=set)

services/storage/tests/unit/test_rpc_handlers_paths.py

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
# pylint:disable=unused-variable
88

99

10-
import asyncio
1110
import datetime
1211
import random
1312
from pathlib import Path
@@ -31,7 +30,10 @@
3130
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import (
3231
wait_and_get_result,
3332
)
34-
from servicelib.rabbitmq.rpc_interfaces.storage.paths import compute_path_size
33+
from servicelib.rabbitmq.rpc_interfaces.storage.paths import (
34+
compute_path_size,
35+
delete_paths,
36+
)
3537
from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker
3638
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager
3739

@@ -74,7 +76,6 @@ async def _assert_compute_path_size(
7476
location_id=location_id,
7577
path=path,
7678
)
77-
await asyncio.sleep(1)
7879
async for job_composed_result in wait_and_get_result(
7980
storage_rpc_client,
8081
rpc_namespace=STORAGE_RPC_NAMESPACE,
@@ -91,6 +92,39 @@ async def _assert_compute_path_size(
9192
return received_size
9293

9394
pytest.fail("Job did not finish")
95+
return ByteSize(0) # for mypy
96+
97+
98+
async def _assert_delete_paths(
99+
storage_rpc_client: RabbitMQRPCClient,
100+
location_id: LocationID,
101+
user_id: UserID,
102+
product_name: ProductName,
103+
*,
104+
paths: set[Path],
105+
) -> None:
106+
async_job, async_job_name = await delete_paths(
107+
storage_rpc_client,
108+
product_name=product_name,
109+
user_id=user_id,
110+
location_id=location_id,
111+
paths=paths,
112+
)
113+
async for job_composed_result in wait_and_get_result(
114+
storage_rpc_client,
115+
rpc_namespace=STORAGE_RPC_NAMESPACE,
116+
method_name=RPCMethodName(compute_path_size.__name__),
117+
job_id=async_job.job_id,
118+
job_id_data=AsyncJobNameData(user_id=user_id, product_name=product_name),
119+
client_timeout=datetime.timedelta(seconds=120),
120+
):
121+
if job_composed_result.done:
122+
response = await job_composed_result.result()
123+
assert isinstance(response, AsyncJobResult)
124+
assert response.result is None
125+
return
126+
127+
pytest.fail("Job did not finish")
94128

95129

96130
@pytest.mark.parametrize(
@@ -246,3 +280,107 @@ async def test_path_compute_size_inexistent_path(
246280
expected_total_size=0,
247281
product_name=product_name,
248282
)
283+
284+
285+
@pytest.mark.parametrize(
286+
"location_id",
287+
[SimcoreS3DataManager.get_location_id()],
288+
ids=[SimcoreS3DataManager.get_location_name()],
289+
indirect=True,
290+
)
291+
async def test_delete_paths_empty_set(
292+
initialized_app: FastAPI,
293+
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
294+
user_id: UserID,
295+
location_id: LocationID,
296+
product_name: ProductName,
297+
with_storage_celery_worker: CeleryTaskQueueWorker,
298+
):
299+
await _assert_delete_paths(
300+
storage_rabbitmq_rpc_client,
301+
location_id,
302+
user_id,
303+
product_name,
304+
paths=set(),
305+
)
306+
307+
308+
@pytest.mark.parametrize(
309+
"location_id",
310+
[SimcoreS3DataManager.get_location_id()],
311+
ids=[SimcoreS3DataManager.get_location_name()],
312+
indirect=True,
313+
)
314+
@pytest.mark.parametrize(
315+
"project_params",
316+
[
317+
ProjectWithFilesParams(
318+
num_nodes=1,
319+
allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),),
320+
workspace_files_count=15,
321+
)
322+
],
323+
ids=str,
324+
)
325+
async def test_delete_paths(
326+
initialized_app: FastAPI,
327+
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
328+
user_id: UserID,
329+
location_id: LocationID,
330+
with_random_project_with_files: tuple[
331+
dict[str, Any],
332+
dict[NodeID, dict[SimcoreS3FileID, FileIDDict]],
333+
],
334+
project_params: ProjectWithFilesParams,
335+
product_name: ProductName,
336+
with_storage_celery_worker: CeleryTaskQueueWorker,
337+
):
338+
assert (
339+
len(project_params.allowed_file_sizes) == 1
340+
), "test preconditions are not filled! allowed file sizes should have only 1 option for this test"
341+
project, list_of_files = with_random_project_with_files
342+
343+
total_num_files = sum(
344+
len(files_in_node) for files_in_node in list_of_files.values()
345+
)
346+
347+
# get size of a full project
348+
expected_total_size = project_params.allowed_file_sizes[0] * total_num_files
349+
path = Path(project["uuid"])
350+
await _assert_compute_path_size(
351+
storage_rabbitmq_rpc_client,
352+
location_id,
353+
user_id,
354+
path=path,
355+
expected_total_size=expected_total_size,
356+
product_name=product_name,
357+
)
358+
359+
# now select multiple random files to delete
360+
selected_paths = random.sample(
361+
list(
362+
list_of_files[
363+
NodeID(random.choice(list(project["workbench"]))) # noqa: S311
364+
]
365+
),
366+
round(project_params.workspace_files_count / 2),
367+
)
368+
369+
await _assert_delete_paths(
370+
storage_rabbitmq_rpc_client,
371+
location_id,
372+
user_id,
373+
product_name,
374+
paths=set({Path(_) for _ in selected_paths}),
375+
)
376+
377+
# the size is reduced by the amount of deleted files
378+
await _assert_compute_path_size(
379+
storage_rabbitmq_rpc_client,
380+
location_id,
381+
user_id,
382+
path=path,
383+
expected_total_size=expected_total_size
384+
- len(selected_paths) * project_params.allowed_file_sizes[0],
385+
product_name=product_name,
386+
)

services/web/server/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.61.4
1+
0.61.5

services/web/server/setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.61.4
2+
current_version = 0.61.5
33
commit = True
44
message = services/webserver api version: {current_version} → {new_version}
55
tag = False

0 commit comments

Comments
 (0)