Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
from typing import Final

from aiohttp import web
from common_library.users_enums import UserRole
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStop,
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.logging_errors import create_troubleshotting_log_kwargs
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import logged_gather
from simcore_postgres_database.models.users import UserRole
from servicelib.utils import limited_as_completed, logged_gather

from ..dynamic_scheduler import api as dynamic_scheduler_api
from ..projects.api import has_user_project_access_rights
Expand Down Expand Up @@ -102,16 +103,41 @@ async def remove_orphaned_services(
}

known_opened_project_ids = await _list_opened_project_ids(registry)
potentially_running_service_ids: list[
set[NodeID] | BaseException
] = await logged_gather(
*(list_node_ids_in_project(app, _) for _ in known_opened_project_ids),
log=_logger,
max_concurrency=_MAX_CONCURRENT_CALLS,
reraise=False,
)

# NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
# Why? If a service is running but the nodes form the correspondign project cannot be listed,
# the service will be considered as orphaned and closed.
potentially_running_service_ids: list[set[NodeID]] = []
async for project_nodes_future in limited_as_completed(
(
list_node_ids_in_project(app, project_id)
for project_id in known_opened_project_ids
),
limit=_MAX_CONCURRENT_CALLS,
):
try:
project_nodes = await project_nodes_future
potentially_running_service_ids.append(project_nodes)
except BaseException as e: # pylint:disable=broad-exception-caught
_logger.warning(
create_troubleshotting_log_kwargs(
(
"Skipping orpahn services removal, call to "
"`list_node_ids_in_project` raised"
),
error=e,
error_context={
"running_services": running_services,
"running_services_by_id": running_services_by_id,
"known_opened_project_ids": known_opened_project_ids,
},
),
exc_info=True,
)
return

potentially_running_service_ids_set: set[NodeID] = set().union(
*(_ for _ in potentially_running_service_ids if isinstance(_, set))
*(node_id for node_id in potentially_running_service_ids)
)
_logger.debug(
"Allowed service UUIDs from known opened projects: %s",
Expand Down
Loading