Skip to content

Commit a751531

Browse files
committed
refactored redis client structure
1 parent 3982a20 commit a751531

File tree

9 files changed

+118
-102
lines changed

9 files changed

+118
-102
lines changed

packages/pytest-simcore/src/pytest_simcore/redis_service.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import tenacity
1111
from pytest_mock import MockerFixture
1212
from redis.asyncio import Redis, from_url
13+
from servicelib.redis import _constants as redis_constants
1314
from settings_library.basic_types import PortInt
1415
from settings_library.redis import RedisDatabase, RedisSettings
1516
from tenacity.before_sleep import before_sleep_log
@@ -118,6 +119,4 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
118119
@pytest.fixture
119120
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
120121
# lowered to allow CI to properly shutdown RedisClientSDK instances
121-
from servicelib import redis
122-
123-
mocker.patch.object(redis, "_DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
122+
mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
from ._client import RabbitMQClient
1+
from ._client import RedisClientSDK
2+
from ._clients_manager import RedisClientsManager
3+
from ._decorators import exclusive
4+
from ._distributed_locks_utils import start_exclusive_periodic_task
25
from ._errors import (
36
CouldNotAcquireLockError,
47
CouldNotConnectToRedisError,
58
LockLostError,
69
)
10+
from ._models import RedisManagerDBConfig
711

812
__all__: tuple[str, ...] = (
913
"CouldNotAcquireLockError",
1014
"CouldNotConnectToRedisError",
15+
"exclusive",
1116
"LockLostError",
17+
"RedisClientSDK",
18+
"RedisClientsManager",
19+
"RedisManagerDBConfig",
20+
"start_exclusive_periodic_task",
1221
)
1322

1423
# nopycln: file

packages/service-library/src/servicelib/redis/_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class RedisClientSDK:
4747
def redis(self) -> aioredis.Redis:
4848
return self._client
4949

50-
def __post_init__(self):
50+
def __post_init__(self) -> None:
5151
self._client = aioredis.from_url(
5252
self.redis_dsn,
5353
# Run 3 retries with exponential backoff strategy source: https://redis.readthedocs.io/en/stable/backoff.html
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import asyncio
2+
import datetime
3+
import logging
4+
from collections.abc import Awaitable, Callable
5+
6+
import arrow
7+
from servicelib.background_task import start_periodic_task
8+
9+
from ._client import RedisClientSDK
10+
from ._decorators import exclusive
11+
from ._errors import CouldNotAcquireLockError
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
async def _exclusive_task_starter(
17+
client: RedisClientSDK,
18+
usr_tsk_task: Callable[..., Awaitable[None]],
19+
*,
20+
usr_tsk_interval: datetime.timedelta,
21+
usr_tsk_task_name: str,
22+
**kwargs,
23+
) -> None:
24+
lock_key = f"lock:exclusive_task_starter:{usr_tsk_task_name}"
25+
lock_value = f"locked since {arrow.utcnow().format()}"
26+
27+
try:
28+
await exclusive(client, lock_key=lock_key, lock_value=lock_value)(
29+
start_periodic_task
30+
)(
31+
usr_tsk_task,
32+
interval=usr_tsk_interval,
33+
task_name=usr_tsk_task_name,
34+
**kwargs,
35+
)
36+
except CouldNotAcquireLockError:
37+
_logger.debug(
38+
"Could not acquire lock '%s' with value '%s'", lock_key, lock_value
39+
)
40+
except Exception as e:
41+
_logger.exception(e) # noqa: TRY401
42+
raise
43+
44+
45+
def start_exclusive_periodic_task(
46+
client: RedisClientSDK,
47+
task: Callable[..., Awaitable[None]],
48+
*,
49+
task_period: datetime.timedelta,
50+
retry_after: datetime.timedelta = datetime.timedelta(seconds=1),
51+
task_name: str,
52+
**kwargs,
53+
) -> asyncio.Task:
54+
"""
55+
Ensures that only 1 process periodically ever runs ``task`` at all times.
56+
If one process dies, another process will run the ``task``.
57+
58+
Creates a background task that periodically tries to start the user ``task``.
59+
Before the ``task`` is scheduled for periodic background execution, it acquires a lock.
60+
Subsequent calls to ``start_exclusive_periodic_task`` will not allow the same ``task``
61+
to start since the lock will prevent the scheduling.
62+
63+
Q&A:
64+
- Why is `_exclusive_task_starter` run as a task?
65+
This is usually used at setup time and cannot block the setup process forever
66+
- Why is `_exclusive_task_starter` task a periodic task?
67+
If Redis connectivity is lost, the periodic `_exclusive_task_starter` ensures the lock is
68+
reacquired
69+
"""
70+
return start_periodic_task(
71+
_exclusive_task_starter,
72+
interval=retry_after,
73+
task_name=f"exclusive_task_starter_{task_name}",
74+
client=client,
75+
usr_tsk_task=task,
76+
usr_tsk_interval=task_period,
77+
usr_tsk_task_name=task_name,
78+
**kwargs,
79+
)

packages/service-library/src/servicelib/redis/_utils.py

Lines changed: 3 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
import asyncio
2-
import datetime
32
import logging
4-
from collections.abc import Awaitable, Callable
3+
from collections.abc import Awaitable
54
from typing import Any
65

7-
import arrow
86
import redis.exceptions
97
from redis.asyncio.lock import Lock
10-
from servicelib.background_task import start_periodic_task
118

129
from ..logging_utils import log_context
13-
from ._client import RedisClientSDK
1410
from ._constants import SHUTDOWN_TIMEOUT_S
15-
from ._decorators import exclusive
16-
from ._errors import CouldNotAcquireLockError, LockLostError
11+
from ._errors import LockLostError
1712

1813
_logger = logging.getLogger(__name__)
1914

@@ -29,78 +24,12 @@ async def cancel_or_warn(task: asyncio.Task) -> None:
2924

3025
async def auto_extend_lock(lock: Lock) -> None:
3126
try:
32-
with log_context(_logger, logging.DEBUG, f"Autoextend lock {lock.name}"):
27+
with log_context(_logger, logging.DEBUG, f"Autoextend lock {lock.name!r}"):
3328
await lock.reacquire()
3429
except redis.exceptions.LockNotOwnedError as exc:
3530
raise LockLostError(lock=lock) from exc
3631

3732

38-
async def _exclusive_task_starter(
39-
client: RedisClientSDK,
40-
usr_tsk_task: Callable[..., Awaitable[None]],
41-
*,
42-
usr_tsk_interval: datetime.timedelta,
43-
usr_tsk_task_name: str,
44-
**kwargs,
45-
) -> None:
46-
lock_key = f"lock:exclusive_task_starter:{usr_tsk_task_name}"
47-
lock_value = f"locked since {arrow.utcnow().format()}"
48-
49-
try:
50-
await exclusive(client, lock_key=lock_key, lock_value=lock_value)(
51-
start_periodic_task
52-
)(
53-
usr_tsk_task,
54-
interval=usr_tsk_interval,
55-
task_name=usr_tsk_task_name,
56-
**kwargs,
57-
)
58-
except CouldNotAcquireLockError:
59-
_logger.debug(
60-
"Could not acquire lock '%s' with value '%s'", lock_key, lock_value
61-
)
62-
except Exception as e:
63-
_logger.exception(e) # noqa: TRY401
64-
raise
65-
66-
67-
def start_exclusive_periodic_task(
68-
client: RedisClientSDK,
69-
task: Callable[..., Awaitable[None]],
70-
*,
71-
task_period: datetime.timedelta,
72-
retry_after: datetime.timedelta = datetime.timedelta(seconds=1),
73-
task_name: str,
74-
**kwargs,
75-
) -> asyncio.Task:
76-
"""
77-
Ensures that only 1 process periodically ever runs ``task`` at all times.
78-
If one process dies, another process will run the ``task``.
79-
80-
Creates a background task that periodically tries to start the user ``task``.
81-
Before the ``task`` is scheduled for periodic background execution, it acquires a lock.
82-
Subsequent calls to ``start_exclusive_periodic_task`` will not allow the same ``task``
83-
to start since the lock will prevent the scheduling.
84-
85-
Q&A:
86-
- Why is `_exclusive_task_starter` run as a task?
87-
This is usually used at setup time and cannot block the setup process forever
88-
- Why is `_exclusive_task_starter` task a periodic task?
89-
If Redis connectivity is lost, the periodic `_exclusive_task_starter` ensures the lock is
90-
reacquired
91-
"""
92-
return start_periodic_task(
93-
_exclusive_task_starter,
94-
interval=retry_after,
95-
task_name=f"exclusive_task_starter_{task_name}",
96-
client=client,
97-
usr_tsk_task=task,
98-
usr_tsk_interval=task_period,
99-
usr_tsk_task_name=task_name,
100-
**kwargs,
101-
)
102-
103-
10433
async def handle_redis_returns_union_types(result: Any | Awaitable[Any]) -> Any:
10534
"""Used to handle mypy issues with redis 5.x return types"""
10635
if isinstance(result, Awaitable):

packages/service-library/tests/conftest.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ async def get_redis_client_sdk(
7777
]:
7878
@asynccontextmanager
7979
async def _(
80-
database: RedisDatabase, decode_response: bool = True # noqa: FBT002
80+
database: RedisDatabase,
81+
decode_response: bool = True, # noqa: FBT002
8182
) -> AsyncIterator[RedisClientSDK]:
8283
redis_resources_dns = redis_service.build_redis_dsn(database)
8384
client = RedisClientSDK(
@@ -97,7 +98,7 @@ async def _cleanup_redis_data(clients_manager: RedisClientsManager) -> None:
9798
await clients_manager.client(db).redis.flushall()
9899

99100
async with RedisClientsManager(
100-
{RedisManagerDBConfig(db) for db in RedisDatabase},
101+
{RedisManagerDBConfig(database=db) for db in RedisDatabase},
101102
redis_service,
102103
client_name="pytest",
103104
) as clients_manager:

packages/service-library/tests/deferred_tasks/test_deferred_tasks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
from common_library.serialization import model_dump_with_secrets
2121
from pydantic import NonNegativeFloat, NonNegativeInt
2222
from pytest_mock import MockerFixture
23-
from servicelib import redis as servicelib_redis
2423
from servicelib.rabbitmq import RabbitMQClient
2524
from servicelib.redis import RedisClientSDK
25+
from servicelib.redis import _constants as redis_client_constants
2626
from servicelib.sequences_utils import partition_gen
2727
from settings_library.rabbit import RabbitSettings
2828
from settings_library.redis import RedisSettings
@@ -353,7 +353,6 @@ def __init__(
353353
async def _pause_container(
354354
self, container_name: str, client: ClientWithPingProtocol
355355
) -> AsyncIterator[None]:
356-
357356
async with self.paused_container(container_name):
358357
async for attempt in AsyncRetrying(
359358
wait=wait_fixed(0.1),
@@ -391,7 +390,9 @@ async def pause_redis(self) -> AsyncIterator[None]:
391390
@pytest.fixture
392391
def mock_default_socket_timeout(mocker: MockerFixture) -> None:
393392
mocker.patch.object(
394-
servicelib_redis, "_DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25)
393+
redis_client_constants,
394+
"DEFAULT_SOCKET_TIMEOUT",
395+
datetime.timedelta(seconds=0.25),
395396
)
396397

397398

@@ -420,7 +421,6 @@ async def test_workflow_with_third_party_services_outages(
420421
redis_service,
421422
max_workers,
422423
) as manager:
423-
424424
# start all in parallel
425425
await asyncio.gather(
426426
*[manager.start_task(0.1, i) for i in range(deferred_tasks_to_start)]

packages/service-library/tests/test_redis.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
from faker import Faker
1515
from pytest_mock import MockerFixture
1616
from redis.exceptions import LockError, LockNotOwnedError
17-
from servicelib import redis as servicelib_redis
1817
from servicelib.redis import (
1918
CouldNotAcquireLockError,
2019
LockLostError,
2120
RedisClientSDK,
2221
RedisClientsManager,
2322
RedisManagerDBConfig,
2423
)
24+
from servicelib.redis import _constants as redis_constants
2525
from servicelib.utils import limited_gather
2626
from settings_library.redis import RedisDatabase, RedisSettings
2727
from tenacity import (
@@ -63,7 +63,7 @@ def lock_timeout() -> datetime.timedelta:
6363
@pytest.fixture
6464
def mock_default_lock_ttl(mocker: MockerFixture) -> None:
6565
mocker.patch.object(
66-
servicelib_redis, "_DEFAULT_LOCK_TTL", datetime.timedelta(seconds=0.25)
66+
redis_constants, "DEFAULT_LOCK_TTL", datetime.timedelta(seconds=0.25)
6767
)
6868

6969

@@ -166,6 +166,7 @@ async def test_lock_context(
166166
assert await ttl_lock.owned() is False
167167

168168

169+
@pytest.mark.xfail(reason="This test shows an issue, that will be fixed in the next PR")
169170
async def test_lock_context_raises_if_lock_is_lost(
170171
redis_client_sdk: RedisClientSDK, faker: Faker
171172
):
@@ -180,7 +181,6 @@ async def test_lock_context_raises_if_lock_is_lost(
180181
await asyncio.sleep(20)
181182

182183

183-
@pytest.mark.xfail(reason="This test shows an issue, that will be fixed in the next PR")
184184
async def test_lock_context_with_already_locked_lock_raises(
185185
redis_client_sdk: RedisClientSDK, faker: Faker
186186
):
@@ -213,7 +213,7 @@ async def test_lock_context_with_data(redis_client_sdk: RedisClientSDK, faker: F
213213
lock_name = faker.pystr()
214214
assert await _is_locked(redis_client_sdk, lock_name) is False
215215
assert await redis_client_sdk.lock_value(lock_name) is None
216-
async with redis_client_sdk.lock_context(lock_name, lock_value=lock_data) as lock:
216+
async with redis_client_sdk.lock_context(lock_name, lock_value=lock_data):
217217
assert await _is_locked(redis_client_sdk, lock_name) is True
218218
assert await redis_client_sdk.lock_value(lock_name) == lock_data
219219
assert await _is_locked(redis_client_sdk, lock_name) is False
@@ -254,18 +254,14 @@ async def race_condition_increase(self, by: int) -> None:
254254
current_value = self.value
255255
current_value += by
256256
# most likely situation which creates issues
257-
await asyncio.sleep(
258-
servicelib_redis._DEFAULT_LOCK_TTL.total_seconds() / 2 # noqa: SLF001
259-
)
257+
await asyncio.sleep(redis_constants.DEFAULT_LOCK_TTL.total_seconds() / 2)
260258
self.value = current_value
261259

262260
counter = RaceConditionCounter()
263261
lock_name: str = faker.pystr()
264262
# ensures it does nto time out before acquiring the lock
265263
time_for_all_inc_counter_calls_to_finish_s: float = (
266-
servicelib_redis._DEFAULT_LOCK_TTL.total_seconds() # noqa: SLF001
267-
* INCREASE_OPERATIONS
268-
* 10
264+
redis_constants.DEFAULT_LOCK_TTL.total_seconds() * INCREASE_OPERATIONS * 10
269265
)
270266

271267
async def _inc_counter() -> None:
@@ -289,7 +285,7 @@ async def test_redis_client_sdks_manager(
289285
mock_redis_socket_timeout: None, redis_service: RedisSettings
290286
):
291287
all_redis_configs: set[RedisManagerDBConfig] = {
292-
RedisManagerDBConfig(db) for db in RedisDatabase
288+
RedisManagerDBConfig(database=db) for db in RedisDatabase
293289
}
294290
manager = RedisClientsManager(
295291
databases_configs=all_redis_configs,
@@ -335,7 +331,7 @@ async def test_redis_client_sdk_setup_shutdown(
335331
@pytest.fixture
336332
def mock_default_socket_timeout(mocker: MockerFixture) -> None:
337333
mocker.patch.object(
338-
servicelib_redis, "_DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25)
334+
redis_constants, "DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25)
339335
)
340336

341337

0 commit comments

Comments
 (0)