Skip to content

Commit 4ab783f

Browse files
committed
this now works
1 parent d612b3d commit 4ab783f

File tree

4 files changed

+25
-16
lines changed

4 files changed

+25
-16
lines changed

packages/common-library/src/common_library/async_tools.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,6 @@ async def cancel_wait_task(
111111
# owner function is being cancelled -> propagate cancellation
112112
raise
113113

114-
# else: task cancellation is complete, we can safely ignore it
115-
116114

117115
def delayed_start(
118116
delay: datetime.timedelta,

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ async def acquire(self) -> bool:
177177
else stop_never
178178
),
179179
retry=retry_if_not_result(lambda acquired: acquired),
180-
before_sleep=before_sleep_log(_logger, logging.INFO),
180+
before_sleep=before_sleep_log(_logger, logging.DEBUG),
181181
)
182182
async def _blocking_acquire() -> bool:
183183
return await self._try_acquire()
@@ -287,23 +287,31 @@ async def reacquire(self) -> None:
287287

288288
# Lua script returns: 'renewed' or status message
289289
if status == "renewed":
290+
assert success == 1 # nosec
290291
_logger.debug(
291-
"Renewed semaphore '%s' (instance: %s)",
292+
"Renewed semaphore '%s' (instance: %s, count: %s, expired: %s)",
292293
self.key,
293294
self.instance_id,
295+
current_count,
296+
expired_count,
294297
)
295298
else:
299+
assert success == 0 # nosec
296300
if status == "expired":
297301
_logger.warning(
298-
"Semaphore '%s' holder key expired for instance %s",
302+
"Semaphore '%s' holder key expired (instance: %s, count: %s, expired: %s)",
299303
self.key,
300304
self.instance_id,
305+
current_count,
306+
expired_count,
301307
)
302308
elif status == "not_held":
303309
_logger.warning(
304-
"Semaphore '%s' not held by instance %s",
310+
"Semaphore '%s' not held (instance: %s, count: %s, expired: %s)",
305311
self.key,
306312
self.instance_id,
313+
current_count,
314+
expired_count,
307315
)
308316

309317
raise SemaphoreLostError(name=self.key, instance_id=self.instance_id)

packages/service-library/src/servicelib/redis/_semaphore_decorator.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from collections.abc import Callable, Coroutine
77
from typing import Any, ParamSpec, TypeVar
88

9+
from common_library.async_tools import cancel_wait_task
10+
911
from ..background_task import periodic
1012
from ..logging_errors import create_troubleshootting_log_kwargs
1113
from ._client import RedisClientSDK
@@ -113,7 +115,8 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
113115
@periodic(interval=ttl / 3, raise_on_error=True)
114116
async def _periodic_renewer() -> None:
115117
await semaphore.reacquire()
116-
started_event.set()
118+
if not started_event.is_set():
119+
started_event.set()
117120

118121
# Start the renewal task
119122
renewal_task = tg.create_task(
@@ -129,11 +132,10 @@ async def _periodic_renewer() -> None:
129132
coro(*args, **kwargs),
130133
name=f"semaphore/work_{coro.__module__}.{coro.__name__}",
131134
)
132-
133135
result = await work_task
134136

135137
# Cancel renewal task (work is done)
136-
renewal_task.cancel()
138+
await cancel_wait_task(renewal_task, max_delay=None)
137139

138140
return result
139141

packages/service-library/tests/redis/test_semaphore_decorator.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,16 +351,15 @@ async def function_raising_cancelled_error():
351351
await function_raising_cancelled_error()
352352

353353

354-
# @pytest.mark.skip
355354
async def test_with_large_capacity(
356355
redis_client_sdk: RedisClientSDK,
357356
semaphore_name: str,
358357
):
359358
large_capacity = 100
360359
concurrent_count = 0
361360
max_concurrent = 0
362-
sleep_time_s = 10
363-
num_tasks = 400
361+
sleep_time_s = 5
362+
num_tasks = 1000
364363

365364
@with_limited_concurrency(
366365
redis_client_sdk,
@@ -380,10 +379,12 @@ async def limited_function() -> None:
380379

381380
# Start tasks equal to the large capacity
382381
tasks = [asyncio.create_task(limited_function()) for _ in range(num_tasks)]
383-
async with asyncio.timeout(
384-
float(num_tasks) / float(large_capacity) * 10.0 * float(sleep_time_s)
385-
):
386-
await asyncio.gather(*tasks)
382+
done, pending = await asyncio.wait(
383+
tasks,
384+
timeout=float(num_tasks) / float(large_capacity) * 10.0 * float(sleep_time_s),
385+
)
386+
assert not pending, f"Some tasks did not complete: {len(pending)} pending"
387+
assert len(done) == num_tasks
387388

388389
# Should never exceed the large capacity
389390
assert max_concurrent <= large_capacity

0 commit comments

Comments
 (0)