File tree Expand file tree Collapse file tree 3 files changed +34
-7
lines changed
src/frequenz/channels/experimental Expand file tree Collapse file tree 3 files changed +34
-7
lines changed Original file line number Diff line number Diff line change 1414
1515## Bug Fixes
1616
17- <!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+ - Fix ` NopReceiver.ready() ` to properly terminate when receiver is closed.
Original file line number Diff line number Diff line change @@ -20,7 +20,9 @@ class NopReceiver(Receiver[ReceiverMessageT_co]):
2020
2121 def __init__ (self ) -> None :
2222 """Initialize this instance."""
23- self ._closed : bool = False
23+ self ._ready_future : asyncio .Future [bool ] = (
24+ asyncio .get_running_loop ().create_future ()
25+ )
2426
2527 @override
2628 async def ready (self ) -> bool :
@@ -29,10 +31,9 @@ async def ready(self) -> bool:
2931 Returns:
3032 Whether the receiver is still active.
3133 """
32- if self ._closed :
34+ if self ._ready_future . done () :
3335 return False
34- await asyncio .Future ()
35- return False
36+ return await self ._ready_future
3637
3738 @override
3839 def consume (self ) -> ReceiverMessageT_co : # noqa: DOC503 (raised indirectly)
@@ -47,11 +48,12 @@ def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
4748 ReceiverStoppedError: If the receiver stopped producing messages.
4849 ReceiverError: If there is some problem with the underlying receiver.
4950 """
50- if self ._closed :
51+ if self ._ready_future . done () :
5152 raise ReceiverStoppedError (self )
5253 raise ReceiverError ("`consume()` must be preceded by a call to `ready()`" , self )
5354
5455 @override
5556 def close (self ) -> None :
5657 """Stop the receiver."""
57- self ._closed = True
58+ if not self ._ready_future .done ():
59+ self ._ready_future .set_result (False )
Original file line number Diff line number Diff line change @@ -38,3 +38,28 @@ async def test_consuming_raises() -> None:
3838 receiver .close ()
3939 with pytest .raises (ReceiverStoppedError ):
4040 receiver .consume ()
41+
42+
43+ async def test_close_method_effect_on_ready () -> None :
44+ """Test `ready()` terminates when the receiver is closed.
45+
46+ When the receiver is closed, `ready()` should return False.
47+ This test verifies that:
48+ 1. `ready()` returns False immediately after `close()` is called
49+ 2. `ready()` and `close()` can be called multiple times
50+ """
51+ receiver = NopReceiver [int ]()
52+
53+ # Create a task that waits for the receiver to be ready.
54+ task = asyncio .create_task (receiver .ready ())
55+
56+ # Wait for the task to start.
57+ await asyncio .sleep (0.1 )
58+
59+ # Close the receiver and wait for the task to complete.
60+ receiver .close ()
61+ assert await asyncio .wait_for (task , timeout = 0.1 ) is False
62+
63+ # Second call to `close()` or `ready()` should not raise an error.
64+ receiver .close ()
65+ assert await asyncio .wait_for (receiver .ready (), timeout = 0.1 ) is False
You can’t perform that action at this time.
0 commit comments