Skip to content

Commit 54e93b9

Browse files
committed
converted to RPC
1 parent 3c126c6 commit 54e93b9

File tree

2 files changed

+32
-27
lines changed

2 files changed

+32
-27
lines changed

services/web/server/src/simcore_service_webserver/storage/api.py

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,37 @@
11
"""Storage subsystem's API: responsible of communication with storage service"""
22

3-
import asyncio
3+
import datetime
44
import logging
55
import urllib.parse
66
from collections.abc import AsyncGenerator
77
from typing import Any, Final
88

99
from aiohttp import ClientError, ClientSession, ClientTimeout, web
10+
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData
11+
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
1012
from models_library.api_schemas_storage.storage_schemas import (
1113
FileLocation,
1214
FileLocationArray,
1315
FileMetaDataGet,
16+
FoldersBody,
1417
PresignedLink,
1518
)
1619
from models_library.generics import Envelope
1720
from models_library.projects import ProjectID
1821
from models_library.projects_nodes_io import LocationID, NodeID, SimCoreFileLink
1922
from models_library.users import UserID
20-
from models_library.utils.fastapi_encoders import jsonable_encoder
2123
from pydantic import ByteSize, HttpUrl, TypeAdapter
2224
from servicelib.aiohttp.client_session import get_client_session
23-
from servicelib.aiohttp.long_running_tasks.client import (
24-
LRTask,
25-
long_running_task_request,
26-
)
2725
from servicelib.logging_utils import get_log_record_extra, log_context
26+
from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import (
27+
AsyncJobComposedResult,
28+
submit_and_wait,
29+
)
2830
from yarl import URL
2931

3032
from ..projects.models import ProjectDict
3133
from ..projects.utils import NodesMap
34+
from ..rabbitmq import get_rabbitmq_rpc_client
3235
from .settings import StorageSettings, get_plugin_settings
3336

3437
_logger = logging.getLogger(__name__)
@@ -104,23 +107,25 @@ async def copy_data_folders_from_project(
104107
destination_project: ProjectDict,
105108
nodes_map: NodesMap,
106109
user_id: UserID,
107-
) -> AsyncGenerator[LRTask, None]:
108-
session, api_endpoint = _get_storage_client(app)
109-
_logger.debug("Copying %d nodes", len(nodes_map))
110-
# /simcore-s3/folders:
111-
async for lr_task in long_running_task_request(
112-
session,
113-
(api_endpoint / "simcore-s3/folders").with_query(user_id=user_id),
114-
json=jsonable_encoder(
115-
{
116-
"source": source_project,
117-
"destination": destination_project,
118-
"nodes_map": nodes_map,
119-
}
120-
),
121-
client_timeout=_TOTAL_TIMEOUT_TO_COPY_DATA_SECS,
122-
):
123-
yield lr_task
110+
) -> AsyncGenerator[AsyncJobComposedResult, None]:
111+
product_name = "osparc" # TODO fix it
112+
with log_context(_logger, logging.DEBUG, msg=f"copy {nodes_map=}"):
113+
rabbitmq_client = get_rabbitmq_rpc_client(app)
114+
async for job_composed_result in submit_and_wait(
115+
rabbitmq_client,
116+
method_name="copy_folders_from_project",
117+
rpc_namespace=STORAGE_RPC_NAMESPACE,
118+
job_id_data=AsyncJobNameData(user_id=user_id, product_name=product_name),
119+
body=TypeAdapter(FoldersBody).validate_python(
120+
{
121+
"source": source_project,
122+
"destination": destination_project,
123+
"nodes_map": nodes_map,
124+
},
125+
),
126+
client_timeout=datetime.timedelta(seconds=_TOTAL_TIMEOUT_TO_COPY_DATA_SECS),
127+
):
128+
yield job_composed_result
124129

125130

126131
async def _delete(session, target_url):
@@ -164,7 +169,7 @@ async def is_healthy(app: web.Application) -> bool:
164169
timeout=ClientTimeout(total=2, connect=1),
165170
)
166171
return True
167-
except (ClientError, asyncio.TimeoutError) as err:
172+
except (TimeoutError, ClientError) as err:
168173
# ClientResponseError, ClientConnectionError, ClientPayloadError, InValidURL
169174
_logger.debug("Storage is NOT healthy: %s", err)
170175
return False

services/web/server/src/simcore_service_webserver/studies_dispatcher/_studies_access.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" handles access to *public* studies
1+
"""handles access to *public* studies
22
33
Handles a request to share a given sharable study via '/study/{id}'
44
@@ -205,9 +205,9 @@ async def copy_study_to_account(
205205
f"{template_project['uuid']=}",
206206
f"{project['uuid']}",
207207
f"{user['id']}",
208-
f"{lr_task.progress=}",
208+
f"{lr_task.status.progress=}",
209209
)
210-
if lr_task.done():
210+
if lr_task.done:
211211
await lr_task.result()
212212
await create_or_update_pipeline(
213213
request.app, user["id"], project["uuid"], product_name

0 commit comments

Comments
 (0)