Skip to content

Commit 2abde0c

Browse files
committed
fix
1 parent a210b2d commit 2abde0c

File tree

4 files changed

+25
-10
lines changed

4 files changed

+25
-10
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,15 @@ def validate_timeout(
194194
async def _ensure_semaphore_initialized(self) -> None:
195195
"""Initializes the semaphore in Redis if not already done."""
196196
assert self.register_semaphore is not None # nosec
197-
await self.register_semaphore( # pylint: disable=not-callable
197+
result = await self.register_semaphore( # pylint: disable=not-callable
198198
keys=[self.tokens_key, self.holders_set],
199199
args=[self.capacity, self.holders_set_ttl.total_seconds()],
200200
client=self.redis_client.redis,
201201
)
202+
assert isinstance(result, list) # nosec
203+
exit_code, status = result
204+
assert exit_code == SCRIPT_OK_EXIT_CODE # nosec
205+
_logger.debug("Semaphore '%s' init status: %s", self.key, status)
202206

203207
async def _blocking_acquire(self) -> str | None:
204208
@retry(
@@ -462,9 +466,9 @@ async def _periodic_reacquisition(
462466
) -> None:
463467
if cancellation_event.is_set():
464468
raise asyncio.CancelledError
469+
await semaphore.reacquire()
465470
if not started.is_set():
466471
started.set()
467-
await semaphore.reacquire()
468472

469473
lock_acquisition_time = None
470474
try:

packages/service-library/src/servicelib/redis/lua/register_semaphore_tokens.lua

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,25 @@ local holders_key = KEYS[2]
1414
local capacity = tonumber(ARGV[1])
1515
local ttl_seconds = tonumber(ARGV[2])
1616

17-
-- Step 1: Initialize token pool if needed (first time setup)
18-
local tokens_exist = redis.call('EXISTS', tokens_key)
19-
local holders_exist = redis.call('EXISTS', holders_key)
20-
if tokens_exist == 0 and holders_exist == 0 then
17+
-- Use a persistent marker to track if semaphore was ever initialized
18+
local init_marker_key = tokens_key .. ':initialized'
19+
20+
-- Check if we've ever initialized this semaphore
21+
local was_initialized = redis.call('EXISTS', init_marker_key)
22+
23+
if was_initialized == 0 then
24+
-- First time initialization - set the permanent marker
25+
redis.call('SET', init_marker_key, '1')
26+
redis.call('EXPIRE', init_marker_key, ttl_seconds)
27+
2128
-- Initialize with capacity number of tokens
2229
for i = 1, capacity do
2330
redis.call('LPUSH', tokens_key, 'token_' .. i)
2431
end
25-
-- Set expiry on tokens list to prevent infinite growth
32+
-- Set expiry on tokens list
2633
redis.call('EXPIRE', tokens_key, ttl_seconds)
34+
return {0, 'initialized'}
2735
end
2836

29-
return 0
37+
38+
return {0, 'already_initialized'}

packages/service-library/src/servicelib/redis/lua/renew_semaphore.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,7 @@ redis.call('SETEX', holder_key, ttl_seconds, token)
4141
-- Step 4: Renew the holders set and tokens list TTLs to prevent infinite growth
4242
redis.call('EXPIRE', holders_key, holders_ttl_seconds)
4343
redis.call('EXPIRE', tokens_key, tokens_ttl_seconds)
44+
local init_marker_tokens_key = tokens_key .. ':initialized'
45+
redis.call('EXPIRE', init_marker_tokens_key, tokens_ttl_seconds)
4446

4547
return {0, 'renewed', redis.call('SCARD', holders_key)}

packages/service-library/tests/redis/test_semaphore_decorator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ async def test_decorator_with_callable_parameters(
153153
):
154154
executed_keys = []
155155

156-
def get_redis_client(*args, **kwargs):
156+
def get_redis_client(*args, **kwargs) -> RedisClientSDK:
157157
return redis_client_sdk
158158

159159
def get_key(user_id: str, resource: str) -> str:
@@ -196,7 +196,7 @@ async def test_decorator_capacity_enforcement(
196196
key=semaphore_name,
197197
capacity=2,
198198
)
199-
async def limited_function():
199+
async def limited_function() -> None:
200200
nonlocal concurrent_count, max_concurrent
201201
concurrent_count += 1
202202
max_concurrent = max(max_concurrent, concurrent_count)

0 commit comments

Comments
 (0)