|
1 | 1 | import logging |
2 | | -import traceback |
3 | 2 | from typing import Final |
4 | 3 |
|
5 | 4 | from aiohttp import web |
| 5 | +from common_library.users_enums import UserRole |
6 | 6 | from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet |
7 | 7 | from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( |
8 | 8 | DynamicServiceStop, |
9 | 9 | ) |
10 | 10 | from models_library.projects import ProjectID |
11 | 11 | from models_library.projects_nodes_io import NodeID |
12 | 12 | from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE |
| 13 | +from servicelib.logging_errors import create_troubleshotting_log_kwargs |
13 | 14 | from servicelib.logging_utils import log_catch, log_context |
14 | | -from servicelib.utils import logged_gather |
15 | | -from simcore_postgres_database.models.users import UserRole |
| 15 | +from servicelib.utils import limited_as_completed, logged_gather |
16 | 16 |
|
17 | 17 | from ..dynamic_scheduler import api as dynamic_scheduler_api |
18 | 18 | from ..projects.api import has_user_project_access_rights |
@@ -103,37 +103,41 @@ async def remove_orphaned_services( |
103 | 103 | } |
104 | 104 |
|
105 | 105 | known_opened_project_ids = await _list_opened_project_ids(registry) |
106 | | - potentially_running_service_ids: list[set[NodeID] | BaseException] = ( |
107 | | - await logged_gather( |
108 | | - *(list_node_ids_in_project(app, r) for r in known_opened_project_ids), |
109 | | - log=_logger, |
110 | | - max_concurrency=_MAX_CONCURRENT_CALLS, |
111 | | - reraise=False, |
112 | | - ) |
113 | | - ) |
114 | 106 |
|
115 | 107 | # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. |
116 | 108 | # Why? If a service is running but the nodes form the correspondign project cannot be listed, |
117 | 109 | # the service will be considered as orphaned and closed. |
118 | | - node_list_errors: set[BaseException] = { |
119 | | - x for x in potentially_running_service_ids if not isinstance(x, set) |
120 | | - } |
121 | | - if node_list_errors: |
122 | | - formatted_errors = [] |
123 | | - for error in node_list_errors: |
124 | | - formatted_error = "".join( |
125 | | - traceback.format_exception(type(error), error, error.__traceback__) |
| 110 | + potentially_running_service_ids: list[set[NodeID]] = [] |
| 111 | + async for project_nodes_future in limited_as_completed( |
| 112 | + ( |
| 113 | + list_node_ids_in_project(app, project_id) |
| 114 | + for project_id in known_opened_project_ids |
| 115 | + ), |
| 116 | + limit=_MAX_CONCURRENT_CALLS, |
| 117 | + ): |
| 118 | + try: |
| 119 | + project_nodes = await project_nodes_future |
| 120 | + potentially_running_service_ids.append(project_nodes) |
| 121 | + except BaseException as e: # pylint:disable=broad-exception-caught |
| 122 | + _logger.warning( |
| 123 | + create_troubleshotting_log_kwargs( |
| 124 | + ( |
| 125 | + "Skipping orpahn services removal, call to " |
| 126 | + "`list_node_ids_in_project` raised" |
| 127 | + ), |
| 128 | + error=e, |
| 129 | + error_context={ |
| 130 | + "running_services": running_services, |
| 131 | + "running_services_by_id": running_services_by_id, |
| 132 | + "known_opened_project_ids": known_opened_project_ids, |
| 133 | + }, |
| 134 | + ), |
| 135 | + exc_info=True, |
126 | 136 | ) |
127 | | - formatted_errors.append(formatted_error) |
128 | | - _logger.warning( |
129 | | - "Skipping orpahn services removal, '%s' calls to `list_node_ids_in_project` raised the following:\n%s", |
130 | | - len(formatted_errors), |
131 | | - "\n".join(formatted_errors), |
132 | | - ) |
133 | | - return |
| 137 | + return |
134 | 138 |
|
135 | 139 | potentially_running_service_ids_set: set[NodeID] = set().union( |
136 | | - *(_ for _ in potentially_running_service_ids if isinstance(_, set)) |
| 140 | + *(_ for _ in potentially_running_service_ids) |
137 | 141 | ) |
138 | 142 | _logger.debug( |
139 | 143 | "Allowed service UUIDs from known opened projects: %s", |
|
0 commit comments