Skip to content

Commit e7069da

Browse files
Improve documentation of the frequenz.channels.experimental.Pipe
Documentation shows how to create Pipe inside context manager to cleanup the resources when context is exited. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 8db631a commit e7069da

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111

1212
The name `close()` is still available for backwards compatibility, but it will be removed in the next major release, so it is recommended to switch to `aclose()`.
1313

14+
1415
## New Features
1516

16-
* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
17+
- Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
18+
- Improve documentation of the `frequenz.channels.experimental.Pipe`
1719

1820
## Bug Fixes
1921

src/frequenz/channels/experimental/_pipe.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,24 @@ class Pipe(typing.Generic[ChannelMessageT]):
2525
2626
Example:
2727
```python
28-
from frequenz.channels import Broadcast, Pipe
28+
from contextlib import closing, aclosing, AsyncExitStack
29+
from frequenz.channels import Broadcast, Pipe, Receiver
2930
30-
channel1: Broadcast[int] = Broadcast(name="channel1")
31-
channel2: Broadcast[int] = Broadcast(name="channel2")
31+
async with AsyncExitStack() as stack:
32+
source_channel = await stack.enter_async_context(
33+
aclosing(Broadcast(name="source channel")))
34+
source_receiver = await stack.enter_async_context(
35+
closing(source_channel.new_receiver()))
3236
33-
receiver_chan1 = channel1.new_receiver()
34-
sender_chan2 = channel2.new_sender()
37+
forwarding_channel = await stack.enter_async_context(
38+
aclosing(Broadcast(name="forwarding channel")))
39+
await stack.enter_async_context(Pipe(source_receiver, forwarding_channel.new_sender()))
3540
36-
async with Pipe(channel2.new_receiver(), channel1.new_sender()):
37-
await sender_chan2.send(10)
38-
assert await receiver_chan1.receive() == 10
41+
receiver = await stack.enter_async_context(closing(forwarding_channel.new_receiver()))
42+
43+
source_sender = source_channel.new_sender()
44+
await source_sender.send(10)
45+
assert await receiver.receive() == 10
3946
```
4047
"""
4148

0 commit comments

Comments
 (0)