diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py index a30fe7f1d748..d94ade770498 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py @@ -7,7 +7,6 @@ from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStop, ) -from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE from servicelib.logging_errors import create_troubleshootting_log_kwargs @@ -21,6 +20,7 @@ ) from ..projects.api import has_user_project_access_rights from ..resource_manager.registry import RedisResourceRegistry +from ..resource_manager.service import list_opened_project_ids from ..users import users_service from ..users.exceptions import UserNotFoundError @@ -71,16 +71,6 @@ async def _remove_service( ) -async def _list_opened_project_ids(registry: RedisResourceRegistry) -> list[ProjectID]: - opened_projects: list[ProjectID] = [] - all_session_alive, _ = await registry.get_all_resource_keys() - for alive_session in all_session_alive: - resources = await registry.get_resources(alive_session) - if "project_id" in resources: - opened_projects.append(ProjectID(resources["project_id"])) - return opened_projects - - async def remove_orphaned_services( registry: RedisResourceRegistry, app: web.Application ) -> None: @@ -105,7 +95,7 @@ async def remove_orphaned_services( service.node_uuid: service for service in running_services } - known_opened_project_ids = await _list_opened_project_ids(registry) + known_opened_project_ids = await list_opened_project_ids(registry) # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. # Why? If a service is running but the nodes form the correspondign project cannot be listed, diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_documents.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_documents.py new file mode 100644 index 000000000000..4dcbc5b0f40f --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_documents.py @@ -0,0 +1,43 @@ +""" +Scheduled tasks addressing users + +""" + +import logging +from collections.abc import AsyncIterator +from datetime import timedelta + +from aiohttp import web +from servicelib.background_task_utils import exclusive_periodic +from servicelib.logging_utils import log_context + +from ..projects import projects_documents_service +from ..redis import get_redis_lock_manager_client_sdk +from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan + +_logger = logging.getLogger(__name__) + + +def create_background_task_to_prune_documents(wait_s: float) -> CleanupContextFunc: + + async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]: + interval = timedelta(seconds=wait_s) + + @exclusive_periodic( + # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently + get_redis_lock_manager_client_sdk(app), + task_interval=interval, + retry_after=min(timedelta(seconds=10), interval / 10), + ) + async def _prune_documents_periodically() -> None: + with log_context( + _logger, + logging.INFO, + "Deleting project documents in Redis `documents` table started", + ): + await projects_documents_service.remove_project_documents_as_admin(app) + + async for _ in periodic_task_lifespan(app, _prune_documents_periodically): + yield + + return _cleanup_ctx_fun diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py index aa5ef38fdc1a..b191c3558ccc 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py @@ -10,7 +10,7 @@ from ..projects._projects_repository_legacy import setup_projects_db from ..redis import setup_redis from ..socketio.plugin import setup_socketio -from . import _tasks_api_keys, _tasks_core, _tasks_trash, _tasks_users +from . import _tasks_api_keys, _tasks_core, _tasks_documents, _tasks_trash, _tasks_users from .settings import get_plugin_settings _logger = logging.getLogger(__name__) @@ -66,3 +66,8 @@ def setup_garbage_collector(app: web.Application) -> None: app.cleanup_ctx.append( _tasks_trash.create_background_task_to_prune_trash(wait_period_s) ) + + wait_period_s = settings.GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S + app.cleanup_ctx.append( + _tasks_documents.create_background_task_to_prune_documents(wait_period_s) + ) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py b/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py index 0f72f1dd2cbd..054d37a508bf 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/settings.py @@ -38,6 +38,13 @@ class GarbageCollectorSettings(BaseCustomSettings): Field(description="Wait time between periodic pruning of expired API keys"), ] = _HOUR + GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S: Annotated[ + PositiveInt, + Field(description="Wait time between periodic pruning of documents"), + ] = ( + 30 * _MINUTE + ) + def get_plugin_settings(app: web.Application) -> GarbageCollectorSettings: settings = app[APP_SETTINGS_KEY].WEBSERVER_GARBAGE_COLLECTOR diff --git a/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py b/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py index 540e791a2cc0..dacc6833dc57 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_project_document_service.py @@ -3,6 +3,8 @@ This module contains common utilities for building and versioning project documents. """ +import logging +import re from typing import cast from aiohttp import web @@ -10,8 +12,11 @@ ProjectDocument, ProjectDocumentVersion, ) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr from models_library.projects import ProjectID, ProjectTemplateType from models_library.projects import ProjectType as ProjectTypeAPI +from servicelib.logging_errors import create_troubleshootting_log_kwargs +from servicelib.logging_utils import log_context from servicelib.redis import ( PROJECT_DB_UPDATE_REDIS_LOCK_KEY, exclusive, @@ -22,8 +27,13 @@ get_redis_document_manager_client_sdk, get_redis_lock_manager_client_sdk, ) +from ..resource_manager.registry import get_registry +from ..resource_manager.service import list_opened_project_ids +from ..socketio._utils import get_socket_server from . import _projects_repository +_logger = logging.getLogger(__name__) + async def create_project_document_and_increment_version( app: web.Application, project_uuid: ProjectID @@ -84,3 +94,110 @@ async def _create_project_document_and_increment_version() -> ( return project_document, document_version return await _create_project_document_and_increment_version() + + +async def remove_project_documents_as_admin(app: web.Application) -> None: + """Admin function to clean up project documents for projects with no connected users. + + This function scans through all project documents in the Redis DOCUMENTS database, + checks if there are any users currently connected to the project room via socketio, + and removes documents that have no connected users. + """ + with log_context( + _logger, + logging.INFO, + msg="Project document cleanup started", + ): + # Get Redis document manager client to access the DOCUMENTS database + redis_client = get_redis_document_manager_client_sdk(app) + + # Pattern to match project document keys - looking for keys that contain project UUIDs + project_document_pattern = "projects:*:version" + + # Get socketio server instance + sio = get_socket_server(app) + + # Get known opened projects ids based on Redis resources table + registry = get_registry(app) + known_opened_project_ids = await list_opened_project_ids(registry) + known_opened_project_ids_set = set(known_opened_project_ids) + + projects_removed = 0 + + # Scan through all project document keys + async for key in redis_client.redis.scan_iter( + match=project_document_pattern, count=1000 + ): + # Extract project UUID from the key pattern "projects:{project_uuid}:version" + key_str = key.decode("utf-8") if isinstance(key, bytes) else key + match = re.match(r"projects:(?P[0-9a-f-]+):version", key_str) + + if not match: + continue + + project_uuid_str = match.group("project_uuid") + project_uuid = ProjectID(project_uuid_str) + project_room = SocketIORoomStr.from_project_id(project_uuid) + + # 1. CHECK - Check if the project UUID is in the known opened projects + if project_uuid in known_opened_project_ids_set: + _logger.debug( + "Project %s is in Redis Resources table (which means Project is opened), keeping document", + project_uuid, + ) + continue + + # 2. CHECK - Check if there are any users connected to this project room + try: + # Get all session IDs (socket IDs) in the project room + room_sessions = list( + sio.manager.get_participants(namespace="/", room=project_room) + ) + + # If no users are connected to this project room, remove the document + if not room_sessions: + await redis_client.redis.delete(key_str) + projects_removed += 1 + _logger.info( + "Removed project document for project %s (no connected users)", + project_uuid, + ) + else: + # Create a synthetic exception for this unexpected state + unexpected_state_error = RuntimeError( + f"Project {project_uuid} has {len(room_sessions)} connected users but is not in Redis Resources table" + ) + _logger.error( + **create_troubleshootting_log_kwargs( + user_error_msg=f"Project {project_uuid} has {len(room_sessions)} connected users in the socket io room (This is not expected, as project resource is not in the Redis Resources table), keeping document just in case", + error=unexpected_state_error, + error_context={ + "project_uuid": str(project_uuid), + "project_room": project_room, + "key_str": key_str, + "connected_users_count": len(room_sessions), + "room_sessions": room_sessions[ + :5 + ], # Limit to first 5 sessions for debugging + }, + tip="This indicates a potential race condition or inconsistency between the Redis Resources table and socketio room state. Check if the project was recently closed but users are still connected, or if there's a synchronization issue between services.", + ) + ) + continue + + except (KeyError, AttributeError, ValueError) as exc: + _logger.exception( + **create_troubleshootting_log_kwargs( + user_error_msg=f"Failed to check room participants for project {project_uuid}", + error=exc, + error_context={ + "project_uuid": str(project_uuid), + "project_room": project_room, + "key_str": key_str, + }, + tip="Check if socketio server is properly initialized and the room exists. This could indicate a socketio manager issue or invalid room format.", + ) + ) + continue + + _logger.info("Completed: removed %d project documents", projects_removed) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_documents_service.py b/services/web/server/src/simcore_service_webserver/projects/projects_documents_service.py new file mode 100644 index 000000000000..0aedc1597a0c --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/projects/projects_documents_service.py @@ -0,0 +1,8 @@ +from ._project_document_service import ( + remove_project_documents_as_admin, +) + +__all__: tuple[str, ...] = ("remove_project_documents_as_admin",) + + +# nopycln: file diff --git a/services/web/server/src/simcore_service_webserver/resource_manager/_registry_utils.py b/services/web/server/src/simcore_service_webserver/resource_manager/_registry_utils.py new file mode 100644 index 000000000000..33452cc252ae --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/resource_manager/_registry_utils.py @@ -0,0 +1,18 @@ +import logging + +from models_library.projects import ProjectID + +from .registry import RedisResourceRegistry + +_logger = logging.getLogger(__name__) + + +async def list_opened_project_ids(registry: RedisResourceRegistry) -> list[ProjectID]: + """Lists all project IDs that are currently opened in active sessions.""" + opened_projects: list[ProjectID] = [] + all_session_alive, _ = await registry.get_all_resource_keys() + for alive_session in all_session_alive: + resources = await registry.get_resources(alive_session) + if projects_id := resources.get("project_id"): + opened_projects.append(ProjectID(projects_id)) + return opened_projects diff --git a/services/web/server/src/simcore_service_webserver/resource_manager/service.py b/services/web/server/src/simcore_service_webserver/resource_manager/service.py new file mode 100644 index 000000000000..d9a3f898e3b7 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/resource_manager/service.py @@ -0,0 +1,3 @@ +from ._registry_utils import list_opened_project_ids + +__all__ = ("list_opened_project_ids",) diff --git a/services/web/server/tests/unit/with_dbs/04/garbage_collector/test_projects_document_service.py b/services/web/server/tests/unit/with_dbs/04/garbage_collector/test_projects_document_service.py new file mode 100644 index 000000000000..b5fd6e6570fe --- /dev/null +++ b/services/web/server/tests/unit/with_dbs/04/garbage_collector/test_projects_document_service.py @@ -0,0 +1,267 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import uuid +from collections.abc import AsyncGenerator +from typing import Any + +import pytest +from aiohttp.test_utils import TestClient +from common_library.users_enums import UserRole +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects import ProjectID +from pytest_simcore.helpers.webserver_users import UserInfoDict +from servicelib.redis._project_document_version import ( + increment_and_return_project_document_version, +) +from simcore_service_webserver.projects import _project_document_service +from simcore_service_webserver.projects._project_document_service import ( + get_redis_document_manager_client_sdk, +) +from simcore_service_webserver.socketio._utils import get_socket_server + +pytest_simcore_core_services_selection = [ + "redis", +] + +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +@pytest.fixture +def sample_project_uuids() -> list[ProjectID]: + """Generate sample project UUIDs for testing.""" + return [ + ProjectID(str(uuid.uuid4())), + ProjectID(str(uuid.uuid4())), + ProjectID(str(uuid.uuid4())), + ] + + +@pytest.fixture +def redis_document_client(client: TestClient) -> Any: + """Get Redis document manager client for testing.""" + return get_redis_document_manager_client_sdk(client.app) + + +@pytest.fixture +def socketio_server(client: TestClient) -> Any: + """Get SocketIO server instance for testing.""" + return get_socket_server(client.app) + + +@pytest.fixture +async def project_documents_setup( + redis_document_client: Any, sample_project_uuids: list[ProjectID] +) -> AsyncGenerator[list[str], None]: + """Setup project documents in Redis and cleanup after test.""" + test_keys = [] + + # Setup: Create project document versions in Redis + for project_id in sample_project_uuids: + key = f"projects:{project_id}:version" + test_keys.append(key) + # Create document versions (calling twice to increment to version 2) + await increment_and_return_project_document_version( + redis_client=redis_document_client, project_uuid=project_id + ) + await increment_and_return_project_document_version( + redis_client=redis_document_client, project_uuid=project_id + ) + + # Verify keys exist before returning + for key in test_keys: + assert await redis_document_client.redis.exists(key) == 1 + + yield test_keys + + # Cleanup: Remove test keys from Redis + for key in test_keys: + await redis_document_client.redis.delete(key) + + +@pytest.fixture +async def create_project_socketio_connections( + create_socketio_connection: Any, client: TestClient, socketio_server: Any +): + """Factory fixture to create SocketIO connections with automatic cleanup.""" + connections = [] + + async def _create_connections_for_projects( + project_uuids: list[ProjectID], connected_project_indices: list[int] + ) -> list[tuple[Any, str]]: + """Create SocketIO connections and connect specified projects to their rooms. + + Args: + project_uuids: List of project UUIDs + connected_project_indices: Indices of projects that should be connected to rooms + + Returns: + List of (sio_client, session_id) tuples + """ + created_connections = [] + + for i, project_id in enumerate(project_uuids): + sio_client, session_id = await create_socketio_connection(None, client) + created_connections.append((sio_client, session_id)) + + # Connect to project room if this project index is in the connected list + if i in connected_project_indices: + project_room = SocketIORoomStr.from_project_id(project_id) + await socketio_server.enter_room(sio_client.get_sid(), project_room) + + connections.extend(created_connections) + return created_connections + + return _create_connections_for_projects + + # Cleanup: Disconnect all SocketIO clients is done already in create_socket_io_connection + + +@pytest.mark.parametrize( + "user_role", + [ + UserRole.USER, + ], +) +async def test_remove_project_documents_as_admin_with_real_connections( + client: TestClient, + logged_user: UserInfoDict, + redis_document_client: Any, + sample_project_uuids: list[ProjectID], + project_documents_setup: list[str], + create_project_socketio_connections, +): + """Test removing project documents with real Redis and SocketIO connections. + + Test scenario: + - Project 0: Has SocketIO connection -> should be preserved + - Project 1: Has SocketIO connection -> should be preserved + - Project 2: No SocketIO connection -> should be removed + """ + # Create SocketIO connections - connect first two projects to their rooms + await create_project_socketio_connections( + project_uuids=sample_project_uuids, + connected_project_indices=[0, 1], # Connect projects 0 and 1 to rooms + ) + + # Execute the function being tested + await _project_document_service.remove_project_documents_as_admin(client.app) + + # Verify results: + # Projects 0 and 1 should still have their documents (users connected) + assert ( + await redis_document_client.redis.exists( + f"projects:{sample_project_uuids[0]}:version" + ) + == 1 + ), "Project 0 should be preserved because it has active SocketIO connection" + + assert ( + await redis_document_client.redis.exists( + f"projects:{sample_project_uuids[1]}:version" + ) + == 1 + ), "Project 1 should be preserved because it has active SocketIO connection" + + # Project 2 should have its document removed (no users connected) + assert ( + await redis_document_client.redis.exists( + f"projects:{sample_project_uuids[2]}:version" + ) + == 0 + ), "Project 2 should be removed because it has no active SocketIO connections" + + +@pytest.mark.parametrize( + "user_role", + [ + UserRole.USER, + ], +) +async def test_remove_project_documents_as_admin_with_known_opened_projects( + client: TestClient, + logged_user: UserInfoDict, + redis_document_client: Any, + sample_project_uuids: list[ProjectID], + project_documents_setup: list[str], + mocker, +): + """Test that project documents are NOT removed when projects are in known opened projects list. + + Test scenario: + - Projects 0 and 1: In known opened projects list -> should be preserved + - Project 2: Not in known opened projects and no connections -> should be removed + """ + # Mock list_opened_project_ids to return the first two projects as "known opened" + known_opened_projects = sample_project_uuids[:2] # First two projects are "opened" + mocker.patch( + "simcore_service_webserver.projects._project_document_service.list_opened_project_ids", + return_value=known_opened_projects, + ) + + # Execute the function being tested + await _project_document_service.remove_project_documents_as_admin(client.app) + + # Verify results: Projects 0 and 1 should be preserved, Project 2 should be removed + assert ( + await redis_document_client.redis.exists( + f"projects:{sample_project_uuids[0]}:version" + ) + == 1 + ), "Project 0 should be kept because it's in known opened projects" + + assert ( + await redis_document_client.redis.exists( + f"projects:{sample_project_uuids[1]}:version" + ) + == 1 + ), "Project 1 should be kept because it's in known opened projects" + + assert ( + await redis_document_client.redis.exists( + f"projects:{sample_project_uuids[2]}:version" + ) + == 0 + ), "Project 2 should be removed because it's not in known opened projects and has no socket connections" + + +@pytest.mark.parametrize( + "user_role", + [ + UserRole.USER, + ], +) +async def test_remove_project_documents_as_admin_mixed_state( + client: TestClient, + logged_user: UserInfoDict, + redis_document_client: Any, + sample_project_uuids: list[ProjectID], + create_project_socketio_connections, +): + """Test mixed state: some projects have documents, some have connections without documents.""" + # Setup: Create document only for first project + test_key = f"projects:{sample_project_uuids[0]}:version" + await increment_and_return_project_document_version( + redis_client=redis_document_client, project_uuid=sample_project_uuids[0] + ) + + # Create SocketIO connection for second project (no document) + await create_project_socketio_connections( + project_uuids=sample_project_uuids[1:2], # Only second project + connected_project_indices=[0], # Connect it to room + ) + + # Execute the function + await _project_document_service.remove_project_documents_as_admin(client.app) + + # Verify: First project document should be removed (no connections) + assert ( + await redis_document_client.redis.exists(test_key) == 0 + ), "Project 0 document should be removed (no active connections)" + + # Cleanup + await redis_document_client.redis.delete(test_key)