Skip to content

Commit d7556f9

Browse files
committed
adjusting
1 parent ad3ca51 commit d7556f9

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from redis.commands.core import AsyncScript
1717
from tenacity import (
18+
RetryError,
1819
retry,
1920
retry_if_exception_type,
2021
stop_after_delay,
@@ -182,7 +183,7 @@ def validate_timeout(
182183
raise ValueError(msg)
183184
return v
184185

185-
async def _initialize_semaphore(self) -> None:
186+
async def _ensure_semaphore_initialized(self) -> None:
186187
"""Initializes the semaphore in Redis if not already done."""
187188
ttl_seconds = int(self.ttl.total_seconds())
188189
cls = type(self)
@@ -203,6 +204,7 @@ async def acquire(self) -> bool:
203204
Raises:
204205
SemaphoreAcquisitionError: If acquisition fails and blocking=True
205206
"""
207+
await self._ensure_semaphore_initialized()
206208

207209
if await self.is_acquired():
208210
_logger.debug(
@@ -219,19 +221,19 @@ async def acquire(self) -> bool:
219221
self.blocking_timeout.total_seconds() if self.blocking_timeout else 0
220222
)
221223

222-
await self._initialize_semaphore()
223-
224224
try:
225225

226226
@retry(
227-
stop=stop_after_delay(blocking_timeout_seconds),
227+
stop=stop_after_delay( # this is the time after the first attempt
228+
blocking_timeout_seconds
229+
),
228230
wait=wait_random_exponential(min=0.1),
229231
retry=retry_if_exception_type(redis.exceptions.TimeoutError),
230-
reraise=True,
231232
)
232233
async def _try_acquire() -> list[str] | None:
233234
# NOTE: brpop returns None on timeout
234235
# NOTE: the timeout here is for the socket, not for the semaphore itself
236+
235237
tokens_key_token: list[str] | None = (
236238
await handle_redis_returns_union_types(
237239
self.redis_client.redis.brpop(
@@ -243,8 +245,10 @@ async def _try_acquire() -> list[str] | None:
243245
return tokens_key_token
244246

245247
tokens_key_token = await _try_acquire()
246-
except redis.exceptions.TimeoutError as e:
247-
# when this triggers it is the socket timeout of the client
248+
except RetryError as e:
249+
# NOTE: this can happen with either the blocking timeout or the socket timeout
250+
# but the blocking timeout is anyway hit since tenacity did not retry more
251+
# therefore we can safely assume we could not acquire the semaphore in time
248252
_logger.debug(
249253
"Timeout acquiring semaphore '%s' (instance: %s)",
250254
self.key,
@@ -257,6 +261,7 @@ async def _try_acquire() -> list[str] | None:
257261
return False
258262

259263
if tokens_key_token is None:
264+
# NOTE: when BRPOP returns None it means it timed out properly
260265
# when this triggers it is the blocking timeout of the brpop call
261266
_logger.debug(
262267
"Timeout acquiring semaphore '%s' (instance: %s)",
@@ -425,7 +430,7 @@ async def get_current_count(self) -> int:
425430

426431
async def get_available_count(self) -> int:
427432
"""Get the number of available semaphore slots"""
428-
await self._initialize_semaphore()
433+
await self._ensure_semaphore_initialized()
429434
return await handle_redis_returns_union_types(
430435
self.redis_client.redis.llen(self.tokens_key)
431436
)

0 commit comments

Comments
 (0)