Skip to content

Commit d5b77c2

Browse files
committed
test worker task
1 parent bc0c209 commit d5b77c2

File tree

1 file changed

+218
-0
lines changed

1 file changed

+218
-0
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
# pylint:disable=no-name-in-module
2+
# pylint:disable=protected-access
3+
# pylint:disable=redefined-outer-name
4+
# pylint:disable=too-many-arguments
5+
# pylint:disable=too-many-positional-arguments
6+
# pylint:disable=unused-argument
7+
# pylint:disable=unused-variable
8+
9+
10+
import random
11+
from pathlib import Path
12+
from typing import Any, TypeAlias
13+
14+
import httpx
15+
import pytest
16+
from celery import Celery, Task
17+
from faker import Faker
18+
from fastapi import FastAPI
19+
from models_library.projects_nodes_io import LocationID, NodeID, SimcoreS3FileID
20+
from models_library.users import UserID
21+
from pydantic import ByteSize, TypeAdapter
22+
from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams
23+
from simcore_service_storage.api._worker_tasks._paths import compute_path_size
24+
from simcore_service_storage.modules.celery.utils import set_fastapi_app
25+
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager
26+
27+
pytest_simcore_core_services_selection = ["postgres"]
28+
pytest_simcore_ops_services_selection = ["adminer"]
29+
30+
_IsFile: TypeAlias = bool
31+
32+
33+
def _filter_and_group_paths_one_level_deeper(
34+
paths: list[Path], prefix: Path
35+
) -> list[tuple[Path, _IsFile]]:
36+
relative_paths = (path for path in paths if path.is_relative_to(prefix))
37+
return sorted(
38+
{
39+
(
40+
(path, len(path.relative_to(prefix).parts) == 1)
41+
if len(path.relative_to(prefix).parts) == 1
42+
else (prefix / path.relative_to(prefix).parts[0], False)
43+
)
44+
for path in relative_paths
45+
},
46+
key=lambda x: x[0],
47+
)
48+
49+
50+
async def _assert_compute_path_size(
51+
celery_task: Task,
52+
location_id: LocationID,
53+
user_id: UserID,
54+
*,
55+
path: Path,
56+
expected_total_size: int,
57+
) -> ByteSize:
58+
response = await compute_path_size(
59+
celery_task, user_id=user_id, location_id=location_id, path=path
60+
)
61+
assert isinstance(response, ByteSize)
62+
assert response == expected_total_size
63+
return response
64+
65+
66+
@pytest.fixture
67+
def fake_celery_task(celery_app: Celery, initialized_app: FastAPI) -> Task:
68+
celery_task = Task()
69+
celery_task.app = celery_app
70+
set_fastapi_app(celery_app, initialized_app)
71+
return celery_task
72+
73+
74+
@pytest.mark.parametrize(
75+
"location_id",
76+
[SimcoreS3DataManager.get_location_id()],
77+
ids=[SimcoreS3DataManager.get_location_name()],
78+
indirect=True,
79+
)
80+
@pytest.mark.parametrize(
81+
"project_params",
82+
[
83+
ProjectWithFilesParams(
84+
num_nodes=5,
85+
allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),),
86+
workspace_files_count=10,
87+
)
88+
],
89+
ids=str,
90+
)
91+
async def test_path_compute_size(
92+
fake_celery_task: Task,
93+
initialized_app: FastAPI,
94+
client: httpx.AsyncClient,
95+
location_id: LocationID,
96+
user_id: UserID,
97+
with_random_project_with_files: tuple[
98+
dict[str, Any],
99+
dict[NodeID, dict[SimcoreS3FileID, FileIDDict]],
100+
],
101+
project_params: ProjectWithFilesParams,
102+
):
103+
assert (
104+
len(project_params.allowed_file_sizes) == 1
105+
), "test preconditions are not filled! allowed file sizes should have only 1 option for this test"
106+
project, list_of_files = with_random_project_with_files
107+
108+
total_num_files = sum(
109+
len(files_in_node) for files_in_node in list_of_files.values()
110+
)
111+
112+
# get size of a full project
113+
expected_total_size = project_params.allowed_file_sizes[0] * total_num_files
114+
path = Path(project["uuid"])
115+
await _assert_compute_path_size(
116+
fake_celery_task,
117+
location_id,
118+
user_id,
119+
path=path,
120+
expected_total_size=expected_total_size,
121+
)
122+
123+
# get size of one of the nodes
124+
selected_node_id = NodeID(random.choice(list(project["workbench"]))) # noqa: S311
125+
path = Path(project["uuid"]) / f"{selected_node_id}"
126+
selected_node_s3_keys = [
127+
Path(s3_object_id) for s3_object_id in list_of_files[selected_node_id]
128+
]
129+
expected_total_size = project_params.allowed_file_sizes[0] * len(
130+
selected_node_s3_keys
131+
)
132+
await _assert_compute_path_size(
133+
fake_celery_task,
134+
location_id,
135+
user_id,
136+
path=path,
137+
expected_total_size=expected_total_size,
138+
)
139+
140+
# get size of the outputs of one of the nodes
141+
path = Path(project["uuid"]) / f"{selected_node_id}" / "outputs"
142+
selected_node_s3_keys = [
143+
Path(s3_object_id)
144+
for s3_object_id in list_of_files[selected_node_id]
145+
if s3_object_id.startswith(f"{path}")
146+
]
147+
expected_total_size = project_params.allowed_file_sizes[0] * len(
148+
selected_node_s3_keys
149+
)
150+
await _assert_compute_path_size(
151+
fake_celery_task,
152+
location_id,
153+
user_id,
154+
path=path,
155+
expected_total_size=expected_total_size,
156+
)
157+
158+
# get size of workspace in one of the nodes (this is semi-cached in the DB)
159+
path = Path(project["uuid"]) / f"{selected_node_id}" / "workspace"
160+
selected_node_s3_keys = [
161+
Path(s3_object_id)
162+
for s3_object_id in list_of_files[selected_node_id]
163+
if s3_object_id.startswith(f"{path}")
164+
]
165+
expected_total_size = project_params.allowed_file_sizes[0] * len(
166+
selected_node_s3_keys
167+
)
168+
workspace_total_size = await _assert_compute_path_size(
169+
fake_celery_task,
170+
location_id,
171+
user_id,
172+
path=path,
173+
expected_total_size=expected_total_size,
174+
)
175+
176+
# get size of folders inside the workspace
177+
folders_inside_workspace = [
178+
p[0]
179+
for p in _filter_and_group_paths_one_level_deeper(selected_node_s3_keys, path)
180+
if p[1] is False
181+
]
182+
accumulated_subfolder_size = 0
183+
for workspace_subfolder in folders_inside_workspace:
184+
selected_node_s3_keys = [
185+
Path(s3_object_id)
186+
for s3_object_id in list_of_files[selected_node_id]
187+
if s3_object_id.startswith(f"{workspace_subfolder}")
188+
]
189+
expected_total_size = project_params.allowed_file_sizes[0] * len(
190+
selected_node_s3_keys
191+
)
192+
accumulated_subfolder_size += await _assert_compute_path_size(
193+
fake_celery_task,
194+
location_id,
195+
user_id,
196+
path=workspace_subfolder,
197+
expected_total_size=expected_total_size,
198+
)
199+
200+
assert workspace_total_size == accumulated_subfolder_size
201+
202+
203+
async def test_path_compute_size_inexistent_path(
204+
fake_celery_task: Task,
205+
initialized_app: FastAPI,
206+
client: httpx.AsyncClient,
207+
location_id: LocationID,
208+
user_id: UserID,
209+
faker: Faker,
210+
fake_datcore_tokens: tuple[str, str],
211+
):
212+
await _assert_compute_path_size(
213+
fake_celery_task,
214+
location_id,
215+
user_id,
216+
path=Path(faker.file_path(absolute=False)),
217+
expected_total_size=0,
218+
)

0 commit comments

Comments
 (0)