Skip to content

Commit 44f90bf

Browse files
committed
ensure cancellation happens for real
1 parent 93df0b0 commit 44f90bf

File tree

1 file changed

+25
-18
lines changed

1 file changed

+25
-18
lines changed

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -446,29 +446,37 @@ async def distributed_semaphore(
446446

447447
@periodic(interval=semaphore.ttl / 3, raise_on_error=True)
448448
async def _periodic_reacquisition(
449-
semaphore: DistributedSemaphore, started: asyncio.Event
449+
semaphore: DistributedSemaphore,
450+
started: asyncio.Event,
451+
cancellation_event: asyncio.Event,
450452
) -> None:
451-
await semaphore.reacquire()
453+
if cancellation_event.is_set():
454+
raise asyncio.CancelledError
452455
if not started.is_set():
453456
started.set()
457+
await semaphore.reacquire()
454458

455-
lock_acquisition_time = arrow.utcnow()
456459
try:
457460
if not await semaphore.acquire():
458461
raise SemaphoreAcquisitionError(name=key, instance_id=semaphore.instance_id)
459462

463+
lock_acquisition_time = arrow.utcnow()
464+
460465
async with (
461466
asyncio.TaskGroup() as tg
462467
): # NOTE: using task group ensures proper cancellation propagation of parent task
463468
auto_reacquisition_started = asyncio.Event()
469+
cancellation_event = asyncio.Event()
464470
auto_reacquisition_task = tg.create_task(
465-
_periodic_reacquisition(semaphore, auto_reacquisition_started),
471+
_periodic_reacquisition(
472+
semaphore, auto_reacquisition_started, cancellation_event
473+
),
466474
name=f"semaphore/auto_reacquisition_task_{semaphore.key}_{semaphore.instance_id}",
467475
)
468476
await auto_reacquisition_started.wait()
469477

470478
yield semaphore
471-
479+
cancellation_event.set() # NOTE: this ensure cancellation is effective
472480
await cancel_wait_task(auto_reacquisition_task)
473481
except BaseExceptionGroup as eg:
474482
semaphore_errors, other_errors = eg.split(SemaphoreError)
@@ -508,16 +516,15 @@ async def _periodic_reacquisition(
508516
"Look for synchronouse code or the loop is very busy and cannot schedule the reacquisition task.",
509517
)
510518
)
511-
finally:
512-
lock_release_time = arrow.utcnow()
513-
locking_time = lock_release_time - lock_acquisition_time
514-
if locking_time > expected_lock_overall_time:
515-
_logger.warning(
516-
"Semaphore '%s' was held for %s by %s which is longer than expected (%s). "
517-
"TIP: consider reducing the locking time by optimizing the code inside "
518-
"the critical section or increasing the default locking time",
519-
semaphore.key,
520-
locking_time,
521-
semaphore.instance_id,
522-
expected_lock_overall_time,
523-
)
519+
lock_release_time = arrow.utcnow()
520+
locking_time = lock_release_time - lock_acquisition_time
521+
if locking_time > expected_lock_overall_time:
522+
_logger.warning(
523+
"Semaphore '%s' was held for %s by %s which is longer than expected (%s). "
524+
"TIP: consider reducing the locking time by optimizing the code inside "
525+
"the critical section or increasing the default locking time",
526+
semaphore.key,
527+
locking_time,
528+
semaphore.instance_id,
529+
expected_lock_overall_time,
530+
)

0 commit comments

Comments
 (0)