Skip to content

Commit ba2fd03

Browse files
committed
ensure we have auto expiry on keys
1 parent 06c0ac6 commit ba2fd03

File tree

5 files changed

+37
-10
lines changed

5 files changed

+37
-10
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,18 @@ def holders_set(self) -> str:
155155
"""Redis key for the holders SET."""
156156
return f"{SEMAPHORE_KEY_PREFIX}{self.key}:holders_set"
157157

158+
@computed_field # type: ignore[prop-decorator]
159+
@property
160+
def holders_set_ttl(self) -> datetime.timedelta:
161+
"""TTL for the holders SET"""
162+
return self.ttl * 5
163+
164+
@computed_field # type: ignore[prop-decorator]
165+
@property
166+
def tokens_set_ttl(self) -> datetime.timedelta:
167+
"""TTL for the tokens SET"""
168+
return self.ttl * 5
169+
158170
@computed_field # type: ignore[prop-decorator]
159171
@property
160172
def holder_key(self) -> str:
@@ -181,12 +193,11 @@ def validate_timeout(
181193

182194
async def _ensure_semaphore_initialized(self) -> None:
183195
"""Initializes the semaphore in Redis if not already done."""
184-
ttl_seconds = self.ttl.total_seconds()
185196
cls = type(self)
186197
assert cls.register_semaphore is not None # nosec
187198
await cls.register_semaphore( # pylint: disable=not-callable
188199
keys=[self.tokens_key, self.holders_set],
189-
args=[self.capacity, ttl_seconds],
200+
args=[self.capacity, self.holders_set_ttl.total_seconds()],
190201
client=self.redis_client.redis,
191202
)
192203

@@ -259,8 +270,6 @@ async def acquire(self) -> bool:
259270
)
260271
return True
261272

262-
ttl_seconds = self.ttl.total_seconds()
263-
264273
if self.blocking is False:
265274
self._token = await self._non_blocking_acquire()
266275
if not self._token:
@@ -277,7 +286,8 @@ async def acquire(self) -> bool:
277286
args=[
278287
self._token,
279288
self.instance_id,
280-
ttl_seconds,
289+
self.ttl.total_seconds(),
290+
self.holders_set_ttl.total_seconds(),
281291
],
282292
client=self.redis_client.redis,
283293
)
@@ -361,8 +371,13 @@ async def reacquire(self) -> None:
361371
cls = type(self)
362372
assert cls.renew_script is not None # nosec
363373
result = await cls.renew_script( # pylint: disable=not-callable
364-
keys=[self.holders_set, self.holder_key],
365-
args=[self.instance_id, ttl_seconds],
374+
keys=[self.holders_set, self.holder_key, self.tokens_key],
375+
args=[
376+
self.instance_id,
377+
ttl_seconds,
378+
self.holders_set_ttl.total_seconds(),
379+
self.tokens_set_ttl.total_seconds(),
380+
],
366381
client=self.redis_client.redis,
367382
)
368383

packages/service-library/src/servicelib/redis/lua/acquire_fair_semaphore_v2.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
-- ARGV[1]: token (the token received from BRPOP)
66
-- ARGV[2]: instance_id (the instance trying to acquire the semaphore)
77
-- ARGV[3]: ttl_seconds (for the holder_key)
8+
-- ARGV[4]: holders_set_ttl_seconds (to set expiry on holders set)
89
--
910
-- Returns: {exit_code, status, token, current_count}
1011
-- exit_code: 0 if acquired
@@ -16,6 +17,7 @@ local holder_key = KEYS[2]
1617
local token = ARGV[1]
1718
local instance_id = ARGV[2]
1819
local ttl_seconds = tonumber(ARGV[3])
20+
local holders_set_ttl_seconds = tonumber(ARGV[4])
1921

2022

2123

@@ -24,7 +26,7 @@ redis.call('SADD', holders_key, instance_id)
2426
redis.call('SETEX', holder_key, ttl_seconds, token)
2527

2628
-- Step 2: Set expiry on holders set to prevent infinite growth
27-
redis.call('EXPIRE', holders_key, ttl_seconds * 10)
29+
redis.call('EXPIRE', holders_key, holders_set_ttl_seconds)
2830

2931
local current_count = redis.call('SCARD', holders_key)
3032

packages/service-library/src/servicelib/redis/lua/register_semaphore_holder.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ if tokens_exist == 0 and holders_exist == 0 then
2323
redis.call('LPUSH', tokens_key, 'token_' .. i)
2424
end
2525
-- Set expiry on tokens list to prevent infinite growth
26-
-- redis.call('EXPIRE', tokens_key, ttl_seconds)
26+
redis.call('EXPIRE', tokens_key, ttl_seconds)
2727
end
2828

2929
return 0

packages/service-library/src/servicelib/redis/lua/renew_fair_semaphore_v2.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
-- Renew semaphore holder TTL (simplified for token pool design)
22
-- KEYS[1]: holders_key (SET of current holders)
33
-- KEYS[2]: holder_key (individual holder TTL key for this instance)
4+
-- KEYS[3]: tokens_key (LIST of available tokens)
45
-- ARGV[1]: instance_id
56
-- ARGV[2]: ttl_seconds
7+
-- ARGV[3]: holders_ttl_seconds (to renew holders set)
8+
-- ARGV[4]: tokens_ttl_seconds (to renew tokens list)
69
--
710
-- Returns: {exit_code, status, current_count}
811
-- exit_code: 0 if renewed, 255 if failed
912
-- status: 'renewed', 'not_held', or 'expired'
1013

1114
local holders_key = KEYS[1]
1215
local holder_key = KEYS[2]
16+
local tokens_key = KEYS[3]
1317

1418
local instance_id = ARGV[1]
1519
local ttl_seconds = tonumber(ARGV[2])
20+
local holders_ttl_seconds = tonumber(ARGV[3])
21+
local tokens_ttl_seconds = tonumber(ARGV[4])
1622

1723
-- Step 1: Check if this instance is currently a holder
1824
local is_holder = redis.call('SISMEMBER', holders_key, instance_id)
@@ -32,4 +38,8 @@ end
3238
local token = redis.call('GET', holder_key)
3339
redis.call('SETEX', holder_key, ttl_seconds, token)
3440

41+
-- Step 4: Renew the holders set and tokens list TTLs to prevent infinite growth
42+
redis.call('EXPIRE', holders_key, holders_ttl_seconds)
43+
redis.call('EXPIRE', tokens_key, tokens_ttl_seconds)
44+
3545
return {0, 'renewed', redis.call('SCARD', holders_key)}

packages/service-library/tests/redis/test_semaphore.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
def with_short_default_semaphore_ttl(
3939
mocker: MockerFixture,
4040
) -> datetime.timedelta:
41-
short_ttl = datetime.timedelta(seconds=2)
41+
short_ttl = datetime.timedelta(seconds=5)
4242
mocker.patch(
4343
"servicelib.redis._semaphore.DEFAULT_SEMAPHORE_TTL",
4444
short_ttl,

0 commit comments

Comments
 (0)