@@ -25,17 +25,34 @@ class Pipe(typing.Generic[ChannelMessageT]):
2525
2626    Example: 
2727        ```python 
28+         from contextlib import asynccontextmanager 
29+         from typing import AsyncIterator 
2830        from frequenz.channels import Broadcast, Pipe 
2931
30-         channel1: Broadcast[int] = Broadcast(name="channel1") 
31-         channel2: Broadcast[int] = Broadcast(name="channel2") 
32- 
33-         receiver_chan1 = channel1.new_receiver() 
34-         sender_chan2 = channel2.new_sender() 
35- 
36-         async with Pipe(channel2.new_receiver(), channel1.new_sender()): 
37-             await sender_chan2.send(10) 
38-             assert await receiver_chan1.receive() == 10 
32+         @asynccontextmanager 
33+         async def create_forwarding_pipe( 
34+             receiver: Receiver[int], 
35+         ) -> AsyncIterator[Broadcast[int]]: 
36+             forwarding_channel: Broadcast[int] = Broadcast(name="forwarded channel") 
37+             forwarding_sender = forwarding_channel.new_sender() 
38+             try: 
39+                 async with Pipe(receiver, forwarding_sender): 
40+                     yield forwarding_channel 
41+             finally: 
42+                 receiver.close() 
43+                 await forwarding_channel.close() 
44+ 
45+         source_channel: Broadcast[int] = Broadcast(name="source channel") 
46+         source_sender = source_channel.new_sender() 
47+ 
48+         # Create a pipe in a context manager to ensure the 
49+         # resources are cleaned up when the context is exited. 
50+         async with create_forwarding_pipe( 
51+             source_channel.new_receiver() 
52+         ) as forwarding_channel: 
53+             receiver = forwarding_channel.new_receiver() 
54+             await source_sender.send(10) 
55+             assert await receiver.receive() == 10 
3956        ``` 
4057    """ 
4158
0 commit comments