Skip to content

Commit 9e0b3ab

Browse files
committed
refactor
1 parent 06b6d84 commit 9e0b3ab

File tree

7 files changed

+193
-164
lines changed

7 files changed

+193
-164
lines changed

services/web/server/src/simcore_service_webserver/garbage_collector/_core_disconnected.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def remove_disconnected_user_resources(
3030

3131
# clean up all resources of expired keys
3232
for dead_session in dead_user_sessions:
33-
user_id = dead_session["user_id"]
33+
user_id = dead_session.user_id
3434

3535
# (0) If key has no resources => remove from registry and continue
3636
resources = await registry.get_resources(dead_session)
@@ -63,7 +63,7 @@ async def remove_disconnected_user_resources(
6363
await _projects_service.close_project_for_user(
6464
user_id=user_id,
6565
project_uuid=project_id,
66-
client_session_id=dead_session["client_session_id"],
66+
client_session_id=dead_session.client_session_id,
6767
app=app,
6868
simcore_user_agent=UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
6969
wait_for_service_closed=True,

services/web/server/src/simcore_service_webserver/garbage_collector/_core_guests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ async def remove_users_manually_marked_as_guests(
192192
) = await registry.get_all_resource_keys()
193193

194194
skip_users = {
195-
int(user_session["user_id"])
195+
user_session.user_id
196196
for user_session in itertools.chain(
197197
all_user_session_alive, all_user_sessions_dead
198198
)

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@
110110
get_redis_document_manager_client_sdk,
111111
get_redis_lock_manager_client_sdk,
112112
)
113+
from ..resource_manager.models import UserSession
113114
from ..resource_manager.user_sessions import (
114115
PROJECT_ID_KEY,
115-
UserSessionID,
116116
managed_resource,
117117
)
118118
from ..resource_usage import service as rut_api
@@ -1441,7 +1441,7 @@ async def post_trigger_connected_service_retrieve(
14411441

14421442

14431443
async def _user_has_another_active_session(
1444-
users_sessions_ids: list[UserSessionID], app: web.Application
1444+
users_sessions_ids: list[UserSession], app: web.Application
14451445
) -> bool:
14461446
# NOTE if there is an active socket in use, that means the client is active
14471447
for u in users_sessions_ids:
@@ -1452,7 +1452,7 @@ async def _user_has_another_active_session(
14521452

14531453

14541454
async def _clean_user_disconnected_clients(
1455-
users_sessions_ids: list[UserSessionID], app: web.Application
1455+
users_sessions_ids: list[UserSession], app: web.Application
14561456
):
14571457
for u in users_sessions_ids:
14581458
with managed_resource(u.user_id, u.client_session_id, app) as user_session:
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from typing import Final, Self, TypeAlias, TypedDict
2+
3+
from models_library.basic_types import UUIDStr
4+
from models_library.users import UserID
5+
from pydantic import BaseModel, ConfigDict
6+
from pydantic.config import JsonDict
7+
8+
ALIVE_SUFFIX: Final[str] = "alive" # points to a string type
9+
RESOURCE_SUFFIX: Final[str] = "resources" # points to a hash (like a dict) type
10+
RedisHashKey: TypeAlias = str
11+
12+
13+
class UserSession(BaseModel):
14+
"""Parts of the key used in redis for a user-session"""
15+
16+
user_id: UserID
17+
client_session_id: str
18+
19+
def to_redis_hash_key(self) -> RedisHashKey:
20+
return ":".join(f"{k}={v}" for k, v in self.model_dump().items())
21+
22+
@classmethod
23+
def from_redis_hash_key(cls, hash_key: RedisHashKey) -> Self:
24+
key = dict(x.split("=") for x in hash_key.split(":") if "=" in x)
25+
return cls.model_validate(key)
26+
27+
@staticmethod
28+
def _update_json_schema_extra(schema: JsonDict) -> None:
29+
schema.update(
30+
{
31+
"exampels": [
32+
{
33+
"user_id": 7,
34+
"client_session_id": "c7fc4985-f96a-4be3-a8ed-5a43b1aa15e2",
35+
},
36+
{
37+
"user_id": 666,
38+
"client_session_id": "*",
39+
},
40+
]
41+
}
42+
)
43+
44+
model_config = ConfigDict(
45+
frozen=True,
46+
json_schema_extra=_update_json_schema_extra,
47+
)
48+
49+
50+
class ResourcesDict(TypedDict, total=False):
51+
"""Field-value pairs of {user_id}:{client_session_id}:resources key"""
52+
53+
project_id: UUIDStr
54+
socket_id: str
55+
56+
57+
AliveSessions: TypeAlias = list[UserSession]
58+
DeadSessions: TypeAlias = list[UserSession]

services/web/server/src/simcore_service_webserver/resource_manager/registry.py

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,21 @@
1414
"""
1515

1616
import logging
17-
from typing import TypeAlias
1817

1918
import redis.asyncio as aioredis
2019
from aiohttp import web
21-
from models_library.basic_types import UUIDStr
22-
from models_library.users import UserID
2320
from servicelib.redis import handle_redis_returns_union_types
24-
from typing_extensions import ( # https://docs.pydantic.dev/latest/api/standard_library_types/#typeddict
25-
TypedDict,
26-
)
2721

2822
from ..redis import get_redis_resources_client
2923
from ._constants import APP_CLIENT_SOCKET_REGISTRY_KEY
24+
from .models import (
25+
ALIVE_SUFFIX,
26+
RESOURCE_SUFFIX,
27+
AliveSessions,
28+
DeadSessions,
29+
ResourcesDict,
30+
UserSession,
31+
)
3032

3133
_logger = logging.getLogger(__name__)
3234

@@ -35,26 +37,6 @@
3537
# Key: user_id=1:client_session_id=7f40353b-db02-4474-a44d-23ce6a6e428c:alive = 1
3638
# Key: user_id=1:client_session_id=7f40353b-db02-4474-a44d-23ce6a6e428c:resources = {project_id: ... , socket_id: ...}
3739
#
38-
_ALIVE_SUFFIX = "alive" # points to a string type
39-
_RESOURCE_SUFFIX = "resources" # points to a hash (like a dict) type
40-
41-
42-
class UserSession(TypedDict):
43-
"""Parts of the key used in redis for a user-session"""
44-
45-
user_id: UserID
46-
client_session_id: str
47-
48-
49-
class ResourcesDict(TypedDict, total=False):
50-
"""Field-value pairs of {user_id}:{client_session_id}:resources key"""
51-
52-
project_id: UUIDStr
53-
socket_id: str
54-
55-
56-
AliveSessions: TypeAlias = list[UserSession]
57-
DeadSessions: TypeAlias = list[UserSession]
5840

5941

6042
class RedisResourceRegistry:
@@ -75,17 +57,12 @@ def __init__(self, app: web.Application):
7557
def app(self) -> web.Application:
7658
return self._app
7759

78-
@classmethod
79-
def _hash_key(cls, key: UserSession) -> str:
80-
hash_key: str = ":".join(f"{k}={v}" for k, v in key.items())
81-
return hash_key
82-
8360
@classmethod
8461
def _decode_hash_key(cls, hash_key: str) -> UserSession:
8562
tmp_key = (
86-
hash_key[: -len(f":{_RESOURCE_SUFFIX}")]
87-
if hash_key.endswith(f":{_RESOURCE_SUFFIX}")
88-
else hash_key[: -len(f":{_ALIVE_SUFFIX}")]
63+
hash_key[: -len(f":{RESOURCE_SUFFIX}")]
64+
if hash_key.endswith(f":{RESOURCE_SUFFIX}")
65+
else hash_key[: -len(f":{ALIVE_SUFFIX}")]
8966
)
9067
key = dict(x.split("=") for x in tmp_key.split(":"))
9168
return UserSession(**key) # type: ignore
@@ -96,27 +73,27 @@ def client(self) -> aioredis.Redis:
9673
return client
9774

9875
async def set_resource(self, key: UserSession, resource: tuple[str, str]) -> None:
99-
hash_key = f"{self._hash_key(key)}:{_RESOURCE_SUFFIX}"
76+
hash_key = f"{key.to_redis_hash_key()}:{RESOURCE_SUFFIX}"
10077
field, value = resource
10178
await handle_redis_returns_union_types(
10279
self.client.hset(hash_key, mapping={field: value})
10380
)
10481

10582
async def get_resources(self, key: UserSession) -> ResourcesDict:
106-
hash_key = f"{self._hash_key(key)}:{_RESOURCE_SUFFIX}"
83+
hash_key = f"{key.to_redis_hash_key()}:{RESOURCE_SUFFIX}"
10784
fields = await handle_redis_returns_union_types(self.client.hgetall(hash_key))
10885
return ResourcesDict(**fields)
10986

11087
async def remove_resource(self, key: UserSession, resource_name: str) -> None:
111-
hash_key = f"{self._hash_key(key)}:{_RESOURCE_SUFFIX}"
88+
hash_key = f"{key.to_redis_hash_key()}:{RESOURCE_SUFFIX}"
11289
await handle_redis_returns_union_types(
11390
self.client.hdel(hash_key, resource_name)
11491
)
11592

11693
async def find_resources(self, key: UserSession, resource_name: str) -> list[str]:
11794
resources: list[str] = []
11895
# the key might only be partialy complete
119-
partial_hash_key = f"{self._hash_key(key)}:{_RESOURCE_SUFFIX}"
96+
partial_hash_key = f"{key.to_redis_hash_key()}:{RESOURCE_SUFFIX}"
12097
async for scanned_key in self.client.scan_iter(match=partial_hash_key):
12198
if await handle_redis_returns_union_types(
12299
self.client.hexists(scanned_key, resource_name)
@@ -135,35 +112,35 @@ async def find_keys(self, resource: tuple[str, str]) -> list[UserSession]:
135112
field, value = resource
136113
return [
137114
self._decode_hash_key(hash_key)
138-
async for hash_key in self.client.scan_iter(match=f"*:{_RESOURCE_SUFFIX}")
115+
async for hash_key in self.client.scan_iter(match=f"*:{RESOURCE_SUFFIX}")
139116
if value
140117
== await handle_redis_returns_union_types(self.client.hget(hash_key, field))
141118
]
142119

143120
async def set_key_alive(self, key: UserSession, timeout: int) -> None:
144121
# setting the timeout to always expire, timeout > 0
145122
timeout = int(max(1, timeout))
146-
hash_key = f"{self._hash_key(key)}:{_ALIVE_SUFFIX}"
123+
hash_key = f"{key.to_redis_hash_key()}:{ALIVE_SUFFIX}"
147124
await self.client.set(hash_key, 1, ex=timeout)
148125

149126
async def is_key_alive(self, key: UserSession) -> bool:
150-
hash_key = f"{self._hash_key(key)}:{_ALIVE_SUFFIX}"
127+
hash_key = f"{key.to_redis_hash_key()}:{ALIVE_SUFFIX}"
151128
return bool(await self.client.exists(hash_key) > 0)
152129

153130
async def remove_key(self, key: UserSession) -> None:
154131
await self.client.delete(
155-
f"{self._hash_key(key)}:{_RESOURCE_SUFFIX}",
156-
f"{self._hash_key(key)}:{_ALIVE_SUFFIX}",
132+
f"{key.to_redis_hash_key()}:{RESOURCE_SUFFIX}",
133+
f"{key.to_redis_hash_key()}:{ALIVE_SUFFIX}",
157134
)
158135

159136
async def get_all_resource_keys(self) -> tuple[AliveSessions, DeadSessions]:
160137
alive_keys = [
161138
self._decode_hash_key(hash_key)
162-
async for hash_key in self.client.scan_iter(match=f"*:{_ALIVE_SUFFIX}")
139+
async for hash_key in self.client.scan_iter(match=f"*:{ALIVE_SUFFIX}")
163140
]
164141
dead_keys = [
165142
self._decode_hash_key(hash_key)
166-
async for hash_key in self.client.scan_iter(match=f"*:{_RESOURCE_SUFFIX}")
143+
async for hash_key in self.client.scan_iter(match=f"*:{RESOURCE_SUFFIX}")
167144
if self._decode_hash_key(hash_key) not in alive_keys
168145
]
169146

services/web/server/src/simcore_service_webserver/resource_manager/user_sessions.py

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
from models_library.users import UserID
99
from servicelib.logging_utils import get_log_record_extra, log_context
1010

11+
from .models import ResourcesDict, UserSession
1112
from .registry import (
1213
RedisResourceRegistry,
13-
ResourcesDict,
14-
UserSession,
1514
get_registry,
1615
)
1716
from .settings import ResourceManagerSettings, get_plugin_settings
@@ -31,12 +30,6 @@ def _get_service_deletion_timeout(app: web.Application) -> int:
3130
return settings.RESOURCE_MANAGER_RESOURCE_TTL_S
3231

3332

34-
@dataclass(order=True, frozen=True)
35-
class UserSessionID:
36-
user_id: UserID
37-
client_session_id: str
38-
39-
4033
@dataclass
4134
class UserSessionResourcesRegistry:
4235
"""
@@ -61,7 +54,7 @@ class UserSessionResourcesRegistry:
6154
6255
"""
6356

64-
user_id: int
57+
user_id: UserID
6558
client_session_id: str | None # Every tab that a user opens
6659
app: web.Application
6760

@@ -71,7 +64,7 @@ def _registry(self) -> RedisResourceRegistry:
7164

7265
def _resource_key(self) -> UserSession:
7366
return UserSession(
74-
user_id=f"{self.user_id}",
67+
user_id=self.user_id,
7568
client_session_id=self.client_session_id or "*",
7669
)
7770

@@ -138,11 +131,10 @@ async def find_socket_ids(self) -> list[str]:
138131
extra=get_log_record_extra(user_id=self.user_id),
139132
)
140133

141-
user_sockets: list[str] = await self._registry.find_resources(
142-
{"user_id": f"{self.user_id}", "client_session_id": "*"},
134+
return await self._registry.find_resources(
135+
UserSession(user_id=self.user_id, client_session_id="*"),
143136
_SOCKET_ID_FIELDNAME,
144137
)
145-
return user_sockets
146138

147139
async def find_all_resources_of_user(self, key: str) -> list[str]:
148140
with log_context(
@@ -151,10 +143,9 @@ async def find_all_resources_of_user(self, key: str) -> list[str]:
151143
msg=f"{self.user_id=} finding all {key} from registry",
152144
extra=get_log_record_extra(user_id=self.user_id),
153145
):
154-
resources: list[str] = await get_registry(self.app).find_resources(
155-
{"user_id": f"{self.user_id}", "client_session_id": "*"}, key
146+
return await get_registry(self.app).find_resources(
147+
UserSession(user_id=self.user_id, client_session_id="*"), key
156148
)
157-
return resources
158149

159150
async def find(self, resource_name: str) -> list[str]:
160151
_logger.debug(
@@ -165,10 +156,7 @@ async def find(self, resource_name: str) -> list[str]:
165156
extra=get_log_record_extra(user_id=self.user_id),
166157
)
167158

168-
resource_values: list[str] = await self._registry.find_resources(
169-
self._resource_key(), resource_name
170-
)
171-
return resource_values
159+
return await self._registry.find_resources(self._resource_key(), resource_name)
172160

173161
async def add(self, key: str, value: str) -> None:
174162
_logger.debug(
@@ -196,25 +184,15 @@ async def remove(self, key: str) -> None:
196184
@staticmethod
197185
async def find_users_of_resource(
198186
app: web.Application, key: str, value: str
199-
) -> list[UserSessionID]:
187+
) -> list[UserSession]:
200188
registry = get_registry(app)
201-
registry_keys: list[UserSession] = await registry.find_keys(
202-
resource=(key, value)
203-
)
204-
users_sessions_ids: list[UserSessionID] = [
205-
UserSessionID(
206-
user_id=int(r["user_id"]),
207-
client_session_id=r["client_session_id"],
208-
)
209-
for r in registry_keys
210-
]
211-
return users_sessions_ids
189+
return await registry.find_keys(resource=(key, value))
212190

213-
def get_id(self) -> UserSessionID:
191+
def get_id(self) -> UserSession:
214192
if self.client_session_id is None:
215193
msg = f"Cannot build UserSessionID with missing {self.client_session_id=}"
216194
raise ValueError(msg)
217-
return UserSessionID(
195+
return UserSession(
218196
user_id=self.user_id, client_session_id=self.client_session_id
219197
)
220198

0 commit comments

Comments
 (0)