Skip to content

Commit c7e6096

Browse files
authored
Add abstractions for sharing messages between channels (#301)
This PR implements two `experimental` features: - `RelaySender`, which is a `Sender` that, forwards the messages sent to it, to multiple senders. - `Pipe`, which provides a pipe between two channels, by connecting a `Receiver` to a `Sender`. Closes #300
2 parents bfcee7b + 3a314df commit c7e6096

File tree

7 files changed

+261
-0
lines changed

7 files changed

+261
-0
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212

1313
- The `LatestValueCache` class, which used to be internal to the Frequenz SDK, is now available through the channels package.
1414

15+
- **Experimental**: `RelaySender`, which is a `Sender` that forwards the messages sent to it, to multiple senders.
16+
17+
- **Experimental**: `Pipe`, which provides a pipe between two channels, by connecting a `Receiver` to a `Sender`.
18+
1519
## Bug Fixes
1620

1721
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Experimental channel primitives.
5+
6+
Warning:
7+
This package contains experimental channel primitives that are not yet
8+
considered stable. They are subject to change without notice, including
9+
removal, even in minor updates.
10+
"""
11+
12+
from ._pipe import Pipe
13+
from ._relay_sender import RelaySender
14+
15+
__all__ = [
16+
"Pipe",
17+
"RelaySender",
18+
]
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Pipe between a receiver and a sender.
5+
6+
The `Pipe` class takes a receiver and a sender and creates a pipe between them by
7+
forwarding all the messages received by the receiver to the sender.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import asyncio
13+
import typing
14+
15+
from .._generic import ChannelMessageT
16+
from .._receiver import Receiver
17+
from .._sender import Sender
18+
19+
20+
class Pipe(typing.Generic[ChannelMessageT]):
21+
"""A pipe between two channels.
22+
23+
The `Pipe` class takes a receiver and a sender and creates a pipe between them
24+
by forwarding all the messages received by the receiver to the sender.
25+
26+
Example:
27+
```python
28+
from frequenz.channels import Broadcast, Pipe
29+
30+
channel1: Broadcast[int] = Broadcast(name="channel1")
31+
channel2: Broadcast[int] = Broadcast(name="channel2")
32+
33+
receiver_chan1 = channel1.new_receiver()
34+
sender_chan2 = channel2.new_sender()
35+
36+
async with Pipe(channel2.new_receiver(), channel1.new_sender()):
37+
await sender_chan2.send(10)
38+
assert await receiver_chan1.receive() == 10
39+
```
40+
"""
41+
42+
def __init__(
43+
self, receiver: Receiver[ChannelMessageT], sender: Sender[ChannelMessageT]
44+
) -> None:
45+
"""Create a new pipe between two channels.
46+
47+
Args:
48+
receiver: The receiver channel.
49+
sender: The sender channel.
50+
"""
51+
self._sender = sender
52+
self._receiver = receiver
53+
self._task: asyncio.Task[None] | None = None
54+
55+
async def __aenter__(self) -> Pipe[ChannelMessageT]:
56+
"""Enter the runtime context."""
57+
await self.start()
58+
return self
59+
60+
async def __aexit__(
61+
self,
62+
_exc_type: typing.Type[BaseException],
63+
_exc: BaseException,
64+
_tb: typing.Any,
65+
) -> None:
66+
"""Exit the runtime context."""
67+
await self.stop()
68+
69+
async def start(self) -> None:
70+
"""Start this pipe if it is not already running."""
71+
if not self._task or self._task.done():
72+
self._task = asyncio.create_task(self._run())
73+
74+
async def stop(self) -> None:
75+
"""Stop this pipe."""
76+
if self._task and not self._task.done():
77+
self._task.cancel()
78+
try:
79+
await self._task
80+
except asyncio.CancelledError:
81+
pass
82+
83+
async def _run(self) -> None:
84+
async for value in self._receiver:
85+
await self._sender.send(value)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A Sender for sending messages to multiple senders.
5+
6+
The `RelaySender` class takes multiple senders and forwards all the messages sent to it,
7+
to the senders it was created with.
8+
"""
9+
10+
import typing
11+
12+
from .._generic import SenderMessageT_contra
13+
from .._sender import Sender
14+
15+
16+
class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
17+
"""A Sender for sending messages to multiple senders.
18+
19+
The `RelaySender` class takes multiple senders and forwards all the messages sent to
20+
it, to the senders it was created with.
21+
22+
Example:
23+
```python
24+
from frequenz.channels import Broadcast
25+
from frequenz.channels.experimental import RelaySender
26+
27+
channel1: Broadcast[int] = Broadcast(name="channel1")
28+
channel2: Broadcast[int] = Broadcast(name="channel2")
29+
30+
receiver1 = channel1.new_receiver()
31+
receiver2 = channel2.new_receiver()
32+
33+
tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())
34+
35+
await tee_sender.send(5)
36+
assert await receiver1.receive() == 5
37+
assert await receiver2.receive() == 5
38+
```
39+
"""
40+
41+
def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
42+
"""Create a new RelaySender.
43+
44+
Args:
45+
*senders: The senders to send messages to.
46+
"""
47+
self._senders = senders
48+
49+
async def send(self, message: SenderMessageT_contra, /) -> None:
50+
"""Send a message.
51+
52+
Args:
53+
message: The message to be sent.
54+
"""
55+
for sender in self._senders:
56+
await sender.send(message)

tests/experimental/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for experimental channel primitives."""

tests/experimental/test_pipe.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the Pipe class."""
5+
6+
7+
import asyncio
8+
import typing
9+
10+
from frequenz.channels import Broadcast, Receiver
11+
from frequenz.channels.experimental import Pipe
12+
13+
T = typing.TypeVar("T")
14+
15+
16+
class Timeout:
17+
"""Sentinel for timeout."""
18+
19+
20+
async def receive_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[Timeout]:
21+
"""Receive message from receiver with timeout."""
22+
try:
23+
return await asyncio.wait_for(recv.receive(), timeout=timeout)
24+
except asyncio.TimeoutError:
25+
return Timeout
26+
27+
28+
async def test_pipe() -> None:
29+
"""Test pipe."""
30+
channel1: Broadcast[int] = Broadcast(name="channel1")
31+
channel2: Broadcast[int] = Broadcast(name="channel2")
32+
33+
sender_chan1 = channel1.new_sender()
34+
sender_chan2 = channel2.new_sender()
35+
receiver_chan1 = channel1.new_receiver()
36+
receiver_chan2 = channel2.new_receiver()
37+
38+
async with Pipe(channel2.new_receiver(), channel1.new_sender()):
39+
await sender_chan2.send(42)
40+
assert await receive_timeout(receiver_chan1) == 42
41+
assert await receive_timeout(receiver_chan2) == 42
42+
43+
await sender_chan2.send(-2)
44+
assert await receive_timeout(receiver_chan1) == -2
45+
assert await receive_timeout(receiver_chan2) == -2
46+
47+
await sender_chan1.send(43)
48+
assert await receive_timeout(receiver_chan1) == 43
49+
assert await receive_timeout(receiver_chan2) is Timeout
50+
51+
await sender_chan2.send(5)
52+
assert await receive_timeout(receiver_chan1) == 5
53+
assert await receive_timeout(receiver_chan2) == 5
54+
55+
await sender_chan2.send(5)
56+
assert await receive_timeout(receiver_chan1) is Timeout
57+
assert await receive_timeout(receiver_chan2) == 5
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the RelaySender class."""
5+
6+
from frequenz.channels import Broadcast
7+
from frequenz.channels.experimental import RelaySender
8+
9+
10+
async def test_tee_sender() -> None:
11+
"""Test tee sender."""
12+
channel1: Broadcast[int] = Broadcast(name="channel1")
13+
channel2: Broadcast[int] = Broadcast(name="channel2")
14+
channel3: Broadcast[int] = Broadcast(name="channel3")
15+
16+
sender = RelaySender(
17+
channel1.new_sender(), channel2.new_sender(), channel3.new_sender()
18+
)
19+
receiver1 = channel1.new_receiver()
20+
receiver2 = channel2.new_receiver()
21+
receiver3 = channel3.new_receiver()
22+
23+
await sender.send(42)
24+
assert (
25+
await receiver1.receive()
26+
== await receiver2.receive()
27+
== await receiver3.receive()
28+
== 42
29+
)
30+
31+
await sender.send(-2)
32+
assert (
33+
await receiver1.receive()
34+
== await receiver2.receive()
35+
== await receiver3.receive()
36+
== -2
37+
)

0 commit comments

Comments
 (0)