Skip to content

Commit ad3ca51

Browse files
committed
fixes handling
1 parent 4e254f5 commit ad3ca51

File tree

2 files changed

+38
-6
lines changed

2 files changed

+38
-6
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
field_validator,
1515
)
1616
from redis.commands.core import AsyncScript
17+
from tenacity import (
18+
retry,
19+
retry_if_exception_type,
20+
stop_after_delay,
21+
wait_random_exponential,
22+
)
1723

1824
from ._client import RedisClientSDK
1925
from ._constants import (
@@ -216,13 +222,29 @@ async def acquire(self) -> bool:
216222
await self._initialize_semaphore()
217223

218224
try:
219-
# this is blocking pop with timeout
220-
tokens_key_token: list[str] = await handle_redis_returns_union_types(
221-
self.redis_client.redis.brpop(
222-
[self.tokens_key], timeout=blocking_timeout_seconds
223-
)
225+
226+
@retry(
227+
stop=stop_after_delay(blocking_timeout_seconds),
228+
wait=wait_random_exponential(min=0.1),
229+
retry=retry_if_exception_type(redis.exceptions.TimeoutError),
230+
reraise=True,
224231
)
232+
async def _try_acquire() -> list[str] | None:
233+
# NOTE: brpop returns None on timeout
234+
# NOTE: the timeout here is for the socket, not for the semaphore itself
235+
tokens_key_token: list[str] | None = (
236+
await handle_redis_returns_union_types(
237+
self.redis_client.redis.brpop(
238+
[self.tokens_key],
239+
timeout=blocking_timeout_seconds,
240+
)
241+
)
242+
)
243+
return tokens_key_token
244+
245+
tokens_key_token = await _try_acquire()
225246
except redis.exceptions.TimeoutError as e:
247+
# when this triggers it is the socket timeout of the client
226248
_logger.debug(
227249
"Timeout acquiring semaphore '%s' (instance: %s)",
228250
self.key,
@@ -234,6 +256,17 @@ async def acquire(self) -> bool:
234256
) from e
235257
return False
236258

259+
if tokens_key_token is None:
260+
# when this triggers it is the blocking timeout of the brpop call
261+
_logger.debug(
262+
"Timeout acquiring semaphore '%s' (instance: %s)",
263+
self.key,
264+
self.instance_id,
265+
)
266+
if self.blocking:
267+
raise SemaphoreAcquisitionError(name=self.key, capacity=self.capacity)
268+
return False
269+
237270
assert len(tokens_key_token) == 2 # nosec
238271
assert tokens_key_token[0] == self.tokens_key # nosec
239272
token = tokens_key_token[1]

packages/service-library/tests/redis/test_semaphore.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ async def test_semaphore_blocking_timeout(
224224
redis_client=redis_client_sdk,
225225
key=semaphore_name,
226226
capacity=capacity,
227-
ttl=datetime.timedelta(seconds=60),
228227
):
229228
# Second semaphore should timeout
230229
semaphore2 = DistributedSemaphore(

0 commit comments

Comments
 (0)