Skip to content

Commit fbe7ab8

Browse files
committed
Improve Anycast class documentation
Add a "Characteristics" summary with the most important properties of this channel type, a diagram to show how messages are routed from multiple senders to multiple receivers, explain a bit more about what happens when buffers are full and improve and add a new example. The formatting is done having in mind this documentation is now included in the User Guide. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent a2b4462 commit fbe7ab8

File tree

1 file changed

+154
-28
lines changed

1 file changed

+154
-28
lines changed

src/frequenz/channels/_anycast.py

Lines changed: 154 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,54 +20,180 @@
2020

2121

2222
class Anycast(Generic[_T]):
23-
"""A channel for sending data across async tasks.
23+
"""A channel that delivers each message to exactly one receiver.
2424
25-
Anycast channels support multiple senders and multiple receivers. A message sent
26-
through a sender will be received by exactly one receiver.
25+
# Description
26+
27+
!!! Tip inline end
28+
29+
[Anycast][frequenz.channels.Anycast] channels behave like the
30+
[Golang](https://golang.org/) [channels](https://go.dev/ref/spec#Channel_types).
31+
32+
[Anycast][frequenz.channels.Anycast] channels support multiple
33+
[senders][frequenz.channels.Sender] and multiple
34+
[receivers][frequenz.channels.Receiver]. Each message sent through any of the
35+
senders will be received by exactly one receiver (but **any** receiver).
36+
37+
<center>
38+
```bob
39+
.---------. msg1 msg1 .-----------.
40+
| Sender +------. .------>| Receiver |
41+
'---------' | .----------. | '-----------'
42+
+----->| Channel +-----+
43+
.---------. | '----------' | .-----------.
44+
| Sender +------' '------>| Receiver |
45+
'---------' msg2 msg2 '-----------'
46+
```
47+
</center>
48+
49+
!!! Note inline end "Characteristics"
50+
51+
* **Buffered:** Yes, with a global channel buffer
52+
* **Buffer full policy:** Block senders
53+
* **Multiple receivers:** Yes
54+
* **Multiple senders:** Yes
55+
* **Thread-safe:** No
2756
2857
This channel is buffered, and if the senders are faster than the receivers, then the
2958
channel's buffer will fill up. In that case, the senders will block at the
30-
[send()][frequenz.channels.Sender.send] method until the receivers consume the
59+
[`send()`][frequenz.channels.Sender.send] method until the receivers consume the
3160
messages in the channel's buffer. The channel's buffer size can be configured at
3261
creation time via the `limit` argument.
3362
63+
The first receiver that is awaited will get the next message. When multiple
64+
receivers are waiting, the [asyncio][] loop scheduler picks a receiver for each next
65+
massage.
66+
67+
This means that, in practice, there might be only one receiver receiving all the
68+
messages, depending on how tasks are schduled.
69+
70+
If you need to ensure some delivery policy (like round-robin or uniformly random),
71+
then you will have to implement it yourself.
72+
73+
To create a new [senders][frequenz.channels.Sender] and
74+
[receivers][frequenz.channels.Receiver] you can use the
75+
[`new_sender()`][frequenz.channels.Broadcast.new_sender] and
76+
[`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
77+
respectively.
78+
79+
When the channel is not needed anymore, it should be closed with the
80+
[`close()`][frequenz.channels.Anycast.close] method. This will prevent further
81+
attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be
82+
able to drain the pending values on the channel, but after that, subsequent
83+
[`receive()`][frequenz.channels.Receiver.receive] calls will raise a
84+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception.
85+
86+
This channel is useful, for example, to distribute work across multiple workers.
87+
3488
In cases where each message need to be received by every receiver, a
35-
[Broadcast][frequenz.channels.Broadcast] channel may be used.
89+
[broadcast][frequenz.channels.Broadcast] channel may be used.
90+
91+
# Examples
92+
93+
Example: Send a few numbers to a receiver
94+
This is a very simple example that sends a few numbers from a single sender to
95+
a single receiver.
96+
97+
```python
98+
import asyncio
99+
100+
from frequenz.channels import Anycast, Sender
101+
36102
37-
Uses an [deque][collections.deque] internally, so Anycast channels are not
38-
thread-safe.
103+
async def send(sender: Sender[int]) -> None:
104+
for msg in range(3):
105+
print(f"sending {msg}")
106+
await sender.send(msg)
39107
40-
When there are multiple channel receivers, they can be awaited
41-
simultaneously using [select][frequenz.channels.select] or
42-
[merge][frequenz.channels.merge].
43108
44-
Example:
45-
``` python
46-
async def send(sender: channel.Sender) -> None:
47-
while True:
48-
next = random.randint(3, 17)
49-
print(f"sending: {next}")
50-
await sender.send(next)
109+
async def main() -> None:
110+
channel = Anycast[int](name="numbers")
51111
112+
sender = channel.new_sender()
113+
receiver = channel.new_receiver()
52114
53-
async def recv(id: int, receiver: channel.Receiver) -> None:
54-
while True:
55-
next = await receiver.receive()
56-
print(f"receiver_{id} received {next}")
57-
await asyncio.sleep(0.1) # sleep (or work) with the data
115+
async with asyncio.TaskGroup() as task_group:
116+
task_group.create_task(send(sender))
117+
for _ in range(3):
118+
msg = await receiver.receive()
119+
print(f"received {msg}")
120+
await asyncio.sleep(0.1) # sleep (or work) with the data
58121
59122
60-
acast = channel.Anycast()
123+
asyncio.run(main())
124+
```
125+
126+
The output should look something like (although the sending and received might
127+
appear more interleaved):
128+
129+
```
130+
sending 0
131+
sending 1
132+
sending 2
133+
received 0
134+
received 1
135+
received 2
136+
```
137+
138+
Example: Send a few number from multiple senders to multiple receivers
139+
This is a more complex example that sends a few numbers from multiple senders to
140+
multiple receivers, using a small buffer to force the senders to block.
141+
142+
```python
143+
import asyncio
144+
145+
from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender
146+
61147
62-
sender = acast.new_sender()
63-
receiver_1 = acast.new_receiver()
148+
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
149+
for msg in range(start, stop):
150+
print(f"{name} sending {msg}")
151+
await sender.send(msg)
64152
65-
asyncio.create_task(send(sender))
66153
67-
await recv(1, receiver_1)
154+
async def recv(name: str, receiver: Receiver[int]) -> None:
155+
try:
156+
async for msg in receiver:
157+
print(f"{name} received {msg}")
158+
await asyncio.sleep(0.1) # sleep (or work) with the data
159+
except ReceiverStoppedError:
160+
pass
161+
162+
163+
async def main() -> None:
164+
acast = Anycast[int](name="numbers", limit=2)
165+
166+
async with asyncio.TaskGroup() as task_group:
167+
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
168+
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
169+
task_group.create_task(recv("receiver_1", acast.new_receiver()))
170+
task_group.create_task(recv("receiver_2", acast.new_receiver()))
171+
172+
173+
asyncio.run(main())
68174
```
69175
70-
Check the `tests` and `benchmarks` directories for more examples.
176+
The output should look something like this(although the sending and received
177+
might appear interleaved in a different way):
178+
179+
```
180+
sender_1 sending 10
181+
sender_1 sending 11
182+
sender_1 sending 12
183+
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
184+
consumes a value
185+
sender_2 sending 20
186+
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
187+
consumes a value
188+
receiver_1 received 10
189+
receiver_1 received 11
190+
sender_2 sending 21
191+
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
192+
consumes a value
193+
receiver_1 received 12
194+
receiver_1 received 20
195+
receiver_1 received 21
196+
```
71197
"""
72198

73199
def __init__(self, *, name: str, limit: int = 10) -> None:

0 commit comments

Comments
 (0)