|
2 | 2 | from typing import Final |
3 | 3 |
|
4 | 4 | from aiohttp import web |
| 5 | +from common_library.users_enums import UserRole |
5 | 6 | from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet |
6 | 7 | from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( |
7 | 8 | DynamicServiceStop, |
8 | 9 | ) |
9 | 10 | from models_library.projects import ProjectID |
10 | 11 | from models_library.projects_nodes_io import NodeID |
11 | 12 | from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE |
| 13 | +from servicelib.logging_errors import create_troubleshotting_log_kwargs |
12 | 14 | from servicelib.logging_utils import log_catch, log_context |
13 | | -from servicelib.utils import logged_gather |
14 | | -from simcore_postgres_database.models.users import UserRole |
| 15 | +from servicelib.utils import limited_as_completed, logged_gather |
15 | 16 |
|
16 | 17 | from ..dynamic_scheduler import api as dynamic_scheduler_api |
17 | 18 | from ..projects.api import has_user_project_access_rights |
@@ -102,16 +103,41 @@ async def remove_orphaned_services( |
102 | 103 | } |
103 | 104 |
|
104 | 105 | known_opened_project_ids = await _list_opened_project_ids(registry) |
105 | | - potentially_running_service_ids: list[ |
106 | | - set[NodeID] | BaseException |
107 | | - ] = await logged_gather( |
108 | | - *(list_node_ids_in_project(app, _) for _ in known_opened_project_ids), |
109 | | - log=_logger, |
110 | | - max_concurrency=_MAX_CONCURRENT_CALLS, |
111 | | - reraise=False, |
112 | | - ) |
| 106 | + |
| 107 | + # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. |
| 108 | + # Why? If a service is running but the nodes form the correspondign project cannot be listed, |
| 109 | + # the service will be considered as orphaned and closed. |
| 110 | + potentially_running_service_ids: list[set[NodeID]] = [] |
| 111 | + async for project_nodes_future in limited_as_completed( |
| 112 | + ( |
| 113 | + list_node_ids_in_project(app, project_id) |
| 114 | + for project_id in known_opened_project_ids |
| 115 | + ), |
| 116 | + limit=_MAX_CONCURRENT_CALLS, |
| 117 | + ): |
| 118 | + try: |
| 119 | + project_nodes = await project_nodes_future |
| 120 | + potentially_running_service_ids.append(project_nodes) |
| 121 | + except BaseException as e: # pylint:disable=broad-exception-caught |
| 122 | + _logger.warning( |
| 123 | + create_troubleshotting_log_kwargs( |
| 124 | + ( |
| 125 | + "Skipping orpahn services removal, call to " |
| 126 | + "`list_node_ids_in_project` raised" |
| 127 | + ), |
| 128 | + error=e, |
| 129 | + error_context={ |
| 130 | + "running_services": running_services, |
| 131 | + "running_services_by_id": running_services_by_id, |
| 132 | + "known_opened_project_ids": known_opened_project_ids, |
| 133 | + }, |
| 134 | + ), |
| 135 | + exc_info=True, |
| 136 | + ) |
| 137 | + return |
| 138 | + |
113 | 139 | potentially_running_service_ids_set: set[NodeID] = set().union( |
114 | | - *(_ for _ in potentially_running_service_ids if isinstance(_, set)) |
| 140 | + *(node_id for node_id in potentially_running_service_ids) |
115 | 141 | ) |
116 | 142 | _logger.debug( |
117 | 143 | "Allowed service UUIDs from known opened projects: %s", |
|
0 commit comments