Skip to content

Commit d93561b

Browse files
committed
Add an example showing how to receive from multiple channels
This illustrates the use of select to be able to manage multiple channels in one actor. It also showcases a bit more the `run()` function and more advance uses of channels. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 1013a72 commit d93561b

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed

src/frequenz/sdk/actor/__init__.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,147 @@ async def main() -> None: # (2)!
319319
Actor2 forwarding: "Actor1 forwarding: 'Hello'"
320320
```
321321
322+
#### Receiving from multiple channels
323+
324+
This example shows how to create an actor that receives messages from multiple
325+
[broadcast][frequenz.channels.Broadcast] channels using
326+
[`select()`][frequenz.channels.util.select].
327+
328+
```python title="select.py"
329+
import asyncio
330+
331+
from frequenz.channels import Broadcast, Receiver, Sender
332+
from frequenz.channels.util import select, selected_from
333+
from frequenz.sdk.actor import Actor, run
334+
335+
336+
class EchoActor(Actor): # (1)!
337+
def __init__(
338+
self,
339+
receiver_1: Receiver[bool],
340+
receiver_2: Receiver[bool],
341+
output: Sender[bool],
342+
) -> None:
343+
super().__init__()
344+
self._receiver_1 = receiver_1
345+
self._receiver_2 = receiver_2
346+
self._output = output
347+
348+
async def _run(self) -> None: # (2)!
349+
async for selected in select(self._receiver_1, self._receiver_2): # (10)!
350+
if selected_from(selected, self._receiver_1): # (11)!
351+
print(f"Received from receiver_1: {selected.value}")
352+
await self._output.send(selected.value)
353+
if not selected.value: # (12)!
354+
break
355+
elif selected_from(selected, self._receiver_2): # (13)!
356+
print(f"Received from receiver_2: {selected.value}")
357+
await self._output.send(selected.value)
358+
if not selected.value: # (14)!
359+
break
360+
else:
361+
assert False, "Unknown selected channel"
362+
print("EchoActor finished")
363+
# (15)!
364+
365+
366+
# (3)!
367+
input_channel_1 = Broadcast[bool]("input_channel_1")
368+
input_channel_2 = Broadcast[bool]("input_channel_2")
369+
echo_channel = Broadcast[bool]("echo_channel")
370+
371+
echo_actor = EchoActor( # (4)!
372+
input_channel_1.new_receiver(),
373+
input_channel_2.new_receiver(),
374+
echo_channel.new_sender(),
375+
)
376+
377+
echo_receiver = echo_channel.new_receiver() # (5)!
378+
379+
async def main() -> None: # (6)!
380+
# (8)!
381+
await input_channel_1.new_sender().send(True)
382+
await input_channel_2.new_sender().send(False)
383+
384+
await run(echo_actor) # (9)!
385+
386+
await echo_channel.close() # (16)!
387+
388+
async for message in echo_receiver: # (17)!
389+
print(f"Received {message=}")
390+
391+
392+
if __name__ == "__main__": # (7)!
393+
asyncio.run(main())
394+
```
395+
396+
1. We define an `EchoActor` that receives messages from two channels and sends
397+
them to another channel.
398+
399+
2. We implement the `_run()` method that will receive messages from the two channels
400+
using and send them to the output channel. The `run()` method will stop if a `False`
401+
message is received.
402+
403+
3. We create the channels that will be used with the actor.
404+
405+
4. We create the actor and connect it to the channels by creating new receivers and
406+
senders from the channels.
407+
408+
5. We create a receiver for the `echo_channel` to eventually receive the messages sent
409+
by the actor.
410+
411+
6. We define the `main()` function that will run the actor.
412+
413+
7. We start the `main()` function in the async loop using [`asyncio.run()`][asyncio.run].
414+
415+
8. We send a message to each of the input channels. These messages will be queued in
416+
the channels until they are consumed by the actor.
417+
418+
9. We start the actor and wait for it to finish using the
419+
[`run()`][frequenz.sdk.actor.run] function.
420+
421+
10. The [`select()`][frequenz.channels.util.select] function will get the first message
422+
available from the two channels. The order in which they will be handled is
423+
unknown, but in this example we assume that the first message will be from
424+
`input_channel_1` (`True`) and the second from `input_channel_1` (`False`).
425+
426+
11. The [`selected_from()`][frequenz.channels.util.selected_from] function will return
427+
`True` for the `input_channel_1` receiver. `selected.value` holds the received
428+
message, so `"Received from receiver_1: True"` will be printed and `True` will be
429+
sent to the `output` channel.
430+
431+
12. Since `selected.value` is `True`, the loop will continue, going back to the and the
432+
the loop will continue, going back to the
433+
[`select()`][frequenz.channels.util.select] function.
434+
435+
13. The [`selected_from()`][frequenz.channels.util.selected_from] function will return
436+
`False` for the `input_channel_1` receiver and `True` for the `input_channel_2`
437+
receiver. The message stored in `selected.value` will now be `False`, so
438+
`"Received from receiver_2: False"` will be printed and `False` will be sent to the
439+
`output` channel.
440+
441+
14. Since `selected.value` is `False`, the loop will break.
442+
443+
15. The `_run()` method will finish normally and the actor will be stopped, so the
444+
[`run()`][frequenz.sdk.actor.run] function will return.
445+
446+
16. We close the `echo_channel` to make sure the `echo_receiver` will stop receiving
447+
messages after all the queued messages are consumed (otherwise the step 17 will
448+
never end!).
449+
450+
17. We receive the messages sent by the actor to the `echo_channel` one by one and print
451+
them, it should print first `Received message=True` and then `Received
452+
message=False`.
453+
454+
The expected output is:
455+
456+
```
457+
Received from receiver_1: True
458+
Received from receiver_2: False
459+
Received message=True
460+
Received message=False
461+
```
462+
322463
[async context manager]: https://docs.python.org/3/reference/datamodel.html#async-context-managers
323464
"""
324465

0 commit comments

Comments
 (0)