Skip to content

Commit a2290e1

Browse files
committed
added the blocking behavior
1 parent 2aa2f4e commit a2290e1

File tree

2 files changed

+50
-45
lines changed

2 files changed

+50
-45
lines changed

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import functools
44
import logging
55
from collections.abc import Callable, Coroutine
6+
from datetime import timedelta
67
from typing import Any, ParamSpec, TypeVar
78

89
import redis.exceptions
@@ -25,6 +26,8 @@ def exclusive(
2526
*,
2627
lock_key: str | Callable[..., str],
2728
lock_value: bytes | str | None = None,
29+
blocking: bool = False,
30+
blocking_timeout: timedelta | None = None,
2831
) -> Callable[
2932
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
3033
]:
@@ -64,7 +67,13 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
6467
assert isinstance(client, RedisClientSDK) # nosec
6568

6669
lock = client.create_lock(redis_lock_key, ttl=DEFAULT_LOCK_TTL)
67-
if not await lock.acquire(token=lock_value):
70+
if not await lock.acquire(
71+
token=lock_value,
72+
blocking=blocking,
73+
blocking_timeout=blocking_timeout.total_seconds()
74+
if blocking_timeout
75+
else None,
76+
):
6877
raise CouldNotAcquireLockError(lock=lock)
6978

7079
try:

packages/service-library/tests/redis/test_decorators.py

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from collections.abc import Awaitable, Callable
1010
from datetime import timedelta
1111
from itertools import chain
12+
from typing import Final
1213
from unittest.mock import Mock
1314

1415
import arrow
@@ -22,7 +23,7 @@
2223
start_exclusive_periodic_task,
2324
)
2425
from servicelib.redis._errors import LockLostError
25-
from servicelib.utils import logged_gather
26+
from servicelib.utils import limited_gather, logged_gather
2627
from tenacity.asyncio import AsyncRetrying
2728
from tenacity.retry import retry_if_exception_type
2829
from tenacity.stop import stop_after_delay
@@ -332,46 +333,41 @@ async def _() -> None:
332333
assert await redis_client_sdk.lock_value(lock_name) is None
333334

334335

335-
# async def test_lock_acquired_in_parallel_to_update_same_resource(
336-
# with_short_default_redis_lock_ttl: None,
337-
# get_redis_client_sdk: Callable[
338-
# [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]
339-
# ],
340-
# faker: Faker,
341-
# ):
342-
# INCREASE_OPERATIONS: Final[int] = 250
343-
# INCREASE_BY: Final[int] = 10
344-
345-
# class RaceConditionCounter:
346-
# def __init__(self):
347-
# self.value: int = 0
348-
349-
# async def race_condition_increase(self, by: int) -> None:
350-
# current_value = self.value
351-
# current_value += by
352-
# # most likely situation which creates issues
353-
# await asyncio.sleep(redis_constants.DEFAULT_LOCK_TTL.total_seconds() / 2)
354-
# self.value = current_value
355-
356-
# counter = RaceConditionCounter()
357-
# lock_name: str = faker.pystr()
358-
# # ensures it does nto time out before acquiring the lock
359-
# time_for_all_inc_counter_calls_to_finish_s: float = (
360-
# redis_constants.DEFAULT_LOCK_TTL.total_seconds() * INCREASE_OPERATIONS * 10
361-
# )
362-
363-
# async def _inc_counter() -> None:
364-
# async with get_redis_client_sdk(
365-
# RedisDatabase.RESOURCES
366-
# ) as redis_client_sdk:
367-
# async with redis_client_sdk.lock_context(
368-
# lock_key=lock_name,
369-
# blocking=True,
370-
# blocking_timeout_s=time_for_all_inc_counter_calls_to_finish_s,
371-
# ):
372-
# await counter.race_condition_increase(INCREASE_BY)
373-
374-
# await limited_gather(
375-
# *(_inc_counter() for _ in range(INCREASE_OPERATIONS)), limit=15
376-
# )
377-
# assert counter.value == INCREASE_BY * INCREASE_OPERATIONS
336+
async def test_lock_acquired_in_parallel_to_update_same_resource(
337+
with_short_default_redis_lock_ttl: datetime.timedelta,
338+
redis_client_sdk: RedisClientSDK,
339+
lock_name: str,
340+
):
341+
INCREASE_OPERATIONS: Final[int] = 250
342+
INCREASE_BY: Final[int] = 10
343+
344+
class RaceConditionCounter:
345+
def __init__(self) -> None:
346+
self.value: int = 0
347+
348+
async def race_condition_increase(self, by: int) -> None:
349+
current_value = self.value
350+
current_value += by
351+
# most likely situation which creates issues
352+
await asyncio.sleep(with_short_default_redis_lock_ttl.total_seconds() / 2)
353+
self.value = current_value
354+
355+
counter = RaceConditionCounter()
356+
# ensures it does nto time out before acquiring the lock
357+
time_for_all_inc_counter_calls_to_finish = (
358+
with_short_default_redis_lock_ttl * INCREASE_OPERATIONS * 10
359+
)
360+
361+
@exclusive(
362+
redis_client_sdk,
363+
lock_key=lock_name,
364+
blocking=True,
365+
blocking_timeout=time_for_all_inc_counter_calls_to_finish,
366+
)
367+
async def _inc_counter() -> None:
368+
await counter.race_condition_increase(INCREASE_BY)
369+
370+
await limited_gather(
371+
*(_inc_counter() for _ in range(INCREASE_OPERATIONS)), limit=15
372+
)
373+
assert counter.value == INCREASE_BY * INCREASE_OPERATIONS

0 commit comments

Comments
 (0)