Skip to content

Commit bca1a27

Browse files
🎨 Add removal of project documents from the Redis (Garbage Collection background task) (#8177)
1 parent 2c424ad commit bca1a27

File tree

9 files changed

+471
-13
lines changed

9 files changed

+471
-13
lines changed

services/web/server/src/simcore_service_webserver/garbage_collector/_core_orphans.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
88
DynamicServiceStop,
99
)
10-
from models_library.projects import ProjectID
1110
from models_library.projects_nodes_io import NodeID
1211
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
1312
from servicelib.logging_errors import create_troubleshootting_log_kwargs
@@ -21,6 +20,7 @@
2120
)
2221
from ..projects.api import has_user_project_access_rights
2322
from ..resource_manager.registry import RedisResourceRegistry
23+
from ..resource_manager.service import list_opened_project_ids
2424
from ..users import users_service
2525
from ..users.exceptions import UserNotFoundError
2626

@@ -71,16 +71,6 @@ async def _remove_service(
7171
)
7272

7373

74-
async def _list_opened_project_ids(registry: RedisResourceRegistry) -> list[ProjectID]:
75-
opened_projects: list[ProjectID] = []
76-
all_session_alive, _ = await registry.get_all_resource_keys()
77-
for alive_session in all_session_alive:
78-
resources = await registry.get_resources(alive_session)
79-
if "project_id" in resources:
80-
opened_projects.append(ProjectID(resources["project_id"]))
81-
return opened_projects
82-
83-
8474
async def remove_orphaned_services(
8575
registry: RedisResourceRegistry, app: web.Application
8676
) -> None:
@@ -105,7 +95,7 @@ async def remove_orphaned_services(
10595
service.node_uuid: service for service in running_services
10696
}
10797

108-
known_opened_project_ids = await _list_opened_project_ids(registry)
98+
known_opened_project_ids = await list_opened_project_ids(registry)
10999

110100
# NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error.
111101
# Why? If a service is running but the nodes form the correspondign project cannot be listed,
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""
2+
Scheduled tasks addressing users
3+
4+
"""
5+
6+
import logging
7+
from collections.abc import AsyncIterator
8+
from datetime import timedelta
9+
10+
from aiohttp import web
11+
from servicelib.background_task_utils import exclusive_periodic
12+
from servicelib.logging_utils import log_context
13+
14+
from ..projects import projects_documents_service
15+
from ..redis import get_redis_lock_manager_client_sdk
16+
from ._tasks_utils import CleanupContextFunc, periodic_task_lifespan
17+
18+
_logger = logging.getLogger(__name__)
19+
20+
21+
def create_background_task_to_prune_documents(wait_s: float) -> CleanupContextFunc:
22+
23+
async def _cleanup_ctx_fun(app: web.Application) -> AsyncIterator[None]:
24+
interval = timedelta(seconds=wait_s)
25+
26+
@exclusive_periodic(
27+
# Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
28+
get_redis_lock_manager_client_sdk(app),
29+
task_interval=interval,
30+
retry_after=min(timedelta(seconds=10), interval / 10),
31+
)
32+
async def _prune_documents_periodically() -> None:
33+
with log_context(
34+
_logger,
35+
logging.INFO,
36+
"Deleting project documents in Redis `documents` table started",
37+
):
38+
await projects_documents_service.remove_project_documents_as_admin(app)
39+
40+
async for _ in periodic_task_lifespan(app, _prune_documents_periodically):
41+
yield
42+
43+
return _cleanup_ctx_fun

services/web/server/src/simcore_service_webserver/garbage_collector/plugin.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from ..projects._projects_repository_legacy import setup_projects_db
1111
from ..redis import setup_redis
1212
from ..socketio.plugin import setup_socketio
13-
from . import _tasks_api_keys, _tasks_core, _tasks_trash, _tasks_users
13+
from . import _tasks_api_keys, _tasks_core, _tasks_documents, _tasks_trash, _tasks_users
1414
from .settings import get_plugin_settings
1515

1616
_logger = logging.getLogger(__name__)
@@ -66,3 +66,8 @@ def setup_garbage_collector(app: web.Application) -> None:
6666
app.cleanup_ctx.append(
6767
_tasks_trash.create_background_task_to_prune_trash(wait_period_s)
6868
)
69+
70+
wait_period_s = settings.GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S
71+
app.cleanup_ctx.append(
72+
_tasks_documents.create_background_task_to_prune_documents(wait_period_s)
73+
)

services/web/server/src/simcore_service_webserver/garbage_collector/settings.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ class GarbageCollectorSettings(BaseCustomSettings):
3838
Field(description="Wait time between periodic pruning of expired API keys"),
3939
] = _HOUR
4040

41+
GARBAGE_COLLECTOR_PRUNE_DOCUMENTS_INTERVAL_S: Annotated[
42+
PositiveInt,
43+
Field(description="Wait time between periodic pruning of documents"),
44+
] = (
45+
30 * _MINUTE
46+
)
47+
4148

4249
def get_plugin_settings(app: web.Application) -> GarbageCollectorSettings:
4350
settings = app[APP_SETTINGS_KEY].WEBSERVER_GARBAGE_COLLECTOR

services/web/server/src/simcore_service_webserver/projects/_project_document_service.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@
33
This module contains common utilities for building and versioning project documents.
44
"""
55

6+
import logging
7+
import re
68
from typing import cast
79

810
from aiohttp import web
911
from models_library.api_schemas_webserver.projects import (
1012
ProjectDocument,
1113
ProjectDocumentVersion,
1214
)
15+
from models_library.api_schemas_webserver.socketio import SocketIORoomStr
1316
from models_library.projects import ProjectID, ProjectTemplateType
1417
from models_library.projects import ProjectType as ProjectTypeAPI
18+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
19+
from servicelib.logging_utils import log_context
1520
from servicelib.redis import (
1621
PROJECT_DB_UPDATE_REDIS_LOCK_KEY,
1722
exclusive,
@@ -22,8 +27,13 @@
2227
get_redis_document_manager_client_sdk,
2328
get_redis_lock_manager_client_sdk,
2429
)
30+
from ..resource_manager.registry import get_registry
31+
from ..resource_manager.service import list_opened_project_ids
32+
from ..socketio._utils import get_socket_server
2533
from . import _projects_repository
2634

35+
_logger = logging.getLogger(__name__)
36+
2737

2838
async def create_project_document_and_increment_version(
2939
app: web.Application, project_uuid: ProjectID
@@ -84,3 +94,110 @@ async def _create_project_document_and_increment_version() -> (
8494
return project_document, document_version
8595

8696
return await _create_project_document_and_increment_version()
97+
98+
99+
async def remove_project_documents_as_admin(app: web.Application) -> None:
100+
"""Admin function to clean up project documents for projects with no connected users.
101+
102+
This function scans through all project documents in the Redis DOCUMENTS database,
103+
checks if there are any users currently connected to the project room via socketio,
104+
and removes documents that have no connected users.
105+
"""
106+
with log_context(
107+
_logger,
108+
logging.INFO,
109+
msg="Project document cleanup started",
110+
):
111+
# Get Redis document manager client to access the DOCUMENTS database
112+
redis_client = get_redis_document_manager_client_sdk(app)
113+
114+
# Pattern to match project document keys - looking for keys that contain project UUIDs
115+
project_document_pattern = "projects:*:version"
116+
117+
# Get socketio server instance
118+
sio = get_socket_server(app)
119+
120+
# Get known opened projects ids based on Redis resources table
121+
registry = get_registry(app)
122+
known_opened_project_ids = await list_opened_project_ids(registry)
123+
known_opened_project_ids_set = set(known_opened_project_ids)
124+
125+
projects_removed = 0
126+
127+
# Scan through all project document keys
128+
async for key in redis_client.redis.scan_iter(
129+
match=project_document_pattern, count=1000
130+
):
131+
# Extract project UUID from the key pattern "projects:{project_uuid}:version"
132+
key_str = key.decode("utf-8") if isinstance(key, bytes) else key
133+
match = re.match(r"projects:(?P<project_uuid>[0-9a-f-]+):version", key_str)
134+
135+
if not match:
136+
continue
137+
138+
project_uuid_str = match.group("project_uuid")
139+
project_uuid = ProjectID(project_uuid_str)
140+
project_room = SocketIORoomStr.from_project_id(project_uuid)
141+
142+
# 1. CHECK - Check if the project UUID is in the known opened projects
143+
if project_uuid in known_opened_project_ids_set:
144+
_logger.debug(
145+
"Project %s is in Redis Resources table (which means Project is opened), keeping document",
146+
project_uuid,
147+
)
148+
continue
149+
150+
# 2. CHECK - Check if there are any users connected to this project room
151+
try:
152+
# Get all session IDs (socket IDs) in the project room
153+
room_sessions = list(
154+
sio.manager.get_participants(namespace="/", room=project_room)
155+
)
156+
157+
# If no users are connected to this project room, remove the document
158+
if not room_sessions:
159+
await redis_client.redis.delete(key_str)
160+
projects_removed += 1
161+
_logger.info(
162+
"Removed project document for project %s (no connected users)",
163+
project_uuid,
164+
)
165+
else:
166+
# Create a synthetic exception for this unexpected state
167+
unexpected_state_error = RuntimeError(
168+
f"Project {project_uuid} has {len(room_sessions)} connected users but is not in Redis Resources table"
169+
)
170+
_logger.error(
171+
**create_troubleshootting_log_kwargs(
172+
user_error_msg=f"Project {project_uuid} has {len(room_sessions)} connected users in the socket io room (This is not expected, as project resource is not in the Redis Resources table), keeping document just in case",
173+
error=unexpected_state_error,
174+
error_context={
175+
"project_uuid": str(project_uuid),
176+
"project_room": project_room,
177+
"key_str": key_str,
178+
"connected_users_count": len(room_sessions),
179+
"room_sessions": room_sessions[
180+
:5
181+
], # Limit to first 5 sessions for debugging
182+
},
183+
tip="This indicates a potential race condition or inconsistency between the Redis Resources table and socketio room state. Check if the project was recently closed but users are still connected, or if there's a synchronization issue between services.",
184+
)
185+
)
186+
continue
187+
188+
except (KeyError, AttributeError, ValueError) as exc:
189+
_logger.exception(
190+
**create_troubleshootting_log_kwargs(
191+
user_error_msg=f"Failed to check room participants for project {project_uuid}",
192+
error=exc,
193+
error_context={
194+
"project_uuid": str(project_uuid),
195+
"project_room": project_room,
196+
"key_str": key_str,
197+
},
198+
tip="Check if socketio server is properly initialized and the room exists. This could indicate a socketio manager issue or invalid room format.",
199+
)
200+
)
201+
continue
202+
203+
_logger.info("Completed: removed %d project documents", projects_removed)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from ._project_document_service import (
2+
remove_project_documents_as_admin,
3+
)
4+
5+
__all__: tuple[str, ...] = ("remove_project_documents_as_admin",)
6+
7+
8+
# nopycln: file
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import logging
2+
3+
from models_library.projects import ProjectID
4+
5+
from .registry import RedisResourceRegistry
6+
7+
_logger = logging.getLogger(__name__)
8+
9+
10+
async def list_opened_project_ids(registry: RedisResourceRegistry) -> list[ProjectID]:
11+
"""Lists all project IDs that are currently opened in active sessions."""
12+
opened_projects: list[ProjectID] = []
13+
all_session_alive, _ = await registry.get_all_resource_keys()
14+
for alive_session in all_session_alive:
15+
resources = await registry.get_resources(alive_session)
16+
if projects_id := resources.get("project_id"):
17+
opened_projects.append(ProjectID(projects_id))
18+
return opened_projects
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from ._registry_utils import list_opened_project_ids
2+
3+
__all__ = ("list_opened_project_ids",)

0 commit comments

Comments
 (0)