Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

## New Features

* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
- Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
- Improve documentation of the `frequenz.channels.experimental.Pipe`

## Bug Fixes

Expand Down
43 changes: 30 additions & 13 deletions src/frequenz/channels/experimental/_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,34 @@ class Pipe(typing.Generic[ChannelMessageT]):

Example:
```python
from frequenz.channels import Broadcast, Pipe

channel1: Broadcast[int] = Broadcast(name="channel1")
channel2: Broadcast[int] = Broadcast(name="channel2")

receiver_chan1 = channel1.new_receiver()
sender_chan2 = channel2.new_sender()

async with Pipe(channel2.new_receiver(), channel1.new_sender()):
await sender_chan2.send(10)
assert await receiver_chan1.receive() == 10
import asyncio
from contextlib import closing, aclosing, AsyncExitStack

from frequenz.channels import Broadcast, Pipe, Receiver

async def main() -> None:
# Channels, receivers and Pipe are in AsyncExitStack
# to close and stop them at the end.
async with AsyncExitStack() as stack:
source_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="source channel"))
)
source_receiver = stack.enter_context(closing(source_channel.new_receiver()))

forwarding_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="forwarding channel"))
)
await stack.enter_async_context(
Pipe(source_receiver, forwarding_channel.new_sender())
)

receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))

source_sender = source_channel.new_sender()
await source_sender.send(10)
assert await receiver.receive() == 11

asyncio.run(main())
```
"""

Expand All @@ -59,8 +76,8 @@ async def __aenter__(self) -> Pipe[ChannelMessageT]:

async def __aexit__(
self,
_exc_type: typing.Type[BaseException],
_exc: BaseException,
_exc_type: typing.Type[BaseException] | None,
_exc: BaseException | None,
_tb: typing.Any,
) -> None:
"""Exit the runtime context."""
Expand Down
25 changes: 25 additions & 0 deletions tests/experimental/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncio
import typing
from contextlib import AsyncExitStack, aclosing, closing

from frequenz.channels import Broadcast, Receiver
from frequenz.channels.experimental import Pipe
Expand Down Expand Up @@ -55,3 +56,27 @@ async def test_pipe() -> None:
await sender_chan2.send(5)
assert await receive_timeout(receiver_chan1) is Timeout
assert await receive_timeout(receiver_chan2) == 5


async def test_pipe_with_async_exit_stack() -> None:
"""Test how pipe should work with async exit stack."""
# Channels, receivers and Pipe are in AsyncExitStack
# to close and stop them at the end.
async with AsyncExitStack() as stack:
source_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="source channel"))
)
source_receiver = stack.enter_context(closing(source_channel.new_receiver()))

forwarding_channel = await stack.enter_async_context(
aclosing(Broadcast[int](name="forwarding channel"))
)
await stack.enter_async_context(
Pipe(source_receiver, forwarding_channel.new_sender())
)

receiver = stack.enter_context(closing(forwarding_channel.new_receiver()))

source_sender = source_channel.new_sender()
await source_sender.send(10)
assert await receiver.receive() == 10
Loading