77# pylint:disable=unused-variable
88
99
10- import logging
11- import random
1210from collections .abc import Awaitable , Callable
1311from pathlib import Path
14- from typing import Any , TypeAlias
12+ from unittest import mock
1513
16- import httpx
1714import pytest
18- from celery import Celery , Task
1915from faker import Faker
2016from fastapi import FastAPI
21- from models_library .api_schemas_rpc_async_jobs .async_jobs import AsyncJobId
22- from models_library .api_schemas_storage import STORAGE_RPC_NAMESPACE
23- from models_library .api_schemas_webserver .storage import StorageAsyncJobGet
24- from models_library .projects_nodes_io import LocationID , NodeID , SimcoreS3FileID
17+ from models_library .projects_nodes_io import LocationID
2518from models_library .users import UserID
26- from pydantic import ByteSize , TypeAdapter
27- from pytest_simcore .helpers .storage_utils import FileIDDict , ProjectWithFilesParams
19+ from pytest_mock import MockerFixture
2820from servicelib .rabbitmq ._client_rpc import RabbitMQRPCClient
29- from servicelib .rabbitmq .rpc_interfaces .async_jobs .async_jobs import (
30- get_result ,
31- get_status ,
32- )
3321from servicelib .rabbitmq .rpc_interfaces .storage .paths import compute_path_size
3422from simcore_service_storage .simcore_s3_dsm import SimcoreS3DataManager
35- from tenacity import (
36- before_sleep_log ,
37- retry ,
38- retry_if_exception_type ,
39- stop_after_delay ,
40- wait_fixed ,
41- )
4223
4324pytest_simcore_core_services_selection = ["postgres" , "rabbit" ]
4425pytest_simcore_ops_services_selection = ["adminer" ]
4526
46- _IsFile : TypeAlias = bool
47-
4827
4928@pytest .fixture
5029async def storage_rabbitmq_rpc_client (
@@ -55,98 +34,15 @@ async def storage_rabbitmq_rpc_client(
5534 return rpc_client
5635
5736
58- def _filter_and_group_paths_one_level_deeper (
59- paths : list [Path ], prefix : Path
60- ) -> list [tuple [Path , _IsFile ]]:
61- relative_paths = (path for path in paths if path .is_relative_to (prefix ))
62- return sorted (
63- {
64- (
65- (path , len (path .relative_to (prefix ).parts ) == 1 )
66- if len (path .relative_to (prefix ).parts ) == 1
67- else (prefix / path .relative_to (prefix ).parts [0 ], False )
68- )
69- for path in relative_paths
70- },
71- key = lambda x : x [0 ],
72- )
73-
74-
75- _logger = logging .getLogger (__name__ )
76-
77-
78- async def _assert_compute_path_size (
79- rpc_client : RabbitMQRPCClient ,
80- location_id : LocationID ,
81- user_id : UserID ,
82- * ,
83- path : Path ,
84- expected_total_size : int ,
85- ) -> ByteSize :
86- received , job_id_data = await compute_path_size (
87- rpc_client , user_id = user_id , product_name = "" , location_id = location_id , path = path
88- )
89-
90- assert isinstance (received , StorageAsyncJobGet )
91-
92- @retry (
93- wait = wait_fixed (1 ),
94- stop = stop_after_delay (10 ),
95- retry = retry_if_exception_type (AssertionError ),
96- before_sleep = before_sleep_log (_logger , logging .WARNING ),
97- )
98- async def _wait_for_job_completion (job_id : AsyncJobId ) -> None :
99- job_status = await get_status (
100- rpc_client ,
101- rpc_namespace = STORAGE_RPC_NAMESPACE ,
102- job_id = job_id ,
103- job_id_data = job_id_data ,
104- )
105- assert job_status .done
106-
107- await _wait_for_job_completion (received .job_id )
108- job_result = await get_result (
109- rpc_client ,
110- rpc_namespace = STORAGE_RPC_NAMESPACE ,
111- job_id = received .job_id ,
112- job_id_data = job_id_data ,
113- )
114- assert job_result .result is not None
115- assert job_result .error is None
116- response = job_result .result
117- assert isinstance (response , ByteSize )
118- assert response == expected_total_size
119- return response
120-
121-
122- @pytest .fixture (scope = "session" )
123- def celery_config () -> dict [str , Any ]:
124- return {
125- "broker_url" : "amqp://admin:adminadmin@localhost" ,
126- "result_backend" : "redis://redis:6789" ,
127- "result_extended" : True ,
128- "pool" : "threads" ,
129- }
130-
131-
132- @pytest .fixture (scope = "session" )
133- def celery_app (celery_config ):
134- app = Celery (** celery_config )
135- return app
136-
137-
13837@pytest .fixture
139- def fake_storage_worker (celery_app : Celery , celery_worker ):
140- @celery_app .task
141- def compute_path_size (
142- task : Task , user_id : UserID , location_id : LocationID , path : Path
143- ):
144- return ByteSize (23243 )
145-
146- assert celery_worker .ready ()
147-
38+ async def mock_celery_send_task (mocker : MockerFixture , faker : Faker ) -> mock .AsyncMock :
39+ def mocked_send_task (* args , ** kwargs ):
40+ return faker .uuid4 ()
14841
149- def test_fake_storage_worker (fake_storage_worker ): ...
42+ return mocker .patch (
43+ "simcore_service_storage.modules.celery.client.CeleryTaskQueueClient.send_task" ,
44+ side_effect = mocked_send_task ,
45+ )
15046
15147
15248@pytest .mark .parametrize (
@@ -155,143 +51,21 @@ def test_fake_storage_worker(fake_storage_worker): ...
15551 ids = [SimcoreS3DataManager .get_location_name ()],
15652 indirect = True ,
15753)
158- @pytest .mark .parametrize (
159- "project_params" ,
160- [
161- ProjectWithFilesParams (
162- num_nodes = 5 ,
163- allowed_file_sizes = (TypeAdapter (ByteSize ).validate_python ("1b" ),),
164- workspace_files_count = 10 ,
165- )
166- ],
167- ids = str ,
168- )
169- async def test_path_compute_size (
170- fake_storage_worker ,
54+ async def test_path_compute_size_calls_in_celery (
17155 initialized_app : FastAPI ,
17256 storage_rabbitmq_rpc_client : RabbitMQRPCClient ,
17357 location_id : LocationID ,
17458 user_id : UserID ,
175- with_random_project_with_files : tuple [
176- dict [str , Any ],
177- dict [NodeID , dict [SimcoreS3FileID , FileIDDict ]],
178- ],
179- project_params : ProjectWithFilesParams ,
180- ):
181- assert (
182- len (project_params .allowed_file_sizes ) == 1
183- ), "test preconditions are not filled! allowed file sizes should have only 1 option for this test"
184- project , list_of_files = with_random_project_with_files
185-
186- total_num_files = sum (
187- len (files_in_node ) for files_in_node in list_of_files .values ()
188- )
189-
190- # get size of a full project
191- expected_total_size = project_params .allowed_file_sizes [0 ] * total_num_files
192- path = Path (project ["uuid" ])
193- await _assert_compute_path_size (
194- storage_rabbitmq_rpc_client ,
195- location_id ,
196- user_id ,
197- path = path ,
198- expected_total_size = expected_total_size ,
199- )
200-
201- # get size of one of the nodes
202- selected_node_id = NodeID (random .choice (list (project ["workbench" ]))) # noqa: S311
203- path = Path (project ["uuid" ]) / f"{ selected_node_id } "
204- selected_node_s3_keys = [
205- Path (s3_object_id ) for s3_object_id in list_of_files [selected_node_id ]
206- ]
207- expected_total_size = project_params .allowed_file_sizes [0 ] * len (
208- selected_node_s3_keys
209- )
210- await _assert_compute_path_size (
211- storage_rabbitmq_rpc_client ,
212- location_id ,
213- user_id ,
214- path = path ,
215- expected_total_size = expected_total_size ,
216- )
217-
218- # get size of the outputs of one of the nodes
219- path = Path (project ["uuid" ]) / f"{ selected_node_id } " / "outputs"
220- selected_node_s3_keys = [
221- Path (s3_object_id )
222- for s3_object_id in list_of_files [selected_node_id ]
223- if s3_object_id .startswith (f"{ path } " )
224- ]
225- expected_total_size = project_params .allowed_file_sizes [0 ] * len (
226- selected_node_s3_keys
227- )
228- await _assert_compute_path_size (
229- storage_rabbitmq_rpc_client ,
230- location_id ,
231- user_id ,
232- path = path ,
233- expected_total_size = expected_total_size ,
234- )
235-
236- # get size of workspace in one of the nodes (this is semi-cached in the DB)
237- path = Path (project ["uuid" ]) / f"{ selected_node_id } " / "workspace"
238- selected_node_s3_keys = [
239- Path (s3_object_id )
240- for s3_object_id in list_of_files [selected_node_id ]
241- if s3_object_id .startswith (f"{ path } " )
242- ]
243- expected_total_size = project_params .allowed_file_sizes [0 ] * len (
244- selected_node_s3_keys
245- )
246- workspace_total_size = await _assert_compute_path_size (
247- storage_rabbitmq_rpc_client ,
248- location_id ,
249- user_id ,
250- path = path ,
251- expected_total_size = expected_total_size ,
252- )
253-
254- # get size of folders inside the workspace
255- folders_inside_workspace = [
256- p [0 ]
257- for p in _filter_and_group_paths_one_level_deeper (selected_node_s3_keys , path )
258- if p [1 ] is False
259- ]
260- accumulated_subfolder_size = 0
261- for workspace_subfolder in folders_inside_workspace :
262- selected_node_s3_keys = [
263- Path (s3_object_id )
264- for s3_object_id in list_of_files [selected_node_id ]
265- if s3_object_id .startswith (f"{ workspace_subfolder } " )
266- ]
267- expected_total_size = project_params .allowed_file_sizes [0 ] * len (
268- selected_node_s3_keys
269- )
270- accumulated_subfolder_size += await _assert_compute_path_size (
271- storage_rabbitmq_rpc_client ,
272- location_id ,
273- user_id ,
274- path = workspace_subfolder ,
275- expected_total_size = expected_total_size ,
276- )
277-
278- assert workspace_total_size == accumulated_subfolder_size
279-
280-
281- @pytest .mark .xfail (reason = "in development" )
282- async def test_path_compute_size_inexistent_path (
283- initialized_app : FastAPI ,
284- client : httpx .AsyncClient ,
285- storage_rabbitmq_rpc_client : RabbitMQRPCClient ,
286- location_id : LocationID ,
287- user_id : UserID ,
28859 faker : Faker ,
289- fake_datcore_tokens : tuple [ str , str ] ,
60+ mock_celery_send_task : mock . AsyncMock ,
29061):
291- await _assert_compute_path_size (
62+ received , job_id_data = await compute_path_size (
29263 storage_rabbitmq_rpc_client ,
293- location_id ,
294- user_id ,
64+ user_id = user_id ,
65+ product_name = faker .name (),
66+ location_id = location_id ,
29567 path = Path (faker .file_path (absolute = False )),
296- expected_total_size = 0 ,
29768 )
69+ mock_celery_send_task .assert_called_once ()
70+ assert received
71+ assert job_id_data
0 commit comments