Skip to content

Commit 6667795

Browse files
committed
moving copy to RPC
1 parent 0f78b04 commit 6667795

File tree

4 files changed

+157
-54
lines changed

4 files changed

+157
-54
lines changed

services/storage/src/simcore_service_storage/modules/celery/_task.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ def wrapper(task: AbortableTask, *args: Any, **kwargs: Any) -> Any:
2828
exc_traceback = traceback.format_exc().split("\n")
2929

3030
_logger.exception(
31-
"Task %s failed with exception: %s",
31+
"Task %s failed with exception: %s:%s",
3232
task.request.id,
33+
exc_type,
3334
exc_message,
3435
)
3536

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ def abort_task( # pylint: disable=R6301
8282
@make_async()
8383
def get_task_result(self, task_context: TaskContext, task_uuid: TaskUUID) -> Any:
8484
task_id = _build_task_id(task_context, task_uuid)
85-
return self._celery_app.AsyncResult(task_id).result
85+
with log_context(
86+
_logger,
87+
logging.DEBUG,
88+
msg=f"Getting task {task_id=} result",
89+
):
90+
return AbortableAsyncResult(task_id).result
8691

8792
def _get_progress_report(
8893
self, task_context: TaskContext, task_uuid: TaskUUID

services/storage/tests/unit/test_handlers_simcore_s3.py

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import logging
1010
import sys
1111
from collections.abc import Awaitable, Callable
12-
from copy import deepcopy
1312
from pathlib import Path
1413
from typing import Any, Literal
1514

@@ -41,7 +40,6 @@
4140
assert_file_meta_data_in_db,
4241
)
4342
from pytest_simcore.helpers.storage_utils_project import clone_project_data
44-
from pytest_simcore.helpers.typing_env import EnvVarsDict
4543
from servicelib.aiohttp import status
4644
from servicelib.fastapi.long_running_tasks.client import long_running_task_request
4745
from settings_library.s3 import S3Settings
@@ -51,8 +49,8 @@
5149
from sqlalchemy.ext.asyncio import AsyncEngine
5250
from yarl import URL
5351

54-
pytest_simcore_core_services_selection = ["postgres"]
55-
pytest_simcore_ops_services_selection = ["adminer", "minio"]
52+
pytest_simcore_core_services_selection = ["postgres", "rabbit"]
53+
pytest_simcore_ops_services_selection = ["adminer"]
5654

5755

5856
CURRENT_DIR = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent
@@ -125,53 +123,6 @@ async def _request_copy_folders(
125123
pytest.fail(reason="Copy folders failed!")
126124

127125

128-
async def test_copy_folders_from_non_existing_project(
129-
initialized_app: FastAPI,
130-
client: httpx.AsyncClient,
131-
user_id: UserID,
132-
create_project: Callable[[], Awaitable[dict[str, Any]]],
133-
faker: Faker,
134-
):
135-
src_project = await create_project()
136-
incorrect_src_project = deepcopy(src_project)
137-
incorrect_src_project["uuid"] = faker.uuid4()
138-
dst_project = await create_project()
139-
incorrect_dst_project = deepcopy(dst_project)
140-
incorrect_dst_project["uuid"] = faker.uuid4()
141-
142-
with pytest.raises(httpx.HTTPStatusError, match="404") as exc_info:
143-
await _request_copy_folders(
144-
initialized_app,
145-
client,
146-
user_id,
147-
incorrect_src_project,
148-
dst_project,
149-
nodes_map={},
150-
)
151-
assert_status(
152-
exc_info.value.response,
153-
status.HTTP_404_NOT_FOUND,
154-
None,
155-
expected_msg=f"{incorrect_src_project['uuid']} was not found",
156-
)
157-
158-
with pytest.raises(httpx.HTTPStatusError, match="404") as exc_info:
159-
await _request_copy_folders(
160-
initialized_app,
161-
client,
162-
user_id,
163-
src_project,
164-
incorrect_dst_project,
165-
nodes_map={},
166-
)
167-
assert_status(
168-
exc_info.value.response,
169-
status.HTTP_404_NOT_FOUND,
170-
None,
171-
expected_msg=f"{incorrect_dst_project['uuid']} was not found",
172-
)
173-
174-
175126
async def test_copy_folders_from_empty_project(
176127
initialized_app: FastAPI,
177128
client: httpx.AsyncClient,
@@ -540,7 +491,6 @@ async def test_create_and_delete_folders_from_project(
540491
@pytest.mark.parametrize("num_concurrent_calls", [50])
541492
async def test_create_and_delete_folders_from_project_burst(
542493
set_log_levels_for_noisy_libraries: None,
543-
minio_s3_settings_envs: EnvVarsDict,
544494
initialized_app: FastAPI,
545495
client: httpx.AsyncClient,
546496
user_id: UserID,
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import datetime
2+
import logging
3+
from collections.abc import Awaitable, Callable
4+
from copy import deepcopy
5+
from typing import Any
6+
7+
import pytest
8+
import sqlalchemy as sa
9+
from aws_library.s3 import SimcoreS3API
10+
from faker import Faker
11+
from fastapi import FastAPI
12+
from fastapi.encoders import jsonable_encoder
13+
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobResult
14+
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
15+
from models_library.api_schemas_storage.storage_schemas import FoldersBody
16+
from models_library.projects_nodes_io import NodeID
17+
from models_library.users import UserID
18+
from pytest_simcore.helpers.logging_tools import log_context
19+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
20+
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import wait_and_get_result
21+
from servicelib.rabbitmq.rpc_interfaces.storage.simcore_s3 import (
22+
copy_folders_from_project,
23+
)
24+
from simcore_postgres_database.storage_models import file_meta_data
25+
from sqlalchemy.ext.asyncio import AsyncEngine
26+
27+
pytest_simcore_core_services_selection = ["postgres", "rabbit"]
28+
pytest_simcore_ops_services_selection = ["adminer"]
29+
30+
31+
async def _request_copy_folders(
32+
rpc_client: RabbitMQRPCClient,
33+
user_id: UserID,
34+
product_name: str,
35+
source_project: dict[str, Any],
36+
dst_project: dict[str, Any],
37+
nodes_map: dict[NodeID, NodeID],
38+
) -> dict[str, Any]:
39+
with log_context(
40+
logging.INFO,
41+
f"Copying folders from {source_project['uuid']} to {dst_project['uuid']}",
42+
) as ctx:
43+
async_job_get, async_job_name = await copy_folders_from_project(
44+
rpc_client,
45+
user_id=user_id,
46+
product_name=product_name,
47+
body=FoldersBody(
48+
source=source_project, destination=dst_project, nodes_map=nodes_map
49+
),
50+
)
51+
52+
async for async_job_result in wait_and_get_result(
53+
rpc_client,
54+
rpc_namespace=STORAGE_RPC_NAMESPACE,
55+
method_name=copy_folders_from_project.__name__,
56+
job_id=async_job_get.job_id,
57+
job_id_data=async_job_name,
58+
client_timeout=datetime.timedelta(seconds=60),
59+
):
60+
ctx.logger.info("%s", f"<-- current state is {async_job_result=}")
61+
if async_job_result.done:
62+
result = await async_job_result.result()
63+
assert isinstance(result, AsyncJobResult)
64+
return result.result
65+
66+
pytest.fail(reason="Copy folders failed!")
67+
68+
69+
@pytest.mark.xfail(reason="There is something fishy here MB, GC")
70+
async def test_copy_folders_from_non_existing_project(
71+
initialized_app: FastAPI,
72+
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
73+
user_id: UserID,
74+
product_name: str,
75+
create_project: Callable[..., Awaitable[dict[str, Any]]],
76+
faker: Faker,
77+
):
78+
src_project = await create_project()
79+
incorrect_src_project = deepcopy(src_project)
80+
incorrect_src_project["uuid"] = faker.uuid4()
81+
dst_project = await create_project()
82+
incorrect_dst_project = deepcopy(dst_project)
83+
incorrect_dst_project["uuid"] = faker.uuid4()
84+
85+
with pytest.raises(RuntimeError, match="404") as exc_info:
86+
await _request_copy_folders(
87+
storage_rabbitmq_rpc_client,
88+
user_id,
89+
product_name,
90+
incorrect_src_project,
91+
dst_project,
92+
nodes_map={},
93+
)
94+
# assert_status(
95+
# exc_info.value.response,
96+
# status.HTTP_404_NOT_FOUND,
97+
# None,
98+
# expected_msg=f"{incorrect_src_project['uuid']} was not found",
99+
# )
100+
101+
with pytest.raises(RuntimeError, match="404") as exc_info:
102+
await _request_copy_folders(
103+
storage_rabbitmq_rpc_client,
104+
user_id,
105+
product_name,
106+
src_project,
107+
incorrect_dst_project,
108+
nodes_map={},
109+
)
110+
# assert_status(
111+
# exc_info.value.response,
112+
# status.HTTP_404_NOT_FOUND,
113+
# None,
114+
# expected_msg=f"{incorrect_dst_project['uuid']} was not found",
115+
# )
116+
117+
118+
async def test_copy_folders_from_empty_project(
119+
initialized_app: FastAPI,
120+
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
121+
user_id: UserID,
122+
product_name: str,
123+
create_project: Callable[[], Awaitable[dict[str, Any]]],
124+
sqlalchemy_async_engine: AsyncEngine,
125+
storage_s3_client: SimcoreS3API,
126+
):
127+
# we will copy from src to dst
128+
src_project = await create_project()
129+
dst_project = await create_project()
130+
131+
data = await _request_copy_folders(
132+
storage_rabbitmq_rpc_client,
133+
user_id,
134+
product_name,
135+
src_project,
136+
dst_project,
137+
nodes_map={},
138+
)
139+
assert data == jsonable_encoder(dst_project)
140+
# check there is nothing in the dst project
141+
async with sqlalchemy_async_engine.connect() as conn:
142+
num_entries = await conn.scalar(
143+
sa.select(sa.func.count())
144+
.select_from(file_meta_data)
145+
.where(file_meta_data.c.project_id == dst_project["uuid"])
146+
)
147+
assert num_entries == 0

0 commit comments

Comments
 (0)