Skip to content

Commit 8398261

Browse files
Improve documentation of the frequenz.channels.experimental.Pipe
Documentation shows how to create Pipe inside context manager to cleanup the resources when context is exited. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 8db631a commit 8398261

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313

1414
## New Features
1515

16-
* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
16+
- Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
17+
- Improve documentation of the `frequenz.channels.experimental.Pipe`
1718

1819
## Bug Fixes
1920

src/frequenz/channels/experimental/_pipe.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,27 @@ class Pipe(typing.Generic[ChannelMessageT]):
2525
2626
Example:
2727
```python
28-
from frequenz.channels import Broadcast, Pipe
28+
from contextlib import closing, aclosing, AsyncExitStack
29+
from frequenz.channels import Broadcast, Pipe, Receiver
2930
30-
channel1: Broadcast[int] = Broadcast(name="channel1")
31-
channel2: Broadcast[int] = Broadcast(name="channel2")
31+
# Channels, receivers and Pipe are in AsyncExitStack
32+
# to close and stop them at the end.
3233
33-
receiver_chan1 = channel1.new_receiver()
34-
sender_chan2 = channel2.new_sender()
34+
async with AsyncExitStack() as stack:
35+
source_channel = await stack.enter_async_context(
36+
aclosing(Broadcast(name="source channel")))
37+
source_receiver = await stack.enter_async_context(
38+
closing(source_channel.new_receiver()))
3539
36-
async with Pipe(channel2.new_receiver(), channel1.new_sender()):
37-
await sender_chan2.send(10)
38-
assert await receiver_chan1.receive() == 10
40+
forwarding_channel = await stack.enter_async_context(
41+
aclosing(Broadcast(name="forwarding channel")))
42+
await stack.enter_async_context(Pipe(source_receiver, forwarding_channel.new_sender()))
43+
44+
receiver = await stack.enter_async_context(closing(forwarding_channel.new_receiver()))
45+
46+
source_sender = source_channel.new_sender()
47+
await source_sender.send(10)
48+
assert await receiver.receive() == 10
3949
```
4050
"""
4151

0 commit comments

Comments
 (0)