Skip to content

Commit 9e67b90

Browse files
Improve documentation of the frequenz.channels.experimental.Pipe (#407)
Documentation shows how to create Pipe inside context manager to cleanup the resources when context is exited.
2 parents 8db631a + 9b4b7d7 commit 9e67b90

File tree

3 files changed

+57
-14
lines changed

3 files changed

+57
-14
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313

1414
## New Features
1515

16-
* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
16+
- Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
17+
- Improve documentation of the `frequenz.channels.experimental.Pipe`
1718

1819
## Bug Fixes
1920

src/frequenz/channels/experimental/_pipe.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,34 @@ class Pipe(typing.Generic[ChannelMessageT]):
2525
2626
Example:
2727
```python
28-
from frequenz.channels import Broadcast, Pipe
29-
30-
channel1: Broadcast[int] = Broadcast(name="channel1")
31-
channel2: Broadcast[int] = Broadcast(name="channel2")
32-
33-
receiver_chan1 = channel1.new_receiver()
34-
sender_chan2 = channel2.new_sender()
35-
36-
async with Pipe(channel2.new_receiver(), channel1.new_sender()):
37-
await sender_chan2.send(10)
38-
assert await receiver_chan1.receive() == 10
28+
import asyncio
29+
from contextlib import closing, aclosing, AsyncExitStack
30+
31+
from frequenz.channels import Broadcast, Pipe, Receiver
32+
33+
async def main() -> None:
34+
# Channels, receivers and Pipe are in AsyncExitStack
35+
# to close and stop them at the end.
36+
async with AsyncExitStack() as stack:
37+
source_channel = await stack.enter_async_context(
38+
aclosing(Broadcast[int](name="source channel"))
39+
)
40+
source_receiver = stack.enter_context(closing(source_channel.new_receiver()))
41+
42+
forwarding_channel = await stack.enter_async_context(
43+
aclosing(Broadcast[int](name="forwarding channel"))
44+
)
45+
await stack.enter_async_context(
46+
Pipe(source_receiver, forwarding_channel.new_sender())
47+
)
48+
49+
receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))
50+
51+
source_sender = source_channel.new_sender()
52+
await source_sender.send(10)
53+
assert await receiver.receive() == 11
54+
55+
asyncio.run(main())
3956
```
4057
"""
4158

@@ -59,8 +76,8 @@ async def __aenter__(self) -> Pipe[ChannelMessageT]:
5976

6077
async def __aexit__(
6178
self,
62-
_exc_type: typing.Type[BaseException],
63-
_exc: BaseException,
79+
_exc_type: typing.Type[BaseException] | None,
80+
_exc: BaseException | None,
6481
_tb: typing.Any,
6582
) -> None:
6683
"""Exit the runtime context."""

tests/experimental/test_pipe.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import asyncio
88
import typing
9+
from contextlib import AsyncExitStack, aclosing, closing
910

1011
from frequenz.channels import Broadcast, Receiver
1112
from frequenz.channels.experimental import Pipe
@@ -55,3 +56,27 @@ async def test_pipe() -> None:
5556
await sender_chan2.send(5)
5657
assert await receive_timeout(receiver_chan1) is Timeout
5758
assert await receive_timeout(receiver_chan2) == 5
59+
60+
61+
async def test_pipe_with_async_exit_stack() -> None:
62+
"""Test how pipe should work with async exit stack."""
63+
# Channels, receivers and Pipe are in AsyncExitStack
64+
# to close and stop them at the end.
65+
async with AsyncExitStack() as stack:
66+
source_channel = await stack.enter_async_context(
67+
aclosing(Broadcast[int](name="source channel"))
68+
)
69+
source_receiver = stack.enter_context(closing(source_channel.new_receiver()))
70+
71+
forwarding_channel = await stack.enter_async_context(
72+
aclosing(Broadcast[int](name="forwarding channel"))
73+
)
74+
await stack.enter_async_context(
75+
Pipe(source_receiver, forwarding_channel.new_sender())
76+
)
77+
78+
receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))
79+
80+
source_sender = source_channel.new_sender()
81+
await source_sender.send(10)
82+
assert await receiver.receive() == 10

0 commit comments

Comments
 (0)