Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
""" Core implementation of garbage collector


"""
"""Core implementation of garbage collector"""

import logging

from aiohttp import web
from servicelib.logging_utils import log_context
from servicelib.logging_utils import log_catch, log_context

from ..resource_manager.registry import RedisResourceRegistry, get_registry
from ._core_disconnected import remove_disconnected_user_resources
Expand Down Expand Up @@ -39,21 +36,23 @@ async def collect_garbage(app: web.Application):
"""
registry: RedisResourceRegistry = get_registry(app)

with log_context(
with log_catch(_logger, reraise=False), log_context(
_logger, logging.INFO, "Step 1: Removes disconnected user sessions"
):
# Triggers signal to close possible pending opened projects
# Removes disconnected GUEST users after they finished their sessions
await remove_disconnected_user_resources(registry, app)

with log_context(
with log_catch(_logger, reraise=False), log_context(
_logger, logging.INFO, "Step 2: Removes users manually marked for removal"
):
# if a user was manually marked as GUEST it needs to be
# removed together with all the associated projects
await remove_users_manually_marked_as_guests(registry, app)

with log_context(_logger, logging.INFO, "Step 3: Removes orphaned services"):
with log_catch(_logger, reraise=False), log_context(
_logger, logging.INFO, "Step 3: Removes orphaned services"
):
# For various reasons, some services remain pending after
# the projects are closed or the user was disconencted.
# This will close and remove all these services from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.logging_errors import create_troubleshotting_log_kwargs
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import limited_as_completed, logged_gather
from servicelib.utils import limited_as_completed, limited_gather

from ..dynamic_scheduler import api as dynamic_scheduler_service
from ..projects._projects_service import (
Expand Down Expand Up @@ -50,10 +50,10 @@ async def _remove_service(
except (UserNotFoundError, ValueError):
save_service_state = False

with log_context(
with log_catch(_logger, reraise=False), log_context(
_logger,
logging.INFO,
msg=f"removing {(service.node_uuid, service.host)} with {save_service_state=}",
f"removing {(service.node_uuid, service.host)} with {save_service_state=}",
):
await dynamic_scheduler_service.stop_dynamic_service(
app,
Expand Down Expand Up @@ -89,73 +89,72 @@ async def remove_orphaned_services(
# otherwise if some time goes it could very well be that there are new projects opened
# in between and the GC would remove services that actually should be running.

with log_catch(_logger, reraise=False):
running_services = await dynamic_scheduler_service.list_dynamic_services(app)
if not running_services:
# nothing to do
return
_logger.debug(
"Actual running dynamic services: %s",
[(x.node_uuid, x.host) for x in running_services],
)
running_services_by_id: dict[NodeID, DynamicServiceGet] = {
service.node_uuid: service for service in running_services
}

known_opened_project_ids = await _list_opened_project_ids(registry)

# NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
# Why? If a service is running but the nodes form the correspondign project cannot be listed,
# the service will be considered as orphaned and closed.
potentially_running_service_ids: list[set[NodeID]] = []
async for project_nodes_future in limited_as_completed(
(
list_node_ids_in_project(app, project_id)
for project_id in known_opened_project_ids
),
limit=_MAX_CONCURRENT_CALLS,
):
try:
project_nodes = await project_nodes_future
potentially_running_service_ids.append(project_nodes)
except BaseException as e: # pylint:disable=broad-exception-caught
_logger.warning(
create_troubleshotting_log_kwargs(
(
"Skipping orpahn services removal, call to "
"`list_node_ids_in_project` raised"
),
error=e,
error_context={
"running_services": running_services,
"running_services_by_id": running_services_by_id,
"known_opened_project_ids": known_opened_project_ids,
},
running_services = await dynamic_scheduler_service.list_dynamic_services(app)
if not running_services:
# nothing to do
return
_logger.debug(
"Actual running dynamic services: %s",
[(x.node_uuid, x.host) for x in running_services],
)
running_services_by_id: dict[NodeID, DynamicServiceGet] = {
service.node_uuid: service for service in running_services
}

known_opened_project_ids = await _list_opened_project_ids(registry)

# NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
# Why? If a service is running but the nodes form the correspondign project cannot be listed,
# the service will be considered as orphaned and closed.
potentially_running_service_ids: list[set[NodeID]] = []
async for project_nodes_future in limited_as_completed(
(
list_node_ids_in_project(app, project_id)
for project_id in known_opened_project_ids
),
limit=_MAX_CONCURRENT_CALLS,
):
try:
project_nodes = await project_nodes_future
potentially_running_service_ids.append(project_nodes)
except BaseException as e: # pylint:disable=broad-exception-caught
_logger.warning(
create_troubleshotting_log_kwargs(
(
"Skipping orpahn services removal, call to "
"`list_node_ids_in_project` raised"
),
exc_info=True,
)
return

potentially_running_service_ids_set: set[NodeID] = set().union(
*(node_id for node_id in potentially_running_service_ids)
)
_logger.debug(
"Allowed service UUIDs from known opened projects: %s",
potentially_running_service_ids_set,
)

# compute the difference to find the orphaned services
orphaned_running_service_ids = (
set(running_services_by_id) - potentially_running_service_ids_set
)
_logger.debug("Found orphaned services: %s", orphaned_running_service_ids)
# NOTE: no need to not reraise here, since we catch everything above
# and logged_gather first runs everything
await logged_gather(
*(
_remove_service(app, node_id, running_services_by_id[node_id])
for node_id in orphaned_running_service_ids
),
log=_logger,
max_concurrency=_MAX_CONCURRENT_CALLS,
)
error=e,
error_context={
"running_services": running_services,
"running_services_by_id": running_services_by_id,
"known_opened_project_ids": known_opened_project_ids,
},
),
exc_info=True,
)
continue

potentially_running_service_ids_set: set[NodeID] = set().union(
*(node_id for node_id in potentially_running_service_ids)
)
_logger.debug(
"Allowed service UUIDs from known opened projects: %s",
potentially_running_service_ids_set,
)

# compute the difference to find the orphaned services
orphaned_running_service_ids = (
set(running_services_by_id) - potentially_running_service_ids_set
)
_logger.debug("Found orphaned services: %s", orphaned_running_service_ids)
# NOTE: no need to not reraise here, since we catch everything above
# and logged_gather first runs everything
await limited_gather(
*(
_remove_service(app, node_id, running_services_by_id[node_id])
for node_id in orphaned_running_service_ids
),
log=_logger,
limit=_MAX_CONCURRENT_CALLS,
)
Loading