Skip to content

Commit 8515444

Browse files
committed
minor
1 parent 5a34e0c commit 8515444

File tree

1 file changed

+33
-30
lines changed

1 file changed

+33
-30
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@
1313
computed_field,
1414
field_validator,
1515
)
16+
from tenacity import (
17+
RetryError,
18+
retry,
19+
retry_if_not_result,
20+
stop_after_delay,
21+
stop_never,
22+
wait_fixed,
23+
)
1624

1725
from ..logging_utils import log_catch
1826
from ._client import RedisClientSDK
@@ -129,37 +137,32 @@ async def acquire(self) -> bool:
129137
if self._acquired:
130138
return True
131139

132-
start_time = asyncio.get_event_loop().time()
133-
timeout_seconds = (
134-
self.blocking_timeout.total_seconds() if self.blocking_timeout else None
140+
if not self.blocking:
141+
# Non-blocking: try once
142+
self._acquired = await self._try_acquire()
143+
return self._acquired
144+
145+
# Blocking
146+
@retry(
147+
wait=wait_fixed(0.1),
148+
reraise=True,
149+
stop=(
150+
stop_after_delay(self.blocking_timeout.total_seconds())
151+
if self.blocking_timeout
152+
else stop_never
153+
),
154+
retry=retry_if_not_result(lambda acquired: acquired),
135155
)
136-
137-
while True:
138-
# Try to acquire using Redis sorted set for atomic operations
139-
acquired = await self._try_acquire()
140-
141-
if acquired:
142-
self._acquired = True
143-
_logger.debug(
144-
"Acquired semaphore '%s' (instance: %s)",
145-
self.key,
146-
self.instance_id,
147-
)
148-
return True
149-
150-
if not self.blocking:
151-
return False
152-
153-
# Check timeout
154-
if timeout_seconds is not None:
155-
elapsed = asyncio.get_event_loop().time() - start_time
156-
if elapsed >= timeout_seconds:
157-
raise SemaphoreAcquisitionError(
158-
name=self.key, capacity=self.capacity
159-
)
160-
161-
# Wait a bit before retrying
162-
await asyncio.sleep(0.1)
156+
async def _blocking_acquire() -> bool:
157+
return await self._try_acquire()
158+
159+
try:
160+
self._acquired = await _blocking_acquire()
161+
return self._acquired
162+
except RetryError as exc:
163+
raise SemaphoreAcquisitionError(
164+
name=self.key, capacity=self.capacity
165+
) from exc
163166

164167
async def release(self) -> None:
165168
"""

0 commit comments

Comments
 (0)