|
3 | 3 |
|
4 | 4 | """Select the first among multiple Receivers. |
5 | 5 |
|
6 | | -Expects Receiver class to raise `StopAsyncIteration` |
7 | | -exception once no more messages are expected or the channel |
8 | | -is closed in case of `Receiver` class. |
| 6 | +# Usage |
| 7 | +
|
| 8 | +If you need to receiver different types of messages from different receivers, you need |
| 9 | +to know the source of a particular received value to know the type of the value. |
| 10 | +
|
| 11 | +[`select()`][frequenz.channels.select] allows you to do that. It is an |
| 12 | +[async iterator][typing.AsyncIterator] that will iterate over the values of all |
| 13 | +receivers as they receive new values. |
| 14 | +
|
| 15 | +It yields a [`Selected`][frequenz.channels.Selected] object that will tell you the |
| 16 | +source of the received message. To make sure the received value is *cast* to the |
| 17 | +correct type, you need to use the [`selected_from()`][frequenz.channels.selected_from] |
| 18 | +function to check the source of the message, and the |
| 19 | +[`value`][frequenz.channels.Selected.value] attribute to access the message: |
| 20 | +
|
| 21 | +```python |
| 22 | +from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from |
| 23 | +
|
| 24 | +channel1: Anycast[int] = Anycast(name="channel1") |
| 25 | +channel2: Anycast[str] = Anycast(name="channel2") |
| 26 | +receiver1 = channel1.new_receiver() |
| 27 | +receiver2 = channel2.new_receiver() |
| 28 | +
|
| 29 | +async for selected in select(receiver1, receiver2): |
| 30 | + if selected_from(selected, receiver1): |
| 31 | + print(f"Received from receiver1, next number: {selected.value + 1}") |
| 32 | + elif selected_from(selected, receiver2): |
| 33 | + print(f"Received from receiver2, length: {len(selected.value)}") |
| 34 | + else: |
| 35 | + assert False, "Unknown source, this should never happen" |
| 36 | +``` |
| 37 | +
|
| 38 | +Tip: |
| 39 | + To prevent common bugs, like when a new receiver is added to the select loop but |
| 40 | + the handling code is forgotten, [`select()`][frequenz.channels.select] will check |
| 41 | + that all the selected receivers are handled in the if-chain. |
| 42 | +
|
| 43 | + If this happens, it will raise an |
| 44 | + [`UnhandledSelectedError`][frequenz.channels.UnhandledSelectedError] exception. |
| 45 | +
|
| 46 | + Not handling a receiver is considered a programming error. Because of this, the |
| 47 | + exception is a subclass of [`BaseException`][BaseException] instead of |
| 48 | + [`Exception`][Exception]. This means that it will not be caught by [`except |
| 49 | + Exception`][Exception] blocks. |
| 50 | +
|
| 51 | + If for some reason you want to ignore a received value, just add the receiver to |
| 52 | + the if-chain and do nothing with the value: |
| 53 | +
|
| 54 | + ```python |
| 55 | + from frequenz.channels import Anycast, select, selected_from |
| 56 | +
|
| 57 | + channel1: Anycast[int] = Anycast(name="channel1") |
| 58 | + channel2: Anycast[str] = Anycast(name="channel2") |
| 59 | + receiver1 = channel1.new_receiver() |
| 60 | + receiver2 = channel2.new_receiver() |
| 61 | +
|
| 62 | + async for selected in select(receiver1, receiver2): |
| 63 | + if selected_from(selected, receiver1): |
| 64 | + continue |
| 65 | + if selected_from(selected, receiver2): |
| 66 | + print(f"Received from receiver2, length: {len(selected.value)}") |
| 67 | + ``` |
| 68 | +
|
| 69 | +# Stopping |
| 70 | +
|
| 71 | +The `select()` async iterator will stop as soon as all the receivers are stopped. You |
| 72 | +can also end the iteration early by breaking out of the loop as normal. |
| 73 | +
|
| 74 | +When a single [receiver][frequenz.channels.Receiver] is stopped, it will be reported |
| 75 | +via the [`Selected`][frequenz.channels.Selected] object. You can use the |
| 76 | +[`was_stopped()`][frequenz.channels.Selected.was_stopped] method to check if the |
| 77 | +selected [receiver][frequenz.channels.Receiver] was stopped: |
| 78 | +
|
| 79 | +```python |
| 80 | +from frequenz.channels import Anycast, select, selected_from |
| 81 | +
|
| 82 | +channel1: Anycast[int] = Anycast(name="channel1") |
| 83 | +channel2: Anycast[str] = Anycast(name="channel2") |
| 84 | +receiver1 = channel1.new_receiver() |
| 85 | +receiver2 = channel2.new_receiver() |
| 86 | +
|
| 87 | +async for selected in select(receiver1, receiver2): |
| 88 | + if selected_from(selected, receiver1): |
| 89 | + if selected.was_stopped(): |
| 90 | + print("receiver1 was stopped") |
| 91 | + continue |
| 92 | + print(f"Received from receiver1, the next number is: {selected.value + 1}") |
| 93 | + # ... |
| 94 | +``` |
| 95 | +
|
| 96 | +Tip: |
| 97 | + The [`was_stopped()`][frequenz.channels.Selected.was_stopped] method is a |
| 98 | + convenience method that is equivalent to checking if the |
| 99 | + [`exception`][frequenz.channels.Selected.exception] attribute is an instance of |
| 100 | + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. |
| 101 | +
|
| 102 | +# Error Handling |
| 103 | +
|
| 104 | +Tip: |
| 105 | + For more information about handling errors, please refer to the |
| 106 | + [Error Handling](/user-guide/error-handling/) section of the user guide. |
| 107 | +
|
| 108 | +If a receiver raises an exception while receiving a value, the exception will be |
| 109 | +raised by the [`value`][frequenz.channels.Selected.value] attribute of the |
| 110 | +[`Selected`][frequenz.channels.Selected] object. |
| 111 | +
|
| 112 | +You can use a try-except block to handle exceptions as usual: |
| 113 | +
|
| 114 | +```python |
| 115 | +from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from |
| 116 | +
|
| 117 | +channel1: Anycast[int] = Anycast(name="channel1") |
| 118 | +channel2: Anycast[str] = Anycast(name="channel2") |
| 119 | +receiver1 = channel1.new_receiver() |
| 120 | +receiver2 = channel2.new_receiver() |
| 121 | +
|
| 122 | +async for selected in select(receiver1, receiver2): |
| 123 | + if selected_from(selected, receiver1): |
| 124 | + try: |
| 125 | + print(f"Received from receiver1, next number: {selected.value + 1}") |
| 126 | + except ReceiverStoppedError: |
| 127 | + print("receiver1 was stopped") |
| 128 | + except ValueError as value_error: |
| 129 | + print(f"receiver1 raised a ValueError: {value_error}") |
| 130 | + # ... |
| 131 | + # ... |
| 132 | +``` |
| 133 | +
|
| 134 | +The [`Selected`][frequenz.channels.Selected] object also has a |
| 135 | +[`exception`][frequenz.channels.Selected.exception] attribute that will contain the |
| 136 | +exception that was raised by the receiver. |
9 | 137 | """ |
10 | 138 |
|
11 | 139 | import asyncio |
@@ -297,9 +425,7 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: |
297 | 425 | print(f"timer2: exception={exception}") |
298 | 426 | case None: |
299 | 427 | # All good, no exception, we can use `selected.value` safely |
300 | | - print( |
301 | | - f"timer2: now={datetime.datetime.now()} drift={selected.value}" |
302 | | - ) |
| 428 | + print(f"timer2: now={datetime.datetime.now()} drift={selected.value}") |
303 | 429 | case _ as unhanded: |
304 | 430 | assert_never(unhanded) |
305 | 431 | else: |
|
0 commit comments