Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,11 @@ async def stop_dynamic_service(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
dynamic_service_stop: DynamicServiceStop,
timeout_s: NonNegativeInt,
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("stop_dynamic_service"),
dynamic_service_stop=dynamic_service_stop,
timeout_s=timeout_s,
)
assert result is None # nosec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Annotated, Final

import httpx
from common_library.json_serialization import json_dumps
from fastapi import APIRouter, Depends, Header, HTTPException, Request
from fastapi.responses import RedirectResponse
from models_library.api_schemas_directorv2.dynamic_services import (
Expand All @@ -25,11 +24,6 @@
from servicelib.utils import logged_gather
from starlette import status
from starlette.datastructures import URL
from tenacity import RetryCallState, TryAgain
from tenacity.asyncio import AsyncRetrying
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

from ...api.dependencies.catalog import get_catalog_client
from ...api.dependencies.database import get_repository
Expand Down Expand Up @@ -186,9 +180,6 @@ async def stop_dynamic_service(
node_uuid: NodeID,
director_v0_client: Annotated[DirectorV0Client, Depends(get_director_v0_client)],
scheduler: Annotated[DynamicSidecarsScheduler, Depends(get_scheduler)],
dynamic_services_settings: Annotated[
DynamicServicesSettings, Depends(get_dynamic_services_settings)
],
*,
can_save: bool | None = True,
) -> NoContentResponse | RedirectResponse:
Expand All @@ -209,34 +200,6 @@ async def stop_dynamic_service(
if await scheduler.is_service_awaiting_manual_intervention(node_uuid):
raise HTTPException(status.HTTP_409_CONFLICT, detail="waiting_for_intervention")

# Service was marked for removal, the scheduler will
# take care of stopping cleaning up all allocated resources:
# services, containers, volumes and networks.
# Once the service is no longer being tracked this can return
dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings = (
dynamic_services_settings.DYNAMIC_SCHEDULER
)

def _log_error(retry_state: RetryCallState):
logger.error(
"Service with %s could not be untracked after %s",
f"{node_uuid=}",
f"{json_dumps(retry_state.retry_object.statistics)}",
)

async for attempt in AsyncRetrying(
wait=wait_fixed(1.0),
stop=stop_after_delay(
dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_WAIT_FOR_SERVICE_TO_STOP
),
before_sleep=before_sleep_log(logger=logger, log_level=logging.INFO),
reraise=False,
retry_error_callback=_log_error,
):
with attempt:
if scheduler.is_service_tracked(node_uuid):
raise TryAgain

return NoContentResponse()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings):
DYNAMIC_SIDECAR_WAIT_FOR_SERVICE_TO_STOP: PositiveFloat = Field(
60.0 * _MINUTE,
description=(
"When stopping a service, depending on the amount of data to store, "
"When stopping a LEGACY service, depending on the amount of data to store, "
"the operation might be very long. Also all relative created resources: "
"services, containsers, volumes and networks need to be removed. "
"services, containers, volumes and networks need to be removed. "
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
)
from settings_library.utils_service import DEFAULT_FASTAPI_PORT

from ....core.settings import ApplicationSettings
from ....services.rabbitmq import get_rabbitmq_rpc_client
from ....services.service_tracker import get_tracked_service, remove_tracked_service
from .._utils import get_parent_app, get_settings
Expand Down Expand Up @@ -112,11 +111,9 @@ async def service_stop(node_id: NodeID):

service_model = await get_tracked_service(parent_app, node_id)
if not service_model:
ui.notify(f"Could not stop service {node_id}. Was not abel to find it")
ui.notify(f"Could not stop service {node_id}. Was not able to find it")
return

settings: ApplicationSettings = parent_app.state.settings

assert service_model.user_id # nosec
assert service_model.project_id # nosec

Expand All @@ -129,7 +126,6 @@ async def service_stop(node_id: NodeID):
simcore_user_agent="",
save_state=True,
),
timeout_s=int(settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT.total_seconds()),
)


Expand All @@ -139,7 +135,7 @@ async def remove_service_from_tracking(node_id: NodeID):

service_model = await get_tracked_service(parent_app, node_id)
if not service_model:
ui.notify(f"Could not remove service {node_id}. Was not abel to find it")
ui.notify(f"Could not remove service {node_id}. Was not able to find it")
return

await remove_tracked_service(parent_app, node_id)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ..services.deferred_manager import deferred_manager_lifespan
from ..services.director_v0 import director_v0_lifespan
from ..services.director_v2 import director_v2_lifespan
from ..services.fire_and_forget import fire_and_forget_lifespan
from ..services.generic_scheduler import generic_scheduler_lifespan
from ..services.notifier import get_notifier_lifespans
from ..services.rabbitmq import rabbitmq_lifespan
Expand Down Expand Up @@ -71,6 +72,7 @@ def create_app_lifespan(
)

app_lifespan.include(repository_lifespan_manager)
app_lifespan.add(fire_and_forget_lifespan)
app_lifespan.add(director_v2_lifespan)
app_lifespan.add(director_v0_lifespan)
app_lifespan.add(catalog_lifespan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ class CatalogPublicClient(SingletonInAppStateMixin):
def __init__(self, app: FastAPI) -> None:
self.app = app

async def get_services_labels(
async def get_docker_image_labels(
self, service_key: ServiceKey, service_version: ServiceVersion
) -> SimcoreServiceLabels:
response = await CatalogThinClient.get_from_app_state(
self.app
).get_services_labels(service_key, service_version)
).get_docker_image_labels(service_key, service_version)
return TypeAdapter(SimcoreServiceLabels).validate_python(response.json())

async def get_services_specifications(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, app: FastAPI) -> None:

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_services_labels(
async def get_docker_image_labels(
self, service_key: ServiceKey, service_version: ServiceVersion
) -> Response:
return await self.client.get(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import timedelta
from typing import Final

from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
Expand All @@ -14,10 +17,19 @@
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import NonNegativeInt
from servicelib.utils import fire_and_forget_task

from ..core.settings import ApplicationSettings
from .catalog._public_client import CatalogPublicClient
from .director_v2 import DirectorV2Client
from .service_tracker import set_request_as_running, set_request_as_stopped
from .fire_and_forget import FireAndForgetCollection
from .service_tracker import (
get_tracked_service,
set_request_as_running,
set_request_as_stopped,
)

_NEW_STYLE_SERVICES_STOP_TIMEOUT: Final[timedelta] = timedelta(minutes=5)


async def list_tracked_dynamic_services(
Expand Down Expand Up @@ -74,11 +86,39 @@ async def stop_dynamic_service(
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)

tracked_service = await get_tracked_service(app, dynamic_service_stop.node_id)

if tracked_service and tracked_service.dynamic_service_start:
service_labels = await CatalogPublicClient.get_from_app_state(
app
).get_docker_image_labels(
tracked_service.dynamic_service_start.key,
tracked_service.dynamic_service_start.version,
)
if not service_labels.needs_dynamic_sidecar:
# LEGACY services
fire_and_forget_task(
director_v2_client.stop_dynamic_service(
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
),
task_suffix_name=(
f"stop_dynamic_service_node_{dynamic_service_stop.node_id}"
),
fire_and_forget_tasks_collection=FireAndForgetCollection.get_from_app_state(
app
).tasks_collection,
)
return

await director_v2_client.stop_dynamic_service(
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
timeout=_NEW_STYLE_SERVICES_STOP_TIMEOUT,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,5 @@ async def director_v2_lifespan(app: FastAPI) -> AsyncIterator[State]:
public_client.set_to_app_state(app)

yield {}

public_client.pop_from_app_state(app)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from asyncio import Task
from collections.abc import AsyncIterator

from fastapi import FastAPI
from fastapi_lifespan_manager import State
from servicelib.fastapi.app_state import SingletonInAppStateMixin


class FireAndForgetCollection(SingletonInAppStateMixin):
app_state_name: str = "fire_and_forget_collection"

def __init__(self, app: FastAPI) -> None:
self.app = app
self._tasks_collection: set[Task] = set()

@property
def tasks_collection(self) -> set[Task]:
return self._tasks_collection


async def fire_and_forget_lifespan(app: FastAPI) -> AsyncIterator[State]:
public_client = FireAndForgetCollection(app)
public_client.set_to_app_state(app)

yield {}

public_client.pop_from_app_state(app)
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ def _get_rpc_stop(with_node_id: NodeID) -> DynamicServiceStop:
result = await services.stop_dynamic_service(
rpc_client,
dynamic_service_stop=_get_rpc_stop(node_id),
timeout_s=5,
)
assert result is None

Expand All @@ -443,15 +442,13 @@ def _get_rpc_stop(with_node_id: NodeID) -> DynamicServiceStop:
await services.stop_dynamic_service(
rpc_client,
dynamic_service_stop=_get_rpc_stop(node_id_not_found),
timeout_s=5,
)

# service awaits for manual intervention
with pytest.raises(ServiceWaitingForManualInterventionError):
await services.stop_dynamic_service(
rpc_client,
dynamic_service_stop=_get_rpc_stop(node_id_manual_intervention),
timeout_s=5,
)


Expand Down Expand Up @@ -491,7 +488,6 @@ async def test_stop_dynamic_service_serializes_generic_errors(
simcore_user_agent=simcore_user_agent,
save_state=save_state,
),
timeout_s=5,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ def mock_catalog(
yield


async def test_get_services_labels(
async def test_get_service_labels(
mock_catalog: None,
app: FastAPI,
service_key: ServiceKey,
service_version: ServiceVersion,
simcore_service_labels: SimcoreServiceLabels,
):
client = CatalogPublicClient.get_from_app_state(app)
result = await client.get_services_labels(service_key, service_version)
result = await client.get_docker_image_labels(service_key, service_version)
assert result.model_dump(mode="json") == simcore_service_labels.model_dump(
mode="json"
)
Expand Down
Loading
Loading