Skip to content

Commit d8b520b

Browse files
author
Andrei Neagu
committed
added base scheduler store
1 parent 3b15a9a commit d8b520b

File tree

3 files changed

+330
-0
lines changed

3 files changed

+330
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from typing import TypeAlias
2+
3+
ScheduleId: TypeAlias = str
4+
OperationName: TypeAlias = str
5+
StepGroup: TypeAlias = str
6+
StepName: TypeAlias = str
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import base64
2+
import pickle
3+
from typing import Any, Final, Literal, NotRequired, TypedDict, overload
4+
5+
import redis.asyncio as aioredis
6+
from pydantic import NonNegativeInt
7+
from servicelib.redis._client import RedisClientSDK
8+
from servicelib.redis._utils import handle_redis_returns_union_types
9+
from settings_library.redis import RedisDatabase, RedisSettings
10+
11+
from ._models import OperationName, ScheduleId, StepGroup, StepName
12+
13+
_SCHEDULE_NAMESPACE: Final[str] = "SCH"
14+
_STEPS_KEY: Final[str] = "STEPS"
15+
# Figure out hwo to store data in Redis as flat as possible
16+
17+
18+
def _get_scheduler_data_hash_key(*, schedule_id: ScheduleId) -> str:
19+
# SCHEDULE_NAMESPACE:SCHEDULE_ID:KEY
20+
# Example:
21+
# - SCH:00000000-0000-0000-0000-000000000000
22+
return f"{_SCHEDULE_NAMESPACE}:{schedule_id}"
23+
24+
25+
def _get_step_hash_key(
26+
*,
27+
schedule_id: ScheduleId,
28+
operation_name: OperationName,
29+
group: StepGroup,
30+
step_name: StepName,
31+
) -> str:
32+
# SCHEDULE_NAMESPACE:SCHEDULE_ID:STEPS:OPERATION_NAME:GROUP_INDEX:STEP_NAME:KEY
33+
# - SCHEDULE_NAMESPACE: something short to identify tgis
34+
# - SCHEDULE_ID: the unique scheudle_id assigned
35+
# - STEPS: the constant "STEPS"
36+
# - OPERATION_NAME form the vairble's name during registration
37+
# - GROUP_INDEX
38+
# -> "{index}(S|P)[R]": S=single or P=parallel and optinally, "R" if steps should be repeated forever
39+
# - STEP_NAME form it's class
40+
# Example:
41+
# - SCH:00000000-0000-0000-0000-000000000000:STEPS:START_SERVICE:0S:BS1
42+
return f"{_SCHEDULE_NAMESPACE}:{schedule_id}:{_STEPS_KEY}:{operation_name}:{group}:{step_name}"
43+
44+
45+
def _dumps(obj: Any) -> str:
46+
return base64.b85encode(pickle.dumps(obj)).decode("utf-8")
47+
48+
49+
def _loads(obj_str: str) -> Any:
50+
return pickle.loads(base64.b85decode(obj_str)) # noqa: S301
51+
52+
53+
class Store:
54+
def __init__(self, redis_settings: RedisSettings) -> None:
55+
self.redis_settings = redis_settings
56+
57+
self._client: RedisClientSDK | None = None
58+
59+
async def setup(self) -> None:
60+
self._client = RedisClientSDK(
61+
self.redis_settings.build_redis_dsn(RedisDatabase.DYNAMIC_SERVICES),
62+
client_name=__name__,
63+
)
64+
await self._client.setup()
65+
66+
async def shutdown(self) -> None:
67+
if self._client:
68+
await self._client.shutdown()
69+
70+
@property
71+
def _redis(self) -> aioredis.Redis:
72+
assert self._client # nosec
73+
return self._client.redis
74+
75+
async def set_multiple(self, hash_key: str, updates: dict[str, Any]) -> None:
76+
"""saves multiple key-value pairs in a hash"""
77+
await handle_redis_returns_union_types(
78+
self._redis.hset(
79+
hash_key, mapping={k: _dumps(v) for k, v in updates.items()}
80+
)
81+
)
82+
83+
async def set(self, hash_key: str, key: str, value: Any) -> None:
84+
"""saves a single key-value pair in a hash"""
85+
await self.set_multiple(hash_key, {key: value})
86+
87+
async def get(self, hash_key: str, *keys: str) -> tuple[Any, ...]:
88+
"""retrieves one or more keys from a hash"""
89+
result: list[str | None] = await handle_redis_returns_union_types(
90+
self._redis.hmget(hash_key, list(keys))
91+
)
92+
return tuple(_loads(x) if x else None for x in result)
93+
94+
async def delete(self, hash_key: str, *keys: str) -> None:
95+
"""removes one or more keys form a hash"""
96+
await handle_redis_returns_union_types(self._redis.hdel(hash_key, *keys))
97+
98+
async def remove(self, hash_key: str) -> None:
99+
"""removes the entire hash"""
100+
await handle_redis_returns_union_types(self._redis.delete(hash_key))
101+
102+
103+
class _UpdateScheduleDataDict(TypedDict):
104+
operation_name: NotRequired[OperationName]
105+
group_index: NotRequired[NonNegativeInt]
106+
is_creating: NotRequired[bool]
107+
108+
109+
_DeleteScheduleDataKeys = Literal["operation_name", "group_index", "is_creating"]
110+
111+
112+
class ScheduleDataStoreProxy:
113+
def __init__(self, *, store: Store, schedule_id: ScheduleId) -> None:
114+
self._store = store
115+
self._schedule_id = schedule_id
116+
117+
def _get_hash_key(self) -> str:
118+
return _get_scheduler_data_hash_key(schedule_id=self._schedule_id)
119+
120+
@overload
121+
async def get(self, key: Literal["operation_name"]) -> str: ...
122+
@overload
123+
async def get(self, key: Literal["group_index"]) -> int: ...
124+
@overload
125+
async def get(self, key: Literal["is_creating"]) -> bool: ...
126+
async def get(self, key: str) -> Any:
127+
(result,) = await self._store.get(self._get_hash_key(), key)
128+
return result
129+
130+
@overload
131+
async def set(self, key: Literal["operation_name"], value: str) -> None: ...
132+
@overload
133+
async def set(self, key: Literal["group_index"], value: int) -> None: ...
134+
@overload
135+
async def set(self, key: Literal["is_creating"], *, value: bool) -> None: ...
136+
async def set(self, key: str, value: Any) -> None:
137+
await self._store.set(self._get_hash_key(), key, value)
138+
139+
async def set_multiple(self, values: _UpdateScheduleDataDict) -> None:
140+
await self._store.set_multiple(self._get_hash_key(), updates=values)
141+
142+
async def delete(self, *keys: _DeleteScheduleDataKeys) -> None:
143+
await self._store.delete(self._get_hash_key(), *keys)
144+
145+
146+
class StepStoreProxy:
147+
def __init__(
148+
self,
149+
*,
150+
store: Store,
151+
schedule_id: ScheduleId,
152+
operation_name: OperationName,
153+
group: StepGroup,
154+
step_name: StepName,
155+
) -> None:
156+
self._store = store
157+
self._schedule_id = schedule_id
158+
self._operation_name = operation_name
159+
self._group = group
160+
self._step_name = step_name
161+
162+
def _get_hash_key(self) -> str:
163+
return _get_step_hash_key(
164+
schedule_id=self._schedule_id,
165+
operation_name=self._operation_name,
166+
group=self._group,
167+
step_name=self._step_name,
168+
)
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# pylint: disable=protected-access
2+
# pylint: disable=redefined-outer-name
3+
# pylint: disable=unused-argument
4+
5+
from collections.abc import AsyncIterable
6+
from enum import Enum
7+
from typing import Any, Self
8+
9+
import pytest
10+
from faker import Faker
11+
from servicelib.redis._utils import handle_redis_returns_union_types
12+
from settings_library.redis import RedisSettings
13+
from simcore_service_dynamic_scheduler.services.generic_scheduler._models import (
14+
ScheduleId,
15+
)
16+
from simcore_service_dynamic_scheduler.services.generic_scheduler._store import (
17+
ScheduleDataStoreProxy,
18+
Store,
19+
)
20+
21+
22+
@pytest.fixture
23+
async def store(use_in_memory_redis: RedisSettings) -> AsyncIterable[Store]:
24+
store = Store(use_in_memory_redis)
25+
await store.setup()
26+
yield store
27+
await store._redis.client().flushall() # noqa: SLF001
28+
await store.shutdown()
29+
30+
31+
async def _assert_keys(store: Store, expected_keys: set[str]) -> None:
32+
keys = set(await store._redis.keys()) # noqa: SLF001
33+
assert keys == expected_keys
34+
35+
36+
async def _assert_keys_in_hash(
37+
store: Store, hash_key: str, expected_keys: set[str]
38+
) -> None:
39+
keys = set(
40+
await handle_redis_returns_union_types(
41+
store._redis.hkeys(hash_key) # noqa: SLF001
42+
)
43+
)
44+
assert keys == expected_keys
45+
46+
47+
async def test_store_workflow(store: Store):
48+
# save single value
49+
await store.set("hash1", "key1", "value1")
50+
await _assert_keys(store, {"hash1"})
51+
await _assert_keys_in_hash(store, "hash1", {"key1"})
52+
assert await store.get("hash1", "key1") == ("value1",)
53+
assert await store.get("hash1", "key1", "key1") == ("value1", "value1")
54+
assert await store.get("hash1", "missing1", "missing2") == (None, None)
55+
56+
# remove last key in hash
57+
await store.delete("hash1", "key1")
58+
await _assert_keys(store, set())
59+
await _assert_keys_in_hash(store, "hash1", set())
60+
assert await store.get("hash1", "key1") == (None,)
61+
62+
# save multiple values
63+
await store.set_multiple("hash2", {"key1": "value1", "key2": 2, "key3": True})
64+
await _assert_keys(store, {"hash2"})
65+
await _assert_keys_in_hash(store, "hash2", {"key1", "key2", "key3"})
66+
assert await store.get("hash2", "key1", "key2", "key3") == ("value1", 2, True)
67+
68+
# delete a few keys form hash
69+
await store.delete("hash2", "key1", "key3", "missing1", "missing2", "missing3")
70+
await _assert_keys(store, {"hash2"})
71+
await _assert_keys_in_hash(store, "hash2", {"key2"})
72+
assert await store.get("hash2", "key1", "key2", "key3") == (None, 2, None)
73+
74+
# remove hash completely
75+
await store.remove("hash2")
76+
await _assert_keys(store, set())
77+
await _assert_keys_in_hash(store, "hash2", set())
78+
assert await store.get("hash2", "key1", "key2", "key3") == (None, None, None)
79+
80+
81+
class _BS(str, Enum):
82+
A = "A"
83+
B = "B"
84+
C = "C"
85+
86+
87+
class _CustomObj:
88+
def __init__(self, a: int, b: str) -> None:
89+
self.a = a
90+
self.b = b
91+
92+
def __eq__(self, other: Self) -> bool:
93+
if not isinstance(other, _CustomObj):
94+
return False
95+
return self.a == other.a and self.b == other.b
96+
97+
98+
@pytest.mark.parametrize(
99+
"value",
100+
[
101+
1,
102+
"a",
103+
b"some_bytes",
104+
3.14,
105+
True,
106+
None,
107+
{"dict": "with_data"},
108+
{"a", "set"},
109+
{"some": _BS.A, "enum": _BS.B},
110+
_CustomObj(1, "a"),
111+
],
112+
)
113+
async def test_store_supporse_multiple_python_base_types(store: Store, value: Any):
114+
await store.set("hash1", "key1", value)
115+
assert (await store.get("hash1", "key1")) == (value,)
116+
117+
118+
@pytest.fixture
119+
def schedule_id(faker: Faker) -> ScheduleId:
120+
return faker.uuid4()
121+
122+
123+
async def test_schedule_data_store_proxy_workflow(
124+
store: Store, schedule_id: ScheduleId
125+
):
126+
proxy = ScheduleDataStoreProxy(store=store, schedule_id=schedule_id)
127+
hash_key = f"SCH:{schedule_id}"
128+
129+
# set
130+
await proxy.set("operation_name", "op1")
131+
await proxy.set("group_index", 1)
132+
await proxy.set("is_creating", value=True)
133+
await _assert_keys(store, {hash_key})
134+
await _assert_keys_in_hash(
135+
store, hash_key, {"operation_name", "group_index", "is_creating"}
136+
)
137+
138+
# get
139+
assert await proxy.get("operation_name") == "op1"
140+
assert await proxy.get("group_index") == 1
141+
assert await proxy.get("is_creating") is True
142+
143+
# remove
144+
await proxy.delete("operation_name", "is_creating", "group_index")
145+
await _assert_keys(store, set())
146+
await _assert_keys_in_hash(store, hash_key, set())
147+
148+
# set multiple
149+
await proxy.set_multiple({"group_index": 2, "is_creating": False})
150+
await _assert_keys(store, {hash_key})
151+
await _assert_keys_in_hash(store, hash_key, {"group_index", "is_creating"})
152+
153+
# remove all keys an even missing ones
154+
await proxy.delete("operation_name", "is_creating", "group_index")
155+
await _assert_keys(store, set())
156+
await _assert_keys_in_hash(store, hash_key, set())

0 commit comments

Comments
 (0)