Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- There is a new `Receiver.triggered` method that can be used instead of `selected_from`:

```python
async for selected in select(recv1, recv2):
if recv1.triggered(selected):
print('Received from recv1:', selected.message)
if recv2.triggered(selected):
print('Received from recv2:', selected.message)
```

## Bug Fixes

Expand Down
33 changes: 29 additions & 4 deletions src/frequenz/channels/_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,14 @@

from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import Generic, Self
from typing import TYPE_CHECKING, Any, Generic, Self, TypeGuard

from ._exceptions import Error
from ._generic import MappedMessageT_co, ReceiverMessageT_co

if TYPE_CHECKING:
from ._select import Selected


class Receiver(ABC, Generic[ReceiverMessageT_co]):
"""An endpoint to receive messages."""
Expand Down Expand Up @@ -284,6 +287,30 @@ def filter(
"""
return _Filter(receiver=self, filter_function=filter_function)

def triggered(
self, selected: Selected[Any]
) -> TypeGuard[Selected[ReceiverMessageT_co]]:
"""Check whether this receiver was selected by [`select()`][frequenz.channels.select].

This method is used in conjunction with the
[`Selected`][frequenz.channels.Selected] class to determine which receiver was
selected in `select()` iteration.

It also works as a [type guard][typing.TypeGuard] to narrow the type of the
`Selected` instance to the type of the receiver.

Please see [`select()`][frequenz.channels.select] for an example.

Args:
selected: The result of a `select()` iteration.

Returns:
Whether this receiver was selected.
"""
if handled := selected._recv is self: # pylint: disable=protected-access
selected._handled = True # pylint: disable=protected-access
return handled


class ReceiverError(Error, Generic[ReceiverMessageT_co]):
"""An error that originated in a [Receiver][frequenz.channels.Receiver].
Expand Down Expand Up @@ -373,9 +400,7 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is a problem with the receiver.
"""
return self._mapping_function(
self._receiver.consume()
) # pylint: disable=protected-access
return self._mapping_function(self._receiver.consume())

def __str__(self) -> str:
"""Return a string representation of the mapper."""
Expand Down
10 changes: 4 additions & 6 deletions src/frequenz/channels/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,7 @@ def selected_from(
Returns:
Whether the given receiver was selected.
"""
if handled := selected._recv is receiver: # pylint: disable=protected-access
selected._handled = True # pylint: disable=protected-access
return handled
return receiver.triggered(selected)


class SelectError(Error):
Expand Down Expand Up @@ -378,14 +376,14 @@ async def select( # noqa: DOC503
import datetime
from typing import assert_never

from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels import ReceiverStoppedError, select
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed

timer1 = Timer(datetime.timedelta(seconds=1), TriggerAllMissed())
timer2 = Timer(datetime.timedelta(seconds=0.5), SkipMissedAndDrift())

async for selected in select(timer1, timer2):
if selected_from(selected, timer1):
if timer1.triggered(selected):
# Beware: `selected.message` might raise an exception, you can always
# check for exceptions with `selected.exception` first or use
# a try-except block. You can also quickly check if the receiver was
Expand All @@ -395,7 +393,7 @@ async def select( # noqa: DOC503
continue
print(f"timer1: now={datetime.datetime.now()} drift={selected.message}")
timer2.stop()
elif selected_from(selected, timer2):
elif timer2.triggered(selected):
# Explicitly handling of exceptions
match selected.exception:
case ReceiverStoppedError():
Expand Down
Loading