From 220b7b65a919daee066ba45ceaec10ec598d6f80 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 11:17:19 +0200 Subject: [PATCH 01/21] extracting notifications as seprate module --- .../core/application.py | 2 ++ .../modules/notifications/__init__.py | 7 +++++++ .../_notifier.py | 0 .../modules/notifications/_setup.py | 15 +++++++++++++++ .../_socketio.py | 0 .../modules/system_monitor/_disk_usage.py | 2 +- .../modules/system_monitor/_setup.py | 4 ---- .../unit/test_modules_system_monitor__notifier.py | 4 +--- 8 files changed, 26 insertions(+), 8 deletions(-) create mode 100644 services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py rename services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/{system_monitor => notifications}/_notifier.py (100%) create mode 100644 services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py rename services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/{system_monitor => notifications}/_socketio.py (100%) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index f5910ffbffee..20029cac7fcb 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -19,6 +19,7 @@ from ..modules.attribute_monitor import setup_attribute_monitor from ..modules.inputs import setup_inputs from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs +from ..modules.notifications import setup_notifications from ..modules.outputs import setup_outputs from ..modules.prometheus_metrics import setup_prometheus_metrics from ..modules.resource_tracking import setup_resource_tracking @@ -172,6 +173,7 @@ def create_app(): setup_rabbitmq(app) setup_background_log_fetcher(app) setup_resource_tracking(app) + setup_notifications(app) setup_system_monitor(app) setup_mounted_fs(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py new file mode 100644 index 000000000000..85bdfb8139d1 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py @@ -0,0 +1,7 @@ +from ._notifier import publish_disk_usage +from ._setup import setup_notifications + +__all__: tuple[str, ...] = ( + "publish_disk_usage", + "setup_notifications", +) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py similarity index 100% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py new file mode 100644 index 000000000000..6de0fae307f1 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py @@ -0,0 +1,15 @@ +import logging + +from fastapi import FastAPI +from servicelib.logging_utils import log_context + +from ..notifications._notifier import setup_notifier +from ..notifications._socketio import setup_socketio + +_logger = logging.getLogger(__name__) + + +def setup_notifications(app: FastAPI) -> None: + with log_context(_logger, logging.INFO, "setup notifications"): + setup_socketio(app) + setup_notifier(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_socketio.py similarity index 100% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_socketio.py diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py index 1ecc04fdaea0..90b06450e6f0 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py @@ -15,7 +15,7 @@ from ...core.settings import ApplicationSettings from ..mounted_fs import MountedVolumes -from ._notifier import publish_disk_usage +from ..notifications import publish_disk_usage _logger = logging.getLogger(__name__) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py index e460f7a9ee30..aa0d36a72b9b 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py @@ -5,8 +5,6 @@ from ...core.settings import SystemMonitorSettings from ._disk_usage import setup_disk_usage -from ._notifier import setup_notifier -from ._socketio import setup_socketio _logger = logging.getLogger(__name__) @@ -19,6 +17,4 @@ def setup_system_monitor(app: FastAPI) -> None: _logger.warning("system monitor disabled") return - setup_socketio(app) # required by notifier - setup_notifier(app) setup_disk_usage(app) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py index 73184a1b3cba..db0aac139922 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py @@ -28,12 +28,10 @@ from settings_library.rabbit import RabbitSettings from simcore_service_dynamic_sidecar.core.application import create_app from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings +from simcore_service_dynamic_sidecar.modules.notifications import publish_disk_usage from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( DiskUsageMonitor, ) -from simcore_service_dynamic_sidecar.modules.system_monitor._notifier import ( - publish_disk_usage, -) from socketio import AsyncServer from tenacity import AsyncRetrying from tenacity.stop import stop_after_delay From b419e649c96382690c74bb9f1805ff8cbc4a0b11 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 11:28:13 +0200 Subject: [PATCH 02/21] refactor internals --- .../modules/notifications/__init__.py | 2 +- .../notifications/_notifications_ports.py | 0 .../_notifications_system_monitor.py | 17 +++++++++++++++++ .../modules/notifications/_notifier.py | 9 --------- 4 files changed, 18 insertions(+), 10 deletions(-) create mode 100644 services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py create mode 100644 services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py index 85bdfb8139d1..cd75e0b340a7 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py @@ -1,4 +1,4 @@ -from ._notifier import publish_disk_usage +from ._notifications_system_monitor import publish_disk_usage from ._setup import setup_notifications __all__: tuple[str, ...] = ( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py new file mode 100644 index 000000000000..840c47d729ef --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py @@ -0,0 +1,17 @@ +from pathlib import Path + +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID + +from ._notifier import Notifier + + +async def publish_disk_usage( + app: FastAPI, *, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] +) -> None: + notifier: Notifier = Notifier.get_from_app_state(app) + await notifier.notify_service_disk_usage( + user_id=user_id, node_id=node_id, usage=usage + ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py index 9f97a889baca..eb1483e636ec 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py @@ -33,15 +33,6 @@ async def notify_service_disk_usage( ) -async def publish_disk_usage( - app: FastAPI, *, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] -) -> None: - notifier: Notifier = Notifier.get_from_app_state(app) - await notifier.notify_service_disk_usage( - user_id=user_id, node_id=node_id, usage=usage - ) - - def setup_notifier(app: FastAPI): async def _on_startup() -> None: assert app.state.external_socketio # nosec From d1c838e883922fa7e908ad20b6f20368fc7f6b5c Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 13:29:30 +0200 Subject: [PATCH 03/21] rename module --- ...es_system_monitor__notifier.py => test_modules_notifier.py} | 3 +++ 1 file changed, 3 insertions(+) rename services/dynamic-sidecar/tests/unit/{test_modules_system_monitor__notifier.py => test_modules_notifier.py} (99%) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py similarity index 99% rename from services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py rename to services/dynamic-sidecar/tests/unit/test_modules_notifier.py index db0aac139922..02102e359a6e 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -200,3 +200,6 @@ async def test_notifier_publish_message( ) await _assert_call_count(server_disconnect, call_count=number_of_clients) + + +# TODO: add tests for the two new types of messages From fc88f8d279f8075f398793ffb9bf4de842d0573f Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 13:31:28 +0200 Subject: [PATCH 04/21] extend notifications interface --- .../api_schemas_dynamic_sidecar/ports.py | 31 ++++++++ .../api_schemas_dynamic_sidecar/socketio.py | 2 + .../modules/notifications/__init__.py | 2 + .../notifications/_notifications_ports.py | 71 +++++++++++++++++++ .../modules/notifications/_notifier.py | 41 +++++++++++ 5 files changed, 147 insertions(+) create mode 100644 packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py new file mode 100644 index 000000000000..0f26ffdf80a6 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py @@ -0,0 +1,31 @@ +from enum import Enum, auto + +from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey +from pydantic import BaseModel + + +class OutputStatus(str, Enum): + DOWNLOAD_STARTED = auto() + DOWNLOAD_FINISHED_SUCCESSFULLY = auto() + DOWNLOAD_FINISHED_WITH_ERRROR = auto() + + +class InputStatus(str, Enum): + UPLOAD_STARTED = auto() + UPLOAD_WAS_ABORTED = auto() + UPLOAD_FINISHED_SUCCESSFULLY = auto() + UPLOAD_FINISHED_WITH_ERRROR = auto() + + +class _PortStatusCommon(BaseModel): + node_id: NodeID + port_key: ServicePortKey + + +class OutputPortStatus(_PortStatusCommon): + status: OutputStatus + + +class InputPortSatus(_PortStatusCommon): + status: InputStatus diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py index 054b0834bc4d..93e34a1682bb 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py @@ -1,3 +1,5 @@ from typing import Final SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage" +SOCKET_IO_STATE_OUTPUT_PORTS_EVENT: Final[str] = "stateOutputPorts" +SOCKET_IO_STATE_INPUT_PORTS_EVENT: Final[str] = "stateInputPorts" diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py index cd75e0b340a7..18254b1d23c1 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py @@ -1,7 +1,9 @@ +from ._notifications_ports import PortNotifier from ._notifications_system_monitor import publish_disk_usage from ._setup import setup_notifications __all__: tuple[str, ...] = ( + "PortNotifier", "publish_disk_usage", "setup_notifications", ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py index e69de29bb2d1..e2420d002021 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass + +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.ports import InputStatus, OutputStatus +from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey +from models_library.users import UserID + +from ._notifier import Notifier + + +@dataclass +class PortNotifier: + app: FastAPI + user_id: UserID + node_id: NodeID + + async def _send_output_port_status( + self, port_key: ServicePortKey, status: OutputStatus + ) -> None: + notifier: Notifier = Notifier.get_from_app_state(self.app) + await notifier.notify_output_port_status( + self.user_id, self.node_id, port_key, status + ) + + async def _send_input_port_status( + self, port_key: ServicePortKey, status: InputStatus + ) -> None: + notifier: Notifier = Notifier.get_from_app_state(self.app) + await notifier.notify_input_port_status( + self.user_id, self.node_id, port_key, status + ) + + async def send_output_port_download_sarted(self, port_key: ServicePortKey) -> None: + await self._send_output_port_status(port_key, OutputStatus.DOWNLOAD_STARTED) + + async def send_output_port_download_finished_successfully( + self, port_key: ServicePortKey + ) -> None: + await self._send_output_port_status( + port_key, OutputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY + ) + + async def send_output_port_download_finished_with_error( + self, port_key: ServicePortKey + ) -> None: + await self._send_output_port_status( + port_key, OutputStatus.DOWNLOAD_FINISHED_WITH_ERRROR + ) + + async def send_input_port_upload_started(self, port_key: ServicePortKey) -> None: + await self._send_input_port_status(port_key, InputStatus.UPLOAD_STARTED) + + async def send_input_port_upload_was_aborted( + self, port_key: ServicePortKey + ) -> None: + await self._send_input_port_status(port_key, InputStatus.UPLOAD_WAS_ABORTED) + + async def send_input_port_upload_finished_succesfully( + self, port_key: ServicePortKey + ) -> None: + await self._send_input_port_status( + port_key, InputStatus.UPLOAD_FINISHED_SUCCESSFULLY + ) + + async def send_input_port_upload_finished_with_error( + self, port_key: ServicePortKey + ) -> None: + await self._send_input_port_status( + port_key, InputStatus.UPLOAD_FINISHED_WITH_ERRROR + ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py index eb1483e636ec..4724b1dffe42 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py @@ -4,8 +4,16 @@ import socketio # type: ignore[import-untyped] from fastapi import FastAPI from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_dynamic_sidecar.ports import ( + InputPortSatus, + InputStatus, + OutputPortStatus, + OutputStatus, +) from models_library.api_schemas_dynamic_sidecar.socketio import ( SOCKET_IO_SERVICE_DISK_USAGE_EVENT, + SOCKET_IO_STATE_INPUT_PORTS_EVENT, + SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, ) from models_library.api_schemas_dynamic_sidecar.telemetry import ( DiskUsage, @@ -13,6 +21,7 @@ ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from servicelib.fastapi.app_state import SingletonInAppStateMixin @@ -32,6 +41,38 @@ async def notify_service_disk_usage( room=SocketIORoomStr.from_user_id(user_id), ) + async def notify_output_port_status( + self, + user_id: UserID, + node_id: NodeID, + port_key: ServicePortKey, + output_status: OutputStatus, + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, + data=jsonable_encoder( + OutputPortStatus( + node_id=node_id, port_key=port_key, status=output_status + ) + ), + room=SocketIORoomStr.from_user_id(user_id), + ) + + async def notify_input_port_status( + self, + user_id: UserID, + node_id: NodeID, + port_key: ServicePortKey, + input_status: InputStatus, + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_STATE_INPUT_PORTS_EVENT, + data=jsonable_encoder( + InputPortSatus(node_id=node_id, port_key=port_key, status=input_status) + ), + room=SocketIORoomStr.from_user_id(user_id), + ) + def setup_notifier(app: FastAPI): async def _on_startup() -> None: From 6bd6d15e0e9fc9a2e9eb3600fdada559d470c02a Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 14:04:23 +0200 Subject: [PATCH 05/21] added notifications tests --- .../tests/unit/test_modules_notifier.py | 228 ++++++++++++++++-- 1 file changed, 205 insertions(+), 23 deletions(-) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py index 02102e359a6e..ea8d2c2f72ce 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -4,6 +4,7 @@ from collections.abc import AsyncIterable, Callable from contextlib import AsyncExitStack, _AsyncGeneratorContextManager from pathlib import Path +from typing import Final from unittest.mock import AsyncMock import pytest @@ -11,8 +12,16 @@ from asgi_lifespan import LifespanManager from fastapi import FastAPI from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_dynamic_sidecar.ports import ( + InputPortSatus, + InputStatus, + OutputPortStatus, + OutputStatus, +) from models_library.api_schemas_dynamic_sidecar.socketio import ( SOCKET_IO_SERVICE_DISK_USAGE_EVENT, + SOCKET_IO_STATE_INPUT_PORTS_EVENT, + SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, ) from models_library.api_schemas_dynamic_sidecar.telemetry import ( DiskUsage, @@ -20,6 +29,7 @@ ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import ByteSize, NonNegativeInt, parse_obj_as from pytest_mock import MockerFixture @@ -28,7 +38,10 @@ from settings_library.rabbit import RabbitSettings from simcore_service_dynamic_sidecar.core.application import create_app from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings -from simcore_service_dynamic_sidecar.modules.notifications import publish_disk_usage +from simcore_service_dynamic_sidecar.modules.notifications import ( + PortNotifier, + publish_disk_usage, +) from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( DiskUsageMonitor, ) @@ -41,6 +54,8 @@ "rabbit", ] +_NUMBER_OF_CLIENTS: Final[NonNegativeInt] = 10 + @pytest.fixture def mock_environment( @@ -104,20 +119,6 @@ def room_name(user_id: UserID) -> SocketIORoomStr: return SocketIORoomStr.from_user_id(user_id) -def _get_on_service_disk_usage_event( - socketio_client: socketio.AsyncClient, -) -> AsyncMock: - # emulates front-end receiving message - - async def on_service_status(data): - assert parse_obj_as(ServiceDiskUsage, data) is not None - - on_event_spy = AsyncMock(wraps=on_service_status) - socketio_client.on(SOCKET_IO_SERVICE_DISK_USAGE_EVENT, on_event_spy) - - return on_event_spy - - async def _assert_call_count(mock: AsyncMock, *, call_count: int) -> None: async for attempt in AsyncRetrying( wait=wait_fixed(0.1), stop=stop_after_delay(5), reraise=True @@ -135,6 +136,20 @@ def _get_mocked_disk_usage(byte_size_str: str) -> DiskUsage: ) +def _get_on_service_disk_usage_spy( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(ServiceDiskUsage, data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_SERVICE_DISK_USAGE_EVENT, on_event_spy) + + return on_event_spy + + @pytest.mark.parametrize( "usage", [ @@ -149,7 +164,7 @@ def _get_mocked_disk_usage(byte_size_str: str) -> DiskUsage: ), ], ) -async def test_notifier_publish_message( +async def test_notifier_publish_disk_usage( disk_usage_monitor: DiskUsageMonitor, socketio_server_events: dict[str, AsyncMock], app: FastAPI, @@ -165,15 +180,14 @@ async def test_notifier_publish_message( server_disconnect = socketio_server_events["disconnect"] server_on_check = socketio_server_events["on_check"] - number_of_clients: NonNegativeInt = 10 async with AsyncExitStack() as socketio_frontend_clients: frontend_clients: list[socketio.AsyncClient] = await logged_gather( *[ socketio_frontend_clients.enter_async_context(socketio_client_factory()) - for _ in range(number_of_clients) + for _ in range(_NUMBER_OF_CLIENTS) ] ) - await _assert_call_count(server_connect, call_count=number_of_clients) + await _assert_call_count(server_connect, call_count=_NUMBER_OF_CLIENTS) # client emits and check it was received await logged_gather( @@ -182,11 +196,11 @@ async def test_notifier_publish_message( for frontend_client in frontend_clients ] ) - await _assert_call_count(server_on_check, call_count=number_of_clients) + await _assert_call_count(server_on_check, call_count=_NUMBER_OF_CLIENTS) # attach spy to client on_service_disk_usage_events: list[AsyncMock] = [ - _get_on_service_disk_usage_event(c) for c in frontend_clients + _get_on_service_disk_usage_spy(c) for c in frontend_clients ] # server publishes a message @@ -199,7 +213,175 @@ async def test_notifier_publish_message( jsonable_encoder(ServiceDiskUsage(node_id=node_id, usage=usage)) ) - await _assert_call_count(server_disconnect, call_count=number_of_clients) + await _assert_call_count(server_disconnect, call_count=_NUMBER_OF_CLIENTS) + + +@pytest.fixture +def port_key() -> ServicePortKey: + return ServicePortKey("test_port") + + +def _get_on_input_port_spy( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(ServiceDiskUsage, data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_STATE_INPUT_PORTS_EVENT, on_event_spy) + + return on_event_spy + + +@pytest.mark.parametrize("input_status", InputStatus) +async def test_notifier_send_input_port_status( + socketio_server_events: dict[str, AsyncMock], + app: FastAPI, + user_id: UserID, + node_id: NodeID, + port_key: ServicePortKey, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], + input_status: InputStatus, +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(_NUMBER_OF_CLIENTS) + ] + ) + await _assert_call_count(server_connect, call_count=_NUMBER_OF_CLIENTS) + + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=_NUMBER_OF_CLIENTS) + + # attach spy to client + on_input_port_events: list[AsyncMock] = [ + _get_on_input_port_spy(c) for c in frontend_clients + ] + + port_notifier = PortNotifier(app, user_id, node_id) + + # server publishes a message + match input_status: + case InputStatus.UPLOAD_STARTED: + await port_notifier.send_input_port_upload_started(port_key) + case InputStatus.UPLOAD_WAS_ABORTED: + await port_notifier.send_input_port_upload_was_aborted(port_key) + case InputStatus.UPLOAD_FINISHED_SUCCESSFULLY: + await port_notifier.send_input_port_upload_finished_succesfully( + port_key + ) + case InputStatus.UPLOAD_FINISHED_WITH_ERRROR: + await port_notifier.send_input_port_upload_finished_with_error(port_key) + + # check that all clients received it + for on_input_port_event in on_input_port_events: + await _assert_call_count(on_input_port_event, call_count=1) + on_input_port_event.assert_awaited_once_with( + jsonable_encoder( + InputPortSatus( + node_id=node_id, port_key=port_key, status=input_status + ) + ) + ) + + await _assert_call_count(server_disconnect, call_count=_NUMBER_OF_CLIENTS) + + +def _get_on_output_port_spy( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(ServiceDiskUsage, data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, on_event_spy) + + return on_event_spy + + +@pytest.mark.parametrize("output_status", OutputStatus) +async def test_notifier_send_output_port_status( + socketio_server_events: dict[str, AsyncMock], + app: FastAPI, + user_id: UserID, + node_id: NodeID, + port_key: ServicePortKey, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], + output_status: OutputStatus, +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(_NUMBER_OF_CLIENTS) + ] + ) + await _assert_call_count(server_connect, call_count=_NUMBER_OF_CLIENTS) + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=_NUMBER_OF_CLIENTS) + + # attach spy to client + on_output_port_events: list[AsyncMock] = [ + _get_on_output_port_spy(c) for c in frontend_clients + ] + + port_notifier = PortNotifier(app, user_id, node_id) + + # server publishes a message + match output_status: + case OutputStatus.DOWNLOAD_STARTED: + await port_notifier.send_output_port_download_sarted(port_key) + case OutputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY: + await port_notifier.send_output_port_download_finished_successfully( + port_key + ) + case OutputStatus.DOWNLOAD_FINISHED_WITH_ERRROR: + await port_notifier.send_output_port_download_finished_with_error( + port_key + ) + + # check that all clients received it + for on_output_port_event in on_output_port_events: + await _assert_call_count(on_output_port_event, call_count=1) + on_output_port_event.assert_awaited_once_with( + jsonable_encoder( + OutputPortStatus( + node_id=node_id, port_key=port_key, status=output_status + ) + ) + ) -# TODO: add tests for the two new types of messages + await _assert_call_count(server_disconnect, call_count=_NUMBER_OF_CLIENTS) From 249dda7850acd5229c813b39e3b0a363446a648c Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 16:19:38 +0200 Subject: [PATCH 06/21] refactor to work as expected --- .../api_schemas_dynamic_sidecar/ports.py | 17 ++-- .../simcore_sdk/node_ports_v2/nodeports_v2.py | 42 +++++++++- .../test_node_ports_v2_nodeports2.py | 4 + .../unit/test_node_ports_v2_nodeports_v2.py | 1 + .../api/containers_long_running_tasks.py | 2 + .../modules/long_running_tasks.py | 8 ++ .../modules/nodeports.py | 83 +++++++++++++++++-- .../notifications/_notifications_ports.py | 33 ++++---- .../modules/outputs/_manager.py | 9 ++ .../tests/unit/test_modules_notifier.py | 32 +++---- 10 files changed, 186 insertions(+), 45 deletions(-) diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py index 0f26ffdf80a6..285063fbc475 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py @@ -1,23 +1,24 @@ -from enum import Enum, auto +from enum import StrEnum, auto from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from pydantic import BaseModel -class OutputStatus(str, Enum): - DOWNLOAD_STARTED = auto() - DOWNLOAD_FINISHED_SUCCESSFULLY = auto() - DOWNLOAD_FINISHED_WITH_ERRROR = auto() - - -class InputStatus(str, Enum): +class OutputStatus(StrEnum): UPLOAD_STARTED = auto() UPLOAD_WAS_ABORTED = auto() UPLOAD_FINISHED_SUCCESSFULLY = auto() UPLOAD_FINISHED_WITH_ERRROR = auto() +class InputStatus(StrEnum): + DOWNLOAD_STARTED = auto() + DOWNLOAD_WAS_ABORTED = auto() + DOWNLOAD_FINISHED_SUCCESSFULLY = auto() + DOWNLOAD_FINISHED_WITH_ERRROR = auto() + + class _PortStatusCommon(BaseModel): node_id: NodeID port_key: ServicePortKey diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 8c78e28a0661..d72ea21c49f0 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -1,4 +1,6 @@ import logging +from abc import ABC, abstractmethod +from asyncio import CancelledError from collections.abc import Callable, Coroutine from pathlib import Path from typing import Any @@ -27,6 +29,20 @@ log = logging.getLogger(__name__) +class OutputsCallbacks(ABC): + @abstractmethod + async def aborted(self, key: ServicePortKey) -> None: + pass + + @abstractmethod + async def finished_succesfully(self, key: ServicePortKey) -> None: + pass + + @abstractmethod + async def finished_with_error(self, key: ServicePortKey) -> None: + pass + + class Nodeports(BaseModel): """ Represents a node in a project and all its input/output ports @@ -148,6 +164,7 @@ async def set_multiple( ], *, progress_bar: ProgressBarData, + outputs_callbacks: OutputsCallbacks | None, ) -> None: """ Sets the provided values to the respective input or output ports @@ -156,6 +173,27 @@ async def set_multiple( raises ValidationError """ + + async def _set_output_with_notifications( + port_key: ServicePortKey, + value: ItemConcreteValue | None, + set_kwargs: SetKWargs | None, + sub_progress: ProgressBarData, + ) -> None: + assert outputs_callbacks is not None # nosec + try: + # pylint: disable=protected-access + await self.internal_outputs[port_key]._set( # noqa: SLF001 + value, set_kwargs=set_kwargs, progress_bar=sub_progress + ) + await outputs_callbacks.finished_succesfully(port_key) + except CancelledError: + await outputs_callbacks.aborted(port_key) + raise + except Exception: + await outputs_callbacks.finished_with_error(port_key) + raise + tasks = [] async with progress_bar.sub_progress( steps=len(port_values.items()), description=IDStr("set multiple") @@ -164,8 +202,8 @@ async def set_multiple( # pylint: disable=protected-access try: tasks.append( - self.internal_outputs[port_key]._set( - value, set_kwargs=set_kwargs, progress_bar=sub_progress + _set_output_with_notifications( + port_key, value, set_kwargs, sub_progress ) ) except UnboundPortError: diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index a9016609d130..1be7f5a9c737 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -13,6 +13,7 @@ from collections.abc import Awaitable, Callable, Iterable from pathlib import Path from typing import Any +from unittest.mock import AsyncMock from uuid import uuid4 import np_helpers @@ -777,6 +778,7 @@ async def test_batch_update_inputs_outputs( for k, port in enumerate((await PORTS.outputs).values()) }, progress_bar=progress_bar, + outputs_callbacks=AsyncMock(), ) # pylint: disable=protected-access assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 @@ -786,6 +788,7 @@ async def test_batch_update_inputs_outputs( for k, port in enumerate((await PORTS.inputs).values(), start=1000) }, progress_bar=progress_bar, + outputs_callbacks=None, ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 @@ -807,4 +810,5 @@ async def test_batch_update_inputs_outputs( await PORTS.set_multiple( {ServicePortKey("missing_key_in_both"): (123132, None)}, progress_bar=progress_bar, + outputs_callbacks=AsyncMock(), ) diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py b/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py index 91609476b9cf..81531e00c339 100644 --- a/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py +++ b/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py @@ -138,6 +138,7 @@ async def mock_node_port_creator_cb(*args, **kwargs): + list(original_outputs.values()) }, progress_bar=progress_bar, + outputs_callbacks=None, ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py index ae04a620c8a2..52b0e2e7ad64 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py @@ -209,6 +209,7 @@ async def ports_inputs_pull_task( request: Request, tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], app: Annotated[FastAPI, Depends(get_application)], + settings: Annotated[ApplicationSettings, Depends(get_settings)], mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)], inputs_state: Annotated[InputsState, Depends(get_inputs_state)], port_keys: list[str] | None = None, @@ -223,6 +224,7 @@ async def ports_inputs_pull_task( port_keys=port_keys, mounted_volumes=mounted_volumes, app=app, + settings=settings, inputs_pulling_enabled=inputs_state.inputs_pulling_enabled, ) except TaskAlreadyRunningError as e: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index a8277415b06b..1fcdff35384a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -19,6 +19,9 @@ from servicelib.progress_bar import ProgressBarData from servicelib.utils import logged_gather from simcore_sdk.node_data import data_manager +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.retry import retry_if_result @@ -472,6 +475,7 @@ async def task_ports_inputs_pull( port_keys: list[str] | None, mounted_volumes: MountedVolumes, app: FastAPI, + settings: ApplicationSettings, *, inputs_pulling_enabled: bool, ) -> int: @@ -505,6 +509,9 @@ async def task_ports_inputs_pull( post_sidecar_log_message, app, log_level=logging.INFO ), progress_bar=root_progress, + port_notifier=PortNotifier( + app, settings.DY_SIDECAR_USER_ID, settings.DY_SIDECAR_NODE_ID + ), ) await post_sidecar_log_message( app, "Finished pulling inputs", log_level=logging.INFO @@ -541,6 +548,7 @@ async def task_ports_outputs_pull( post_sidecar_log_message, app, log_level=logging.INFO ), progress_bar=root_progress, + port_notifier=None, ) await post_sidecar_log_message( app, "Finished pulling outputs", log_level=logging.INFO diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 2213dd1d4ac9..5a7716e2a0b4 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -4,6 +4,7 @@ import shutil import sys import time +from asyncio import CancelledError from collections import deque from collections.abc import Coroutine from contextlib import AsyncExitStack @@ -29,11 +30,12 @@ from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB from simcore_sdk.node_ports_v2 import Port from simcore_sdk.node_ports_v2.links import ItemConcreteValue -from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports +from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports, OutputsCallbacks from simcore_sdk.node_ports_v2.port import SetKWargs from simcore_sdk.node_ports_v2.port_utils import is_file_type from ..core.settings import ApplicationSettings, get_settings +from ..modules.notifications import PortNotifier class PortTypeName(str, Enum): @@ -70,6 +72,20 @@ def _get_size_of_value(value: tuple[ItemConcreteValue | None, SetKWargs | None]) ) +class OutputCallbacksWrapper(OutputsCallbacks): + def __init__(self, port_notifier: PortNotifier) -> None: + self.port_notifier = port_notifier + + async def aborted(self, key: ServicePortKey) -> None: + await self.port_notifier.send_output_port_upload_was_aborted(key) + + async def finished_succesfully(self, key: ServicePortKey) -> None: + await self.port_notifier.send_output_port_upload_finished_successfully(key) + + async def finished_with_error(self, key: ServicePortKey) -> None: + await self.port_notifier.send_output_port_upload_finished_with_error(key) + + # NOTE: outputs_manager guarantees that no parallel calls # to this function occur async def upload_outputs( @@ -77,6 +93,7 @@ async def upload_outputs( port_keys: list[str], io_log_redirect_cb: LogRedirectCB | None, progress_bar: ProgressBarData, + port_notifier: PortNotifier, ) -> None: # pylint: disable=too-many-branches logger.debug("uploading data to simcore...") @@ -97,12 +114,16 @@ async def upload_outputs( ServicePortKey, tuple[ItemConcreteValue | None, SetKWargs | None] ] = {} archiving_tasks: deque[Coroutine[None, None, None]] = deque() - ports_to_set = [ + ports_to_set: list[Port] = [ port_value for port_value in (await PORTS.outputs).values() if (not port_keys) or (port_value.key in port_keys) ] + await logged_gather( + *(port_notifier.send_output_port_upload_sarted(p.key) for p in ports_to_set) + ) + async with AsyncExitStack() as stack: sub_progress = await stack.enter_async_context( progress_bar.sub_progress( @@ -176,9 +197,32 @@ async def upload_outputs( logger.debug("No file %s to fetch port values from", data_file) if archiving_tasks: - await logged_gather(*archiving_tasks) + # NOTE: if one archiving task fails/cancelled all the ports are affected + # setting all other ports as finished with error/cancelled + try: + await logged_gather(*archiving_tasks) + except CancelledError: + await logged_gather( + *( + port_notifier.send_output_port_upload_was_aborted(p.key) + for p in ports_to_set + ) + ) + raise + except Exception: + await logged_gather( + *( + port_notifier.send_output_port_upload_finished_with_error(p.key) + for p in ports_to_set + ) + ) + raise - await PORTS.set_multiple(ports_values, progress_bar=sub_progress) + await PORTS.set_multiple( + ports_values, + progress_bar=sub_progress, + outputs_callbacks=OutputCallbacksWrapper(port_notifier), + ) elapsed_time = time.perf_counter() - start_time total_bytes = sum(_get_size_of_value(x) for x in ports_values.values()) @@ -264,6 +308,7 @@ async def download_target_ports( port_keys: list[str], io_log_redirect_cb: LogRedirectCB, progress_bar: ProgressBarData, + port_notifier: PortNotifier | None, ) -> ByteSize: logger.debug("retrieving data from simcore...") start_time = time.perf_counter() @@ -279,18 +324,42 @@ async def download_target_ports( ) # let's gather all the data - ports_to_get = [ + ports_to_get: list[Port] = [ port_value for port_value in (await getattr(PORTS, port_type_name.value)).values() if (not port_keys) or (port_value.key in port_keys) ] + + async def _get_date_from_port_notified( + port: Port, progress_bar: ProgressBarData + ) -> tuple[Port, ItemConcreteValue | None, ByteSize]: + assert port_notifier is not None + await port_notifier.send_input_port_download_started(port.key) + try: + result = await _get_data_from_port( + port, target_dir=target_dir, progress_bar=progress_bar + ) + await port_notifier.send_input_port_download_finished_succesfully(port.key) + return result + + except CancelledError: + await port_notifier.send_input_port_download_was_aborted(port.key) + raise + except Exception: + await port_notifier.send_input_port_download_finished_with_error(port.key) + raise + async with progress_bar.sub_progress( steps=len(ports_to_get), description=IDStr("downloading") ) as sub_progress: results = await logged_gather( *[ - _get_data_from_port( - port, target_dir=target_dir, progress_bar=sub_progress + ( + _get_data_from_port( + port, target_dir=target_dir, progress_bar=sub_progress + ) + if port_type_name == PortTypeName.OUTPUTS + else _get_date_from_port_notified(port, progress_bar=sub_progress) ) for port in ports_to_get ], diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py index e2420d002021..65f2ca9de4a2 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py @@ -31,41 +31,46 @@ async def _send_input_port_status( self.user_id, self.node_id, port_key, status ) - async def send_output_port_download_sarted(self, port_key: ServicePortKey) -> None: - await self._send_output_port_status(port_key, OutputStatus.DOWNLOAD_STARTED) + async def send_output_port_upload_sarted(self, port_key: ServicePortKey) -> None: + await self._send_output_port_status(port_key, OutputStatus.UPLOAD_STARTED) - async def send_output_port_download_finished_successfully( + async def send_output_port_upload_was_aborted( + self, port_key: ServicePortKey + ) -> None: + await self._send_output_port_status(port_key, OutputStatus.UPLOAD_WAS_ABORTED) + + async def send_output_port_upload_finished_successfully( self, port_key: ServicePortKey ) -> None: await self._send_output_port_status( - port_key, OutputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY + port_key, OutputStatus.UPLOAD_FINISHED_SUCCESSFULLY ) - async def send_output_port_download_finished_with_error( + async def send_output_port_upload_finished_with_error( self, port_key: ServicePortKey ) -> None: await self._send_output_port_status( - port_key, OutputStatus.DOWNLOAD_FINISHED_WITH_ERRROR + port_key, OutputStatus.UPLOAD_FINISHED_WITH_ERRROR ) - async def send_input_port_upload_started(self, port_key: ServicePortKey) -> None: - await self._send_input_port_status(port_key, InputStatus.UPLOAD_STARTED) + async def send_input_port_download_started(self, port_key: ServicePortKey) -> None: + await self._send_input_port_status(port_key, InputStatus.DOWNLOAD_STARTED) - async def send_input_port_upload_was_aborted( + async def send_input_port_download_was_aborted( self, port_key: ServicePortKey ) -> None: - await self._send_input_port_status(port_key, InputStatus.UPLOAD_WAS_ABORTED) + await self._send_input_port_status(port_key, InputStatus.DOWNLOAD_WAS_ABORTED) - async def send_input_port_upload_finished_succesfully( + async def send_input_port_download_finished_succesfully( self, port_key: ServicePortKey ) -> None: await self._send_input_port_status( - port_key, InputStatus.UPLOAD_FINISHED_SUCCESSFULLY + port_key, InputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY ) - async def send_input_port_upload_finished_with_error( + async def send_input_port_download_finished_with_error( self, port_key: ServicePortKey ) -> None: await self._send_input_port_status( - port_key, InputStatus.UPLOAD_FINISHED_WITH_ERRROR + port_key, InputStatus.DOWNLOAD_FINISHED_WITH_ERRROR ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index 307f8b3d9337..4c89f50bbfcd 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -15,6 +15,9 @@ from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.logging_utils import log_catch, log_context from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from ...core.rabbitmq import post_log_message, post_progress_message from ...core.settings import ApplicationSettings @@ -100,6 +103,7 @@ class OutputsManager: # pylint: disable=too-many-instance-attributes def __init__( self, outputs_context: OutputsContext, + port_notifier: PortNotifier, io_log_redirect_cb: LogRedirectCB | None, progress_cb: progress_bar.AsyncReportCB | None, *, @@ -108,6 +112,7 @@ def __init__( task_monitor_interval_s: PositiveFloat = 1.0, ): self.outputs_context = outputs_context + self.port_notifier = port_notifier self.io_log_redirect_cb = io_log_redirect_cb self.upload_upon_api_request = upload_upon_api_request self.task_cancellation_timeout_s = task_cancellation_timeout_s @@ -138,6 +143,7 @@ async def _upload_ports() -> None: port_keys=port_keys, io_log_redirect_cb=self.io_log_redirect_cb, progress_bar=root_progress, + port_notifier=self.port_notifier, ) task_name = f"outputs_manager_port_keys-{'_'.join(port_keys)}" @@ -271,6 +277,9 @@ async def on_startup() -> None: progress_cb=partial( post_progress_message, app, ProgressType.SERVICE_OUTPUTS_PUSHING ), + port_notifier=PortNotifier( + app, settings.DY_SIDECAR_USER_ID, settings.DY_SIDECAR_NODE_ID + ), ) await outputs_manager.start() diff --git a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py index ea8d2c2f72ce..9a69403d424b 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -279,16 +279,18 @@ async def test_notifier_send_input_port_status( # server publishes a message match input_status: - case InputStatus.UPLOAD_STARTED: - await port_notifier.send_input_port_upload_started(port_key) - case InputStatus.UPLOAD_WAS_ABORTED: - await port_notifier.send_input_port_upload_was_aborted(port_key) - case InputStatus.UPLOAD_FINISHED_SUCCESSFULLY: - await port_notifier.send_input_port_upload_finished_succesfully( + case InputStatus.DOWNLOAD_STARTED: + await port_notifier.send_input_port_download_started(port_key) + case InputStatus.DOWNLOAD_WAS_ABORTED: + await port_notifier.send_input_port_download_was_aborted(port_key) + case InputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY: + await port_notifier.send_input_port_download_finished_succesfully( + port_key + ) + case InputStatus.DOWNLOAD_FINISHED_WITH_ERRROR: + await port_notifier.send_input_port_download_finished_with_error( port_key ) - case InputStatus.UPLOAD_FINISHED_WITH_ERRROR: - await port_notifier.send_input_port_upload_finished_with_error(port_key) # check that all clients received it for on_input_port_event in on_input_port_events: @@ -362,14 +364,16 @@ async def test_notifier_send_output_port_status( # server publishes a message match output_status: - case OutputStatus.DOWNLOAD_STARTED: - await port_notifier.send_output_port_download_sarted(port_key) - case OutputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY: - await port_notifier.send_output_port_download_finished_successfully( + case OutputStatus.UPLOAD_STARTED: + await port_notifier.send_output_port_upload_sarted(port_key) + case OutputStatus.UPLOAD_WAS_ABORTED: + await port_notifier.send_output_port_upload_was_aborted(port_key) + case OutputStatus.UPLOAD_FINISHED_SUCCESSFULLY: + await port_notifier.send_output_port_upload_finished_successfully( port_key ) - case OutputStatus.DOWNLOAD_FINISHED_WITH_ERRROR: - await port_notifier.send_output_port_download_finished_with_error( + case OutputStatus.UPLOAD_FINISHED_WITH_ERRROR: + await port_notifier.send_output_port_upload_finished_with_error( port_key ) From 0c2153c1b5d1866f6a6a4fc3a8fd4104eff81c98 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 16:28:50 +0200 Subject: [PATCH 07/21] added missing project_id to notification --- .../api_schemas_dynamic_sidecar/ports.py | 2 ++ .../modules/long_running_tasks.py | 5 ++++- .../notifications/_notifications_ports.py | 6 ++++-- .../modules/notifications/_notifier.py | 15 +++++++++++++-- .../modules/outputs/_manager.py | 5 ++++- .../tests/unit/test_modules_notifier.py | 17 +++++++++++++---- 6 files changed, 40 insertions(+), 10 deletions(-) diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py index 285063fbc475..a0bfb42a3128 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py @@ -1,5 +1,6 @@ from enum import StrEnum, auto +from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from pydantic import BaseModel @@ -20,6 +21,7 @@ class InputStatus(StrEnum): class _PortStatusCommon(BaseModel): + project_id: ProjectID node_id: NodeID port_key: ServicePortKey diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 1fcdff35384a..c355bafbc492 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -510,7 +510,10 @@ async def task_ports_inputs_pull( ), progress_bar=root_progress, port_notifier=PortNotifier( - app, settings.DY_SIDECAR_USER_ID, settings.DY_SIDECAR_NODE_ID + app, + settings.DY_SIDECAR_USER_ID, + settings.DY_SIDECAR_PROJECT_ID, + settings.DY_SIDECAR_NODE_ID, ), ) await post_sidecar_log_message( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py index 65f2ca9de4a2..ae48f19a973f 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py @@ -2,6 +2,7 @@ from fastapi import FastAPI from models_library.api_schemas_dynamic_sidecar.ports import InputStatus, OutputStatus +from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from models_library.users import UserID @@ -13,6 +14,7 @@ class PortNotifier: app: FastAPI user_id: UserID + project_id: ProjectID node_id: NodeID async def _send_output_port_status( @@ -20,7 +22,7 @@ async def _send_output_port_status( ) -> None: notifier: Notifier = Notifier.get_from_app_state(self.app) await notifier.notify_output_port_status( - self.user_id, self.node_id, port_key, status + self.user_id, self.project_id, self.node_id, port_key, status ) async def _send_input_port_status( @@ -28,7 +30,7 @@ async def _send_input_port_status( ) -> None: notifier: Notifier = Notifier.get_from_app_state(self.app) await notifier.notify_input_port_status( - self.user_id, self.node_id, port_key, status + self.user_id, self.project_id, self.node_id, port_key, status ) async def send_output_port_upload_sarted(self, port_key: ServicePortKey) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py index 4724b1dffe42..0d61e1b388ba 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py @@ -20,6 +20,7 @@ ServiceDiskUsage, ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from models_library.users import UserID @@ -44,6 +45,7 @@ async def notify_service_disk_usage( async def notify_output_port_status( self, user_id: UserID, + project_id: ProjectID, node_id: NodeID, port_key: ServicePortKey, output_status: OutputStatus, @@ -52,7 +54,10 @@ async def notify_output_port_status( SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, data=jsonable_encoder( OutputPortStatus( - node_id=node_id, port_key=port_key, status=output_status + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=output_status, ) ), room=SocketIORoomStr.from_user_id(user_id), @@ -61,6 +66,7 @@ async def notify_output_port_status( async def notify_input_port_status( self, user_id: UserID, + project_id: ProjectID, node_id: NodeID, port_key: ServicePortKey, input_status: InputStatus, @@ -68,7 +74,12 @@ async def notify_input_port_status( await self._sio_manager.emit( SOCKET_IO_STATE_INPUT_PORTS_EVENT, data=jsonable_encoder( - InputPortSatus(node_id=node_id, port_key=port_key, status=input_status) + InputPortSatus( + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=input_status, + ) ), room=SocketIORoomStr.from_user_id(user_id), ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index 4c89f50bbfcd..0f0bc8d7febf 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -278,7 +278,10 @@ async def on_startup() -> None: post_progress_message, app, ProgressType.SERVICE_OUTPUTS_PUSHING ), port_notifier=PortNotifier( - app, settings.DY_SIDECAR_USER_ID, settings.DY_SIDECAR_NODE_ID + app, + settings.DY_SIDECAR_USER_ID, + settings.DY_SIDECAR_PROJECT_ID, + settings.DY_SIDECAR_NODE_ID, ), ) await outputs_manager.start() diff --git a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py index 9a69403d424b..654d2bb16191 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -28,6 +28,7 @@ ServiceDiskUsage, ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey from models_library.users import UserID @@ -240,6 +241,7 @@ async def test_notifier_send_input_port_status( socketio_server_events: dict[str, AsyncMock], app: FastAPI, user_id: UserID, + project_id: ProjectID, node_id: NodeID, port_key: ServicePortKey, socketio_client_factory: Callable[ @@ -275,7 +277,7 @@ async def test_notifier_send_input_port_status( _get_on_input_port_spy(c) for c in frontend_clients ] - port_notifier = PortNotifier(app, user_id, node_id) + port_notifier = PortNotifier(app, user_id, project_id, node_id) # server publishes a message match input_status: @@ -298,7 +300,10 @@ async def test_notifier_send_input_port_status( on_input_port_event.assert_awaited_once_with( jsonable_encoder( InputPortSatus( - node_id=node_id, port_key=port_key, status=input_status + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=input_status, ) ) ) @@ -325,6 +330,7 @@ async def test_notifier_send_output_port_status( socketio_server_events: dict[str, AsyncMock], app: FastAPI, user_id: UserID, + project_id: ProjectID, node_id: NodeID, port_key: ServicePortKey, socketio_client_factory: Callable[ @@ -360,7 +366,7 @@ async def test_notifier_send_output_port_status( _get_on_output_port_spy(c) for c in frontend_clients ] - port_notifier = PortNotifier(app, user_id, node_id) + port_notifier = PortNotifier(app, user_id, project_id, node_id) # server publishes a message match output_status: @@ -383,7 +389,10 @@ async def test_notifier_send_output_port_status( on_output_port_event.assert_awaited_once_with( jsonable_encoder( OutputPortStatus( - node_id=node_id, port_key=port_key, status=output_status + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=output_status, ) ) ) From 4ed8e2a72ce42ec937cfe5fae4c5ec473698ddd3 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 16:55:08 +0200 Subject: [PATCH 08/21] adding missing port notifier to constructor --- services/dynamic-sidecar/tests/unit/conftest.py | 15 +++++++++++++++ .../unit/test_modules_outputs_event_filter.py | 10 ++++++++-- .../unit/test_modules_outputs_event_handler.py | 10 ++++++++-- .../tests/unit/test_modules_outputs_manager.py | 6 +++++- .../tests/unit/test_modules_outputs_watcher.py | 6 +++++- 5 files changed, 41 insertions(+), 6 deletions(-) diff --git a/services/dynamic-sidecar/tests/unit/conftest.py b/services/dynamic-sidecar/tests/unit/conftest.py index b6e590f71ebb..ee2c106bb695 100644 --- a/services/dynamic-sidecar/tests/unit/conftest.py +++ b/services/dynamic-sidecar/tests/unit/conftest.py @@ -17,6 +17,10 @@ docker_compose_down, ) from simcore_service_dynamic_sidecar.core.docker_utils import docker_client +from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from tenacity import retry from tenacity.after import after_log from tenacity.stop import stop_after_delay @@ -142,3 +146,14 @@ def mock_rabbitmq_envs( }, ) return mock_environment + + +@pytest.fixture +def port_notifier(app: FastAPI) -> PortNotifier: + settings: ApplicationSettings = app.state.settings + return PortNotifier( + app, + settings.DY_SIDECAR_USER_ID, + settings.DY_SIDECAR_PROJECT_ID, + settings.DY_SIDECAR_NODE_ID, + ) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py index 024d966e424b..38b217bab8f5 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py @@ -9,6 +9,9 @@ import pytest from pydantic import ByteSize, NonNegativeFloat, NonNegativeInt, parse_obj_as from pytest_mock.plugin import MockerFixture +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext from simcore_service_dynamic_sidecar.modules.outputs._event_filter import ( BaseDelayPolicy, @@ -56,10 +59,13 @@ async def outputs_context(outputs_path: Path, port_keys: list[str]) -> OutputsCo @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterator[OutputsManager]: outputs_manager = OutputsManager( - outputs_context=outputs_context, io_log_redirect_cb=None, progress_cb=None + outputs_context=outputs_context, + port_notifier=port_notifier, + io_log_redirect_cb=None, + progress_cb=None, ) await outputs_manager.start() yield outputs_manager diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py index 5f02a500a4da..35ccc7d72df7 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py @@ -10,6 +10,9 @@ import pytest from aioprocessing.queues import AioQueue from pydantic import PositiveFloat +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext from simcore_service_dynamic_sidecar.modules.outputs._event_handler import ( EventHandlerObserver, @@ -39,10 +42,13 @@ async def outputs_context( @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterable[OutputsManager]: outputs_manager = OutputsManager( - outputs_context, io_log_redirect_cb=None, progress_cb=None + outputs_context, + port_notifier=port_notifier, + io_log_redirect_cb=None, + progress_cb=None, ) await outputs_manager.start() diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py index 40a3db6d3f94..3bf17d09f925 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py @@ -22,6 +22,9 @@ from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings from simcore_service_dynamic_sidecar.modules.mounted_fs import MountedVolumes +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs._context import ( OutputsContext, setup_outputs_context, @@ -165,10 +168,11 @@ async def outputs_context( @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterator[OutputsManager]: outputs_manager = OutputsManager( outputs_context=outputs_context, + port_notifier=port_notifier, io_log_redirect_cb=None, task_monitor_interval_s=0.01, progress_cb=None, diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py index f209e4877a75..7f9b81587c25 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py @@ -26,6 +26,9 @@ ) from pytest_mock import MockerFixture from simcore_service_dynamic_sidecar.modules.mounted_fs import MountedVolumes +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs import ( _watcher as outputs_watcher_core, ) @@ -90,10 +93,11 @@ async def outputs_context( @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterable[OutputsManager]: outputs_manager = OutputsManager( outputs_context=outputs_context, + port_notifier=port_notifier, io_log_redirect_cb=None, task_monitor_interval_s=TICK_INTERVAL, progress_cb=None, From d9950fae189497872a042c061f10b8f411044032 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 19 Sep 2024 17:08:19 +0200 Subject: [PATCH 09/21] refactor broken tests --- .../simcore_sdk/node_ports_v2/nodeports_v2.py | 27 ++++++++----------- .../unit/test_node_ports_v2_nodeports_v2.py | 3 ++- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index d72ea21c49f0..4697150d5679 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -174,7 +174,7 @@ async def set_multiple( raises ValidationError """ - async def _set_output_with_notifications( + async def _set_with_notifications( port_key: ServicePortKey, value: ItemConcreteValue | None, set_kwargs: SetKWargs | None, @@ -187,6 +187,13 @@ async def _set_output_with_notifications( value, set_kwargs=set_kwargs, progress_bar=sub_progress ) await outputs_callbacks.finished_succesfully(port_key) + except UnboundPortError: + # not available try inputs + # if this fails it will raise another exception + # pylint: disable=protected-access + await self.internal_inputs[port_key]._set( # noqa: SLF001 + value, set_kwargs=set_kwargs, progress_bar=sub_progress + ) except CancelledError: await outputs_callbacks.aborted(port_key) raise @@ -199,21 +206,9 @@ async def _set_output_with_notifications( steps=len(port_values.items()), description=IDStr("set multiple") ) as sub_progress: for port_key, (value, set_kwargs) in port_values.items(): - # pylint: disable=protected-access - try: - tasks.append( - _set_output_with_notifications( - port_key, value, set_kwargs, sub_progress - ) - ) - except UnboundPortError: - # not available try inputs - # if this fails it will raise another exception - tasks.append( - self.internal_inputs[port_key]._set( - value, set_kwargs=set_kwargs, progress_bar=sub_progress - ) - ) + tasks.append( + _set_with_notifications(port_key, value, set_kwargs, sub_progress) + ) results = await logged_gather(*tasks) await self.save_to_db_cb(self) diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py b/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py index 81531e00c339..f8d09836213a 100644 --- a/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py +++ b/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, Callable +from unittest.mock import AsyncMock import pytest from faker import Faker @@ -138,7 +139,7 @@ async def mock_node_port_creator_cb(*args, **kwargs): + list(original_outputs.values()) }, progress_bar=progress_bar, - outputs_callbacks=None, + outputs_callbacks=AsyncMock(), ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 From 9973d6889d0f06531b8e7eb3ae47b4875aea0d94 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 20 Sep 2024 09:28:10 +0200 Subject: [PATCH 10/21] fixed tests --- .../tests/integration/test_node_ports_v2_nodeports2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index 1be7f5a9c737..e5b89687c17e 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -788,7 +788,7 @@ async def test_batch_update_inputs_outputs( for k, port in enumerate((await PORTS.inputs).values(), start=1000) }, progress_bar=progress_bar, - outputs_callbacks=None, + outputs_callbacks=AsyncMock(), ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 From 9f2854fc8e5d0e7ec86469fa187922e88a9323d0 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 20 Sep 2024 15:10:00 +0200 Subject: [PATCH 11/21] using str auto enum --- .../models_library/api_schemas_dynamic_sidecar/ports.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py index a0bfb42a3128..5863b53b2bc6 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py @@ -1,19 +1,20 @@ -from enum import StrEnum, auto +from enum import auto from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.services_types import ServicePortKey +from models_library.utils.enums import StrAutoEnum from pydantic import BaseModel -class OutputStatus(StrEnum): +class OutputStatus(StrAutoEnum): UPLOAD_STARTED = auto() UPLOAD_WAS_ABORTED = auto() UPLOAD_FINISHED_SUCCESSFULLY = auto() UPLOAD_FINISHED_WITH_ERRROR = auto() -class InputStatus(StrEnum): +class InputStatus(StrAutoEnum): DOWNLOAD_STARTED = auto() DOWNLOAD_WAS_ABORTED = auto() DOWNLOAD_FINISHED_SUCCESSFULLY = auto() From d0a7698c206d027c4d05157972073937e9eb9107 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 10:36:10 +0200 Subject: [PATCH 12/21] fixed relative imports --- .../modules/long_running_tasks.py | 4 +--- .../modules/outputs/_manager.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index c355bafbc492..0134d481f78e 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -19,9 +19,6 @@ from servicelib.progress_bar import ProgressBarData from servicelib.utils import logged_gather from simcore_sdk.node_data import data_manager -from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( - PortNotifier, -) from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.retry import retry_if_result @@ -55,6 +52,7 @@ from ..models.shared_store import SharedStore from ..modules import nodeports, user_services_preferences from ..modules.mounted_fs import MountedVolumes +from ..modules.notifications._notifications_ports import PortNotifier from ..modules.outputs import OutputsManager, event_propagation_disabled from .long_running_tasksutils import run_before_shutdown_actions from .resource_tracking import send_service_started, send_service_stopped diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index 0f0bc8d7febf..d4a8ac8d07ad 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -15,12 +15,10 @@ from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.logging_utils import log_catch, log_context from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB -from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( - PortNotifier, -) from ...core.rabbitmq import post_log_message, post_progress_message from ...core.settings import ApplicationSettings +from ...modules.notifications._notifications_ports import PortNotifier from ..nodeports import upload_outputs from ._context import OutputsContext From 525bce11dd4f5752d4acbc0afeb5ea48578f65e8 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 11:23:34 +0200 Subject: [PATCH 13/21] refactor using function --- .../modules/nodeports.py | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 5a7716e2a0b4..338bd0b3ee0e 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -168,13 +168,34 @@ async def upload_outputs( # when having multiple directories it is important to # run the compression in parallel to guarantee better performance + async def _archive_dir_notified( + dir_to_compress: Path, destination: Path, port_key: ServicePortKey + ) -> None: + # Errors and cancellation can also be triggered from archving as well + try: + await archive_dir( + dir_to_compress=dir_to_compress, + destination=destination, + compress=False, + store_relative_path=True, + progress_bar=sub_progress, + ) + except CancelledError: + await port_notifier.send_output_port_upload_was_aborted( + port_key + ) + raise + except Exception: + await port_notifier.send_output_port_upload_finished_with_error( + port_key + ) + raise + archiving_tasks.append( - archive_dir( + _archive_dir_notified( dir_to_compress=src_folder, destination=tmp_file, - compress=False, - store_relative_path=True, - progress_bar=sub_progress, + port_key=port.key, ) ) ports_values[port.key] = ( @@ -197,26 +218,7 @@ async def upload_outputs( logger.debug("No file %s to fetch port values from", data_file) if archiving_tasks: - # NOTE: if one archiving task fails/cancelled all the ports are affected - # setting all other ports as finished with error/cancelled - try: - await logged_gather(*archiving_tasks) - except CancelledError: - await logged_gather( - *( - port_notifier.send_output_port_upload_was_aborted(p.key) - for p in ports_to_set - ) - ) - raise - except Exception: - await logged_gather( - *( - port_notifier.send_output_port_upload_finished_with_error(p.key) - for p in ports_to_set - ) - ) - raise + await logged_gather(*archiving_tasks) await PORTS.set_multiple( ports_values, From bf7408a77fb719dfcdfc9461de809a9cfe110089 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 11:28:52 +0200 Subject: [PATCH 14/21] using limited_gather --- .../modules/nodeports.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 338bd0b3ee0e..a140920a04e3 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -25,7 +25,7 @@ from servicelib.file_utils import remove_directory from servicelib.logging_utils import log_context from servicelib.progress_bar import ProgressBarData -from servicelib.utils import logged_gather +from servicelib.utils import limited_gather from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB from simcore_sdk.node_ports_v2 import Port @@ -120,8 +120,9 @@ async def upload_outputs( if (not port_keys) or (port_value.key in port_keys) ] - await logged_gather( - *(port_notifier.send_output_port_upload_sarted(p.key) for p in ports_to_set) + await limited_gather( + *(port_notifier.send_output_port_upload_sarted(p.key) for p in ports_to_set), + limit=4, ) async with AsyncExitStack() as stack: @@ -218,7 +219,7 @@ async def _archive_dir_notified( logger.debug("No file %s to fetch port values from", data_file) if archiving_tasks: - await logged_gather(*archiving_tasks) + await limited_gather(*archiving_tasks, limit=4) await PORTS.set_multiple( ports_values, @@ -354,7 +355,7 @@ async def _get_date_from_port_notified( async with progress_bar.sub_progress( steps=len(ports_to_get), description=IDStr("downloading") ) as sub_progress: - results = await logged_gather( + results = await limited_gather( *[ ( _get_data_from_port( @@ -365,7 +366,7 @@ async def _get_date_from_port_notified( ) for port in ports_to_get ], - max_concurrency=2, + limit=2, ) # parse results data = { From 0085f4b7691fe4ed483a0e2f4837e633ed1a71cf Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 11:42:51 +0200 Subject: [PATCH 15/21] outputs_callback is now totally optional --- .../src/simcore_sdk/node_ports_v2/nodeports_v2.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 4697150d5679..9da016b4cea9 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -180,13 +180,13 @@ async def _set_with_notifications( set_kwargs: SetKWargs | None, sub_progress: ProgressBarData, ) -> None: - assert outputs_callbacks is not None # nosec try: # pylint: disable=protected-access await self.internal_outputs[port_key]._set( # noqa: SLF001 value, set_kwargs=set_kwargs, progress_bar=sub_progress ) - await outputs_callbacks.finished_succesfully(port_key) + if outputs_callbacks: + await outputs_callbacks.finished_succesfully(port_key) except UnboundPortError: # not available try inputs # if this fails it will raise another exception @@ -195,10 +195,12 @@ async def _set_with_notifications( value, set_kwargs=set_kwargs, progress_bar=sub_progress ) except CancelledError: - await outputs_callbacks.aborted(port_key) + if outputs_callbacks: + await outputs_callbacks.aborted(port_key) raise except Exception: - await outputs_callbacks.finished_with_error(port_key) + if outputs_callbacks: + await outputs_callbacks.finished_with_error(port_key) raise tasks = [] From 05ed52e8acc743ea7f9a32e49c192a8ed7158d5b Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 14:26:22 +0200 Subject: [PATCH 16/21] added tests --- .../test_node_ports_v2_nodeports2.py | 58 ++++++++++++++++--- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index e5b89687c17e..aa75fe67e773 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -29,13 +29,14 @@ SimcoreS3FileID, ) from models_library.services_types import ServicePortKey +from pytest_mock import MockerFixture from servicelib.progress_bar import ProgressBarData from settings_library.r_clone import RCloneSettings from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.exceptions import UnboundPortError from simcore_sdk.node_ports_v2 import exceptions from simcore_sdk.node_ports_v2.links import ItemConcreteValue, PortLink -from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports +from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports, OutputsCallbacks from simcore_sdk.node_ports_v2.port import Port from utils_port_v2 import CONSTANT_UUID @@ -750,6 +751,34 @@ async def _upload_create_task(item_key: str) -> None: ) +class _Callbacks(OutputsCallbacks): + async def aborted(self, key: ServicePortKey) -> None: + pass + + async def finished_succesfully(self, key: ServicePortKey) -> None: + pass + + async def finished_with_error(self, key: ServicePortKey) -> None: + pass + + +@pytest.fixture +async def output_callbacks() -> _Callbacks: + return _Callbacks() + + +@pytest.fixture +async def spy_outputs_callbaks( + mocker: MockerFixture, output_callbacks: _Callbacks +) -> dict[str, AsyncMock]: + + return { + "aborted": mocker.spy(output_callbacks, "aborted"), + "finished_succesfully": mocker.spy(output_callbacks, "finished_succesfully"), + "finished_with_error": mocker.spy(output_callbacks, "finished_with_error"), + } + + async def test_batch_update_inputs_outputs( user_id: int, project_id: str, @@ -758,6 +787,8 @@ async def test_batch_update_inputs_outputs( port_count: int, option_r_clone_settings: RCloneSettings | None, faker: Faker, + output_callbacks: _Callbacks, + spy_outputs_callbaks: dict[str, AsyncMock], ) -> None: outputs = [(f"value_out_{i}", "integer", None) for i in range(port_count)] inputs = [(f"value_in_{i}", "integer", None) for i in range(port_count)] @@ -772,13 +803,14 @@ async def test_batch_update_inputs_outputs( await check_config_valid(PORTS, config_dict) async with ProgressBarData(num_steps=2, description=faker.pystr()) as progress_bar: + port_values = (await PORTS.outputs).values() await PORTS.set_multiple( - { - ServicePortKey(port.key): (k, None) - for k, port in enumerate((await PORTS.outputs).values()) - }, + {ServicePortKey(port.key): (k, None) for k, port in enumerate(port_values)}, progress_bar=progress_bar, - outputs_callbacks=AsyncMock(), + outputs_callbacks=output_callbacks, + ) + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == len( + port_values ) # pylint: disable=protected-access assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 @@ -788,7 +820,11 @@ async def test_batch_update_inputs_outputs( for k, port in enumerate((await PORTS.inputs).values(), start=1000) }, progress_bar=progress_bar, - outputs_callbacks=AsyncMock(), + outputs_callbacks=output_callbacks, + ) + # inputs do not trigger callbacks + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == len( + port_values ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 @@ -810,5 +846,11 @@ async def test_batch_update_inputs_outputs( await PORTS.set_multiple( {ServicePortKey("missing_key_in_both"): (123132, None)}, progress_bar=progress_bar, - outputs_callbacks=AsyncMock(), + outputs_callbacks=output_callbacks, ) + + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == len( + port_values + ) + assert len(spy_outputs_callbaks["aborted"].call_args_list) == 0 + assert len(spy_outputs_callbaks["finished_with_error"].call_args_list) == 0 From 5d7d1a33a9805ae9342b5d9576e5c7929323ca42 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 14:34:04 +0200 Subject: [PATCH 17/21] refactor tests --- .../test_node_ports_v2_nodeports2.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index aa75fe67e773..2da7011e9b0b 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -771,7 +771,6 @@ async def output_callbacks() -> _Callbacks: async def spy_outputs_callbaks( mocker: MockerFixture, output_callbacks: _Callbacks ) -> dict[str, AsyncMock]: - return { "aborted": mocker.spy(output_callbacks, "aborted"), "finished_succesfully": mocker.spy(output_callbacks, "finished_succesfully"), @@ -779,6 +778,7 @@ async def spy_outputs_callbaks( } +@pytest.mark.parametrize("use_output_callbacks", [True, False]) async def test_batch_update_inputs_outputs( user_id: int, project_id: str, @@ -789,7 +789,10 @@ async def test_batch_update_inputs_outputs( faker: Faker, output_callbacks: _Callbacks, spy_outputs_callbaks: dict[str, AsyncMock], + use_output_callbacks: bool, ) -> None: + callbacks = output_callbacks if use_output_callbacks else None + outputs = [(f"value_out_{i}", "integer", None) for i in range(port_count)] inputs = [(f"value_in_{i}", "integer", None) for i in range(port_count)] config_dict, _, _ = create_special_configuration(inputs=inputs, outputs=outputs) @@ -807,10 +810,10 @@ async def test_batch_update_inputs_outputs( await PORTS.set_multiple( {ServicePortKey(port.key): (k, None) for k, port in enumerate(port_values)}, progress_bar=progress_bar, - outputs_callbacks=output_callbacks, + outputs_callbacks=callbacks, ) - assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == len( - port_values + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == ( + len(port_values) if use_output_callbacks else 0 ) # pylint: disable=protected-access assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 @@ -820,11 +823,11 @@ async def test_batch_update_inputs_outputs( for k, port in enumerate((await PORTS.inputs).values(), start=1000) }, progress_bar=progress_bar, - outputs_callbacks=output_callbacks, + outputs_callbacks=callbacks, ) # inputs do not trigger callbacks - assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == len( - port_values + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == ( + len(port_values) if use_output_callbacks else 0 ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 @@ -846,11 +849,11 @@ async def test_batch_update_inputs_outputs( await PORTS.set_multiple( {ServicePortKey("missing_key_in_both"): (123132, None)}, progress_bar=progress_bar, - outputs_callbacks=output_callbacks, + outputs_callbacks=callbacks, ) - assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == len( - port_values + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == ( + len(port_values) if use_output_callbacks else 0 ) assert len(spy_outputs_callbaks["aborted"].call_args_list) == 0 assert len(spy_outputs_callbaks["finished_with_error"].call_args_list) == 0 From 4136cdea3c54ae74cfcb5936df816bfd694e21d2 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 16:13:56 +0200 Subject: [PATCH 18/21] fix flaky test --- .../unit/test_modules_outputs_event_filter.py | 79 +++++++++---------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py index 38b217bab8f5..3baa8922b3a0 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py @@ -1,9 +1,9 @@ # pylint:disable=redefined-outer-name # pylint:disable=unused-argument -import asyncio +from collections.abc import AsyncIterator from pathlib import Path -from typing import AsyncIterator, Iterator +from typing import Any, Final from unittest.mock import AsyncMock import pytest @@ -24,12 +24,12 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed -_TENACITY_RETRY_PARAMS = dict( - reraise=True, - retry=retry_if_exception_type(AssertionError), - stop=stop_after_delay(10), - wait=wait_fixed(0.01), -) +_TENACITY_RETRY_PARAMS: Final[dict[str, Any]] = { + "reraise": True, + "retry": retry_if_exception_type(AssertionError), + "stop": stop_after_delay(10), + "wait": wait_fixed(0.01), +} # FIXTURES @@ -75,11 +75,11 @@ async def outputs_manager( @pytest.fixture def mocked_port_key_content_changed( mocker: MockerFixture, outputs_manager: OutputsManager -) -> Iterator[AsyncMock]: +) -> AsyncMock: async def _mock_upload_outputs(*args, **kwargs) -> None: pass - yield mocker.patch.object( + return mocker.patch.object( outputs_manager, "port_key_content_changed", side_effect=_mock_upload_outputs ) @@ -101,8 +101,8 @@ def get_wait_interval(self, dir_size: NonNegativeInt) -> NonNegativeFloat: @pytest.fixture -def mock_get_directory_total_size(mocker: MockerFixture) -> Iterator[AsyncMock]: - yield mocker.patch( +def mock_get_directory_total_size(mocker: MockerFixture) -> AsyncMock: + return mocker.patch( "simcore_service_dynamic_sidecar.modules.outputs._event_filter.get_directory_total_size", return_value=1, ) @@ -120,17 +120,6 @@ async def event_filter( await event_filter.shutdown() -# UTILS - - -async def _wait_for_event_to_trigger(event_filter: EventFilter) -> None: - await asyncio.sleep(event_filter.delay_policy.get_min_interval() * 5) - - -async def _wait_for_event_to_trigger_big_directory(event_filter: EventFilter) -> None: - await asyncio.sleep(event_filter.delay_policy.get_wait_interval(1) * 2) - - # TESTS @@ -141,13 +130,16 @@ async def test_event_triggers_once( ): # event triggers once await event_filter.enqueue(port_key_1) - await _wait_for_event_to_trigger(event_filter) - assert mocked_port_key_content_changed.call_count == 1 + + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mocked_port_key_content_changed.call_count == 1 # event triggers a second time await event_filter.enqueue(port_key_1) - await _wait_for_event_to_trigger(event_filter) - assert mocked_port_key_content_changed.call_count == 2 + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mocked_port_key_content_changed.call_count == 2 async def test_trigger_once_after_event_chain( @@ -157,8 +149,9 @@ async def test_trigger_once_after_event_chain( ): for _ in range(100): await event_filter.enqueue(port_key_1) - await _wait_for_event_to_trigger(event_filter) - assert mocked_port_key_content_changed.call_count == 1 + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mocked_port_key_content_changed.call_count == 1 async def test_always_trigger_after_delay( @@ -170,8 +163,9 @@ async def test_always_trigger_after_delay( # event trigger after correct interval delay correctly for expected_call_count in range(1, 10): await event_filter.enqueue(port_key_1) - await _wait_for_event_to_trigger_big_directory(event_filter) - assert mocked_port_key_content_changed.call_count == expected_call_count + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mocked_port_key_content_changed.call_count == expected_call_count async def test_minimum_amount_of_get_directory_total_size_calls( @@ -183,14 +177,12 @@ async def test_minimum_amount_of_get_directory_total_size_calls( await event_filter.enqueue(port_key_1) # wait a bit for the vent to be picked up # by the workers and processed - await _wait_for_event_to_trigger(event_filter) async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): with attempt: assert mock_get_directory_total_size.call_count == 1 assert mocked_port_key_content_changed.call_count == 0 # event finished processing and was dispatched - await _wait_for_event_to_trigger_big_directory(event_filter) async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): with attempt: assert mock_get_directory_total_size.call_count == 2 @@ -206,9 +198,10 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_ await event_filter.enqueue(port_key_1) # wait a bit for the vent to be picked up # by the workers and processed - await _wait_for_event_to_trigger(event_filter) - assert mock_get_directory_total_size.call_count == 1 - assert mocked_port_key_content_changed.call_count == 0 + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mock_get_directory_total_size.call_count == 1 + assert mocked_port_key_content_changed.call_count == 0 # while changes keep piling up, keep extending the duration # no event will trigger @@ -216,14 +209,16 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_ VERY_LONG_EVENT_CHAIN = 1000 for _ in range(VERY_LONG_EVENT_CHAIN): await event_filter.enqueue(port_key_1) - await _wait_for_event_to_trigger(event_filter) - assert mock_get_directory_total_size.call_count == 1 - assert mocked_port_key_content_changed.call_count == 0 + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mock_get_directory_total_size.call_count == 1 + assert mocked_port_key_content_changed.call_count == 0 # event finished processing and was dispatched - await _wait_for_event_to_trigger_big_directory(event_filter) - assert mock_get_directory_total_size.call_count == 2 - assert mocked_port_key_content_changed.call_count == 1 + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mock_get_directory_total_size.call_count == 2 + assert mocked_port_key_content_changed.call_count == 1 def test_default_delay_policy(): From ab3ace946c4a3364907e9fc3a27c86882f732a3f Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 23 Sep 2024 16:16:15 +0200 Subject: [PATCH 19/21] pylint --- .../src/simcore_service_dynamic_sidecar/modules/nodeports.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index a140920a04e3..0ad00f2c18da 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -86,9 +86,8 @@ async def finished_with_error(self, key: ServicePortKey) -> None: await self.port_notifier.send_output_port_upload_finished_with_error(key) -# NOTE: outputs_manager guarantees that no parallel calls -# to this function occur -async def upload_outputs( +# NOTE: outputs_manager guarantees that no parallel calls to this function occur +async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915, C901 outputs_path: Path, port_keys: list[str], io_log_redirect_cb: LogRedirectCB | None, From 82b4701d68382b3edddbd4f0228982e53601026b Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 24 Sep 2024 08:41:20 +0200 Subject: [PATCH 20/21] revert changes --- .../unit/test_modules_outputs_event_filter.py | 79 ++++++++++--------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py index 3baa8922b3a0..38b217bab8f5 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py @@ -1,9 +1,9 @@ # pylint:disable=redefined-outer-name # pylint:disable=unused-argument -from collections.abc import AsyncIterator +import asyncio from pathlib import Path -from typing import Any, Final +from typing import AsyncIterator, Iterator from unittest.mock import AsyncMock import pytest @@ -24,12 +24,12 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed -_TENACITY_RETRY_PARAMS: Final[dict[str, Any]] = { - "reraise": True, - "retry": retry_if_exception_type(AssertionError), - "stop": stop_after_delay(10), - "wait": wait_fixed(0.01), -} +_TENACITY_RETRY_PARAMS = dict( + reraise=True, + retry=retry_if_exception_type(AssertionError), + stop=stop_after_delay(10), + wait=wait_fixed(0.01), +) # FIXTURES @@ -75,11 +75,11 @@ async def outputs_manager( @pytest.fixture def mocked_port_key_content_changed( mocker: MockerFixture, outputs_manager: OutputsManager -) -> AsyncMock: +) -> Iterator[AsyncMock]: async def _mock_upload_outputs(*args, **kwargs) -> None: pass - return mocker.patch.object( + yield mocker.patch.object( outputs_manager, "port_key_content_changed", side_effect=_mock_upload_outputs ) @@ -101,8 +101,8 @@ def get_wait_interval(self, dir_size: NonNegativeInt) -> NonNegativeFloat: @pytest.fixture -def mock_get_directory_total_size(mocker: MockerFixture) -> AsyncMock: - return mocker.patch( +def mock_get_directory_total_size(mocker: MockerFixture) -> Iterator[AsyncMock]: + yield mocker.patch( "simcore_service_dynamic_sidecar.modules.outputs._event_filter.get_directory_total_size", return_value=1, ) @@ -120,6 +120,17 @@ async def event_filter( await event_filter.shutdown() +# UTILS + + +async def _wait_for_event_to_trigger(event_filter: EventFilter) -> None: + await asyncio.sleep(event_filter.delay_policy.get_min_interval() * 5) + + +async def _wait_for_event_to_trigger_big_directory(event_filter: EventFilter) -> None: + await asyncio.sleep(event_filter.delay_policy.get_wait_interval(1) * 2) + + # TESTS @@ -130,16 +141,13 @@ async def test_event_triggers_once( ): # event triggers once await event_filter.enqueue(port_key_1) - - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mocked_port_key_content_changed.call_count == 1 + await _wait_for_event_to_trigger(event_filter) + assert mocked_port_key_content_changed.call_count == 1 # event triggers a second time await event_filter.enqueue(port_key_1) - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mocked_port_key_content_changed.call_count == 2 + await _wait_for_event_to_trigger(event_filter) + assert mocked_port_key_content_changed.call_count == 2 async def test_trigger_once_after_event_chain( @@ -149,9 +157,8 @@ async def test_trigger_once_after_event_chain( ): for _ in range(100): await event_filter.enqueue(port_key_1) - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mocked_port_key_content_changed.call_count == 1 + await _wait_for_event_to_trigger(event_filter) + assert mocked_port_key_content_changed.call_count == 1 async def test_always_trigger_after_delay( @@ -163,9 +170,8 @@ async def test_always_trigger_after_delay( # event trigger after correct interval delay correctly for expected_call_count in range(1, 10): await event_filter.enqueue(port_key_1) - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mocked_port_key_content_changed.call_count == expected_call_count + await _wait_for_event_to_trigger_big_directory(event_filter) + assert mocked_port_key_content_changed.call_count == expected_call_count async def test_minimum_amount_of_get_directory_total_size_calls( @@ -177,12 +183,14 @@ async def test_minimum_amount_of_get_directory_total_size_calls( await event_filter.enqueue(port_key_1) # wait a bit for the vent to be picked up # by the workers and processed + await _wait_for_event_to_trigger(event_filter) async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): with attempt: assert mock_get_directory_total_size.call_count == 1 assert mocked_port_key_content_changed.call_count == 0 # event finished processing and was dispatched + await _wait_for_event_to_trigger_big_directory(event_filter) async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): with attempt: assert mock_get_directory_total_size.call_count == 2 @@ -198,10 +206,9 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_ await event_filter.enqueue(port_key_1) # wait a bit for the vent to be picked up # by the workers and processed - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mock_get_directory_total_size.call_count == 1 - assert mocked_port_key_content_changed.call_count == 0 + await _wait_for_event_to_trigger(event_filter) + assert mock_get_directory_total_size.call_count == 1 + assert mocked_port_key_content_changed.call_count == 0 # while changes keep piling up, keep extending the duration # no event will trigger @@ -209,16 +216,14 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_ VERY_LONG_EVENT_CHAIN = 1000 for _ in range(VERY_LONG_EVENT_CHAIN): await event_filter.enqueue(port_key_1) - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mock_get_directory_total_size.call_count == 1 - assert mocked_port_key_content_changed.call_count == 0 + await _wait_for_event_to_trigger(event_filter) + assert mock_get_directory_total_size.call_count == 1 + assert mocked_port_key_content_changed.call_count == 0 # event finished processing and was dispatched - async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): - with attempt: - assert mock_get_directory_total_size.call_count == 2 - assert mocked_port_key_content_changed.call_count == 1 + await _wait_for_event_to_trigger_big_directory(event_filter) + assert mock_get_directory_total_size.call_count == 2 + assert mocked_port_key_content_changed.call_count == 1 def test_default_delay_policy(): From 4c32ec06e1e9fef56c7ebe960117ce4b0d5a6932 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 24 Sep 2024 10:37:07 +0200 Subject: [PATCH 21/21] restore flaky marker --- .../02/test_mixed_dynamic_sidecar_and_legacy_project.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py index 646cb788ad7f..d590985680d9 100644 --- a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py +++ b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py @@ -229,6 +229,7 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]: ) +@pytest.mark.flaky(max_runs=3) async def test_legacy_and_dynamic_sidecar_run( initialized_app: FastAPI, wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],