From 50b516d595ce9f72ad2861376e573035c0a4b4cf Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 31 Mar 2025 14:53:14 +0200 Subject: [PATCH 1/3] fixed issue --- .../garbage_collector/_core_orphans.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py index 2bdc3552e405..5d812ad2cc9b 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py @@ -50,7 +50,7 @@ async def _remove_service( except (UserNotFoundError, ValueError): save_service_state = False - with log_context( + with log_catch(_logger, reraise=False), log_context( _logger, logging.INFO, msg=f"removing {(service.node_uuid, service.host)} with {save_service_state=}", @@ -134,7 +134,7 @@ async def remove_orphaned_services( ), exc_info=True, ) - return + continue potentially_running_service_ids_set: set[NodeID] = set().union( *(node_id for node_id in potentially_running_service_ids) From 2e607232de1663be1d75f48ff518e6ebbde5310b Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 31 Mar 2025 15:36:43 +0200 Subject: [PATCH 2/3] reduce error propagation as much as possible --- .../garbage_collector/_core.py | 15 +- .../garbage_collector/_core_orphans.py | 137 +++++++++--------- 2 files changed, 75 insertions(+), 77 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core.py index d8fb7e0a70cd..df192a272d90 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core.py @@ -1,12 +1,9 @@ -""" Core implementation of garbage collector - - -""" +"""Core implementation of garbage collector""" import logging from aiohttp import web -from servicelib.logging_utils import log_context +from servicelib.logging_utils import log_catch, log_context from ..resource_manager.registry import RedisResourceRegistry, get_registry from ._core_disconnected import remove_disconnected_user_resources @@ -39,21 +36,23 @@ async def collect_garbage(app: web.Application): """ registry: RedisResourceRegistry = get_registry(app) - with log_context( + with log_catch(_logger, reraise=False), log_context( _logger, logging.INFO, "Step 1: Removes disconnected user sessions" ): # Triggers signal to close possible pending opened projects # Removes disconnected GUEST users after they finished their sessions await remove_disconnected_user_resources(registry, app) - with log_context( + with log_catch(_logger, reraise=False), log_context( _logger, logging.INFO, "Step 2: Removes users manually marked for removal" ): # if a user was manually marked as GUEST it needs to be # removed together with all the associated projects await remove_users_manually_marked_as_guests(registry, app) - with log_context(_logger, logging.INFO, "Step 3: Removes orphaned services"): + with log_catch(_logger, reraise=False), log_context( + _logger, logging.INFO, "Step 3: Removes orphaned services" + ): # For various reasons, some services remain pending after # the projects are closed or the user was disconencted. # This will close and remove all these services from diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py index 5d812ad2cc9b..d604644681fa 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py @@ -89,73 +89,72 @@ async def remove_orphaned_services( # otherwise if some time goes it could very well be that there are new projects opened # in between and the GC would remove services that actually should be running. - with log_catch(_logger, reraise=False): - running_services = await dynamic_scheduler_service.list_dynamic_services(app) - if not running_services: - # nothing to do - return - _logger.debug( - "Actual running dynamic services: %s", - [(x.node_uuid, x.host) for x in running_services], - ) - running_services_by_id: dict[NodeID, DynamicServiceGet] = { - service.node_uuid: service for service in running_services - } - - known_opened_project_ids = await _list_opened_project_ids(registry) - - # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. - # Why? If a service is running but the nodes form the correspondign project cannot be listed, - # the service will be considered as orphaned and closed. - potentially_running_service_ids: list[set[NodeID]] = [] - async for project_nodes_future in limited_as_completed( - ( - list_node_ids_in_project(app, project_id) - for project_id in known_opened_project_ids - ), - limit=_MAX_CONCURRENT_CALLS, - ): - try: - project_nodes = await project_nodes_future - potentially_running_service_ids.append(project_nodes) - except BaseException as e: # pylint:disable=broad-exception-caught - _logger.warning( - create_troubleshotting_log_kwargs( - ( - "Skipping orpahn services removal, call to " - "`list_node_ids_in_project` raised" - ), - error=e, - error_context={ - "running_services": running_services, - "running_services_by_id": running_services_by_id, - "known_opened_project_ids": known_opened_project_ids, - }, + running_services = await dynamic_scheduler_service.list_dynamic_services(app) + if not running_services: + # nothing to do + return + _logger.debug( + "Actual running dynamic services: %s", + [(x.node_uuid, x.host) for x in running_services], + ) + running_services_by_id: dict[NodeID, DynamicServiceGet] = { + service.node_uuid: service for service in running_services + } + + known_opened_project_ids = await _list_opened_project_ids(registry) + + # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. + # Why? If a service is running but the nodes form the correspondign project cannot be listed, + # the service will be considered as orphaned and closed. + potentially_running_service_ids: list[set[NodeID]] = [] + async for project_nodes_future in limited_as_completed( + ( + list_node_ids_in_project(app, project_id) + for project_id in known_opened_project_ids + ), + limit=_MAX_CONCURRENT_CALLS, + ): + try: + project_nodes = await project_nodes_future + potentially_running_service_ids.append(project_nodes) + except BaseException as e: # pylint:disable=broad-exception-caught + _logger.warning( + create_troubleshotting_log_kwargs( + ( + "Skipping orpahn services removal, call to " + "`list_node_ids_in_project` raised" ), - exc_info=True, - ) - continue - - potentially_running_service_ids_set: set[NodeID] = set().union( - *(node_id for node_id in potentially_running_service_ids) - ) - _logger.debug( - "Allowed service UUIDs from known opened projects: %s", - potentially_running_service_ids_set, - ) - - # compute the difference to find the orphaned services - orphaned_running_service_ids = ( - set(running_services_by_id) - potentially_running_service_ids_set - ) - _logger.debug("Found orphaned services: %s", orphaned_running_service_ids) - # NOTE: no need to not reraise here, since we catch everything above - # and logged_gather first runs everything - await logged_gather( - *( - _remove_service(app, node_id, running_services_by_id[node_id]) - for node_id in orphaned_running_service_ids - ), - log=_logger, - max_concurrency=_MAX_CONCURRENT_CALLS, - ) + error=e, + error_context={ + "running_services": running_services, + "running_services_by_id": running_services_by_id, + "known_opened_project_ids": known_opened_project_ids, + }, + ), + exc_info=True, + ) + continue + + potentially_running_service_ids_set: set[NodeID] = set().union( + *(node_id for node_id in potentially_running_service_ids) + ) + _logger.debug( + "Allowed service UUIDs from known opened projects: %s", + potentially_running_service_ids_set, + ) + + # compute the difference to find the orphaned services + orphaned_running_service_ids = ( + set(running_services_by_id) - potentially_running_service_ids_set + ) + _logger.debug("Found orphaned services: %s", orphaned_running_service_ids) + # NOTE: no need to not reraise here, since we catch everything above + # and logged_gather first runs everything + await logged_gather( + *( + _remove_service(app, node_id, running_services_by_id[node_id]) + for node_id in orphaned_running_service_ids + ), + log=_logger, + max_concurrency=_MAX_CONCURRENT_CALLS, + ) From 4d76e29ee9ba9ec6dd6401371531317f7a9398f1 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 1 Apr 2025 09:02:51 +0200 Subject: [PATCH 3/3] feedback --- .../garbage_collector/_core_orphans.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py index d604644681fa..1020f1dcb1d5 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py @@ -12,7 +12,7 @@ from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE from servicelib.logging_errors import create_troubleshotting_log_kwargs from servicelib.logging_utils import log_catch, log_context -from servicelib.utils import limited_as_completed, logged_gather +from servicelib.utils import limited_as_completed, limited_gather from ..dynamic_scheduler import api as dynamic_scheduler_service from ..projects._projects_service import ( @@ -53,7 +53,7 @@ async def _remove_service( with log_catch(_logger, reraise=False), log_context( _logger, logging.INFO, - msg=f"removing {(service.node_uuid, service.host)} with {save_service_state=}", + f"removing {(service.node_uuid, service.host)} with {save_service_state=}", ): await dynamic_scheduler_service.stop_dynamic_service( app, @@ -150,11 +150,11 @@ async def remove_orphaned_services( _logger.debug("Found orphaned services: %s", orphaned_running_service_ids) # NOTE: no need to not reraise here, since we catch everything above # and logged_gather first runs everything - await logged_gather( + await limited_gather( *( _remove_service(app, node_id, running_services_by_id[node_id]) for node_id in orphaned_running_service_ids ), log=_logger, - max_concurrency=_MAX_CONCURRENT_CALLS, + limit=_MAX_CONCURRENT_CALLS, )