Skip to content

Commit 0465a03

Browse files
committed
Add a RelaySender implementation
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 6f775c4 commit 0465a03

File tree

3 files changed

+99
-0
lines changed

3 files changed

+99
-0
lines changed

src/frequenz/channels/experimental/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,9 @@
1010
"""
1111

1212
from ._pipe import Pipe
13+
from ._relay_sender import RelaySender
14+
15+
__all__ = [
16+
"Pipe",
17+
"RelaySender",
18+
]
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A Sender for sending messages to multiple senders.
5+
6+
The `RelaySender` class takes multiple senders and forwards all the messages sent to it,
7+
to the senders it was created with.
8+
"""
9+
10+
import typing
11+
12+
from .._generic import SenderMessageT_contra
13+
from .._sender import Sender
14+
15+
16+
class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
17+
"""A Sender for sending messages to multiple senders.
18+
19+
The `RelaySender` class takes multiple senders and forwards all the messages sent to
20+
it, to the senders it was created with.
21+
22+
Example:
23+
```python
24+
from frequenz.channels import Broadcast
25+
from frequenz.channels.experimental import RelaySender
26+
27+
channel1: Broadcast[int] = Broadcast(name="channel1")
28+
channel2: Broadcast[int] = Broadcast(name="channel2")
29+
30+
receiver1 = channel1.new_receiver()
31+
receiver2 = channel2.new_receiver()
32+
33+
tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender())
34+
35+
await tee_sender.send(5)
36+
assert await receiver1.receive() == 5
37+
assert await receiver2.receive() == 5
38+
```
39+
"""
40+
41+
def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
42+
"""Create a new RelaySender.
43+
44+
Args:
45+
*senders: The senders to send messages to.
46+
"""
47+
self._senders = senders
48+
49+
async def send(self, message: SenderMessageT_contra, /) -> None:
50+
"""Send a message.
51+
52+
Args:
53+
message: The message to be sent.
54+
"""
55+
for sender in self._senders:
56+
await sender.send(message)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the RelaySender class."""
5+
6+
from frequenz.channels import Broadcast
7+
from frequenz.channels.experimental import RelaySender
8+
9+
10+
async def test_tee_sender() -> None:
11+
"""Test tee sender."""
12+
channel1: Broadcast[int] = Broadcast(name="channel1")
13+
channel2: Broadcast[int] = Broadcast(name="channel2")
14+
channel3: Broadcast[int] = Broadcast(name="channel3")
15+
16+
sender = RelaySender(
17+
channel1.new_sender(), channel2.new_sender(), channel3.new_sender()
18+
)
19+
receiver1 = channel1.new_receiver()
20+
receiver2 = channel2.new_receiver()
21+
receiver3 = channel3.new_receiver()
22+
23+
await sender.send(42)
24+
assert (
25+
await receiver1.receive()
26+
== await receiver2.receive()
27+
== await receiver3.receive()
28+
== 42
29+
)
30+
31+
await sender.send(-2)
32+
assert (
33+
await receiver1.receive()
34+
== await receiver2.receive()
35+
== await receiver3.receive()
36+
== -2
37+
)

0 commit comments

Comments
 (0)