Skip to content

Commit 3bd5424

Browse files
author
Andrei Neagu
committed
rerouted retrieve via dynamic-scheduler
1 parent e75e816 commit 3bd5424

File tree

13 files changed

+155
-61
lines changed

13 files changed

+155
-61
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import logging
22
from typing import Final
33

4-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
4+
from models_library.api_schemas_directorv2.dynamic_services import (
5+
DynamicServiceGet,
6+
RetrieveDataOutEnveloped,
7+
)
58
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
69
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
710
DynamicServiceStart,
@@ -10,6 +13,7 @@
1013
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
1114
from models_library.projects_nodes_io import NodeID
1215
from models_library.rabbitmq_basic_types import RPCMethodName
16+
from models_library.services_types import ServicePortKey
1317
from pydantic import NonNegativeInt, TypeAdapter
1418
from servicelib.logging_utils import log_decorator
1519
from servicelib.rabbitmq import RabbitMQRPCClient
@@ -73,3 +77,22 @@ async def stop_dynamic_service(
7377
timeout_s=timeout_s,
7478
)
7579
assert result is None # nosec
80+
81+
82+
@log_decorator(_logger, level=logging.DEBUG)
83+
async def retrieve_data_on_ports(
84+
rabbitmq_rpc_client: RabbitMQRPCClient,
85+
*,
86+
node_id: NodeID,
87+
port_keys: list[ServicePortKey],
88+
timeout_s: NonNegativeInt,
89+
) -> RetrieveDataOutEnveloped:
90+
result = await rabbitmq_rpc_client.request(
91+
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
92+
_RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_data_on_ports"),
93+
node_id=node_id,
94+
port_keys=port_keys,
95+
timeout_s=timeout_s,
96+
)
97+
assert isinstance(result, RetrieveDataOutEnveloped) # nosec
98+
return result

services/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ services:
568568
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
569569
DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING}
570570
DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET}
571+
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT}
571572
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
572573
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
573574
static-webserver:

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from fastapi import FastAPI
2-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
2+
from models_library.api_schemas_directorv2.dynamic_services import (
3+
DynamicServiceGet,
4+
RetrieveDataOutEnveloped,
5+
)
36
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
47
DynamicServiceStart,
58
DynamicServiceStop,
69
)
710
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
811
from models_library.projects_nodes_io import NodeID
12+
from models_library.services_types import ServicePortKey
913
from servicelib.rabbitmq import RPCRouter
1014
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
1115
ServiceWaitingForManualInterventionError,
@@ -45,3 +49,12 @@ async def stop_dynamic_service(
4549
return await scheduler_interface.stop_dynamic_service(
4650
app, dynamic_service_stop=dynamic_service_stop
4751
)
52+
53+
54+
@router.expose()
55+
async def retrieve_data_on_ports(
56+
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
57+
) -> RetrieveDataOutEnveloped:
58+
return await scheduler_interface.retrieve_data_on_ports(
59+
app, node_id=node_id, port_keys=port_keys
60+
)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
6262

6363
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field(
6464
default=datetime.timedelta(minutes=60),
65+
validation_alias=AliasChoices(
66+
"DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT",
67+
"DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT",
68+
),
6569
description=(
6670
"Time to wait before timing out when stopping a dynamic service. "
6771
"Since services require data to be stopped, this operation is timed out after 1 hour"
6872
),
6973
)
7074

75+
DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field(
76+
default=datetime.timedelta(minutes=60),
77+
description=(
78+
"When dynamic services upload and download data from storage, "
79+
"sometimes very big payloads are involved. In order to handle "
80+
"such payloads it is required to have long timeouts which "
81+
"allow the service to finish the operation."
82+
),
83+
)
84+
7185
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
7286
default=False,
7387
description=(

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22
from typing import Any
33

44
from fastapi import FastAPI, status
5-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
5+
from models_library.api_schemas_directorv2.dynamic_services import (
6+
DynamicServiceGet,
7+
RetrieveDataOutEnveloped,
8+
)
69
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
710
DynamicServiceStart,
811
)
912
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
1013
from models_library.projects_nodes_io import NodeID
14+
from models_library.services_types import ServicePortKey
1115
from pydantic import TypeAdapter
1216
from servicelib.fastapi.app_state import SingletonInAppStateMixin
1317
from servicelib.fastapi.http_client import AttachLifespanMixin, HasClientSetupInterface
@@ -73,7 +77,7 @@ async def stop_dynamic_service(
7377
node_id: NodeID,
7478
simcore_user_agent: str,
7579
save_state: bool,
76-
timeout: datetime.timedelta
80+
timeout: datetime.timedelta # noqa: ASYNC109
7781
) -> None:
7882
try:
7983
await self.thin_client.delete_dynamic_service(
@@ -98,6 +102,19 @@ async def stop_dynamic_service(
98102

99103
raise
100104

105+
async def retrieve_data_on_ports(
106+
self,
107+
*,
108+
node_id: NodeID,
109+
port_keys: list[ServicePortKey],
110+
timeout: datetime.timedelta # noqa: ASYNC109
111+
) -> RetrieveDataOutEnveloped:
112+
response = await self.thin_client.dynamic_service_retrieve(
113+
node_id=node_id, port_keys=port_keys, timeout=timeout
114+
)
115+
dict_response: dict[str, Any] = response.json()
116+
return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response)
117+
101118

102119
def setup_director_v2(app: FastAPI) -> None:
103120
public_client = DirectorV2Client(app)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010
from models_library.projects_nodes_io import NodeID
1111
from models_library.services_resources import ServiceResourcesDictHelpers
12+
from models_library.services_types import ServicePortKey
1213
from servicelib.common_headers import (
1314
X_DYNAMIC_SIDECAR_REQUEST_DNS,
1415
X_DYNAMIC_SIDECAR_REQUEST_SCHEME,
@@ -88,7 +89,7 @@ async def delete_dynamic_service(
8889
node_id: NodeID,
8990
simcore_user_agent: str,
9091
save_state: bool,
91-
timeout: datetime.timedelta,
92+
timeout: datetime.timedelta, # noqa: ASYNC109
9293
) -> Response:
9394
@retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds())
9495
@expect_status(status.HTTP_204_NO_CONTENT)
@@ -108,3 +109,17 @@ async def _(
108109
)
109110

110111
return await _(self)
112+
113+
async def dynamic_service_retrieve(
114+
self,
115+
*,
116+
node_id: NodeID,
117+
port_keys: list[ServicePortKey],
118+
timeout: datetime.timedelta, # noqa: ASYNC109
119+
) -> Response:
120+
post_data = {"port_keys": port_keys}
121+
return await self.client.post(
122+
f"/dynamic_services/{node_id}:retrieve",
123+
content=json_dumps(post_data),
124+
timeout=timeout.total_seconds(),
125+
)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from fastapi import FastAPI
2-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
2+
from models_library.api_schemas_directorv2.dynamic_services import (
3+
DynamicServiceGet,
4+
RetrieveDataOutEnveloped,
5+
)
36
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
47
DynamicServiceStart,
58
DynamicServiceStop,
69
)
710
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
811
from models_library.projects_nodes_io import NodeID
12+
from models_library.services_types import ServicePortKey
913

1014
from ..core.settings import ApplicationSettings
1115
from .director_v2 import DirectorV2Client
@@ -58,3 +62,18 @@ async def stop_dynamic_service(
5862
)
5963

6064
await set_request_as_stopped(app, dynamic_service_stop)
65+
66+
67+
async def retrieve_data_on_ports(
68+
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
69+
) -> RetrieveDataOutEnveloped:
70+
settings: ApplicationSettings = app.state.settings
71+
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
72+
raise NotImplementedError
73+
74+
director_v2_client = DirectorV2Client.get_from_app_state(app)
75+
return await director_v2_client.retrieve_data_on_ports(
76+
node_id=node_id,
77+
port_keys=port_keys,
78+
timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT,
79+
)

services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99
from aiohttp import web
1010
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
1111
from models_library.projects import ProjectID
12-
from models_library.services import ServicePortKey
1312
from pydantic import BaseModel, NonNegativeInt, TypeAdapter
1413
from pydantic.types import PositiveInt
1514
from servicelib.logging_utils import log_decorator
1615
from yarl import URL
1716

1817
from ._core_base import DataType, request_director_v2
19-
from .exceptions import DirectorServiceError
2018
from .settings import DirectorV2Settings, get_plugin_settings
2119

2220
_log = logging.getLogger(__name__)
@@ -52,51 +50,6 @@ async def list_dynamic_services(
5250
return TypeAdapter(list[DynamicServiceGet]).validate_python(services)
5351

5452

55-
# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191
56-
@log_decorator(logger=_log)
57-
async def retrieve(
58-
app: web.Application, service_uuid: str, port_keys: list[ServicePortKey]
59-
) -> DataType:
60-
"""Pulls data from connections to the dynamic service inputs"""
61-
settings: DirectorV2Settings = get_plugin_settings(app)
62-
result = await request_director_v2(
63-
app,
64-
"POST",
65-
url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve",
66-
data={"port_keys": port_keys},
67-
timeout=settings.get_service_retrieve_timeout(),
68-
)
69-
assert isinstance(result, dict) # nosec
70-
return result
71-
72-
73-
# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191
74-
# notice that this function is identical to retrieve except that it does NOT raises
75-
@log_decorator(logger=_log)
76-
async def request_retrieve_dyn_service(
77-
app: web.Application, service_uuid: str, port_keys: list[str]
78-
) -> None:
79-
settings: DirectorV2Settings = get_plugin_settings(app)
80-
body = {"port_keys": port_keys}
81-
82-
try:
83-
await request_director_v2(
84-
app,
85-
"POST",
86-
url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve",
87-
data=body,
88-
timeout=settings.get_service_retrieve_timeout(),
89-
)
90-
except DirectorServiceError as exc:
91-
_log.warning(
92-
"Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s:%s]",
93-
service_uuid,
94-
port_keys,
95-
exc.status,
96-
exc.reason,
97-
)
98-
99-
10053
@log_decorator(logger=_log)
10154
async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None:
10255
"""User restart the dynamic dynamic service started in the node_uuid

services/web/server/src/simcore_service_webserver/director_v2/api.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
from ._core_dynamic_services import (
2020
get_project_inactivity,
2121
list_dynamic_services,
22-
request_retrieve_dyn_service,
2322
restart_dynamic_service,
24-
retrieve,
2523
update_dynamic_service_networks_in_project,
2624
)
2725
from ._core_utils import is_healthy
@@ -40,9 +38,7 @@
4038
"is_healthy",
4139
"is_pipeline_running",
4240
"list_dynamic_services",
43-
"request_retrieve_dyn_service",
4441
"restart_dynamic_service",
45-
"retrieve",
4642
"set_project_run_policy",
4743
"stop_pipeline",
4844
"update_dynamic_service_networks_in_project",

services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
from functools import partial
44

55
from aiohttp import web
6-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
6+
from models_library.api_schemas_directorv2.dynamic_services import (
7+
DynamicServiceGet,
8+
RetrieveDataOutEnveloped,
9+
)
710
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
811
DynamicServiceStart,
912
DynamicServiceStop,
@@ -18,6 +21,7 @@
1821
from models_library.projects import ProjectID
1922
from models_library.projects_nodes_io import NodeID
2023
from models_library.rabbitmq_messages import ProgressRabbitMessageProject, ProgressType
24+
from models_library.services import ServicePortKey
2125
from pydantic.types import PositiveInt
2226
from servicelib.progress_bar import ProgressBarData
2327
from servicelib.rabbitmq import RabbitMQClient, RPCServerError
@@ -133,3 +137,18 @@ async def stop_dynamic_services_in_project(
133137
]
134138

135139
await logged_gather(*services_to_stop)
140+
141+
142+
# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191
143+
async def retrieve(
144+
app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey]
145+
) -> RetrieveDataOutEnveloped:
146+
settings: DynamicSchedulerSettings = get_plugin_settings(app)
147+
return await services.retrieve_data_on_ports(
148+
get_rabbitmq_rpc_client(app),
149+
node_id=node_id,
150+
port_keys=port_keys,
151+
timeout_s=int(
152+
settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT.total_seconds()
153+
),
154+
)

0 commit comments

Comments
 (0)