Skip to content

Commit af38f88

Browse files
committed
handle blocking timeouts
1 parent d7556f9 commit af38f88

File tree

1 file changed

+10
-24
lines changed

1 file changed

+10
-24
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -215,40 +215,37 @@ async def acquire(self) -> bool:
215215
return True
216216

217217
ttl_seconds = int(self.ttl.total_seconds())
218-
blocking_timeout_seconds = 0.001
219-
if self.blocking:
220-
blocking_timeout_seconds = (
221-
self.blocking_timeout.total_seconds() if self.blocking_timeout else 0
222-
)
223218

224219
try:
225220

226221
@retry(
227222
stop=stop_after_delay( # this is the time after the first attempt
228-
blocking_timeout_seconds
223+
(self.blocking_timeout or 0) if self.blocking else 0
229224
),
230225
wait=wait_random_exponential(min=0.1),
231226
retry=retry_if_exception_type(redis.exceptions.TimeoutError),
232227
)
233228
async def _try_acquire() -> list[str] | None:
234229
# NOTE: brpop returns None on timeout
235-
# NOTE: the timeout here is for the socket, not for the semaphore itself
230+
# NOTE: redis-py library timeouts when the socket times out which is defined
231+
# elsewhere on the client (e.g. DEFAULT_SOCKET_TIMEOUT)
232+
# we always block forever since tenacity takes care of timing out
233+
# therefore we can distinguish between a proper timeout (returns None) and a socket
234+
# timeout (raises an exception)
236235

237236
tokens_key_token: list[str] | None = (
238237
await handle_redis_returns_union_types(
239238
self.redis_client.redis.brpop(
240239
[self.tokens_key],
241-
timeout=blocking_timeout_seconds,
240+
timeout=None, # NOTE: we always block forever since tenacity takes care of timing out
242241
)
243242
)
244243
)
245244
return tokens_key_token
246245

247246
tokens_key_token = await _try_acquire()
248247
except RetryError as e:
249-
# NOTE: this can happen with either the blocking timeout or the socket timeout
250-
# but the blocking timeout is anyway hit since tenacity did not retry more
251-
# therefore we can safely assume we could not acquire the semaphore in time
248+
# NOTE: if we end up here that means we could not acquire the semaphore
252249
_logger.debug(
253250
"Timeout acquiring semaphore '%s' (instance: %s)",
254251
self.key,
@@ -260,19 +257,8 @@ async def _try_acquire() -> list[str] | None:
260257
) from e
261258
return False
262259

263-
if tokens_key_token is None:
264-
# NOTE: when BRPOP returns None it means it timed out properly
265-
# when this triggers it is the blocking timeout of the brpop call
266-
_logger.debug(
267-
"Timeout acquiring semaphore '%s' (instance: %s)",
268-
self.key,
269-
self.instance_id,
270-
)
271-
if self.blocking:
272-
raise SemaphoreAcquisitionError(name=self.key, capacity=self.capacity)
273-
return False
274-
275-
assert len(tokens_key_token) == 2 # nosec
260+
assert tokens_key_token is not None # nosec
261+
assert len(tokens_key_token) == 2 # nosec # noqa: PLR2004
276262
assert tokens_key_token[0] == self.tokens_key # nosec
277263
token = tokens_key_token[1]
278264

0 commit comments

Comments
 (0)