diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index edf4a480c1fc..ae52440b2b61 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -90,13 +90,11 @@ async def stop_dynamic_service( rabbitmq_rpc_client: RabbitMQRPCClient, *, dynamic_service_stop: DynamicServiceStop, - timeout_s: NonNegativeInt, ) -> None: result = await rabbitmq_rpc_client.request( DYNAMIC_SCHEDULER_RPC_NAMESPACE, _RPC_METHOD_NAME_ADAPTER.validate_python("stop_dynamic_service"), dynamic_service_stop=dynamic_service_stop, - timeout_s=timeout_s, ) assert result is None # nosec diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py index c7562ccf5cdd..039a9a06f1f3 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py @@ -3,7 +3,6 @@ from typing import Annotated, Final import httpx -from common_library.json_serialization import json_dumps from fastapi import APIRouter, Depends, Header, HTTPException, Request from fastapi.responses import RedirectResponse from models_library.api_schemas_directorv2.dynamic_services import ( @@ -25,11 +24,6 @@ from servicelib.utils import logged_gather from starlette import status from starlette.datastructures import URL -from tenacity import RetryCallState, TryAgain -from tenacity.asyncio import AsyncRetrying -from tenacity.before_sleep import before_sleep_log -from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed from ...api.dependencies.catalog import get_catalog_client from ...api.dependencies.database import get_repository @@ -186,9 +180,6 @@ async def stop_dynamic_service( node_uuid: NodeID, director_v0_client: Annotated[DirectorV0Client, Depends(get_director_v0_client)], scheduler: Annotated[DynamicSidecarsScheduler, Depends(get_scheduler)], - dynamic_services_settings: Annotated[ - DynamicServicesSettings, Depends(get_dynamic_services_settings) - ], *, can_save: bool | None = True, ) -> NoContentResponse | RedirectResponse: @@ -209,34 +200,6 @@ async def stop_dynamic_service( if await scheduler.is_service_awaiting_manual_intervention(node_uuid): raise HTTPException(status.HTTP_409_CONFLICT, detail="waiting_for_intervention") - # Service was marked for removal, the scheduler will - # take care of stopping cleaning up all allocated resources: - # services, containers, volumes and networks. - # Once the service is no longer being tracked this can return - dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings = ( - dynamic_services_settings.DYNAMIC_SCHEDULER - ) - - def _log_error(retry_state: RetryCallState): - logger.error( - "Service with %s could not be untracked after %s", - f"{node_uuid=}", - f"{json_dumps(retry_state.retry_object.statistics)}", - ) - - async for attempt in AsyncRetrying( - wait=wait_fixed(1.0), - stop=stop_after_delay( - dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_WAIT_FOR_SERVICE_TO_STOP - ), - before_sleep=before_sleep_log(logger=logger, log_level=logging.INFO), - reraise=False, - retry_error_callback=_log_error, - ): - with attempt: - if scheduler.is_service_tracked(node_uuid): - raise TryAgain - return NoContentResponse() diff --git a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py index 5072a365af6f..3acd8a3c2bb1 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py @@ -137,9 +137,9 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings): DYNAMIC_SIDECAR_WAIT_FOR_SERVICE_TO_STOP: PositiveFloat = Field( 60.0 * _MINUTE, description=( - "When stopping a service, depending on the amount of data to store, " + "When stopping a LEGACY service, depending on the amount of data to store, " "the operation might be very long. Also all relative created resources: " - "services, containsers, volumes and networks need to be removed. " + "services, containers, volumes and networks need to be removed. " ), ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/frontend/routes_external_scheduler/_service.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/frontend/routes_external_scheduler/_service.py index c0a734f5baba..e90f4a825ac0 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/frontend/routes_external_scheduler/_service.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/frontend/routes_external_scheduler/_service.py @@ -10,7 +10,6 @@ ) from settings_library.utils_service import DEFAULT_FASTAPI_PORT -from ....core.settings import ApplicationSettings from ....services.rabbitmq import get_rabbitmq_rpc_client from ....services.service_tracker import get_tracked_service, remove_tracked_service from .._utils import get_parent_app, get_settings @@ -112,11 +111,9 @@ async def service_stop(node_id: NodeID): service_model = await get_tracked_service(parent_app, node_id) if not service_model: - ui.notify(f"Could not stop service {node_id}. Was not abel to find it") + ui.notify(f"Could not stop service {node_id}. Was not able to find it") return - settings: ApplicationSettings = parent_app.state.settings - assert service_model.user_id # nosec assert service_model.project_id # nosec @@ -129,7 +126,6 @@ async def service_stop(node_id: NodeID): simcore_user_agent="", save_state=True, ), - timeout_s=int(settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT.total_seconds()), ) @@ -139,7 +135,7 @@ async def remove_service_from_tracking(node_id: NodeID): service_model = await get_tracked_service(parent_app, node_id) if not service_model: - ui.notify(f"Could not remove service {node_id}. Was not abel to find it") + ui.notify(f"Could not remove service {node_id}. Was not able to find it") return await remove_tracked_service(parent_app, node_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py index b7afd1092240..e2895ab01f17 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py @@ -24,6 +24,7 @@ from ..services.deferred_manager import deferred_manager_lifespan from ..services.director_v0 import director_v0_lifespan from ..services.director_v2 import director_v2_lifespan +from ..services.fire_and_forget import fire_and_forget_lifespan from ..services.generic_scheduler import generic_scheduler_lifespan from ..services.notifier import get_notifier_lifespans from ..services.rabbitmq import rabbitmq_lifespan @@ -71,6 +72,7 @@ def create_app_lifespan( ) app_lifespan.include(repository_lifespan_manager) + app_lifespan.add(fire_and_forget_lifespan) app_lifespan.add(director_v2_lifespan) app_lifespan.add(director_v0_lifespan) app_lifespan.add(catalog_lifespan) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_public_client.py index fbe160b261a2..cfb41131ad9d 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_public_client.py @@ -17,12 +17,12 @@ class CatalogPublicClient(SingletonInAppStateMixin): def __init__(self, app: FastAPI) -> None: self.app = app - async def get_services_labels( + async def get_docker_image_labels( self, service_key: ServiceKey, service_version: ServiceVersion ) -> SimcoreServiceLabels: response = await CatalogThinClient.get_from_app_state( self.app - ).get_services_labels(service_key, service_version) + ).get_docker_image_labels(service_key, service_version) return TypeAdapter(SimcoreServiceLabels).validate_python(response.json()) async def get_services_specifications( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_thin_client.py index 3d845d26946c..a9eb03b56dd8 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/catalog/_thin_client.py @@ -40,7 +40,7 @@ def __init__(self, app: FastAPI) -> None: @retry_on_errors() @expect_status(status.HTTP_200_OK) - async def get_services_labels( + async def get_docker_image_labels( self, service_key: ServiceKey, service_version: ServiceVersion ) -> Response: return await self.client.get( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/common_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/common_interface.py index 7510fbb60d6f..be9d1a6c8d66 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/common_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/common_interface.py @@ -1,3 +1,6 @@ +from datetime import timedelta +from typing import Final + from fastapi import FastAPI from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, @@ -14,10 +17,19 @@ from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import NonNegativeInt +from servicelib.utils import fire_and_forget_task from ..core.settings import ApplicationSettings +from .catalog._public_client import CatalogPublicClient from .director_v2 import DirectorV2Client -from .service_tracker import set_request_as_running, set_request_as_stopped +from .fire_and_forget import FireAndForgetCollection +from .service_tracker import ( + get_tracked_service, + set_request_as_running, + set_request_as_stopped, +) + +_NEW_STYLE_SERVICES_STOP_TIMEOUT: Final[timedelta] = timedelta(minutes=5) async def list_tracked_dynamic_services( @@ -74,11 +86,39 @@ async def stop_dynamic_service( raise NotImplementedError director_v2_client = DirectorV2Client.get_from_app_state(app) + + tracked_service = await get_tracked_service(app, dynamic_service_stop.node_id) + + if tracked_service and tracked_service.dynamic_service_start: + service_labels = await CatalogPublicClient.get_from_app_state( + app + ).get_docker_image_labels( + tracked_service.dynamic_service_start.key, + tracked_service.dynamic_service_start.version, + ) + if not service_labels.needs_dynamic_sidecar: + # LEGACY services + fire_and_forget_task( + director_v2_client.stop_dynamic_service( + node_id=dynamic_service_stop.node_id, + simcore_user_agent=dynamic_service_stop.simcore_user_agent, + save_state=dynamic_service_stop.save_state, + timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT, + ), + task_suffix_name=( + f"stop_dynamic_service_node_{dynamic_service_stop.node_id}" + ), + fire_and_forget_tasks_collection=FireAndForgetCollection.get_from_app_state( + app + ).tasks_collection, + ) + return + await director_v2_client.stop_dynamic_service( node_id=dynamic_service_stop.node_id, simcore_user_agent=dynamic_service_stop.simcore_user_agent, save_state=dynamic_service_stop.save_state, - timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT, + timeout=_NEW_STYLE_SERVICES_STOP_TIMEOUT, ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index e45b0ae1725f..b1ab61a67895 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -149,3 +149,5 @@ async def director_v2_lifespan(app: FastAPI) -> AsyncIterator[State]: public_client.set_to_app_state(app) yield {} + + public_client.pop_from_app_state(app) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/fire_and_forget.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/fire_and_forget.py new file mode 100644 index 000000000000..12de8407a0f1 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/fire_and_forget.py @@ -0,0 +1,27 @@ +from asyncio import Task +from collections.abc import AsyncIterator + +from fastapi import FastAPI +from fastapi_lifespan_manager import State +from servicelib.fastapi.app_state import SingletonInAppStateMixin + + +class FireAndForgetCollection(SingletonInAppStateMixin): + app_state_name: str = "fire_and_forget_collection" + + def __init__(self, app: FastAPI) -> None: + self.app = app + self._tasks_collection: set[Task] = set() + + @property + def tasks_collection(self) -> set[Task]: + return self._tasks_collection + + +async def fire_and_forget_lifespan(app: FastAPI) -> AsyncIterator[State]: + public_client = FireAndForgetCollection(app) + public_client.set_to_app_state(app) + + yield {} + + public_client.pop_from_app_state(app) diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index c9b974e44546..efaaa4e6c787 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -434,7 +434,6 @@ def _get_rpc_stop(with_node_id: NodeID) -> DynamicServiceStop: result = await services.stop_dynamic_service( rpc_client, dynamic_service_stop=_get_rpc_stop(node_id), - timeout_s=5, ) assert result is None @@ -443,7 +442,6 @@ def _get_rpc_stop(with_node_id: NodeID) -> DynamicServiceStop: await services.stop_dynamic_service( rpc_client, dynamic_service_stop=_get_rpc_stop(node_id_not_found), - timeout_s=5, ) # service awaits for manual intervention @@ -451,7 +449,6 @@ def _get_rpc_stop(with_node_id: NodeID) -> DynamicServiceStop: await services.stop_dynamic_service( rpc_client, dynamic_service_stop=_get_rpc_stop(node_id_manual_intervention), - timeout_s=5, ) @@ -491,7 +488,6 @@ async def test_stop_dynamic_service_serializes_generic_errors( simcore_user_agent=simcore_user_agent, save_state=save_state, ), - timeout_s=5, ) diff --git a/services/dynamic-scheduler/tests/unit/services/test_catalog.py b/services/dynamic-scheduler/tests/unit/services/test_catalog.py index 865ca16005cd..827f1a5bbb1b 100644 --- a/services/dynamic-scheduler/tests/unit/services/test_catalog.py +++ b/services/dynamic-scheduler/tests/unit/services/test_catalog.py @@ -89,7 +89,7 @@ def mock_catalog( yield -async def test_get_services_labels( +async def test_get_service_labels( mock_catalog: None, app: FastAPI, service_key: ServiceKey, @@ -97,7 +97,7 @@ async def test_get_services_labels( simcore_service_labels: SimcoreServiceLabels, ): client = CatalogPublicClient.get_from_app_state(app) - result = await client.get_services_labels(service_key, service_version) + result = await client.get_docker_image_labels(service_key, service_version) assert result.model_dump(mode="json") == simcore_service_labels.model_dump( mode="json" ) diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index c1bfb3ebe445..f2c0592638d3 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -3,6 +3,7 @@ from functools import partial from aiohttp import web +from common_library.json_serialization import json_dumps from models_library.api_schemas_directorv2.dynamic_services import ( DynamicServiceGet, GetProjectInactivityResponse, @@ -28,6 +29,13 @@ from servicelib.rabbitmq import RabbitMQClient, RPCServerError from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler import services from servicelib.utils import logged_gather +from tenacity import ( + AsyncRetrying, + RetryCallState, + TryAgain, + stop_after_delay, + wait_fixed, +) from ..rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client from .settings import DynamicSchedulerSettings, get_plugin_settings @@ -71,6 +79,22 @@ async def run_dynamic_service( ) +def _wait_for_idle_retry_error(node_id: NodeID, retry_state: RetryCallState): + _logger.info( + "Service '%s' is still being tracked after %s", + node_id, + json_dumps(retry_state.retry_object.statistics), + ) + + +def _wait_for_idle_before_sleep(node_id: NodeID, retry_state: RetryCallState): + _logger.debug( + "Waiting for service %s to become idle: %s", + node_id, + json_dumps(retry_state.retry_object.statistics), + ) + + async def stop_dynamic_service( app: web.Application, *, @@ -81,14 +105,32 @@ async def stop_dynamic_service( if progress: await stack.enter_async_context(progress) - settings: DynamicSchedulerSettings = get_plugin_settings(app) + rpc_client = get_rabbitmq_rpc_client(app) await services.stop_dynamic_service( - get_rabbitmq_rpc_client(app), - dynamic_service_stop=dynamic_service_stop, - timeout_s=int( + rpc_client, dynamic_service_stop=dynamic_service_stop + ) + + # wait for the service to be stopped, until it becomes idle + settings: DynamicSchedulerSettings = get_plugin_settings(app) + async for attempt in AsyncRetrying( + wait=wait_fixed(1.0), + stop=stop_after_delay( settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT.total_seconds() ), - ) + before_sleep=partial( + _wait_for_idle_before_sleep, dynamic_service_stop.node_id + ), + reraise=False, + retry_error_callback=partial( + _wait_for_idle_retry_error, dynamic_service_stop.node_id + ), + ): + with attempt: + service_status = await services.get_service_status( + rpc_client, node_id=dynamic_service_stop.node_id + ) + if not isinstance(service_status, NodeGetIdle): + raise TryAgain async def _post_progress_message(