Skip to content

Commit 66559d7

Browse files
Improve documentation of the frequenz.channels.experimental.Pipe
Documentation shows how to create Pipe inside context manager to cleanup the resources when context is exited. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent c4f8acd commit 66559d7

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
## Upgrading
88

99
- Some minimal dependencies have been bumped, so you might need to adjust your dependencies accordingly.
10+
- Improve documentation of the frequenz.channels.experimental.Pipe
1011

1112
## New Features
1213

src/frequenz/channels/experimental/_pipe.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,34 @@ class Pipe(typing.Generic[ChannelMessageT]):
2525
2626
Example:
2727
```python
28+
from contextlib import asynccontextmanager
29+
from typing import AsyncIterator
2830
from frequenz.channels import Broadcast, Pipe
2931
30-
channel1: Broadcast[int] = Broadcast(name="channel1")
31-
channel2: Broadcast[int] = Broadcast(name="channel2")
32-
33-
receiver_chan1 = channel1.new_receiver()
34-
sender_chan2 = channel2.new_sender()
35-
36-
async with Pipe(channel2.new_receiver(), channel1.new_sender()):
37-
await sender_chan2.send(10)
38-
assert await receiver_chan1.receive() == 10
32+
@asynccontextmanager
33+
async def create_forwarding_pipe(
34+
receiver: Receiver[int],
35+
) -> AsyncIterator[Broadcast[int]]:
36+
forwarding_channel: Broadcast[int] = Broadcast(name="forwarded channel")
37+
forwarding_sender = forwarding_channel.new_sender()
38+
try:
39+
async with Pipe(receiver, forwarding_sender):
40+
yield forwarding_channel
41+
finally:
42+
receiver.close()
43+
await forwarding_channel.close()
44+
45+
source_channel: Broadcast[int] = Broadcast(name="source channel")
46+
source_sender = source_channel.new_sender()
47+
48+
# Create a pipe in a context manager to ensure the
49+
# resources are cleaned up when the context is exited.
50+
async with create_forwarding_pipe(
51+
source_channel.new_receiver()
52+
) as forwarding_channel:
53+
receiver = forwarding_channel.new_receiver()
54+
await source_sender.send(10)
55+
assert await receiver.receive() == 10
3956
```
4057
"""
4158

0 commit comments

Comments
 (0)