diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 2da763f8..d6e6b99b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,7 +13,8 @@ ## New Features -* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set. +- Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set. +- Improve documentation of the `frequenz.channels.experimental.Pipe` ## Bug Fixes diff --git a/src/frequenz/channels/experimental/_pipe.py b/src/frequenz/channels/experimental/_pipe.py index 715d6272..45fa40eb 100644 --- a/src/frequenz/channels/experimental/_pipe.py +++ b/src/frequenz/channels/experimental/_pipe.py @@ -25,17 +25,34 @@ class Pipe(typing.Generic[ChannelMessageT]): Example: ```python - from frequenz.channels import Broadcast, Pipe - - channel1: Broadcast[int] = Broadcast(name="channel1") - channel2: Broadcast[int] = Broadcast(name="channel2") - - receiver_chan1 = channel1.new_receiver() - sender_chan2 = channel2.new_sender() - - async with Pipe(channel2.new_receiver(), channel1.new_sender()): - await sender_chan2.send(10) - assert await receiver_chan1.receive() == 10 + import asyncio + from contextlib import closing, aclosing, AsyncExitStack + + from frequenz.channels import Broadcast, Pipe, Receiver + + async def main() -> None: + # Channels, receivers and Pipe are in AsyncExitStack + # to close and stop them at the end. + async with AsyncExitStack() as stack: + source_channel = await stack.enter_async_context( + aclosing(Broadcast[int](name="source channel")) + ) + source_receiver = stack.enter_context(closing(source_channel.new_receiver())) + + forwarding_channel = await stack.enter_async_context( + aclosing(Broadcast[int](name="forwarding channel")) + ) + await stack.enter_async_context( + Pipe(source_receiver, forwarding_channel.new_sender()) + ) + + receiver = stack.enter_context(closing(forwarding_channel.new_receiver())) + + source_sender = source_channel.new_sender() + await source_sender.send(10) + assert await receiver.receive() == 11 + + asyncio.run(main()) ``` """ @@ -59,8 +76,8 @@ async def __aenter__(self) -> Pipe[ChannelMessageT]: async def __aexit__( self, - _exc_type: typing.Type[BaseException], - _exc: BaseException, + _exc_type: typing.Type[BaseException] | None, + _exc: BaseException | None, _tb: typing.Any, ) -> None: """Exit the runtime context.""" diff --git a/tests/experimental/test_pipe.py b/tests/experimental/test_pipe.py index 9da27926..6775a4d8 100644 --- a/tests/experimental/test_pipe.py +++ b/tests/experimental/test_pipe.py @@ -6,6 +6,7 @@ import asyncio import typing +from contextlib import AsyncExitStack, aclosing, closing from frequenz.channels import Broadcast, Receiver from frequenz.channels.experimental import Pipe @@ -55,3 +56,27 @@ async def test_pipe() -> None: await sender_chan2.send(5) assert await receive_timeout(receiver_chan1) is Timeout assert await receive_timeout(receiver_chan2) == 5 + + +async def test_pipe_with_async_exit_stack() -> None: + """Test how pipe should work with async exit stack.""" + # Channels, receivers and Pipe are in AsyncExitStack + # to close and stop them at the end. + async with AsyncExitStack() as stack: + source_channel = await stack.enter_async_context( + aclosing(Broadcast[int](name="source channel")) + ) + source_receiver = stack.enter_context(closing(source_channel.new_receiver())) + + forwarding_channel = await stack.enter_async_context( + aclosing(Broadcast[int](name="forwarding channel")) + ) + await stack.enter_async_context( + Pipe(source_receiver, forwarding_channel.new_sender()) + ) + + receiver = stack.enter_context(closing(forwarding_channel.new_receiver())) + + source_sender = source_channel.new_sender() + await source_sender.send(10) + assert await receiver.receive() == 10