Skip to content

Commit d9dcba9

Browse files
committed
test renewal fail
1 parent 3195f6c commit d9dcba9

File tree

3 files changed

+38
-34
lines changed

3 files changed

+38
-34
lines changed

packages/service-library/src/servicelib/redis/_errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,7 @@ class SemaphoreAcquisitionError(BaseRedisError):
3232

3333
class SemaphoreNotAcquiredError(BaseRedisError):
3434
msg_template: str = "Semaphore '{name}' was not acquired by this instance"
35+
36+
37+
class SemaphoreLostError(BaseRedisError):
38+
msg_template: str = "Semaphore '{name}' was lost by this instance `{instance_id}`"

packages/service-library/src/servicelib/redis/_semaphore_decorator.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
DEFAULT_SEMAPHORE_TTL,
1414
DEFAULT_SOCKET_TIMEOUT,
1515
)
16-
from ._errors import SemaphoreAcquisitionError, SemaphoreNotAcquiredError
16+
from ._errors import (
17+
SemaphoreAcquisitionError,
18+
SemaphoreLostError,
19+
SemaphoreNotAcquiredError,
20+
)
1721
from ._semaphore import DistributedSemaphore
1822

1923
_logger = logging.getLogger(__name__)
@@ -39,7 +43,14 @@ async def _renew_semaphore_entry(semaphore: DistributedSemaphore) -> None:
3943
async with semaphore.redis_client.redis.pipeline(transaction=True) as pipe:
4044
await pipe.zadd(semaphore.semaphore_key, {semaphore.instance_id: current_time})
4145
await pipe.expire(semaphore.holder_key, int(ttl_seconds))
42-
await pipe.execute()
46+
results = await pipe.execute()
47+
48+
# Check if EXPIRE succeeded (returns 1 if key existed, 0 if not)
49+
expire_result = results[1]
50+
if expire_result == 0:
51+
raise SemaphoreLostError(
52+
name=semaphore.key, instance_id=semaphore.instance_id
53+
)
4354

4455

4556
P = ParamSpec("P")
@@ -164,9 +175,7 @@ async def _periodic_renewer() -> None:
164175
# and cancel the work task automatically
165176

166177
# Re-raise the first exception in the group
167-
if eg.exceptions:
168-
raise eg.exceptions[0] from eg
169-
raise
178+
raise eg.exceptions[0] from eg
170179

171180
finally:
172181
# Always release the semaphore

packages/service-library/tests/redis/test_semaphore.py

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import asyncio
99
import datetime
10+
from typing import Literal
1011
from unittest import mock
1112

1213
import pytest
@@ -18,12 +19,13 @@
1819
SEMAPHORE_HOLDER_KEY_PREFIX,
1920
SEMAPHORE_KEY_PREFIX,
2021
)
22+
from servicelib.redis._errors import SemaphoreLostError
2123
from servicelib.redis._semaphore import (
2224
DistributedSemaphore,
2325
SemaphoreAcquisitionError,
2426
SemaphoreNotAcquiredError,
25-
with_limited_concurrency,
2627
)
28+
from servicelib.redis._semaphore_decorator import with_limited_concurrency
2729

2830
pytest_simcore_core_services_selection = [
2931
"redis",
@@ -318,7 +320,7 @@ async def test_semaphore_auto_renewal_via_decorator(
318320
capacity=semaphore_capacity,
319321
ttl=short_ttl,
320322
)
321-
async def long_running_work():
323+
async def long_running_work() -> Literal["success"]:
322324
work_started.set()
323325
# Wait longer than TTL to ensure renewal works
324326
await asyncio.sleep(short_ttl.total_seconds() * 2)
@@ -354,52 +356,41 @@ async def test_decorator_auto_renewal_failure_propagation(
354356
semaphore_name: str,
355357
semaphore_capacity: int,
356358
short_ttl: datetime.timedelta,
357-
monkeypatch: pytest.MonkeyPatch,
358359
):
359360
"""Test that auto-renewal failures properly propagate as exceptions in the decorator"""
360-
from servicelib.redis._semaphore_decorator import _renew_semaphore_entry
361-
362-
class RenewalFailureError(Exception):
363-
"""Custom exception for testing renewal failures"""
364361

365362
work_started = asyncio.Event()
366363

367-
# Mock the renewal function to fail after first call
368-
call_count = 0
369-
original_renew = _renew_semaphore_entry
370-
371-
async def failing_renew_semaphore_entry(semaphore):
372-
nonlocal call_count
373-
call_count += 1
374-
if call_count <= 1:
375-
# First call succeeds
376-
await original_renew(semaphore)
377-
else:
378-
# Subsequent calls fail
379-
raise RenewalFailureError("Simulated renewal failure")
380-
381-
monkeypatch.setattr(
382-
"servicelib.redis._semaphore_decorator._renew_semaphore_entry",
383-
failing_renew_semaphore_entry,
384-
)
385-
386364
@with_limited_concurrency(
387365
redis_client_sdk,
388366
key=semaphore_name,
389367
capacity=semaphore_capacity,
390368
ttl=short_ttl,
391369
)
392-
async def work_that_should_fail():
370+
async def work_that_should_fail() -> Literal["should not reach here"]:
393371
work_started.set()
394372
# Wait long enough for renewal to be attempted multiple times
395-
await asyncio.sleep(short_ttl.total_seconds() * 1.5)
373+
await asyncio.sleep(short_ttl.total_seconds() * 4)
396374
return "should not reach here"
397375

398376
# The decorator should propagate the renewal failure
399377
task = asyncio.create_task(work_that_should_fail())
400378
await work_started.wait() # Wait for work to start
401379

402-
with pytest.raises(RenewalFailureError, match="Simulated renewal failure"):
380+
# Wait for the first renewal interval to pass
381+
renewal_interval = short_ttl / 3
382+
await asyncio.sleep(
383+
renewal_interval.total_seconds() + 1
384+
) # Wait for renewal to happen
385+
386+
# Find and delete all holder keys for this semaphore
387+
holder_keys = await redis_client_sdk.redis.keys(
388+
f"{SEMAPHORE_HOLDER_KEY_PREFIX}{semaphore_name}:*"
389+
)
390+
assert holder_keys, "Holder keys should exist before deletion"
391+
await redis_client_sdk.redis.delete(*holder_keys)
392+
393+
with pytest.raises(SemaphoreLostError):
403394
await task # This should raise the renewal failure exception
404395

405396

0 commit comments

Comments
 (0)