Skip to content

Commit 50b58b9

Browse files
committed
implementing fair queuing
1 parent 3c0d3ac commit 50b58b9

File tree

2 files changed

+9
-44
lines changed

2 files changed

+9
-44
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
field_validator,
1515
)
1616
from redis.commands.core import AsyncScript
17-
from servicelib.redis._utils import handle_redis_returns_union_types
1817

1918
from ._client import RedisClientSDK
2019
from ._constants import (
@@ -37,6 +36,7 @@
3736
SCRIPT_BAD_EXIT_CODE,
3837
SCRIPT_OK_EXIT_CODE,
3938
)
39+
from ._utils import handle_redis_returns_union_types
4040

4141
_logger = logging.getLogger(__name__)
4242

@@ -150,7 +150,7 @@ def holders_key(self) -> str:
150150
@property
151151
def holder_key(self) -> str:
152152
"""Redis key for this instance's holder entry."""
153-
return f"{SEMAPHORE_KEY_PREFIX}{self.key}:holders:{self.instance_id}"
153+
return f"{SEMAPHORE_HOLDER_KEY_PREFIX}{self.key}:{self.instance_id}"
154154

155155
@computed_field
156156
@property
@@ -200,11 +200,11 @@ async def acquire(self) -> bool:
200200
# Execute the Lua scripts atomically
201201
cls = type(self)
202202
assert cls.register_semaphore is not None # nosec
203-
await cls.register_semaphore(
203+
await cls.register_semaphore( # pylint: disable=not-callable
204204
keys=[self.tokens_key, self.holders_key],
205205
args=[self.capacity, ttl_seconds],
206206
client=self.redis_client.redis,
207-
) # pylint: disable=not-callable
207+
)
208208

209209
try:
210210
# this is blocking pop with timeout
@@ -219,9 +219,11 @@ async def acquire(self) -> bool:
219219
self.key,
220220
self.instance_id,
221221
)
222-
raise SemaphoreAcquisitionError(
223-
name=self.key, capacity=self.capacity
224-
) from e
222+
if self.blocking:
223+
raise SemaphoreAcquisitionError(
224+
name=self.key, capacity=self.capacity
225+
) from e
226+
return False
225227

226228
assert len(tokens_key_token) == 2 # nosec
227229
assert tokens_key_token[0] == self.tokens_key # nosec

packages/service-library/tests/redis/test_semaphore.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -290,43 +290,6 @@ async def _raising_context():
290290
assert await captured_semaphore.get_current_count() == 0
291291

292292

293-
async def test_semaphore_ttl_cleanup(
294-
redis_client_sdk: RedisClientSDK,
295-
semaphore_name: str,
296-
semaphore_capacity: int,
297-
short_ttl: datetime.timedelta,
298-
):
299-
# Create semaphore with explicit short TTL
300-
semaphore = DistributedSemaphore(
301-
redis_client=redis_client_sdk,
302-
key=semaphore_name,
303-
capacity=semaphore_capacity,
304-
ttl=short_ttl,
305-
)
306-
307-
# Manually add an expired entry
308-
expired_instance_id = "expired-instance"
309-
current_time = asyncio.get_event_loop().time()
310-
# Make sure it's definitely expired by using the short TTL
311-
expired_time = current_time - short_ttl.total_seconds() - 1
312-
313-
await redis_client_sdk.redis.zadd(
314-
semaphore.semaphore_key, {expired_instance_id: expired_time}
315-
)
316-
317-
# Verify the entry was added
318-
initial_count = await redis_client_sdk.redis.zcard(semaphore.semaphore_key)
319-
assert initial_count == 1
320-
321-
# Current count should clean up expired entries
322-
count = await semaphore.get_current_count()
323-
assert count == 0
324-
325-
# Verify expired entry was removed
326-
remaining = await redis_client_sdk.redis.zcard(semaphore.semaphore_key)
327-
assert remaining == 0
328-
329-
330293
async def test_multiple_semaphores_different_keys(
331294
redis_client_sdk: RedisClientSDK,
332295
faker: Faker,

0 commit comments

Comments
 (0)