Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b2d3396
Adds client session ID propagation for user actions
matusdrobuliak66 Jul 29, 2025
92cb19f
Merge branch 'master' into is1647/collaboration-features-2
matusdrobuliak66 Jul 29, 2025
8c5b95d
Removes unused frontend outputs update logic
matusdrobuliak66 Jul 29, 2025
a68c91e
fix tests
matusdrobuliak66 Jul 30, 2025
4d3d978
Refactors project document update for clarity and atomicity
matusdrobuliak66 Jul 30, 2025
503ef0f
fix tests
matusdrobuliak66 Jul 30, 2025
f58843c
Merge branch 'master' into is1647/collaboration-features-2
matusdrobuliak66 Jul 30, 2025
ae5be6f
review @sanderegg
matusdrobuliak66 Jul 30, 2025
0d5a6c3
review @sanderegg @pcrespov
matusdrobuliak66 Jul 30, 2025
89a9394
review @pcrespov
matusdrobuliak66 Jul 30, 2025
ac80fed
fix
matusdrobuliak66 Jul 30, 2025
26ac23b
fix
matusdrobuliak66 Jul 30, 2025
0b8c698
fix
matusdrobuliak66 Jul 30, 2025
e4b4a0b
additional tests
matusdrobuliak66 Jul 30, 2025
22cae1b
Adds periodic cleanup task for project documents
matusdrobuliak66 Jul 30, 2025
04110d4
Removes project documents with no active users
matusdrobuliak66 Jul 30, 2025
ed21eb0
Refactors project document cleanup to use shared registry utility
matusdrobuliak66 Jul 30, 2025
b07870a
additional tests
matusdrobuliak66 Jul 30, 2025
ff0062f
Refactors project document removal unit tests with fixtures
matusdrobuliak66 Jul 30, 2025
998ff14
additional tests
matusdrobuliak66 Jul 30, 2025
76d70e0
fix mypy
matusdrobuliak66 Jul 30, 2025
32b0db0
Merge branch 'master' into is1647/collaboration-features-3
matusdrobuliak66 Jul 30, 2025
4653951
add GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S
matusdrobuliak66 Jul 30, 2025
a8ee4d9
Merge branch 'master' into is1647/collaboration-features-3
matusdrobuliak66 Jul 31, 2025
05bfe3e
review @sanderegg @pcrespov
matusdrobuliak66 Jul 31, 2025
88402f5
review @pcrespov
matusdrobuliak66 Jul 31, 2025
b3d1213
review @pcrespov
matusdrobuliak66 Jul 31, 2025
94c1cdc
review @sanderegg
matusdrobuliak66 Jul 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStop,
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.logging_errors import create_troubleshootting_log_kwargs
Expand All @@ -21,6 +20,7 @@
)
from ..projects.api import has_user_project_access_rights
from ..resource_manager.registry import RedisResourceRegistry
from ..resource_manager.registry_utils import list_opened_project_ids
from ..users import users_service
from ..users.exceptions import UserNotFoundError

Expand Down Expand Up @@ -71,16 +71,6 @@ async def _remove_service(
)


async def _list_opened_project_ids(registry: RedisResourceRegistry) -> list[ProjectID]:
opened_projects: list[ProjectID] = []
all_session_alive, _ = await registry.get_all_resource_keys()
for alive_session in all_session_alive:
resources = await registry.get_resources(alive_session)
if "project_id" in resources:
opened_projects.append(ProjectID(resources["project_id"]))
return opened_projects


async def remove_orphaned_services(
registry: RedisResourceRegistry, app: web.Application
) -> None:
Expand All @@ -105,7 +95,7 @@ async def remove_orphaned_services(
service.node_uuid: service for service in running_services
}

known_opened_project_ids = await _list_opened_project_ids(registry)
known_opened_project_ids = await list_opened_project_ids(registry)

# NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
# Why? If a service is running but the nodes form the correspondign project cannot be listed,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Scheduled tasks addressing users

"""

import logging
from collections.abc import AsyncIterator
from datetime import timedelta

from aiohttp import web
from servicelib.background_task_utils import exclusive_periodic
from servicelib.logging_utils import log_context

from ..projects import projects_documents_service
from ..redis import get_redis_lock_manager_client_sdk
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan

_logger = logging.getLogger(__name__)


def create_background_task_to_prune_documents(wait_s: float) -> CleanupContextFunc:

async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
interval = timedelta(seconds=wait_s)

@exclusive_periodic(
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
get_redis_lock_manager_client_sdk(app),
task_interval=interval,
retry_after=min(timedelta(seconds=10), interval / 10),
)
async def _prune_documents_periodically() -> None:
with log_context(
_logger,
logging.INFO,
"Deleting project documents in Redis `documents` table started",
):
await projects_documents_service.remove_project_documents_as_admin(app)

async for _ in periodic_task_lifespan(app, _prune_documents_periodically):
yield

return _cleanup_ctx_fun
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..projects._projects_repository_legacy import setup_projects_db
from ..redis import setup_redis
from ..socketio.plugin import setup_socketio
from . import _tasks_api_keys, _tasks_core, _tasks_trash, _tasks_users
from . import _tasks_api_keys, _tasks_core, _tasks_documents, _tasks_trash, _tasks_users
from .settings import get_plugin_settings

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,3 +66,8 @@ def setup_garbage_collector(app: web.Application) -> None:
app.cleanup_ctx.append(
_tasks_trash.create_background_task_to_prune_trash(wait_period_s)
)

wait_period_s = settings.GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S
app.cleanup_ctx.append(
_tasks_documents.create_background_task_to_prune_documents(wait_period_s)
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class GarbageCollectorSettings(BaseCustomSettings):
Field(description="Wait time between periodic pruning of expired API keys"),
] = _HOUR

GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S: Annotated[
PositiveInt,
Field(description="Wait time between periodic pruning of documents"),
] = (
30 * _MINUTE
)


def get_plugin_settings(app: web.Application) -> GarbageCollectorSettings:
settings = app[APP_SETTINGS_KEY].WEBSERVER_GARBAGE_COLLECTOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
This module contains common utilities for building and versioning project documents.
"""

import logging
import re
from typing import cast

from aiohttp import web
from models_library.api_schemas_webserver.projects import (
ProjectDocument,
ProjectDocumentVersion,
)
from models_library.api_schemas_webserver.socketio import SocketIORoomStr
from models_library.projects import ProjectID, ProjectTemplateType
from models_library.projects import ProjectType as ProjectTypeAPI
from servicelib.redis import (
Expand All @@ -22,8 +25,13 @@
get_redis_document_manager_client_sdk,
get_redis_lock_manager_client_sdk,
)
from ..resource_manager.registry import get_registry
from ..resource_manager.registry_utils import list_opened_project_ids
from ..socketio._utils import get_socket_server
from . import _projects_repository

_logger = logging.getLogger(__name__)


async def create_project_document_and_increment_version(
app: web.Application, project_uuid: ProjectID
Expand Down Expand Up @@ -84,3 +92,84 @@ async def _create_project_document_and_increment_version() -> (
return project_document, document_version

return await _create_project_document_and_increment_version()


async def remove_project_documents_as_admin(app: web.Application) -> None:
"""Admin function to clean up project documents for projects with no connected users.

This function scans through all project documents in the Redis DOCUMENTS database,
checks if there are any users currently connected to the project room via socketio,
and removes documents that have no connected users.
"""
# Get Redis document manager client to access the DOCUMENTS database
redis_client = get_redis_document_manager_client_sdk(app)

# Pattern to match project document keys - looking for keys that contain project UUIDs
project_document_pattern = "projects:*:version"

# Get socketio server instance
sio = get_socket_server(app)

# Get known opened projects ids based on Redis resources table
registry = get_registry(app)
known_opened_project_ids = await list_opened_project_ids(registry)
known_opened_project_ids_set = set(known_opened_project_ids)

projects_removed = 0

# Scan through all project document keys
async for key in redis_client.redis.scan_iter(
match=project_document_pattern, count=1000
):
# Extract project UUID from the key pattern "projects:{project_uuid}:version"
key_str = key.decode("utf-8") if isinstance(key, bytes) else key
match = re.match(r"projects:([0-9a-f-]+):version", key_str)

if not match:
continue

project_uuid_str = match.group(1)
project_uuid = ProjectID(project_uuid_str)
project_room = SocketIORoomStr.from_project_id(project_uuid)

# 1. CHECK - Check if the project UUID is in the known opened projects
if project_uuid in known_opened_project_ids_set:
_logger.debug(
"Project %s is in Redis Resources table (which means Project is opened), keeping document",
project_uuid,
)
continue

# 2. CHECK - Check if there are any users connected to this project room
try:
# Get all session IDs (socket IDs) in the project room
room_sessions = list(
sio.manager.get_participants(namespace="/", room=project_room)
)

# If no users are connected to this project room, remove the document
if not room_sessions:
await redis_client.redis.delete(key_str)
projects_removed += 1
_logger.info(
"Removed project document for project %s (no connected users)",
project_uuid,
)
else:
_logger.error(
"Project %s has %d connected users in the socket io room (This is not expected, as project resource is not in the Redis Resources table), keeping document just in case",
project_uuid,
len(room_sessions),
)

except (KeyError, AttributeError, ValueError):
_logger.exception(
"Failed to check room participants for project %s",
project_uuid,
)
continue

_logger.info(
"Project document cleanup completed: removed %d project documents",
projects_removed,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from ._project_document_service import (
remove_project_documents_as_admin,
)

__all__: tuple[str, ...] = ("remove_project_documents_as_admin",)


# nopycln: file
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

from models_library.projects import ProjectID

from .registry import RedisResourceRegistry

_logger = logging.getLogger(__name__)


async def list_opened_project_ids(registry: RedisResourceRegistry) -> list[ProjectID]:
"""Lists all project IDs that are currently opened in active sessions."""
opened_projects: list[ProjectID] = []
all_session_alive, _ = await registry.get_all_resource_keys()
for alive_session in all_session_alive:
resources = await registry.get_resources(alive_session)
if "project_id" in resources:
opened_projects.append(ProjectID(resources["project_id"]))
return opened_projects
Loading
Loading