|
21 | 21 |
|
22 | 22 |
|
23 | 23 | class Broadcast(Generic[_T]): |
24 | | - """A channel to broadcast messages to multiple receivers. |
| 24 | + """A channel that deliver all messages to all receivers. |
25 | 25 |
|
26 | | - `Broadcast` channels can have multiple senders and multiple receivers. Each |
27 | | - message sent through any of the senders is received by all of the |
28 | | - receivers. |
| 26 | + # Description |
29 | 27 |
|
30 | | - Internally, a broadcast receiver's buffer is implemented with just |
31 | | - append/pop operations on either side of a [deque][collections.deque], which |
32 | | - are thread-safe. Because of this, `Broadcast` channels are thread-safe. |
| 28 | + [Broadcast][frequenz.channels.Broadcast] channels can have multiple |
| 29 | + [senders][frequenz.channels.Sender] and multiple |
| 30 | + [receivers][frequenz.channels.Receiver]. Each message sent through any of the |
| 31 | + senders will be received by all receivers. |
33 | 32 |
|
34 | | - When there are multiple channel receivers, they can be awaited |
35 | | - simultaneously using [select][frequenz.channels.select] or |
36 | | - [merge][frequenz.channels.merge]. |
| 33 | + <center> |
| 34 | + ```bob |
| 35 | + .---------. msg1 msg1,msg2 .-----------. |
| 36 | + | Sender +------. .---------->| Receiver | |
| 37 | + '---------' | .----------. | '-----------' |
| 38 | + +----->| Channel +-----+ |
| 39 | + .---------. | '----------' | .-----------. |
| 40 | + | Sender +------' '----------->| Receiver | |
| 41 | + '---------' msg2 msg1,msg2 '-----------' |
| 42 | + ``` |
| 43 | + </center> |
37 | 44 |
|
38 | | - Example: |
39 | | - ``` python |
40 | | - async def send(sender: channel.Sender) -> None: |
41 | | - while True: |
42 | | - next = random.randint(3, 17) |
43 | | - print(f"sending: {next}") |
44 | | - await sender.send(next) |
| 45 | + !!! Note inline end "Characteristics" |
45 | 46 |
|
| 47 | + * **Buffered:** Yes, with one buffer per receiver |
| 48 | + * **Buffer full policy:** Drop oldest message |
| 49 | + * **Multiple receivers:** Yes |
| 50 | + * **Multiple senders:** Yes |
| 51 | + * **Thread-safe:** No |
46 | 52 |
|
47 | | - async def recv(id: int, receiver: channel.Receiver) -> None: |
48 | | - while True: |
49 | | - next = await receiver.receive() |
50 | | - print(f"receiver_{id} received {next}") |
51 | | - await asyncio.sleep(0.1) # sleep (or work) with the data |
| 53 | + This channel is buffered, and when messages are not being consumed fast |
| 54 | + enough and the buffer fills up, old messages will get dropped. |
52 | 55 |
|
| 56 | + Each receiver has its own buffer, so messages will only be dropped for |
| 57 | + receivers that can't keep up with the senders, and not for the whole |
| 58 | + channel. |
53 | 59 |
|
54 | | - bcast = channel.Broadcast() |
| 60 | + To create a new [senders][frequenz.channels.Sender] and |
| 61 | + [receivers][frequenz.channels.Receiver] you can use the |
| 62 | + [`new_sender()`][frequenz.channels.Broadcast.new_sender] and |
| 63 | + [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods |
| 64 | + respectively. |
55 | 65 |
|
56 | | - sender = bcast.new_sender() |
57 | | - receiver_1 = bcast.new_receiver() |
| 66 | + When a channel is not needed anymore, it should be closed with |
| 67 | + [`close()`][frequenz.channels.Broadcast.close]. This will prevent further |
| 68 | + attempts to [`send()`][frequenz.channels.Sender.send] data, and will allow |
| 69 | + receivers to drain the pending items on their queues, but after that, |
| 70 | + subsequent [receive()][frequenz.channels.Receiver.receive] calls will |
| 71 | + raise a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. |
58 | 72 |
|
59 | | - asyncio.create_task(send(sender)) |
| 73 | + This channel is useful, for example, to implement a pub/sub pattern, where |
| 74 | + multiple receivers can subscribe to a channel to receive all messages. |
60 | 75 |
|
61 | | - await recv(1, receiver_1) |
| 76 | + In cases where each message needs to be delivered only to one receiver, an |
| 77 | + [anycast][frequenz.channels.Anycast] channel may be used. |
| 78 | +
|
| 79 | + # Examples |
| 80 | +
|
| 81 | + Example: Send a few numbers to a receiver |
| 82 | + This is a very simple example that sends a few numbers from a single sender to |
| 83 | + a single receiver. |
| 84 | +
|
| 85 | + ```python |
| 86 | + import asyncio |
| 87 | +
|
| 88 | + from frequenz.channels import Broadcast, Sender |
| 89 | +
|
| 90 | +
|
| 91 | + async def send(sender: Sender[int]) -> None: |
| 92 | + for msg in range(3): |
| 93 | + print(f"sending {msg}") |
| 94 | + await sender.send(msg) |
| 95 | +
|
| 96 | +
|
| 97 | + async def main() -> None: |
| 98 | + channel = Broadcast[int](name="numbers") |
| 99 | +
|
| 100 | + sender = channel.new_sender() |
| 101 | + receiver = channel.new_receiver() |
| 102 | +
|
| 103 | + async with asyncio.TaskGroup() as task_group: |
| 104 | + task_group.create_task(send(sender)) |
| 105 | + for _ in range(3): |
| 106 | + msg = await receiver.receive() |
| 107 | + print(f"received {msg}") |
| 108 | + await asyncio.sleep(0.1) # sleep (or work) with the data |
| 109 | +
|
| 110 | +
|
| 111 | + asyncio.run(main()) |
| 112 | + ``` |
| 113 | +
|
| 114 | + The output should look something like (although the sending and received might |
| 115 | + appear more interleaved): |
| 116 | +
|
| 117 | + ``` |
| 118 | + sending 0 |
| 119 | + sending 1 |
| 120 | + sending 2 |
| 121 | + received 0 |
| 122 | + received 1 |
| 123 | + received 2 |
62 | 124 | ``` |
63 | 125 |
|
64 | | - Check the `tests` and `benchmarks` directories for more examples. |
| 126 | + Example: Send a few number from multiple senders to multiple receivers |
| 127 | + This is a more complex example that sends a few numbers from multiple senders to |
| 128 | + multiple receivers, using a small buffer to force the senders to block. |
| 129 | +
|
| 130 | + ```python |
| 131 | + import asyncio |
| 132 | +
|
| 133 | + from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender |
| 134 | +
|
| 135 | +
|
| 136 | + async def send(name: str, sender: Sender[int], start: int, stop: int) -> None: |
| 137 | + for msg in range(start, stop): |
| 138 | + print(f"{name} sending {msg}") |
| 139 | + await sender.send(msg) |
| 140 | +
|
| 141 | +
|
| 142 | + async def recv(name: str, receiver: Receiver[int]) -> None: |
| 143 | + try: |
| 144 | + async for msg in receiver: |
| 145 | + print(f"{name} received {msg}") |
| 146 | + await asyncio.sleep(0.1) # sleep (or work) with the data |
| 147 | + except ReceiverStoppedError: |
| 148 | + pass |
| 149 | +
|
| 150 | +
|
| 151 | + async def main() -> None: |
| 152 | + acast = Broadcast[int](name="numbers") |
| 153 | +
|
| 154 | + async with asyncio.TaskGroup() as task_group: |
| 155 | + task_group.create_task(send("sender_1", acast.new_sender(), 10, 13)) |
| 156 | + task_group.create_task(send("sender_2", acast.new_sender(), 20, 22)) |
| 157 | + task_group.create_task(recv("receiver_1", acast.new_receiver())) |
| 158 | + task_group.create_task(recv("receiver_2", acast.new_receiver())) |
| 159 | +
|
| 160 | +
|
| 161 | + asyncio.run(main()) |
| 162 | + ``` |
| 163 | +
|
| 164 | + The output should look something like this(although the sending and received |
| 165 | + might appear interleaved in a different way): |
| 166 | +
|
| 167 | + ``` |
| 168 | + sender_1 sending 10 |
| 169 | + sender_1 sending 11 |
| 170 | + sender_1 sending 12 |
| 171 | + sender_2 sending 20 |
| 172 | + sender_2 sending 21 |
| 173 | + receiver_1 received 10 |
| 174 | + receiver_1 received 11 |
| 175 | + receiver_1 received 12 |
| 176 | + receiver_1 received 20 |
| 177 | + receiver_1 received 21 |
| 178 | + receiver_2 received 10 |
| 179 | + receiver_2 received 11 |
| 180 | + receiver_2 received 12 |
| 181 | + receiver_2 received 20 |
| 182 | + receiver_2 received 21 |
| 183 | + ``` |
65 | 184 | """ |
66 | 185 |
|
67 | 186 | def __init__(self, *, name: str, resend_latest: bool = False) -> None: |
|
0 commit comments