diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_scheduler/socketio.py b/packages/models-library/src/models_library/api_schemas_dynamic_scheduler/socketio.py new file mode 100644 index 000000000000..89a493a56cce --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_dynamic_scheduler/socketio.py @@ -0,0 +1,3 @@ +from typing import Final + +SOCKET_IO_SERVICE_STATUS_EVENT: Final[str] = "serviceStatus" diff --git a/packages/models-library/src/models_library/api_schemas_webserver/projects_nodes.py b/packages/models-library/src/models_library/api_schemas_webserver/projects_nodes.py index 25a6f5fb0dd8..0c2bdd07c7fd 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/projects_nodes.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/projects_nodes.py @@ -93,19 +93,36 @@ class NodeGet(OutputSchema): class Config: schema_extra: ClassVar[dict[str, Any]] = { - "example": { - "published_port": 30000, - "entrypoint": "/the/entry/point/is/here", - "service_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", - "service_key": "simcore/services/comp/itis/sleeper", - "service_version": "1.2.3", - "service_host": "jupyter_E1O2E-LAH", - "service_port": 8081, - "service_basepath": "/x/E1O2E-LAH", - "service_state": "pending", - "service_message": "no suitable node (insufficient resources on 1 node)", - "user_id": 123, - } + "examples": [ + # computational + { + "published_port": 30000, + "entrypoint": "/the/entry/point/is/here", + "service_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "service_key": "simcore/services/comp/itis/sleeper", + "service_version": "1.2.3", + "service_host": "jupyter_E1O2E-LAH", + "service_port": 8081, + "service_basepath": "/x/E1O2E-LAH", + "service_state": "pending", + "service_message": "no suitable node (insufficient resources on 1 node)", + "user_id": 123, + }, + # dynamic + { + "published_port": 30000, + "entrypoint": "/the/entry/point/is/here", + "service_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "service_key": "simcore/services/dynamic/some-dynamic-service", + "service_version": "1.2.3", + "service_host": "jupyter_E1O2E-LAH", + "service_port": 8081, + "service_basepath": "/x/E1O2E-LAH", + "service_state": "pending", + "service_message": "no suitable node (insufficient resources on 1 node)", + "user_id": 123, + }, + ] } diff --git a/packages/models-library/src/models_library/services_enums.py b/packages/models-library/src/models_library/services_enums.py index 50a83313482e..ec5414218e3c 100644 --- a/packages/models-library/src/models_library/services_enums.py +++ b/packages/models-library/src/models_library/services_enums.py @@ -11,14 +11,18 @@ class ServiceBootType(str, Enum): @functools.total_ordering @unique class ServiceState(Enum): + FAILED = "failed" + PENDING = "pending" PULLING = "pulling" STARTING = "starting" RUNNING = "running" - COMPLETE = "complete" - FAILED = "failed" + STOPPING = "stopping" + COMPLETE = "complete" + IDLE = "idle" + def __lt__(self, other): if self.__class__ is other.__class__: comparison_order = ServiceState.comparison_order() @@ -39,6 +43,7 @@ def comparison_order() -> dict["ServiceState", int]: ServiceState.RUNNING: 4, ServiceState.STOPPING: 5, ServiceState.COMPLETE: 6, + ServiceState.IDLE: 7, } diff --git a/packages/models-library/tests/conftest.py b/packages/models-library/tests/conftest.py index 9169e570b510..8bf433b901d7 100644 --- a/packages/models-library/tests/conftest.py +++ b/packages/models-library/tests/conftest.py @@ -9,6 +9,7 @@ import pytest pytest_plugins = [ + "pytest_simcore.faker_projects_data", "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", "pytest_simcore.repository_paths", diff --git a/packages/models-library/tests/test_utils_nodes.py b/packages/models-library/tests/test_utils_nodes.py index 47465ce236d3..b4634770a97e 100644 --- a/packages/models-library/tests/test_utils_nodes.py +++ b/packages/models-library/tests/test_utils_nodes.py @@ -16,12 +16,6 @@ from models_library.utils.nodes import compute_node_hash from pydantic import AnyUrl, parse_obj_as - -@pytest.fixture() -def node_id() -> NodeID: - return uuid4() - - ANOTHER_NODE_ID = uuid4() ANOTHER_NODE_OUTPUT_KEY = "the_output_link" ANOTHER_NODE_PAYLOAD = {"outputs": {ANOTHER_NODE_OUTPUT_KEY: 36}} diff --git a/packages/service-library/src/servicelib/deferred_tasks/_redis_task_tracker.py b/packages/service-library/src/servicelib/deferred_tasks/_redis_task_tracker.py index 69762108e716..bbe45ccc39ac 100644 --- a/packages/service-library/src/servicelib/deferred_tasks/_redis_task_tracker.py +++ b/packages/service-library/src/servicelib/deferred_tasks/_redis_task_tracker.py @@ -1,3 +1,4 @@ +import pickle from typing import Final from uuid import uuid4 @@ -33,13 +34,15 @@ async def get_new_unique_identifier(self) -> TaskUID: async def _get_raw(self, redis_key: str) -> TaskScheduleModel | None: found_data = await self.redis_client_sdk.redis.get(redis_key) - return None if found_data is None else TaskScheduleModel.parse_raw(found_data) + return None if found_data is None else pickle.loads(found_data) # noqa: S301 async def get(self, task_uid: TaskUID) -> TaskScheduleModel | None: return await self._get_raw(_get_key(task_uid)) async def save(self, task_uid: TaskUID, task_schedule: TaskScheduleModel) -> None: - await self.redis_client_sdk.redis.set(_get_key(task_uid), task_schedule.json()) + await self.redis_client_sdk.redis.set( + _get_key(task_uid), pickle.dumps(task_schedule) + ) async def remove(self, task_uid: TaskUID) -> None: await self.redis_client_sdk.redis.delete(_get_key(task_uid)) diff --git a/packages/service-library/src/servicelib/services_utils.py b/packages/service-library/src/servicelib/services_utils.py index 60a9caf92a53..98aace49c6c6 100644 --- a/packages/service-library/src/servicelib/services_utils.py +++ b/packages/service-library/src/servicelib/services_utils.py @@ -1,5 +1,11 @@ import urllib.parse +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_webserver.projects_nodes import ( + NodeGet, + NodeGetIdle, + NodeGetUnknown, +) from models_library.services import ServiceType @@ -9,3 +15,14 @@ def get_service_from_key(service_key: str) -> ServiceType: if encoded_service_type == "comp": encoded_service_type = "computational" return ServiceType(encoded_service_type) + + +def get_status_as_dict( + status: NodeGetIdle | NodeGetUnknown | DynamicServiceGet | NodeGet, +) -> dict: + """shared between different backend services to guarantee same result to frontend""" + return ( + status.dict(by_alias=True) + if isinstance(status, DynamicServiceGet) + else status.dict() + ) diff --git a/packages/service-library/tests/conftest.py b/packages/service-library/tests/conftest.py index f069aeedd768..712746ccce97 100644 --- a/packages/service-library/tests/conftest.py +++ b/packages/service-library/tests/conftest.py @@ -76,9 +76,11 @@ async def get_redis_client_sdk( Callable[[RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]] ]: @asynccontextmanager - async def _(database: RedisDatabase) -> AsyncIterator[RedisClientSDK]: + async def _( + database: RedisDatabase, decode_response: bool = True # noqa: FBT002 + ) -> AsyncIterator[RedisClientSDK]: redis_resources_dns = redis_service.build_redis_dsn(database) - client = RedisClientSDK(redis_resources_dns) + client = RedisClientSDK(redis_resources_dns, decode_responses=decode_response) assert client assert client.redis_dsn == redis_resources_dns await client.setup() diff --git a/packages/service-library/tests/deferred_tasks/conftest.py b/packages/service-library/tests/deferred_tasks/conftest.py index 642a67336b6b..00881e614715 100644 --- a/packages/service-library/tests/deferred_tasks/conftest.py +++ b/packages/service-library/tests/deferred_tasks/conftest.py @@ -9,8 +9,10 @@ @pytest.fixture async def redis_client_sdk_deferred_tasks( get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] + [RedisDatabase, bool], AbstractAsyncContextManager[RedisClientSDK] ] ) -> AsyncIterator[RedisClientSDK]: - async with get_redis_client_sdk(RedisDatabase.DEFERRED_TASKS) as client: + async with get_redis_client_sdk( + RedisDatabase.DEFERRED_TASKS, decode_response=False + ) as client: yield client diff --git a/packages/service-library/tests/deferred_tasks/example_app.py b/packages/service-library/tests/deferred_tasks/example_app.py index 75850fddc2e3..0ba848178d8e 100644 --- a/packages/service-library/tests/deferred_tasks/example_app.py +++ b/packages/service-library/tests/deferred_tasks/example_app.py @@ -8,6 +8,7 @@ from uuid import uuid4 from pydantic import NonNegativeInt +from redis.asyncio import Redis from servicelib.deferred_tasks import ( BaseDeferredHandler, DeferredContext, @@ -54,21 +55,22 @@ async def on_result(cls, result: str, context: DeferredContext) -> None: class InMemoryLists: def __init__(self, redis_settings: RedisSettings, port: int) -> None: - self.redis_client_sdk = RedisClientSDK( - redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS) - ) + # NOTE: RedisClientSDK is not required here but it's used to easily construct + # a redis connection + self.redis: Redis = RedisClientSDK( + redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS), + decode_responses=True, + ).redis self.port = port def _get_queue_name(self, queue_name: str) -> str: return f"in_memory_lists::{queue_name}.{self.port}" async def append_to(self, queue_name: str, value: Any) -> None: - await self.redis_client_sdk.redis.rpush(self._get_queue_name(queue_name), value) # type: ignore + await self.redis.rpush(self._get_queue_name(queue_name), value) # type: ignore async def get_all_from(self, queue_name: str) -> list: - return await self.redis_client_sdk.redis.lrange( - self._get_queue_name(queue_name), 0, -1 - ) # type: ignore + return await self.redis.lrange(self._get_queue_name(queue_name), 0, -1) # type: ignore class ExampleApp: @@ -79,18 +81,19 @@ def __init__( in_memory_lists: InMemoryLists, max_workers: NonNegativeInt, ) -> None: - self._redis_client_sdk = RedisClientSDK( - redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS) + self._redis_client = RedisClientSDK( + redis_settings.build_redis_dsn(RedisDatabase.DEFERRED_TASKS), + decode_responses=False, ) self._manager = DeferredManager( rabbit_settings, - self._redis_client_sdk, + self._redis_client, globals_context={"in_memory_lists": in_memory_lists}, max_workers=max_workers, ) async def setup(self) -> None: - await self._redis_client_sdk.setup() + await self._redis_client.setup() await self._manager.setup() diff --git a/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py b/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py index 9f3451058bfb..a5b45ed80d95 100644 --- a/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py +++ b/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py @@ -52,7 +52,10 @@ class MockKeys(StrAutoEnum): async def redis_client_sdk( redis_service: RedisSettings, ) -> AsyncIterable[RedisClientSDK]: - sdk = RedisClientSDK(redis_service.build_redis_dsn(RedisDatabase.DEFERRED_TASKS)) + sdk = RedisClientSDK( + redis_service.build_redis_dsn(RedisDatabase.DEFERRED_TASKS), + decode_responses=False, + ) await sdk.setup() yield sdk await sdk.shutdown() diff --git a/packages/settings-library/src/settings_library/redis.py b/packages/settings-library/src/settings_library/redis.py index 656ffdd2e716..b4873665dd1e 100644 --- a/packages/settings-library/src/settings_library/redis.py +++ b/packages/settings-library/src/settings_library/redis.py @@ -17,6 +17,7 @@ class RedisDatabase(IntEnum): ANNOUNCEMENTS = 5 DISTRIBUTED_IDENTIFIERS = 6 DEFERRED_TASKS = 7 + DYNAMIC_SERVICES = 8 class RedisSettings(BaseCustomSettings): diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index f107cfa54f5b..937ba4a3f307 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -42,12 +42,12 @@ "pytest_simcore.docker_registry", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", + "pytest_simcore.faker_projects_data", "pytest_simcore.faker_users_data", "pytest_simcore.minio_service", "pytest_simcore.postgres_service", "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", - "pytest_simcore.socketio", "pytest_simcore.rabbit_service", "pytest_simcore.redis_service", "pytest_simcore.repository_paths", @@ -55,6 +55,7 @@ "pytest_simcore.simcore_dask_service", "pytest_simcore.simcore_services", "pytest_simcore.simcore_storage_service", + "pytest_simcore.socketio", ] logger = logging.getLogger(__name__) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index a01980027c02..f63381c538bc 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -284,11 +284,6 @@ def project_id() -> ProjectID: return uuid4() -@pytest.fixture -def node_id() -> NodeID: - return uuid4() - - @dataclass class ImageParams: image: Image diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index 358b22fb8aba..9beacf76c343 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -93,7 +93,8 @@ services: user_notifications:${REDIS_HOST}:${REDIS_PORT}:4:${REDIS_PASSWORD}, announcements:${REDIS_HOST}:${REDIS_PORT}:5:${REDIS_PASSWORD}, distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD}, - deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD} + deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD}, + dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD} # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml ports: - "18081:8081" diff --git a/services/docker-compose.yml b/services/docker-compose.yml index af73de611b41..8e8f02db8a23 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -1168,7 +1168,19 @@ services: # also aof (append only) is also enabled such that we get full durability at the expense # of backup size. The backup is written into /data. # https://redis.io/topics/persistence - [ "redis-server", "--save", "60 1", "--loglevel", "verbose", "--databases", "8", "--appendonly", "yes", "--requirepass", "${REDIS_PASSWORD}" ] + [ + "redis-server", + "--save", + "60 1", + "--loglevel", + "verbose", + "--databases", + "9", + "--appendonly", + "yes", + "--requirepass", + "${REDIS_PASSWORD}" + ] networks: - default - autoscaling_subnet diff --git a/services/dynamic-scheduler/requirements/_base.in b/services/dynamic-scheduler/requirements/_base.in index 74bc0519c820..ab95aec0daa5 100644 --- a/services/dynamic-scheduler/requirements/_base.in +++ b/services/dynamic-scheduler/requirements/_base.in @@ -14,9 +14,10 @@ --requirement ../../../packages/service-library/requirements/_fastapi.in - +arrow fastapi httpx packaging +python-socketio typer[all] uvicorn[standard] diff --git a/services/dynamic-scheduler/requirements/_base.txt b/services/dynamic-scheduler/requirements/_base.txt index bab6a9c099e0..f60e814f088c 100644 --- a/services/dynamic-scheduler/requirements/_base.txt +++ b/services/dynamic-scheduler/requirements/_base.txt @@ -47,6 +47,8 @@ attrs==23.2.0 # aiohttp # jsonschema # referencing +bidict==0.23.1 + # via python-socketio certifi==2024.2.2 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -107,6 +109,7 @@ h11==0.14.0 # via # httpcore # uvicorn + # wsproto httpcore==1.0.5 # via httpx httptools==0.6.1 @@ -265,6 +268,10 @@ python-dateutil==2.9.0.post0 # via arrow python-dotenv==1.0.1 # via uvicorn +python-engineio==4.9.1 + # via python-socketio +python-socketio==5.11.2 + # via -r requirements/_base.in pyyaml==6.0.1 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -306,6 +313,8 @@ setuptools==74.0.0 # via opentelemetry-instrumentation shellingham==1.5.4 # via typer +simple-websocket==1.0.0 + # via python-engineio six==1.16.0 # via python-dateutil sniffio==1.3.1 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_dependencies.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_dependencies.py index 088745a07c30..ce43766f5a33 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_dependencies.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_dependencies.py @@ -3,7 +3,8 @@ from servicelib.fastapi.dependencies import get_app, get_reverse_url_mapper from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient from servicelib.redis import RedisClientSDK -from simcore_service_dynamic_scheduler.services.redis import get_redis_client +from settings_library.redis import RedisDatabase +from simcore_service_dynamic_scheduler.services.redis import get_all_redis_clients from ...services.rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_server @@ -19,8 +20,10 @@ def get_rabbitmq_rpc_server_from_request(request: Request) -> RabbitMQRPCClient: return get_rabbitmq_rpc_server(request.app) -def get_redis_client_from_request(request: Request) -> RedisClientSDK: - return get_redis_client(request.app) +def get_redis_clients_from_request( + request: Request, +) -> dict[RedisDatabase, RedisClientSDK]: + return get_all_redis_clients(request.app) __all__: tuple[str, ...] = ( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_health.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_health.py index 515602aef7c2..7e87c57fd06e 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_health.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rest/_health.py @@ -9,11 +9,12 @@ ) from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase from ._dependencies import ( get_rabbitmq_client_from_request, get_rabbitmq_rpc_server_from_request, - get_redis_client_from_request, + get_redis_clients_from_request, ) router = APIRouter() @@ -29,12 +30,17 @@ async def healthcheck( rabbit_rpc_server: Annotated[ RabbitMQRPCClient, Depends(get_rabbitmq_rpc_server_from_request) ], - redis_client_sdk: Annotated[RedisClientSDK, Depends(get_redis_client_from_request)], + redis_client_sdks: Annotated[ + dict[RedisDatabase, RedisClientSDK], + Depends(get_redis_clients_from_request), + ], ): if not rabbit_client.healthy or not rabbit_rpc_server.healthy: raise HealthCheckError(RABBITMQ_CLIENT_UNHEALTHY_MSG) - if not redis_client_sdk.is_healthy: + if not all( + redis_client_sdk.is_healthy for redis_client_sdk in redis_client_sdks.values() + ): raise HealthCheckError(REDIS_CLIENT_UNHEALTHY_MSG) return f"{__name__}@{arrow.utcnow().isoformat()}" diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 991aa004703b..0687c58bac12 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -14,6 +14,7 @@ from ...core.settings import ApplicationSettings from ...services.director_v2 import DirectorV2Client +from ...services.service_tracker import set_request_as_running, set_request_as_stopped router = RPCRouter() @@ -37,6 +38,7 @@ async def run_dynamic_service( response: NodeGet | DynamicServiceGet = ( await director_v2_client.run_dynamic_service(dynamic_service_start) ) + await set_request_as_running(app, dynamic_service_start) return response @@ -59,4 +61,5 @@ async def stop_dynamic_service( timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT, ) ) + await set_request_as_stopped(app, dynamic_service_stop) return response diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/application.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/application.py index f1c00211386f..e6ba2bbb53f7 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/application.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/application.py @@ -17,9 +17,13 @@ ) from ..api.rest.routes import setup_rest_api from ..api.rpc.routes import setup_rpc_api_routes +from ..services.deferred_manager import setup_deferred_manager from ..services.director_v2 import setup_director_v2 +from ..services.notifier import setup_notifier from ..services.rabbitmq import setup_rabbitmq from ..services.redis import setup_redis +from ..services.service_tracker import setup_service_tracker +from ..services.status_monitor import setup_status_monitor from .settings import ApplicationSettings @@ -57,10 +61,18 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI: # PLUGINS SETUP setup_director_v2(app) + setup_rabbitmq(app) setup_rpc_api_routes(app) + setup_redis(app) + setup_notifier(app) + + setup_service_tracker(app) + setup_deferred_manager(app) + setup_status_monitor(app) + setup_rest_api(app) # ERROR HANDLERS diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/deferred_manager.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/deferred_manager.py new file mode 100644 index 000000000000..8544c0f38e6f --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/deferred_manager.py @@ -0,0 +1,24 @@ +from fastapi import FastAPI +from servicelib.deferred_tasks import DeferredManager +from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisDatabase + +from .redis import get_redis_client + + +def setup_deferred_manager(app: FastAPI) -> None: + async def on_startup() -> None: + rabbit_settings: RabbitSettings = app.state.settings.DYNAMIC_SCHEDULER_RABBITMQ + + redis_client_sdk = get_redis_client(app, RedisDatabase.DEFERRED_TASKS) + app.state.deferred_manager = manager = DeferredManager( + rabbit_settings, redis_client_sdk, globals_context={"app": app} + ) + await manager.setup() + + async def on_shutdown() -> None: + manager: DeferredManager = app.state.deferred_manager + await manager.shutdown() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/__init__.py new file mode 100644 index 000000000000..8cd33e12808f --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/__init__.py @@ -0,0 +1,7 @@ +from ._notifier import notify_service_status_change +from ._setup import setup_notifier + +__all__: tuple[str, ...] = ( + "setup_notifier", + "notify_service_status_change", +) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_notifier.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_notifier.py new file mode 100644 index 000000000000..0b8690a96766 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_notifier.py @@ -0,0 +1,55 @@ +import contextlib + +import socketio # type: ignore[import-untyped] +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_dynamic_scheduler.socketio import ( + SOCKET_IO_SERVICE_STATUS_EVENT, +) +from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.users import UserID +from servicelib.fastapi.app_state import SingletonInAppStateMixin +from servicelib.services_utils import get_status_as_dict + + +class Notifier(SingletonInAppStateMixin): + app_state_name: str = "notifier" + + def __init__(self, sio_manager: socketio.AsyncAioPikaManager): + self._sio_manager = sio_manager + + async def notify_service_status( + self, user_id: UserID, status: NodeGet | DynamicServiceGet | NodeGetIdle + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_SERVICE_STATUS_EVENT, + data=jsonable_encoder(get_status_as_dict(status)), + room=SocketIORoomStr.from_user_id(user_id), + ) + + +async def notify_service_status_change( + app: FastAPI, user_id: UserID, status: NodeGet | DynamicServiceGet | NodeGetIdle +) -> None: + notifier: Notifier = Notifier.get_from_app_state(app) + await notifier.notify_service_status(user_id=user_id, status=status) + + +def setup(app: FastAPI): + async def _on_startup() -> None: + assert app.state.external_socketio # nosec + + notifier = Notifier( + sio_manager=app.state.external_socketio, + ) + notifier.set_to_app_state(app) + assert Notifier.get_from_app_state(app) == notifier # nosec + + async def _on_shutdown() -> None: + with contextlib.suppress(AttributeError): + Notifier.pop_from_app_state(app) + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_setup.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_setup.py new file mode 100644 index 000000000000..1542afa8a87d --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_setup.py @@ -0,0 +1,8 @@ +from fastapi import FastAPI + +from . import _notifier, _socketio + + +def setup_notifier(app: FastAPI): + _socketio.setup(app) + _notifier.setup(app) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_socketio.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_socketio.py new file mode 100644 index 000000000000..2f0abfbd3af1 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/notifier/_socketio.py @@ -0,0 +1,32 @@ +import logging + +import socketio # type: ignore[import-untyped] +from fastapi import FastAPI +from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager + +from ...core.settings import ApplicationSettings + +_logger = logging.getLogger(__name__) + + +def setup(app: FastAPI): + settings: ApplicationSettings = app.state.settings + + async def _on_startup() -> None: + assert app.state.rabbitmq_client # nosec + + # Connect to the as an external process in write-only mode + # SEE https://python-socketio.readthedocs.io/en/stable/server.html#emitting-from-external-processes + assert settings.DYNAMIC_SCHEDULER_RABBITMQ # nosec + app.state.external_socketio = socketio.AsyncAioPikaManager( + url=settings.DYNAMIC_SCHEDULER_RABBITMQ.dsn, logger=_logger, write_only=True + ) + + async def _on_shutdown() -> None: + if external_socketio := getattr(app.state, "external_socketio"): # noqa: B009 + await cleanup_socketio_async_pubsub_manager( + server_manager=external_socketio + ) + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py index 7904d5e1a5df..84131eaf54bf 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py @@ -1,25 +1,46 @@ +from typing import Final + from fastapi import FastAPI -from servicelib.redis import RedisClientSDK +from servicelib.redis import RedisClientSDK, RedisClientsManager, RedisManagerDBConfig from settings_library.redis import RedisDatabase, RedisSettings +_DECODE_DBS: Final[set[RedisDatabase]] = { + RedisDatabase.LOCKS, +} + +_BINARY_DBS: Final[set[RedisDatabase]] = { + RedisDatabase.DEFERRED_TASKS, + RedisDatabase.DYNAMIC_SERVICES, +} + +_ALL_REDIS_DATABASES: Final[set[RedisDatabase]] = _DECODE_DBS | _BINARY_DBS + def setup_redis(app: FastAPI) -> None: settings: RedisSettings = app.state.settings.DYNAMIC_SCHEDULER_REDIS async def on_startup() -> None: - redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) - app.state.redis_client_sdk = client = RedisClientSDK(redis_locks_dsn) - await client.setup() + app.state.redis_clients_manager = manager = RedisClientsManager( + {RedisManagerDBConfig(x, decode_responses=False) for x in _BINARY_DBS} + | {RedisManagerDBConfig(x, decode_responses=True) for x in _DECODE_DBS}, + settings, + ) + await manager.setup() async def on_shutdown() -> None: - redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk - if redis_client_sdk: - await redis_client_sdk.shutdown() + manager: RedisClientsManager = app.state.redis_clients_manager + await manager.shutdown() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) -def get_redis_client(app: FastAPI) -> RedisClientSDK: - redis_client_sdk: RedisClientSDK = app.state.redis_client_sdk - return redis_client_sdk +def get_redis_client(app: FastAPI, database: RedisDatabase) -> RedisClientSDK: + manager: RedisClientsManager = app.state.redis_clients_manager + return manager.client(database) + + +def get_all_redis_clients( + app: FastAPI, +) -> dict[RedisDatabase, RedisClientSDK]: + return {d: get_redis_client(app, d) for d in _ALL_REDIS_DATABASES} diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/__init__.py new file mode 100644 index 000000000000..abf543d1befa --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/__init__.py @@ -0,0 +1,33 @@ +from ._api import ( + NORMAL_RATE_POLL_INTERVAL, + get_all_tracked_services, + get_tracked_service, + get_user_id_for_service, + remove_tracked_service, + set_frontned_notified_for_service, + set_if_status_changed_for_service, + set_request_as_running, + set_request_as_stopped, + set_service_scheduled_to_run, + set_service_status_task_uid, + should_notify_frontend_for_service, +) +from ._models import TrackedServiceModel +from ._setup import setup_service_tracker + +__all__: tuple[str, ...] = ( + "get_all_tracked_services", + "get_tracked_service", + "get_user_id_for_service", + "NORMAL_RATE_POLL_INTERVAL", + "remove_tracked_service", + "set_frontned_notified_for_service", + "set_if_status_changed_for_service", + "set_request_as_running", + "set_request_as_stopped", + "set_service_scheduled_to_run", + "set_service_status_task_uid", + "setup_service_tracker", + "should_notify_frontend_for_service", + "TrackedServiceModel", +) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_api.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_api.py new file mode 100644 index 000000000000..1b1b4a0d9f8f --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_api.py @@ -0,0 +1,248 @@ +import inspect +import logging +from datetime import timedelta +from typing import Final + +import arrow +from fastapi import FastAPI +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle +from models_library.projects_nodes_io import NodeID +from models_library.services_enums import ServiceState +from models_library.users import UserID +from servicelib.deferred_tasks import TaskUID + +from ._models import SchedulerServiceState, TrackedServiceModel, UserRequestedState +from ._setup import get_tracker + +_logger = logging.getLogger(__name__) + + +_LOW_RATE_POLL_INTERVAL: Final[timedelta] = timedelta(seconds=1) +NORMAL_RATE_POLL_INTERVAL: Final[timedelta] = timedelta(seconds=5) +_MAX_PERIOD_WITHOUT_SERVICE_STATUS_UPDATES: Final[timedelta] = timedelta(seconds=60) + + +async def set_request_as_running( + app: FastAPI, + dynamic_service_start: DynamicServiceStart, +) -> None: + """Stores intention to `start` request""" + await get_tracker(app).save( + dynamic_service_start.node_uuid, + TrackedServiceModel( + dynamic_service_start=dynamic_service_start, + requested_state=UserRequestedState.RUNNING, + project_id=dynamic_service_start.project_id, + user_id=dynamic_service_start.user_id, + ), + ) + + +async def set_request_as_stopped( + app: FastAPI, dynamic_service_stop: DynamicServiceStop +) -> None: + """Stores intention to `stop` request""" + tracker = get_tracker(app) + model: TrackedServiceModel | None = await tracker.load(dynamic_service_stop.node_id) + + if model is None: + model = TrackedServiceModel( + dynamic_service_start=None, + user_id=dynamic_service_stop.user_id, + project_id=dynamic_service_stop.project_id, + requested_state=UserRequestedState.STOPPED, + ) + + model.requested_state = UserRequestedState.STOPPED + await tracker.save(dynamic_service_stop.node_id, model) + + +def _get_service_state( + status: NodeGet | DynamicServiceGet | NodeGetIdle, +) -> ServiceState: + # Attributes where to find the state + # NodeGet -> service_state + # DynamicServiceGet -> state + # NodeGetIdle -> service_state + state_key = "state" if isinstance(status, DynamicServiceGet) else "service_state" + + state: ServiceState | str = getattr(status, state_key) + result: str = state.value if isinstance(state, ServiceState) else state + return ServiceState(result) + + +def _get_poll_interval(status: NodeGet | DynamicServiceGet | NodeGetIdle) -> timedelta: + if _get_service_state(status) != ServiceState.RUNNING: + return _LOW_RATE_POLL_INTERVAL + + return NORMAL_RATE_POLL_INTERVAL + + +def _get_current_scheduler_service_state( + requested_state: UserRequestedState, + status: NodeGet | DynamicServiceGet | NodeGetIdle, +) -> SchedulerServiceState: + """ + Computes the `SchedulerServiceState` used internally by the scheduler + to decide about a service's future. + """ + + if isinstance(status, NodeGetIdle): + return SchedulerServiceState.IDLE + + service_state: ServiceState = _get_service_state(status) + + if requested_state == UserRequestedState.RUNNING: + if service_state == ServiceState.RUNNING: + return SchedulerServiceState.RUNNING + + if ( + ServiceState.PENDING # type:ignore[operator] + <= service_state + <= ServiceState.STARTING + ): + return SchedulerServiceState.STARTING + + if service_state < ServiceState.PENDING or service_state > ServiceState.RUNNING: + return SchedulerServiceState.UNEXPECTED_OUTCOME + + if requested_state == UserRequestedState.STOPPED: + if service_state >= ServiceState.RUNNING: # type:ignore[operator] + return SchedulerServiceState.STOPPING + + if service_state < ServiceState.RUNNING: + return SchedulerServiceState.UNEXPECTED_OUTCOME + + msg = f"Could not determine current_state from: '{requested_state=}', '{status=}'" + raise TypeError(msg) + + +def _log_skipping_operation(node_id: NodeID) -> None: + # the caller is at index 1 (index 0 is the current function) + caller_name = inspect.stack()[1].function + + _logger.info( + "Could not find a %s entry for node_id %s: skipping %s", + TrackedServiceModel.__name__, + node_id, + caller_name, + ) + + +async def set_if_status_changed_for_service( + app: FastAPI, node_id: NodeID, status: NodeGet | DynamicServiceGet | NodeGetIdle +) -> bool: + """returns ``True`` if the tracker detected a status change""" + tracker = get_tracker(app) + model: TrackedServiceModel | None = await tracker.load(node_id) + if model is None: + _log_skipping_operation(node_id) + return False + + # set new polling interval in the future + model.set_check_status_after_to(_get_poll_interval(status)) + model.service_status_task_uid = None + model.scheduled_to_run = False + + # check if model changed + json_status = status.json() + if model.service_status != json_status: + model.service_status = json_status + model.current_state = _get_current_scheduler_service_state( + model.requested_state, status + ) + await tracker.save(node_id, model) + return True + + return False + + +async def should_notify_frontend_for_service( + app: FastAPI, node_id: NodeID, *, status_changed: bool +) -> bool: + """ + Checks if it's time to notify the frontend. + The frontend will be notified at regular intervals and on changes + Avoids sending too many updates. + """ + tracker = get_tracker(app) + model: TrackedServiceModel | None = await tracker.load(node_id) + + if model is None: + return False + + # check if too much time has passed since the last time an update was sent + return ( + status_changed + or arrow.utcnow().timestamp() - model.last_status_notification + > _MAX_PERIOD_WITHOUT_SERVICE_STATUS_UPDATES.total_seconds() + ) + + +async def set_frontned_notified_for_service(app: FastAPI, node_id: NodeID) -> None: + tracker = get_tracker(app) + model: TrackedServiceModel | None = await tracker.load(node_id) + if model is None: + _log_skipping_operation(node_id) + return + + model.set_last_status_notification_to_now() + await tracker.save(node_id, model) + + +async def set_service_scheduled_to_run( + app: FastAPI, node_id: NodeID, delay_from_now: timedelta +) -> None: + tracker = get_tracker(app) + model: TrackedServiceModel | None = await tracker.load(node_id) + if model is None: + _log_skipping_operation(node_id) + return + + model.scheduled_to_run = True + model.set_check_status_after_to(delay_from_now) + await tracker.save(node_id, model) + + +async def set_service_status_task_uid( + app: FastAPI, node_id: NodeID, task_uid: TaskUID +) -> None: + tracker = get_tracker(app) + model: TrackedServiceModel | None = await tracker.load(node_id) + if model is None: + _log_skipping_operation(node_id) + return + + model.service_status_task_uid = task_uid + await tracker.save(node_id, model) + + +async def remove_tracked_service(app: FastAPI, node_id: NodeID) -> None: + """ + Removes the service from tracking (usually after stop completes) + # NOTE: does not raise if node_id is not found + """ + await get_tracker(app).delete(node_id) + + +async def get_tracked_service( + app: FastAPI, node_id: NodeID +) -> TrackedServiceModel | None: + """Returns information about the tracked service""" + return await get_tracker(app).load(node_id) + + +async def get_all_tracked_services(app: FastAPI) -> dict[NodeID, TrackedServiceModel]: + """Returns all tracked services""" + return await get_tracker(app).all() + + +async def get_user_id_for_service(app: FastAPI, node_id: NodeID) -> UserID | None: + """returns user_id for the service""" + model: TrackedServiceModel | None = await get_tracker(app).load(node_id) + return model.user_id if model else None diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py new file mode 100644 index 000000000000..985ca8feef5a --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py @@ -0,0 +1,123 @@ +import pickle +from dataclasses import dataclass, field +from datetime import timedelta +from enum import auto + +import arrow +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, +) +from models_library.projects import ProjectID +from models_library.users import UserID +from models_library.utils.enums import StrAutoEnum +from servicelib.deferred_tasks import TaskUID + + +class UserRequestedState(StrAutoEnum): + RUNNING = auto() + STOPPED = auto() + + +class SchedulerServiceState(StrAutoEnum): + # service was started and is running as expected + RUNNING = auto() + # service is not present + IDLE = auto() + # something went wrong while starting/stopping service + UNEXPECTED_OUTCOME = auto() + + # service is being started + STARTING = auto() + # service is being stopped + STOPPING = auto() + + # service status has not been determined + UNKNOWN = auto() + + +@dataclass +class TrackedServiceModel: # pylint:disable=too-many-instance-attributes + + dynamic_service_start: DynamicServiceStart | None = field( + metadata={ + "description": ( + "used to create the service in any given moment if the requested_state is RUNNING" + "can be set to None only when stopping the service" + ) + } + ) + + user_id: UserID | None = field( + metadata={ + "description": "required for propagating status changes to the frontend" + } + ) + project_id: ProjectID | None = field( + metadata={ + "description": "required for propagating status changes to the frontend" + } + ) + + requested_state: UserRequestedState = field( + metadata={ + "description": ( + "status of the service desidered by the user RUNNING or STOPPED" + ) + } + ) + + current_state: SchedulerServiceState = field( + default=SchedulerServiceState.UNKNOWN, + metadata={ + "description": "to set after parsing the incoming state via the API calls" + }, + ) + + ############################# + ### SERVICE STATUS UPDATE ### + ############################# + + scheduled_to_run: bool = field( + default=False, + metadata={"description": "set when a job will be immediately scheduled"}, + ) + + service_status: str = field( + default="", + metadata={ + "description": "stored for debug mainly this is used to compute ``current_state``" + }, + ) + service_status_task_uid: TaskUID | None = field( + default=None, + metadata={"description": "uid of the job currently fetching the status"}, + ) + + check_status_after: float = field( + default_factory=lambda: arrow.utcnow().timestamp(), + metadata={"description": "used to determine when to poll the status again"}, + ) + + last_status_notification: float = field( + default=0, + metadata={ + "description": "used to determine when was the last time the status was notified" + }, + ) + + def set_check_status_after_to(self, delay_from_now: timedelta) -> None: + self.check_status_after = (arrow.utcnow() + delay_from_now).timestamp() + + def set_last_status_notification_to_now(self) -> None: + self.last_status_notification = arrow.utcnow().timestamp() + + ##################### + ### SERIALIZATION ### + ##################### + + def to_bytes(self) -> bytes: + return pickle.dumps(self) + + @classmethod + def from_bytes(cls, data: bytes) -> "TrackedServiceModel": + return pickle.loads(data) # type: ignore # noqa: S301 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_setup.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_setup.py new file mode 100644 index 000000000000..40a47bb8becc --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_setup.py @@ -0,0 +1,19 @@ +from fastapi import FastAPI +from settings_library.redis import RedisDatabase + +from ..redis import get_redis_client +from ._tracker import Tracker + + +def setup_service_tracker(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.service_tracker = Tracker( + get_redis_client(app, RedisDatabase.DYNAMIC_SERVICES) + ) + + app.add_event_handler("startup", on_startup) + + +def get_tracker(app: FastAPI) -> Tracker: + tracker: Tracker = app.state.service_tracker + return tracker diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py new file mode 100644 index 000000000000..489cee153105 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass +from typing import Final + +from models_library.projects_nodes_io import NodeID +from servicelib.redis import RedisClientSDK + +from ._models import TrackedServiceModel + +_KEY_PREFIX: Final[str] = "t::" + + +def _get_key(node_id: NodeID) -> str: + return f"{_KEY_PREFIX}{node_id}" + + +@dataclass +class Tracker: + redis_client_sdk: RedisClientSDK + + async def save(self, node_id: NodeID, model: TrackedServiceModel) -> None: + await self.redis_client_sdk.redis.set(_get_key(node_id), model.to_bytes()) + + async def load(self, node_id: NodeID) -> TrackedServiceModel | None: + model_as_bytes: bytes | None = await self.redis_client_sdk.redis.get( + _get_key(node_id) + ) + return ( + None + if model_as_bytes is None + else TrackedServiceModel.from_bytes(model_as_bytes) + ) + + async def delete(self, node_id: NodeID) -> None: + await self.redis_client_sdk.redis.delete(_get_key(node_id)) + + async def all(self) -> dict[NodeID, TrackedServiceModel]: + found_keys = await self.redis_client_sdk.redis.keys(f"{_KEY_PREFIX}*") + found_values = await self.redis_client_sdk.redis.mget(found_keys) + + return { + NodeID(k.decode().lstrip(_KEY_PREFIX)): TrackedServiceModel.from_bytes(v) + for k, v in zip(found_keys, found_values, strict=True) + if v is not None + } diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/__init__.py new file mode 100644 index 000000000000..263451243252 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/__init__.py @@ -0,0 +1,3 @@ +from ._setup import setup_status_monitor + +__all__: tuple[str, ...] = ("setup_status_monitor",) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py new file mode 100644 index 000000000000..f710204504c2 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py @@ -0,0 +1,85 @@ +import logging +from datetime import timedelta + +from fastapi import FastAPI +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services_service import ( + RunningDynamicServiceDetails, +) +from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID +from servicelib.deferred_tasks._base_deferred_handler import DeferredContext + +from .. import service_tracker +from ..director_v2 import DirectorV2Client +from ..notifier import notify_service_status_change + +_logger = logging.getLogger(__name__) + + +class DeferredGetStatus(BaseDeferredHandler[NodeGet | DynamicServiceGet | NodeGetIdle]): + @classmethod + async def get_timeout(cls, context: DeferredContext) -> timedelta: + assert context # nosec + return timedelta(seconds=5) + + @classmethod + async def start( # type:ignore[override] # pylint:disable=arguments-differ + cls, node_id: NodeID + ) -> DeferredContext: + _logger.debug("Getting service status for %s", node_id) + return {"node_id": node_id} + + @classmethod + async def on_created(cls, task_uid: TaskUID, context: DeferredContext) -> None: + """called after deferred was scheduled to run""" + app: FastAPI = context["app"] + node_id: NodeID = context["node_id"] + + await service_tracker.set_service_status_task_uid(app, node_id, task_uid) + + @classmethod + async def run( + cls, context: DeferredContext + ) -> NodeGet | DynamicServiceGet | NodeGetIdle: + app: FastAPI = context["app"] + node_id: NodeID = context["node_id"] + + director_v2_client: DirectorV2Client = DirectorV2Client.get_from_app_state(app) + service_status: NodeGet | RunningDynamicServiceDetails | NodeGetIdle = ( + await director_v2_client.get_status(node_id) + ) + _logger.debug( + "Service status type=%s, %s", type(service_status), service_status + ) + return service_status + + @classmethod + async def on_result( + cls, result: NodeGet | DynamicServiceGet | NodeGetIdle, context: DeferredContext + ) -> None: + app: FastAPI = context["app"] + node_id: NodeID = context["node_id"] + + _logger.debug("Received status for service '%s': '%s'", node_id, result) + + status_changed: bool = await service_tracker.set_if_status_changed_for_service( + app, node_id, result + ) + if await service_tracker.should_notify_frontend_for_service( + app, node_id, status_changed=status_changed + ): + user_id: UserID | None = await service_tracker.get_user_id_for_service( + app, node_id + ) + if user_id: + await notify_service_status_change(app, user_id, result) + await service_tracker.set_frontned_notified_for_service(app, node_id) + else: + _logger.info( + "Did not find a user for '%s', skipping status delivery of: %s", + node_id, + result, + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py new file mode 100644 index 000000000000..0d8b5a2723f3 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py @@ -0,0 +1,121 @@ +import logging +from datetime import timedelta +from functools import cached_property +from typing import Final + +import arrow +from fastapi import FastAPI +from models_library.projects_nodes_io import NodeID +from pydantic import NonNegativeFloat, NonNegativeInt +from servicelib.background_task import stop_periodic_task +from servicelib.redis_utils import start_exclusive_periodic_task +from servicelib.utils import limited_gather +from settings_library.redis import RedisDatabase + +from .. import service_tracker +from ..redis import get_redis_client +from ..service_tracker import NORMAL_RATE_POLL_INTERVAL, TrackedServiceModel +from ..service_tracker._models import SchedulerServiceState, UserRequestedState +from ._deferred_get_status import DeferredGetStatus + +_logger = logging.getLogger(__name__) + +_INTERVAL_BETWEEN_CHECKS: Final[timedelta] = timedelta(seconds=1) +_MAX_CONCURRENCY: Final[NonNegativeInt] = 10 + + +async def _start_get_status_deferred( + app: FastAPI, node_id: NodeID, *, next_check_delay: timedelta +) -> None: + await service_tracker.set_service_scheduled_to_run(app, node_id, next_check_delay) + await DeferredGetStatus.start(node_id=node_id) + + +class Monitor: + def __init__(self, app: FastAPI, status_worker_interval: timedelta) -> None: + self.app = app + self.status_worker_interval = status_worker_interval + + @cached_property + def status_worker_interval_seconds(self) -> NonNegativeFloat: + return self.status_worker_interval.total_seconds() + + async def _worker_start_get_status_requests(self) -> None: + """ + Check if a service requires it's status to be polled. + Note that the interval at which the status is polled can vary. + This is a relatively low resoruce check. + """ + + # NOTE: this worker runs on only once across all instances of the scheduler + + models: dict[ + NodeID, TrackedServiceModel + ] = await service_tracker.get_all_tracked_services(self.app) + + to_remove: list[NodeID] = [] + to_start: list[NodeID] = [] + + current_timestamp = arrow.utcnow().timestamp() + + for node_id, model in models.items(): + # check if service is idle and status polling should stop + if ( + model.current_state == SchedulerServiceState.IDLE + and model.requested_state == UserRequestedState.STOPPED + ): + to_remove.append(node_id) + continue + + job_not_running = not ( + model.scheduled_to_run + and model.service_status_task_uid is not None + and await DeferredGetStatus.is_present(model.service_status_task_uid) + ) + wait_period_finished = current_timestamp > model.check_status_after + if job_not_running and wait_period_finished: + to_start.append(node_id) + else: + _logger.info( + "Skipping status check for %s, because: %s or %s", + node_id, + f"{job_not_running=}", + ( + f"{wait_period_finished=}" + if wait_period_finished + else f"can_start_in={model.check_status_after - current_timestamp}" + ), + ) + + _logger.debug("Removing tracked services: '%s'", to_remove) + await limited_gather( + *( + service_tracker.remove_tracked_service(self.app, node_id) + for node_id in to_remove + ), + limit=_MAX_CONCURRENCY, + ) + + _logger.debug("Poll status for tracked services: '%s'", to_start) + await limited_gather( + *( + _start_get_status_deferred( + self.app, node_id, next_check_delay=NORMAL_RATE_POLL_INTERVAL + ) + for node_id in to_start + ), + limit=_MAX_CONCURRENCY, + ) + + async def setup(self) -> None: + self.app.state.status_monitor_background_task = start_exclusive_periodic_task( + get_redis_client(self.app, RedisDatabase.LOCKS), + self._worker_start_get_status_requests, + task_period=_INTERVAL_BETWEEN_CHECKS, + retry_after=_INTERVAL_BETWEEN_CHECKS, + task_name="periodic_service_status_update", + ) + + async def shutdown(self) -> None: + if getattr(self.app.state, "status_monitor_background_task", None): + await stop_periodic_task(self.app.state.status_monitor_background_task) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_setup.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_setup.py new file mode 100644 index 000000000000..8f9601464bcb --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_setup.py @@ -0,0 +1,28 @@ +from datetime import timedelta +from typing import Final + +from fastapi import FastAPI + +from ._monitor import Monitor + +_STATUS_WORKER_DEFAULT_INTERVAL: Final[timedelta] = timedelta(seconds=1) + + +def setup_status_monitor(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.status_monitor = monitor = Monitor( + app, status_worker_interval=_STATUS_WORKER_DEFAULT_INTERVAL + ) + await monitor.setup() + + async def on_shutdown() -> None: + monitor: Monitor = app.state.status_monitor + await monitor.shutdown() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +def get_monitor(app: FastAPI) -> Monitor: + monitor: Monitor = app.state.status_monitor + return monitor diff --git a/services/dynamic-scheduler/tests/conftest.py b/services/dynamic-scheduler/tests/conftest.py index ff72140f5ee7..2cb14086b2a2 100644 --- a/services/dynamic-scheduler/tests/conftest.py +++ b/services/dynamic-scheduler/tests/conftest.py @@ -4,6 +4,7 @@ import string from collections.abc import AsyncIterator from pathlib import Path +from typing import Final import pytest import simcore_service_dynamic_scheduler @@ -13,6 +14,9 @@ from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.redis import RedisClientsManager, RedisManagerDBConfig +from servicelib.utils import logged_gather +from settings_library.redis import RedisDatabase, RedisSettings from simcore_service_dynamic_scheduler.core.application import create_app pytest_plugins = [ @@ -20,6 +24,7 @@ "pytest_simcore.docker_compose", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", + "pytest_simcore.faker_projects_data", "pytest_simcore.rabbit_service", "pytest_simcore.redis_service", "pytest_simcore.repository_paths", @@ -73,17 +78,38 @@ def app_environment( ) +_PATH_APPLICATION: Final[str] = "simcore_service_dynamic_scheduler.core.application" + + @pytest.fixture def disable_rabbitmq_setup(mocker: MockerFixture) -> None: - base_path = "simcore_service_dynamic_scheduler.core.application" - mocker.patch(f"{base_path}.setup_rabbitmq") - mocker.patch(f"{base_path}.setup_rpc_api_routes") + mocker.patch(f"{_PATH_APPLICATION}.setup_rabbitmq") + mocker.patch(f"{_PATH_APPLICATION}.setup_rpc_api_routes") @pytest.fixture def disable_redis_setup(mocker: MockerFixture) -> None: - base_path = "simcore_service_dynamic_scheduler.core.application" - mocker.patch(f"{base_path}.setup_redis") + mocker.patch(f"{_PATH_APPLICATION}.setup_redis") + + +@pytest.fixture +def disable_service_tracker_setup(mocker: MockerFixture) -> None: + mocker.patch(f"{_PATH_APPLICATION}.setup_service_tracker") + + +@pytest.fixture +def disable_deferred_manager_setup(mocker: MockerFixture) -> None: + mocker.patch(f"{_PATH_APPLICATION}.setup_deferred_manager") + + +@pytest.fixture +def disable_notifier_setup(mocker: MockerFixture) -> None: + mocker.patch(f"{_PATH_APPLICATION}.setup_notifier") + + +@pytest.fixture +def disable_status_monitor_setup(mocker: MockerFixture) -> None: + mocker.patch(f"{_PATH_APPLICATION}.setup_status_monitor") MAX_TIME_FOR_APP_TO_STARTUP = 10 @@ -101,3 +127,13 @@ async def app( shutdown_timeout=None if is_pdb_enabled else MAX_TIME_FOR_APP_TO_SHUTDOWN, ): yield test_app + + +@pytest.fixture +async def remove_redis_data(redis_service: RedisSettings) -> None: + async with RedisClientsManager( + {RedisManagerDBConfig(x) for x in RedisDatabase}, redis_service + ) as manager: + await logged_gather( + *[manager.client(d).redis.flushall() for d in RedisDatabase] + ) diff --git a/services/dynamic-scheduler/tests/unit/api_rest/conftest.py b/services/dynamic-scheduler/tests/unit/api_rest/conftest.py index 987ed8c4d851..efef4241d981 100644 --- a/services/dynamic-scheduler/tests/unit/api_rest/conftest.py +++ b/services/dynamic-scheduler/tests/unit/api_rest/conftest.py @@ -1,13 +1,31 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument from collections.abc import AsyncIterator import pytest from fastapi import FastAPI from httpx import AsyncClient from httpx._transports.asgi import ASGITransport +from pytest_simcore.helpers.typing_env import EnvVarsDict @pytest.fixture -async def client(app: FastAPI) -> AsyncIterator[AsyncClient]: +def app_environment( + disable_rabbitmq_setup: None, + disable_redis_setup: None, + disable_service_tracker_setup: None, + disable_deferred_manager_setup: None, + disable_notifier_setup: None, + disable_status_monitor_setup: None, + app_environment: EnvVarsDict, +) -> EnvVarsDict: + return app_environment + + +@pytest.fixture +async def client( + app_environment: EnvVarsDict, app: FastAPI +) -> AsyncIterator[AsyncClient]: # - Needed for app to trigger start/stop event handlers # - Prefer this client instead of fastapi.testclient.TestClient async with AsyncClient( diff --git a/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__health.py b/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__health.py index 8cc1c3279efd..9b5648e12b4e 100644 --- a/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__health.py +++ b/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__health.py @@ -21,7 +21,6 @@ def __init__(self, is_ok: bool) -> None: @pytest.fixture def mock_rabbitmq_clients( - disable_rabbitmq_setup: None, mocker: MockerFixture, rabbit_client_ok: bool, rabbit_rpc_server_ok: bool, @@ -39,11 +38,13 @@ def mock_rabbitmq_clients( @pytest.fixture def mock_redis_client( - disable_redis_setup: None, mocker: MockerFixture, redis_client_ok: bool + mocker: MockerFixture, + redis_client_ok: bool, ) -> None: base_path = "simcore_service_dynamic_scheduler.api.rest._dependencies" mocker.patch( - f"{base_path}.get_redis_client", return_value=MockHealth(redis_client_ok) + f"{base_path}.get_all_redis_clients", + return_value={0: MockHealth(redis_client_ok)}, ) diff --git a/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__meta.py b/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__meta.py index 6e68190bcee9..8d986dfe60ed 100644 --- a/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__meta.py +++ b/services/dynamic-scheduler/tests/unit/api_rest/test_api_rest__meta.py @@ -1,24 +1,11 @@ # pylint:disable=redefined-outer-name # pylint:disable=unused-argument - - -import pytest from fastapi import status from httpx import AsyncClient -from pytest_simcore.helpers.typing_env import EnvVarsDict from simcore_service_dynamic_scheduler._meta import API_VTAG from simcore_service_dynamic_scheduler.models.schemas.meta import Meta -@pytest.fixture -def app_environment( - disable_rabbitmq_setup: None, - disable_redis_setup: None, - app_environment: EnvVarsDict, -) -> EnvVarsDict: - return app_environment - - async def test_health(client: AsyncClient): response = await client.get(f"/{API_VTAG}/meta") assert response.status_code == status.HTTP_200_OK diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index 7c8dada1e183..c484f722ff95 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -59,7 +59,7 @@ def service_status_new_style() -> DynamicServiceGet: @pytest.fixture def service_status_legacy() -> NodeGet: - return NodeGet.parse_obj(NodeGet.Config.schema_extra["example"]) + return NodeGet.parse_obj(NodeGet.Config.schema_extra["examples"][1]) @pytest.fixture diff --git a/services/dynamic-scheduler/tests/unit/conftest.py b/services/dynamic-scheduler/tests/unit/conftest.py new file mode 100644 index 000000000000..642ed2170ce1 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/conftest.py @@ -0,0 +1,29 @@ +from collections.abc import Callable +from copy import deepcopy + +import pytest +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.projects_nodes_io import NodeID + + +@pytest.fixture +def get_dynamic_service_start() -> Callable[[NodeID], DynamicServiceStart]: + def _(node_id: NodeID) -> DynamicServiceStart: + dict_data = deepcopy(DynamicServiceStart.Config.schema_extra["example"]) + dict_data["service_uuid"] = f"{node_id}" + return DynamicServiceStart.parse_obj(dict_data) + + return _ + + +@pytest.fixture +def get_dynamic_service_stop() -> Callable[[NodeID], DynamicServiceStop]: + def _(node_id: NodeID) -> DynamicServiceStop: + dict_data = deepcopy(DynamicServiceStop.Config.schema_extra["example"]) + dict_data["node_id"] = f"{node_id}" + return DynamicServiceStop.parse_obj(dict_data) + + return _ diff --git a/services/dynamic-scheduler/tests/unit/service_tracker/test__api.py b/services/dynamic-scheduler/tests/unit/service_tracker/test__api.py new file mode 100644 index 000000000000..0755f7e5d786 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/service_tracker/test__api.py @@ -0,0 +1,325 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from collections.abc import Callable +from datetime import timedelta +from typing import Any, Final, NamedTuple +from uuid import uuid4 + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle +from models_library.projects_nodes_io import NodeID +from models_library.services_enums import ServiceState +from pydantic import NonNegativeInt +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.deferred_tasks import TaskUID +from servicelib.utils import limited_gather +from settings_library.redis import RedisSettings +from simcore_service_dynamic_scheduler.services.service_tracker import ( + get_all_tracked_services, + get_tracked_service, + remove_tracked_service, + set_if_status_changed_for_service, + set_request_as_running, + set_request_as_stopped, + set_service_status_task_uid, +) +from simcore_service_dynamic_scheduler.services.service_tracker._api import ( + _LOW_RATE_POLL_INTERVAL, + NORMAL_RATE_POLL_INTERVAL, + _get_current_scheduler_service_state, + _get_poll_interval, +) +from simcore_service_dynamic_scheduler.services.service_tracker._models import ( + SchedulerServiceState, + UserRequestedState, +) + +pytest_simcore_core_services_selection = [ + "redis", +] + + +@pytest.fixture +def app_environment( + disable_rabbitmq_setup: None, + disable_deferred_manager_setup: None, + disable_notifier_setup: None, + app_environment: EnvVarsDict, + redis_service: RedisSettings, + remove_redis_data: None, +) -> EnvVarsDict: + return app_environment + + +async def test_services_tracer_set_as_running_set_as_stopped( + app: FastAPI, + node_id: NodeID, + get_dynamic_service_start: Callable[[NodeID], DynamicServiceStart], + get_dynamic_service_stop: Callable[[NodeID], DynamicServiceStop], +): + async def _remove_service() -> None: + await remove_tracked_service(app, node_id) + assert await get_tracked_service(app, node_id) is None + assert await get_all_tracked_services(app) == {} + + async def _set_as_running() -> None: + await set_request_as_running(app, get_dynamic_service_start(node_id)) + tracked_model = await get_tracked_service(app, node_id) + assert tracked_model + assert tracked_model.requested_state == UserRequestedState.RUNNING + + async def _set_as_stopped() -> None: + await set_request_as_stopped(app, get_dynamic_service_stop(node_id)) + tracked_model = await get_tracked_service(app, node_id) + assert tracked_model + assert tracked_model.requested_state == UserRequestedState.STOPPED + + # request as running then as stopped + await _remove_service() + await _set_as_running() + await _set_as_stopped() + + # request as stopped then as running + await _remove_service() + await _set_as_stopped() + await _set_as_running() + + +@pytest.mark.parametrize("item_count", [100]) +async def test_services_tracer_workflow( + app: FastAPI, + node_id: NodeID, + item_count: NonNegativeInt, + get_dynamic_service_start: Callable[[NodeID], DynamicServiceStart], + get_dynamic_service_stop: Callable[[NodeID], DynamicServiceStop], +): + # ensure more than one service can be tracked + await limited_gather( + *[ + set_request_as_stopped(app, get_dynamic_service_stop(uuid4())) + for _ in range(item_count) + ], + limit=100, + ) + assert len(await get_all_tracked_services(app)) == item_count + + +@pytest.mark.parametrize( + "status", + [ + *[NodeGet.parse_obj(o) for o in NodeGet.Config.schema_extra["examples"]], + *[ + DynamicServiceGet.parse_obj(o) + for o in DynamicServiceGet.Config.schema_extra["examples"] + ], + NodeGetIdle.parse_obj(NodeGetIdle.Config.schema_extra["example"]), + ], +) +async def test_set_if_status_changed( + app: FastAPI, + node_id: NodeID, + status: NodeGet | DynamicServiceGet | NodeGetIdle, + get_dynamic_service_start: Callable[[NodeID], DynamicServiceStart], +): + await set_request_as_running(app, get_dynamic_service_start(node_id)) + + assert await set_if_status_changed_for_service(app, node_id, status) is True + + assert await set_if_status_changed_for_service(app, node_id, status) is False + + model = await get_tracked_service(app, node_id) + assert model + + assert model.service_status == status.json() + + +async def test_set_service_status_task_uid( + app: FastAPI, + node_id: NodeID, + faker: Faker, + get_dynamic_service_start: Callable[[NodeID], DynamicServiceStart], +): + await set_request_as_running(app, get_dynamic_service_start(node_id)) + + task_uid = TaskUID(faker.uuid4()) + await set_service_status_task_uid(app, node_id, task_uid) + + model = await get_tracked_service(app, node_id) + assert model + + assert model.service_status_task_uid == task_uid + + +@pytest.mark.parametrize( + "status, expected_poll_interval", + [ + ( + NodeGet.parse_obj(NodeGet.Config.schema_extra["examples"][1]), + _LOW_RATE_POLL_INTERVAL, + ), + *[ + (DynamicServiceGet.parse_obj(o), NORMAL_RATE_POLL_INTERVAL) + for o in DynamicServiceGet.Config.schema_extra["examples"] + ], + ( + NodeGetIdle.parse_obj(NodeGetIdle.Config.schema_extra["example"]), + _LOW_RATE_POLL_INTERVAL, + ), + ], +) +def test__get_poll_interval( + status: NodeGet | DynamicServiceGet | NodeGetIdle, expected_poll_interval: timedelta +): + assert _get_poll_interval(status) == expected_poll_interval + + +def _get_node_get_from(service_state: ServiceState) -> NodeGet: + dict_data = NodeGet.Config.schema_extra["examples"][1] + assert "service_state" in dict_data + dict_data["service_state"] = service_state + return NodeGet.parse_obj(dict_data) + + +def _get_dynamic_service_get_from( + service_state: ServiceState, +) -> DynamicServiceGet: + dict_data = DynamicServiceGet.Config.schema_extra["examples"][1] + assert "state" in dict_data + dict_data["state"] = service_state + return DynamicServiceGet.parse_obj(dict_data) + + +def _get_node_get_idle() -> NodeGetIdle: + return NodeGetIdle.parse_obj(NodeGetIdle.Config.schema_extra["example"]) + + +def __get_flat_list(nested_list: list[list[Any]]) -> list[Any]: + return [item for sublist in nested_list for item in sublist] + + +class ServiceStatusToSchedulerState(NamedTuple): + requested: UserRequestedState + service_status: NodeGet | DynamicServiceGet | NodeGetIdle + expected: SchedulerServiceState + + +_EXPECTED_TEST_CASES: list[list[ServiceStatusToSchedulerState]] = [ + [ + # UserRequestedState.RUNNING + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.PENDING), + SchedulerServiceState.STARTING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.PULLING), + SchedulerServiceState.STARTING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.STARTING), + SchedulerServiceState.STARTING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.RUNNING), + SchedulerServiceState.RUNNING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.COMPLETE), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.FAILED), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + status_generator(ServiceState.STOPPING), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.RUNNING, + _get_node_get_idle(), + SchedulerServiceState.IDLE, + ), + # UserRequestedState.STOPPED + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.PENDING), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.PULLING), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.STARTING), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.RUNNING), + SchedulerServiceState.STOPPING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.COMPLETE), + SchedulerServiceState.STOPPING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.FAILED), + SchedulerServiceState.UNEXPECTED_OUTCOME, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + status_generator(ServiceState.STOPPING), + SchedulerServiceState.STOPPING, + ), + ServiceStatusToSchedulerState( + UserRequestedState.STOPPED, + _get_node_get_idle(), + SchedulerServiceState.IDLE, + ), + ] + for status_generator in ( + _get_node_get_from, + _get_dynamic_service_get_from, + ) +] +_FLAT_EXPECTED_TEST_CASES: list[ServiceStatusToSchedulerState] = __get_flat_list( + _EXPECTED_TEST_CASES +) +# ensure enum changes do not break above rules +_NODE_STATUS_FORMATS_COUNT: Final[int] = 2 +assert ( + len(_FLAT_EXPECTED_TEST_CASES) + == len(ServiceState) * len(UserRequestedState) * _NODE_STATUS_FORMATS_COUNT +) + + +@pytest.mark.parametrize("service_status_to_scheduler_state", _FLAT_EXPECTED_TEST_CASES) +def test__get_current_scheduler_service_state( + service_status_to_scheduler_state: ServiceStatusToSchedulerState, +): + assert ( + _get_current_scheduler_service_state( + service_status_to_scheduler_state.requested, + service_status_to_scheduler_state.service_status, + ) + == service_status_to_scheduler_state.expected + ) diff --git a/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py b/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py new file mode 100644 index 000000000000..6b8e31321b38 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py @@ -0,0 +1,57 @@ +from datetime import timedelta + +import arrow +import pytest +from faker import Faker +from servicelib.deferred_tasks import TaskUID +from simcore_service_dynamic_scheduler.services.service_tracker._models import ( + SchedulerServiceState, + TrackedServiceModel, + UserRequestedState, +) + + +@pytest.mark.parametrize("requested_state", UserRequestedState) +@pytest.mark.parametrize("current_state", SchedulerServiceState) +@pytest.mark.parametrize("check_status_after", [1, arrow.utcnow().timestamp()]) +@pytest.mark.parametrize("service_status_task_uid", [None, TaskUID("ok")]) +def test_serialization( + faker: Faker, + requested_state: UserRequestedState, + current_state: SchedulerServiceState, + check_status_after: float, + service_status_task_uid: TaskUID | None, +): + tracked_model = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=requested_state, + current_state=current_state, + service_status=faker.pystr(), + check_status_after=check_status_after, + service_status_task_uid=service_status_task_uid, + ) + + as_bytes = tracked_model.to_bytes() + assert as_bytes + assert TrackedServiceModel.from_bytes(as_bytes) == tracked_model + + +async def test_set_check_status_after_to(): + model = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=UserRequestedState.RUNNING, + ) + assert model.check_status_after < arrow.utcnow().timestamp() + + delay = timedelta(seconds=4) + + before = (arrow.utcnow() + delay).timestamp() + model.set_check_status_after_to(delay) + after = (arrow.utcnow() + delay).timestamp() + + assert model.check_status_after + assert before < model.check_status_after < after diff --git a/services/dynamic-scheduler/tests/unit/service_tracker/test__tracker.py b/services/dynamic-scheduler/tests/unit/service_tracker/test__tracker.py new file mode 100644 index 000000000000..59739ddf8f60 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/service_tracker/test__tracker.py @@ -0,0 +1,94 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from uuid import uuid4 + +import pytest +from fastapi import FastAPI +from models_library.projects_nodes_io import NodeID +from pydantic import NonNegativeInt +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.utils import logged_gather +from settings_library.redis import RedisSettings +from simcore_service_dynamic_scheduler.services.service_tracker._models import ( + TrackedServiceModel, + UserRequestedState, +) +from simcore_service_dynamic_scheduler.services.service_tracker._setup import ( + get_tracker, +) +from simcore_service_dynamic_scheduler.services.service_tracker._tracker import Tracker + +pytest_simcore_core_services_selection = [ + "redis", +] + + +@pytest.fixture +def app_environment( + disable_rabbitmq_setup: None, + disable_deferred_manager_setup: None, + disable_notifier_setup: None, + app_environment: EnvVarsDict, + redis_service: RedisSettings, + remove_redis_data: None, +) -> EnvVarsDict: + return app_environment + + +@pytest.fixture +def tracker(app: FastAPI) -> Tracker: + return get_tracker(app) + + +async def test_tracker_workflow(tracker: Tracker): + node_id: NodeID = uuid4() + + # ensure does not already exist + result = await tracker.load(node_id) + assert result is None + + # node creation + model = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=UserRequestedState.RUNNING, + ) + await tracker.save(node_id, model) + + # check if exists + result = await tracker.load(node_id) + assert result == model + + # remove and check is missing + await tracker.delete(node_id) + result = await tracker.load(node_id) + assert result is None + + +@pytest.mark.parametrize("item_count", [100]) +async def test_tracker_listing(tracker: Tracker, item_count: NonNegativeInt) -> None: + assert await tracker.all() == {} + + model_to_insert = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=UserRequestedState.RUNNING, + ) + + data_to_insert = {uuid4(): model_to_insert for _ in range(item_count)} + + await logged_gather( + *[tracker.save(k, v) for k, v in data_to_insert.items()], max_concurrency=100 + ) + + response = await tracker.all() + for key in response: + assert isinstance(key, NodeID) + assert response == data_to_insert + + +async def test_remove_missing_key_does_not_raise_error(tracker: Tracker): + await tracker.delete(uuid4()) diff --git a/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py b/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py new file mode 100644 index 000000000000..e3d6acffa39c --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py @@ -0,0 +1,415 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +import json +import re +from collections.abc import AsyncIterable, Callable +from copy import deepcopy +from typing import Any +from unittest.mock import AsyncMock +from uuid import uuid4 + +import pytest +import respx +from fastapi import FastAPI, status +from fastapi.encoders import jsonable_encoder +from httpx import Request, Response +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle +from models_library.projects_nodes_io import NodeID +from pydantic import NonNegativeInt +from pytest_mock import MockerFixture +from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings +from simcore_service_dynamic_scheduler.services.service_tracker import ( + get_all_tracked_services, + set_request_as_running, + set_request_as_stopped, +) +from simcore_service_dynamic_scheduler.services.status_monitor import _monitor +from simcore_service_dynamic_scheduler.services.status_monitor._deferred_get_status import ( + DeferredGetStatus, +) +from simcore_service_dynamic_scheduler.services.status_monitor._monitor import Monitor +from simcore_service_dynamic_scheduler.services.status_monitor._setup import get_monitor +from tenacity import AsyncRetrying +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + +pytest_simcore_core_services_selection = [ + "rabbit", + "redis", +] + + +@pytest.fixture +def app_environment( + app_environment: EnvVarsDict, + rabbit_service: RabbitSettings, + redis_service: RedisSettings, + remove_redis_data: None, +) -> EnvVarsDict: + return app_environment + + +_DEFAULT_NODE_ID: NodeID = uuid4() + + +def _add_to_dict(dict_data: dict, entries: list[tuple[str, Any]]) -> None: + for key, data in entries: + assert key in dict_data + dict_data[key] = data + + +def _get_node_get_with(state: str, node_id: NodeID = _DEFAULT_NODE_ID) -> NodeGet: + dict_data = deepcopy(NodeGet.Config.schema_extra["examples"][1]) + _add_to_dict( + dict_data, + [ + ("service_state", state), + ("service_uuid", f"{node_id}"), + ], + ) + return NodeGet.parse_obj(dict_data) + + +def _get_dynamic_service_get_legacy_with( + state: str, node_id: NodeID = _DEFAULT_NODE_ID +) -> DynamicServiceGet: + dict_data = deepcopy(DynamicServiceGet.Config.schema_extra["examples"][0]) + _add_to_dict( + dict_data, + [ + ("state", state), + ("uuid", f"{node_id}"), + ("node_uuid", f"{node_id}"), + ], + ) + return DynamicServiceGet.parse_obj(dict_data) + + +def _get_dynamic_service_get_new_style_with( + state: str, node_id: NodeID = _DEFAULT_NODE_ID +) -> DynamicServiceGet: + dict_data = deepcopy(DynamicServiceGet.Config.schema_extra["examples"][1]) + _add_to_dict( + dict_data, + [ + ("state", state), + ("uuid", f"{node_id}"), + ("node_uuid", f"{node_id}"), + ], + ) + return DynamicServiceGet.parse_obj(dict_data) + + +def _get_node_get_idle(node_id: NodeID = _DEFAULT_NODE_ID) -> NodeGetIdle: + dict_data = NodeGetIdle.Config.schema_extra["example"] + _add_to_dict( + dict_data, + [ + ("service_uuid", f"{node_id}"), + ], + ) + return NodeGetIdle.parse_obj(dict_data) + + +class _ResponseTimeline: + def __init__( + self, timeline: list[NodeGet | DynamicServiceGet | NodeGetIdle] + ) -> None: + self._timeline = timeline + + self._client_access_history: dict[NodeID, NonNegativeInt] = {} + + @property + def entries(self) -> list[NodeGet | DynamicServiceGet | NodeGetIdle]: + return self._timeline + + def __len__(self) -> int: + return len(self._timeline) + + def get_status(self, node_id: NodeID) -> NodeGet | DynamicServiceGet | NodeGetIdle: + if node_id not in self._client_access_history: + self._client_access_history[node_id] = 0 + + # always return node idle when timeline finished playing + if self._client_access_history[node_id] >= len(self._timeline): + return _get_node_get_idle() + + status = self._timeline[self._client_access_history[node_id]] + self._client_access_history[node_id] += 1 + return status + + +async def _assert_call_to( + deferred_status_spies: dict[str, AsyncMock], *, method: str, count: NonNegativeInt +) -> None: + async for attempt in AsyncRetrying( + reraise=True, + stop=stop_after_delay(1), + wait=wait_fixed(0.01), + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + call_count = deferred_status_spies[method].call_count + assert ( + call_count == count + ), f"Received calls {call_count} != {count} (expected) to '{method}'" + + +async def _assert_result( + deferred_status_spies: dict[str, AsyncMock], + *, + timeline: list[NodeGet | DynamicServiceGet | NodeGetIdle], +) -> None: + async for attempt in AsyncRetrying( + reraise=True, + stop=stop_after_delay(1), + wait=wait_fixed(0.01), + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + + assert deferred_status_spies["on_result"].call_count == len(timeline) + assert [ + x.args[0] for x in deferred_status_spies["on_result"].call_args_list + ] == timeline + + +async def _assert_notification_count( + mock: AsyncMock, expected_count: NonNegativeInt +) -> None: + async for attempt in AsyncRetrying( + reraise=True, + stop=stop_after_delay(1), + wait=wait_fixed(0.01), + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert mock.call_count == expected_count + + +@pytest.fixture +async def mock_director_v2_status( + app: FastAPI, response_timeline: _ResponseTimeline +) -> AsyncIterable[None]: + def _side_effect_node_status_response(request: Request) -> Response: + node_id = NodeID(f"{request.url}".split("/")[-1]) + + service_status = response_timeline.get_status(node_id) + + if isinstance(service_status, NodeGet): + return Response( + status.HTTP_200_OK, + text=json.dumps(jsonable_encoder({"data": service_status.dict()})), + ) + if isinstance(service_status, DynamicServiceGet): + return Response(status.HTTP_200_OK, text=service_status.json()) + if isinstance(service_status, NodeGetIdle): + return Response(status.HTTP_404_NOT_FOUND) + + raise TypeError + + with respx.mock( + base_url=app.state.settings.DYNAMIC_SCHEDULER_DIRECTOR_V2_SETTINGS.api_base_url, + assert_all_called=False, + assert_all_mocked=True, + ) as mock: + mock.get(re.compile(r"/dynamic_services/([\w-]+)")).mock( + side_effect=_side_effect_node_status_response + ) + yield + + +@pytest.fixture +def monitor(mock_director_v2_status: None, app: FastAPI) -> Monitor: + return get_monitor(app) + + +@pytest.fixture +def deferred_status_spies(mocker: MockerFixture) -> dict[str, AsyncMock]: + results: dict[str, AsyncMock] = {} + for method_name in ( + "start", + "on_result", + "on_created", + "run", + "on_finished_with_error", + ): + mock_method = mocker.AsyncMock(wraps=getattr(DeferredGetStatus, method_name)) + mocker.patch.object(DeferredGetStatus, method_name, mock_method) + results[method_name] = mock_method + + return results + + +@pytest.fixture +def remove_tracked_spy(mocker: MockerFixture) -> AsyncMock: + mock_method = mocker.AsyncMock( + wraps=_monitor.service_tracker.remove_tracked_service + ) + return mocker.patch.object( + _monitor.service_tracker, + _monitor.service_tracker.remove_tracked_service.__name__, + mock_method, + ) + + +@pytest.fixture +def node_id() -> NodeID: + return _DEFAULT_NODE_ID + + +@pytest.fixture +def mocked_notify_frontend(mocker: MockerFixture) -> AsyncMock: + return mocker.patch( + "simcore_service_dynamic_scheduler.services.status_monitor._deferred_get_status.notify_service_status_change" + ) + + +@pytest.fixture +def disable_status_monitor_background_task(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_dynamic_scheduler.services.status_monitor._monitor.Monitor.setup" + ) + + +@pytest.mark.parametrize( + "user_requests_running, response_timeline, expected_notification_count, remove_tracked_count", + [ + pytest.param( + True, + _ResponseTimeline([_get_node_get_with("running")]), + 1, + 0, + id="requested_running_state_changes_1_no_task_removal", + ), + pytest.param( + True, + _ResponseTimeline( + [_get_dynamic_service_get_legacy_with("running") for _ in range(10)] + ), + 1, + 0, + id="requested_running_state_changes_1_for_multiple_same_state_no_task_removal", + ), + pytest.param( + True, + _ResponseTimeline([_get_node_get_idle()]), + 1, + 0, + id="requested_running_state_idle_no_removal", + ), + pytest.param( + False, + _ResponseTimeline([_get_node_get_idle()]), + 1, + 1, + id="requested_stopped_state_idle_is_removed", + ), + pytest.param( + True, + _ResponseTimeline( + [ + *[_get_node_get_idle() for _ in range(10)], + _get_dynamic_service_get_new_style_with("pending"), + _get_dynamic_service_get_new_style_with("pulling"), + *[ + _get_dynamic_service_get_new_style_with("starting") + for _ in range(10) + ], + _get_dynamic_service_get_new_style_with("running"), + _get_dynamic_service_get_new_style_with("stopping"), + _get_dynamic_service_get_new_style_with("complete"), + _get_node_get_idle(), + ] + ), + 8, + 0, + id="requested_running_state_changes_8_no_removal", + ), + pytest.param( + False, + _ResponseTimeline( + [ + _get_dynamic_service_get_new_style_with("pending"), + _get_dynamic_service_get_new_style_with("pulling"), + *[ + _get_dynamic_service_get_new_style_with("starting") + for _ in range(10) + ], + _get_dynamic_service_get_new_style_with("running"), + _get_dynamic_service_get_new_style_with("stopping"), + _get_dynamic_service_get_new_style_with("complete"), + _get_node_get_idle(), + ] + ), + 7, + 1, + id="requested_stopped_state_changes_7_is_removed", + ), + ], +) +async def test_expected_calls_to_notify_frontend( # pylint:disable=too-many-arguments + disable_status_monitor_background_task: None, + mocked_notify_frontend: AsyncMock, + deferred_status_spies: dict[str, AsyncMock], + remove_tracked_spy: AsyncMock, + app: FastAPI, + monitor: Monitor, + node_id: NodeID, + user_requests_running: bool, + response_timeline: _ResponseTimeline, + expected_notification_count: NonNegativeInt, + remove_tracked_count: NonNegativeInt, + get_dynamic_service_start: Callable[[NodeID], DynamicServiceStart], + get_dynamic_service_stop: Callable[[NodeID], DynamicServiceStop], +): + assert await get_all_tracked_services(app) == {} + + if user_requests_running: + await set_request_as_running(app, get_dynamic_service_start(node_id)) + else: + await set_request_as_stopped(app, get_dynamic_service_stop(node_id)) + + entries_in_timeline = len(response_timeline) + + for i in range(entries_in_timeline): + async for attempt in AsyncRetrying( + reraise=True, stop=stop_after_delay(10), wait=wait_fixed(0.1) + ): + with attempt: + # pylint:disable=protected-access + await monitor._worker_start_get_status_requests() # noqa: SLF001 + for method in ("start", "on_created", "on_result"): + await _assert_call_to( + deferred_status_spies, method=method, count=i + 1 + ) + + await _assert_call_to( + deferred_status_spies, method="run", count=entries_in_timeline + ) + await _assert_call_to( + deferred_status_spies, method="on_finished_with_error", count=0 + ) + + await _assert_result(deferred_status_spies, timeline=response_timeline.entries) + + await _assert_notification_count( + mocked_notify_frontend, expected_notification_count + ) + + async for attempt in AsyncRetrying( + reraise=True, stop=stop_after_delay(1), wait=wait_fixed(0.1) + ): + with attempt: + # pylint:disable=protected-access + await monitor._worker_start_get_status_requests() # noqa: SLF001 + assert remove_tracked_spy.call_count == remove_tracked_count diff --git a/services/dynamic-scheduler/tests/unit/test_services_rabbitmq.py b/services/dynamic-scheduler/tests/unit/test_services_rabbitmq.py index feefc0c1aa4a..eadb7c9ee038 100644 --- a/services/dynamic-scheduler/tests/unit/test_services_rabbitmq.py +++ b/services/dynamic-scheduler/tests/unit/test_services_rabbitmq.py @@ -21,6 +21,10 @@ @pytest.fixture def app_environment( disable_redis_setup: None, + disable_service_tracker_setup: None, + disable_deferred_manager_setup: None, + disable_notifier_setup: None, + disable_status_monitor_setup: None, app_environment: EnvVarsDict, rabbit_service: RabbitSettings, ) -> EnvVarsDict: diff --git a/services/dynamic-scheduler/tests/unit/test_services_redis.py b/services/dynamic-scheduler/tests/unit/test_services_redis.py index 7a7d90063851..059a17aeb0fc 100644 --- a/services/dynamic-scheduler/tests/unit/test_services_redis.py +++ b/services/dynamic-scheduler/tests/unit/test_services_redis.py @@ -6,7 +6,7 @@ from fastapi import FastAPI from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.redis import RedisSettings -from simcore_service_dynamic_scheduler.services.redis import get_redis_client +from simcore_service_dynamic_scheduler.services.redis import get_all_redis_clients pytest_simcore_core_services_selection = [ "redis", @@ -16,6 +16,9 @@ @pytest.fixture def app_environment( disable_rabbitmq_setup: None, + disable_deferred_manager_setup: None, + disable_notifier_setup: None, + disable_status_monitor_setup: None, app_environment: EnvVarsDict, redis_service: RedisSettings, ) -> EnvVarsDict: @@ -23,5 +26,6 @@ def app_environment( async def test_health(app: FastAPI): - redis_client = get_redis_client(app) - assert await redis_client.ping() is True + redis_clients = get_all_redis_clients(app) + for redis_client in redis_clients.values(): + assert await redis_client.ping() is True diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index 58b06af19e4c..c7b1ad4629ab 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -57,6 +57,7 @@ ServiceWaitingForManualInterventionError, ServiceWasNotFoundError, ) +from servicelib.services_utils import get_status_as_dict from simcore_postgres_database.models.users import UserRole from .._meta import API_VTAG as VTAG @@ -208,11 +209,7 @@ async def get_node(request: web.Request) -> web.Response: ) ) - return envelope_json_response( - service_data.dict(by_alias=True) - if isinstance(service_data, DynamicServiceGet) - else service_data.dict() - ) + return envelope_json_response(get_status_as_dict(service_data)) @routes.patch( diff --git a/services/web/server/tests/unit/isolated/test_dynamic_scheduler.py b/services/web/server/tests/unit/isolated/test_dynamic_scheduler.py index 0823f52b1b29..6308141d2540 100644 --- a/services/web/server/tests/unit/isolated/test_dynamic_scheduler.py +++ b/services/web/server/tests/unit/isolated/test_dynamic_scheduler.py @@ -55,7 +55,7 @@ def dynamic_service_start() -> DynamicServiceStart: @pytest.mark.parametrize( "expected_response", [ - NodeGet.parse_obj(NodeGet.Config.schema_extra["example"]), + *[NodeGet.parse_obj(x) for x in NodeGet.Config.schema_extra["examples"]], NodeGetIdle.parse_obj(NodeGetIdle.Config.schema_extra["example"]), DynamicServiceGet.parse_obj( DynamicServiceGet.Config.schema_extra["examples"][0] @@ -98,7 +98,7 @@ async def test_get_service_status_raises_rpc_server_error( @pytest.mark.parametrize( "expected_response", [ - NodeGet.parse_obj(NodeGet.Config.schema_extra["example"]), + *[NodeGet.parse_obj(x) for x in NodeGet.Config.schema_extra["examples"]], DynamicServiceGet.parse_obj( DynamicServiceGet.Config.schema_extra["examples"][0] ), diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py b/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py index 8613fbc83193..d34adace8ae1 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_states_handlers.py @@ -1036,7 +1036,7 @@ async def test_project_node_lifetime( # noqa: PLR0915 project_id=user_project["uuid"], node_id=dynamic_node_id ) - node_sample = deepcopy(NodeGet.Config.schema_extra["example"]) + node_sample = deepcopy(NodeGet.Config.schema_extra["examples"][1]) mocked_director_v2_api[ "dynamic_scheduler.api.get_dynamic_service" ].return_value = NodeGet.parse_obj(