@@ -20,9 +20,7 @@ class NopReceiver(Receiver[ReceiverMessageT_co]):
2020
2121 def __init__ (self ) -> None :
2222 """Initialize this instance."""
23- self ._ready_future : asyncio .Future [bool ] = (
24- asyncio .get_running_loop ().create_future ()
25- )
23+ self ._close_event : asyncio .Event = asyncio .Event ()
2624
2725 @override
2826 async def ready (self ) -> bool :
@@ -31,9 +29,10 @@ async def ready(self) -> bool:
3129 Returns:
3230 Whether the receiver is still active.
3331 """
34- if self ._ready_future . done ():
32+ if self ._close_event . is_set ():
3533 return False
36- return await self ._ready_future
34+ await self ._close_event .wait ()
35+ return False
3736
3837 @override
3938 def consume (self ) -> ReceiverMessageT_co : # noqa: DOC503 (raised indirectly)
@@ -48,12 +47,11 @@ def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
4847 ReceiverStoppedError: If the receiver stopped producing messages.
4948 ReceiverError: If there is some problem with the underlying receiver.
5049 """
51- if self ._ready_future . done ():
50+ if self ._close_event . is_set ():
5251 raise ReceiverStoppedError (self )
5352 raise ReceiverError ("`consume()` must be preceded by a call to `ready()`" , self )
5453
5554 @override
5655 def close (self ) -> None :
5756 """Stop the receiver."""
58- if not self ._ready_future .done ():
59- self ._ready_future .set_result (False )
57+ self ._close_event .set ()
0 commit comments