Skip to content

Commit b25e711

Browse files
author
Andrei Neagu
committed
added cleanup utilities
1 parent 5ca9f04 commit b25e711

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import logging
2+
3+
import redis.asyncio as aioredis
4+
from models_library.projects_nodes_io import NodeID
5+
from settings_library.redis import RedisDatabase, RedisSettings
6+
7+
from ..redis._client import RedisClientSDK
8+
from .models import LRTNamespace
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
13+
class ClientLongRunningManager:
14+
def __init__(self, redis_settings: RedisSettings):
15+
self.redis_settings = redis_settings
16+
17+
self._client: RedisClientSDK | None = None
18+
19+
async def setup(self) -> None:
20+
self._client = RedisClientSDK(
21+
self.redis_settings.build_redis_dsn(RedisDatabase.LONG_RUNNING_TASKS),
22+
client_name="long_running_tasks_cleanup_client",
23+
)
24+
await self._client.setup()
25+
26+
async def shutdown(self) -> None:
27+
if self._client:
28+
await self._client.shutdown()
29+
30+
@property
31+
def _redis(self) -> aioredis.Redis:
32+
assert self._client # nosec
33+
return self._client.redis
34+
35+
@classmethod
36+
def get_sidecar_namespace(cls, node_id: NodeID) -> LRTNamespace:
37+
return f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}"
38+
39+
async def cleanup_store(self, lrt_namespace: LRTNamespace) -> None:
40+
"""Cleanups all Redis keys for the given LRTNamespace"""
41+
keys_to_remove: list[str] = [
42+
x async for x in self._redis.scan_iter(f"{lrt_namespace}*")
43+
]
44+
_logger.info(
45+
"Removing keys='%s' from Redis for namespace '%s'",
46+
keys_to_remove,
47+
lrt_namespace,
48+
)
49+
await self._redis.delete(*keys_to_remove)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# pylint:disable=redefined-outer-name
2+
3+
from collections.abc import AsyncIterable, Callable
4+
from contextlib import AbstractAsyncContextManager
5+
6+
import pytest
7+
from pydantic import TypeAdapter
8+
from servicelib.long_running_tasks._redis_store import RedisStore
9+
from servicelib.long_running_tasks.client_long_running_manager import (
10+
ClientLongRunningManager,
11+
)
12+
from servicelib.long_running_tasks.models import LRTNamespace, TaskData
13+
from servicelib.redis._client import RedisClientSDK
14+
from settings_library.redis import RedisDatabase, RedisSettings
15+
16+
17+
@pytest.fixture
18+
def task_data() -> TaskData:
19+
return TypeAdapter(TaskData).validate_python(
20+
TaskData.model_json_schema()["examples"][0]
21+
)
22+
23+
24+
@pytest.fixture
25+
def lrt_namespace() -> LRTNamespace:
26+
return "TEST-NAMESPACE"
27+
28+
29+
@pytest.fixture
30+
async def store(
31+
use_in_memory_redis: RedisSettings,
32+
get_redis_client_sdk: Callable[
33+
[RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]
34+
],
35+
lrt_namespace: LRTNamespace,
36+
) -> AsyncIterable[RedisStore]:
37+
store = RedisStore(redis_settings=use_in_memory_redis, namespace=lrt_namespace)
38+
39+
await store.setup()
40+
yield store
41+
await store.shutdown()
42+
43+
# triggers cleanup of all redis data
44+
async with get_redis_client_sdk(RedisDatabase.LONG_RUNNING_TASKS):
45+
pass
46+
47+
48+
@pytest.fixture
49+
async def client_long_running_manager(
50+
use_in_memory_redis: RedisSettings, lrt_namespace: LRTNamespace
51+
) -> AsyncIterable[ClientLongRunningManager]:
52+
manager = ClientLongRunningManager(redis_settings=use_in_memory_redis)
53+
54+
await manager.setup()
55+
yield manager
56+
await manager.shutdown()
57+
58+
59+
async def test_cleanup_namespace(
60+
store: RedisStore,
61+
task_data: TaskData,
62+
client_long_running_manager: ClientLongRunningManager,
63+
lrt_namespace: LRTNamespace,
64+
) -> None:
65+
# create entries in both sides
66+
await store.add_task_data(task_data.task_id, task_data)
67+
await store.mark_task_for_removal(task_data.task_id, task_data.task_context)
68+
69+
# entries exit
70+
assert await store.list_tasks_data() == [task_data]
71+
assert await store.list_tasks_to_remove() == {
72+
task_data.task_id: task_data.task_context
73+
}
74+
75+
# removes
76+
await client_long_running_manager.cleanup_store(lrt_namespace)
77+
78+
# entris were removed
79+
assert await store.list_tasks_data() == []
80+
assert await store.list_tasks_to_remove() == {}

0 commit comments

Comments
 (0)