@@ -353,9 +353,75 @@ async def test_semaphore_multiple_instances_capacity_limit(
353353 await semaphores [2 ].release ()
354354
355355
356+ async def test_semaphore_with_timeout (
357+ redis_client_sdk : RedisClientSDK ,
358+ semaphore_name : str ,
359+ ):
360+ timeout = datetime .timedelta (seconds = 0.5 )
361+ semaphore1 = DistributedSemaphore (
362+ redis_client = redis_client_sdk ,
363+ key = semaphore_name ,
364+ capacity = 1 ,
365+ blocking_timeout = timeout ,
366+ )
367+ assert await semaphore1 .acquire () is True
368+ assert await semaphore1 .is_acquired () is True
369+ await _assert_semaphore_redis_state (
370+ redis_client_sdk ,
371+ semaphore1 ,
372+ expected_count = 1 ,
373+ expected_free_tokens = 0 ,
374+ )
375+ semaphore2 = DistributedSemaphore (
376+ redis_client = redis_client_sdk ,
377+ key = semaphore_name ,
378+ capacity = 1 ,
379+ blocking_timeout = timeout ,
380+ )
381+ # Second should timeout
382+ with pytest .raises (SemaphoreAcquisitionError ):
383+ await semaphore2 .acquire ()
384+ assert await semaphore2 .is_acquired () is False
385+ await _assert_semaphore_redis_state (
386+ redis_client_sdk ,
387+ semaphore1 ,
388+ expected_count = 1 ,
389+ expected_free_tokens = 0 ,
390+ )
391+
392+
356393async def test_semaphore_context_manager (
357394 redis_client_sdk : RedisClientSDK ,
358395 semaphore_name : str ,
396+ ):
397+ async with distributed_semaphore (
398+ redis_client = redis_client_sdk ,
399+ key = semaphore_name ,
400+ capacity = 1 ,
401+ ) as semaphore1 :
402+ assert await semaphore1 .is_acquired () is True
403+ assert await semaphore1 .current_count () == 1
404+ assert await semaphore1 .available_tokens () == 0
405+ await _assert_semaphore_redis_state (
406+ redis_client_sdk ,
407+ semaphore1 ,
408+ expected_count = 1 ,
409+ expected_free_tokens = 0 ,
410+ )
411+ assert await semaphore1 .is_acquired () is False
412+ assert await semaphore1 .current_count () == 0
413+ assert await semaphore1 .available_tokens () == 1
414+ await _assert_semaphore_redis_state (
415+ redis_client_sdk ,
416+ semaphore1 ,
417+ expected_count = 0 ,
418+ expected_free_tokens = 1 ,
419+ )
420+
421+
422+ async def test_semaphore_context_manager_with_timeout (
423+ redis_client_sdk : RedisClientSDK ,
424+ semaphore_name : str ,
359425):
360426 capacity = 1
361427 timeout = datetime .timedelta (seconds = 0.1 )
@@ -375,20 +441,34 @@ async def test_semaphore_context_manager(
375441 expected_count = 1 ,
376442 expected_free_tokens = 0 ,
377443 )
378-
379- # Second semaphore should timeout
444+ # Second semaphore should raise on timeout
445+ with pytest .raises (SemaphoreAcquisitionError ):
446+ async with distributed_semaphore (
447+ redis_client = redis_client_sdk ,
448+ key = semaphore_name ,
449+ capacity = capacity ,
450+ blocking = True ,
451+ blocking_timeout = timeout ,
452+ ):
453+ ...
454+
455+ # non-blocking should also raise when used with context manager
456+ with pytest .raises (SemaphoreAcquisitionError ):
457+ async with distributed_semaphore (
458+ redis_client = redis_client_sdk ,
459+ key = semaphore_name ,
460+ capacity = capacity ,
461+ blocking = False ,
462+ ):
463+ ...
464+ # using the semaphore directly should in non-blocking mode should return False
380465 semaphore2 = DistributedSemaphore (
381466 redis_client = redis_client_sdk ,
382467 key = semaphore_name ,
383468 capacity = capacity ,
384- blocking_timeout = timeout ,
469+ blocking = False ,
385470 )
386-
387- with pytest .raises (
388- SemaphoreAcquisitionError ,
389- match = f"Could not acquire semaphore '{ semaphore_name } ' by this instance" ,
390- ):
391- await semaphore2 .acquire ()
471+ assert await semaphore2 .acquire () is False
392472
393473 # now try infinite timeout
394474 semaphore3 = DistributedSemaphore (
@@ -402,36 +482,6 @@ async def test_semaphore_context_manager(
402482 assert not acquire_task .done ()
403483
404484
405- async def test_semaphore_blocking_acquire_waits (
406- redis_client_sdk : RedisClientSDK ,
407- semaphore_name : str ,
408- ):
409- capacity = 1
410- semaphore1 = DistributedSemaphore (
411- redis_client = redis_client_sdk , key = semaphore_name , capacity = capacity
412- )
413- semaphore2 = DistributedSemaphore (
414- redis_client = redis_client_sdk , key = semaphore_name , capacity = capacity
415- )
416-
417- # First acquires immediately
418- await semaphore1 .acquire ()
419-
420- # Second will wait
421- async def delayed_release () -> None :
422- await asyncio .sleep (0.1 )
423- await semaphore1 .release ()
424-
425- acquire_task = asyncio .create_task (semaphore2 .acquire ())
426- release_task = asyncio .create_task (delayed_release ())
427-
428- # Both should complete successfully
429- results = await asyncio .gather (acquire_task , release_task )
430- assert results [0 ] is True # acquire succeeded
431-
432- await semaphore2 .release ()
433-
434-
435485@pytest .mark .parametrize (
436486 "exception" ,
437487 [RuntimeError , asyncio .CancelledError ],
0 commit comments