Skip to content

Commit 7c604d4

Browse files
author
Andrei Neagu
committed
added rpc interface for create_output_dirs
1 parent a674736 commit 7c604d4

File tree

3 files changed

+92
-0
lines changed

3 files changed

+92
-0
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/container_extensions.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from models_library.projects_nodes_io import NodeID
44
from models_library.rabbitmq_basic_types import RPCMethodName
5+
from models_library.services import ServiceOutput
56
from pydantic import TypeAdapter
67

78
from ....logging_utils import log_decorator
@@ -27,3 +28,19 @@ async def toggle_ports_io(
2728
enable_inputs=enable_inputs,
2829
)
2930
assert result is None # nosec
31+
32+
33+
@log_decorator(_logger, level=logging.DEBUG)
34+
async def create_output_dirs(
35+
rabbitmq_rpc_client: RabbitMQRPCClient,
36+
*,
37+
node_id: NodeID,
38+
outputs_labels: dict[str, ServiceOutput]
39+
) -> None:
40+
rpc_namespace = get_rpc_namespace(node_id)
41+
result = await rabbitmq_rpc_client.request(
42+
rpc_namespace,
43+
TypeAdapter(RPCMethodName).validate_python("create_output_dirs"),
44+
outputs_labels=outputs_labels,
45+
)
46+
assert result is None # nosec

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/_containers_extension.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from fastapi import FastAPI
2+
from models_library.services import ServiceOutput
23
from servicelib.rabbitmq import RPCRouter
34

45
from ...services import container_extensions
@@ -13,3 +14,10 @@ async def toggle_ports_io(
1314
await container_extensions.toggle_ports_io(
1415
app, enable_outputs=enable_outputs, enable_inputs=enable_inputs
1516
)
17+
18+
19+
@router.expose()
20+
async def create_output_dirs(
21+
app: FastAPI, *, outputs_labels: dict[str, ServiceOutput]
22+
) -> None:
23+
await container_extensions.create_output_dirs(app, outputs_labels=outputs_labels)

services/dynamic-sidecar/tests/unit/api/rpc/test__container_extensions.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
# pylint:disable=unused-argument
22
# pylint:disable=redefined-outer-name
3+
# pylint:disable=protected-access
34

5+
import asyncio
6+
from typing import Final
47
from unittest.mock import AsyncMock
58

69
import pytest
710
from fastapi import FastAPI
11+
from models_library.services import ServiceOutput
12+
from pydantic import TypeAdapter
813
from pytest_mock import MockerFixture
914
from servicelib.rabbitmq import RabbitMQRPCClient
1015
from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar import container_extensions
16+
from simcore_service_dynamic_sidecar.core.application import AppState
1117
from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings
1218
from simcore_service_dynamic_sidecar.modules.inputs import InputsState
1319
from simcore_service_dynamic_sidecar.modules.outputs._watcher import OutputsWatcher
@@ -16,6 +22,8 @@
1622
"rabbit",
1723
]
1824

25+
_WAIT_FOR_OUTPUTS_WATCHER: Final[float] = 0.1
26+
1927

2028
def _assert_inputs_pulling(app: FastAPI, is_enabled: bool) -> None:
2129
inputs_state: InputsState = app.state.inputs_state
@@ -64,3 +72,62 @@ async def test_toggle_ports_io(
6472

6573
_assert_inputs_pulling(app, enabled)
6674
_assert_outputs_event_propagation(spy_output_watcher, enabled)
75+
76+
77+
@pytest.fixture
78+
def mock_outputs_labels() -> dict[str, ServiceOutput]:
79+
return {
80+
"output_port_1": TypeAdapter(ServiceOutput).validate_python(
81+
ServiceOutput.model_json_schema()["examples"][3]
82+
),
83+
"output_port_2": TypeAdapter(ServiceOutput).validate_python(
84+
ServiceOutput.model_json_schema()["examples"][3]
85+
),
86+
}
87+
88+
89+
@pytest.fixture
90+
def mock_event_filter_enqueue(
91+
app: FastAPI, monkeypatch: pytest.MonkeyPatch
92+
) -> AsyncMock:
93+
mock = AsyncMock(return_value=None)
94+
outputs_watcher: OutputsWatcher = app.state.outputs_watcher
95+
monkeypatch.setattr(outputs_watcher._event_filter, "enqueue", mock) # noqa: SLF001
96+
return mock
97+
98+
99+
async def test_container_create_outputs_dirs(
100+
app: FastAPI,
101+
rpc_client: RabbitMQRPCClient,
102+
mock_outputs_labels: dict[str, ServiceOutput],
103+
mock_event_filter_enqueue: AsyncMock,
104+
):
105+
app_state = AppState(app)
106+
107+
# by default outputs-watcher it is disabled
108+
result = await container_extensions.toggle_ports_io(
109+
rpc_client,
110+
node_id=app_state.settings.DY_SIDECAR_NODE_ID,
111+
enable_outputs=True,
112+
enable_inputs=True,
113+
)
114+
assert result is None
115+
await asyncio.sleep(_WAIT_FOR_OUTPUTS_WATCHER)
116+
117+
assert mock_event_filter_enqueue.call_count == 0
118+
119+
result = await container_extensions.create_output_dirs(
120+
rpc_client,
121+
node_id=app_state.settings.DY_SIDECAR_NODE_ID,
122+
outputs_labels=mock_outputs_labels,
123+
)
124+
125+
for dir_name in mock_outputs_labels:
126+
assert (app_state.mounted_volumes.disk_outputs_path / dir_name).is_dir()
127+
128+
await asyncio.sleep(_WAIT_FOR_OUTPUTS_WATCHER)
129+
EXPECT_EVENTS_WHEN_CREATING_OUTPUT_PORT_KEY_DIRS = 0
130+
assert (
131+
mock_event_filter_enqueue.call_count
132+
== EXPECT_EVENTS_WHEN_CREATING_OUTPUT_PORT_KEY_DIRS
133+
)

0 commit comments

Comments
 (0)