Skip to content

Commit 6fcc42c

Browse files
authored
Replace Select with select(), a type-safe async iterator (#114)
This new function acts as an async iterator, and makes sure no dangling tasks are left behind after a select loop is done. This class is designed to replace the current `Select` implementation with the following improvements: * Proper type hinting by using the new helper type guard `selected_from()`. * Fixes potential starvation issues. * Simplifies the interface by providing values one-by-one. * Simplifies the implementation, so it is easier to maintain. * Guarantees there are no dangling tasks left behind. Here is an usage example: ```python import datetime from frequenz.channels import ReceiverStoppedError from frequenz.channels.util import select, selected_from, Timer timer1 = Timer.periodic(datetime.timedelta(seconds=1)) timer2 = Timer.timeout(datetime.timedelta(seconds=0.5)) async for selected in selector(timer1, timer2): if selected_from(selected, timer1): if selected.was_stopped(): print("timer1 was stopped") continue print(f"timer1: now={datetime.datetime.now()} drift={selected.value}") timer2.stop() elif selected_from(selected, timer2): if selected.was_stopped(): print("timer2 was stopped") continue if exception := selected.exception is not None: print("timer2 was had an error") continue print(f"timer2: now={datetime.datetime.now()} drift={selected.value}") else: # This is not necessary, as select() will check for exhaustiveness, but # it is good practice to have it in case you forgot to handle a new # receiver added to `select()` at a later point in time. assert False ```
2 parents 1a67588 + e776d1d commit 6fcc42c

File tree

12 files changed

+1258
-297
lines changed

12 files changed

+1258
-297
lines changed

RELEASE_NOTES.md

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,90 @@
22

33
## Summary
44

5-
<!-- Here goes a general summary of what this release is about -->
5+
The minimum Python supported version was bumped to 3.11 and the `Select` class replaced by the new `select()` function.
66

77
## Upgrading
88

99
* The minimum supported Python version was bumped to 3.11, downstream projects will need to upgrade too to use this version.
1010

11+
* The `Select` class was replaced by a new `select()` function, with the following improvements:
12+
13+
* Type-safe: proper type hinting by using the new helper type guard `selected_from()`.
14+
* Fixes potential starvation issues.
15+
* Simplifies the interface by providing values one-by-one.
16+
* Guarantees there are no dangling tasks left behind when used as an async context manager.
17+
18+
This new function is an [async iterator](https://docs.python.org/3.11/library/collections.abc.html#collections.abc.AsyncIterator), and makes sure no dangling tasks are left behind after a select loop is done.
19+
20+
Example:
21+
```python
22+
timer1 = Timer.periodic(datetime.timedelta(seconds=1))
23+
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))
24+
25+
async for selected in selector(timer1, timer2):
26+
if selected_from(selected, timer1):
27+
# Beware: `selected.value` might raise an exception, you can always
28+
# check for exceptions with `selected.exception` first or use
29+
# a try-except block. You can also quickly check if the receiver was
30+
# stopped and let any other unexpected exceptions bubble up.
31+
if selected.was_stopped():
32+
print("timer1 was stopped")
33+
continue
34+
print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
35+
timer2.stop()
36+
elif selected_from(selected, timer2):
37+
# Explicitly handling of exceptions
38+
match selected.exception:
39+
case ReceiverStoppedError():
40+
print("timer2 was stopped")
41+
case Exception() as exception:
42+
print(f"timer2: exception={exception}")
43+
case None:
44+
# All good, no exception, we can use `selected.value` safely
45+
print(
46+
f"timer2: now={datetime.datetime.now()} "
47+
f"drift={selected.value}"
48+
)
49+
case _ as unhanded:
50+
assert_never(unhanded)
51+
else:
52+
# This is not necessary, as select() will check for exhaustiveness, but
53+
# it is good practice to have it in case you forgot to handle a new
54+
# receiver added to `select()` at a later point in time.
55+
assert False
56+
```
57+
1158
## New Features
1259

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
60+
* A new `select()` function was added, please look at the *Upgrading* section for details.
61+
62+
* A new `Event` utility receiver was added.
63+
64+
This receiver can be made ready manually. It is mainly useful for testing but can also become handy in scenarios where a simple, on-off signal needs to be sent to a select loop for example.
65+
66+
Example:
67+
68+
```python
69+
import asyncio
70+
from frequenz.channels import Receiver
71+
from frequenz.channels.util import Event, select, selected_from
72+
73+
other_receiver: Receiver[int] = ...
74+
exit_event = Event()
75+
76+
async def exit_after_10_seconds() -> None:
77+
asyncio.sleep(10)
78+
exit_event.set()
79+
80+
asyncio.ensure_future(exit_after_10_seconds())
1481

15-
## Bug Fixes
82+
async for selected in selector(exit_event, other_receiver):
83+
if selected_from(selected, exit_event):
84+
break
85+
if selected_from(selected, other_receiver):
86+
print(selected.value)
87+
else:
88+
assert False, "Unknow receiver selected"
89+
```
1690

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
91+
* The `Timer` class now has more descriptive `__str__` and `__repr__` methods.

src/frequenz/channels/_anycast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Anycast(Generic[T]):
2828
thread-safe.
2929
3030
When there are multiple channel receivers, they can be awaited
31-
simultaneously using [Select][frequenz.channels.util.Select],
31+
simultaneously using [select][frequenz.channels.util.select],
3232
[Merge][frequenz.channels.util.Merge] or
3333
[MergeNamed][frequenz.channels.util.MergeNamed].
3434

src/frequenz/channels/_broadcast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Broadcast(Generic[T]):
3838
are thread-safe. Because of this, `Broadcast` channels are thread-safe.
3939
4040
When there are multiple channel receivers, they can be awaited
41-
simultaneously using [Select][frequenz.channels.util.Select],
41+
simultaneously using [select][frequenz.channels.util.select],
4242
[Merge][frequenz.channels.util.Merge] or
4343
[MergeNamed][frequenz.channels.util.MergeNamed].
4444

src/frequenz/channels/util/__init__.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
66
A module with several utilities to work with channels:
77
8+
* [Event][frequenz.channels.util.Event]:
9+
A [receiver][frequenz.channels.Receiver] that can be made ready through an event.
10+
811
* [FileWatcher][frequenz.channels.util.FileWatcher]:
912
A [receiver][frequenz.channels.Receiver] that watches for file events.
1013
@@ -20,15 +23,22 @@
2023
* [Timer][frequenz.channels.util.Timer]:
2124
A [receiver][frequenz.channels.Receiver] that ticks at certain intervals.
2225
23-
* [Select][frequenz.channels.util.Select]: A helper to select the next
24-
available message for each [receiver][frequenz.channels.Receiver] in a group
25-
of receivers.
26+
* [select][frequenz.channels.util.select]: Iterate over the values of all
27+
[receivers][frequenz.channels.Receiver] as new values become available.
2628
"""
2729

30+
from ._event import Event
2831
from ._file_watcher import FileWatcher
2932
from ._merge import Merge
3033
from ._merge_named import MergeNamed
31-
from ._select import Select
34+
from ._select import (
35+
Selected,
36+
SelectError,
37+
SelectErrorGroup,
38+
UnhandledSelectedError,
39+
select,
40+
selected_from,
41+
)
3242
from ._timer import (
3343
MissedTickPolicy,
3444
SkipMissedAndDrift,
@@ -38,13 +48,19 @@
3848
)
3949

4050
__all__ = [
51+
"Event",
4152
"FileWatcher",
4253
"Merge",
4354
"MergeNamed",
4455
"MissedTickPolicy",
45-
"Timer",
46-
"Select",
56+
"SelectError",
57+
"SelectErrorGroup",
58+
"Selected",
4759
"SkipMissedAndDrift",
4860
"SkipMissedAndResync",
61+
"Timer",
4962
"TriggerAllMissed",
63+
"UnhandledSelectedError",
64+
"select",
65+
"selected_from",
5066
]
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that can be made ready through an event."""
5+
6+
7+
import asyncio as _asyncio
8+
9+
from frequenz.channels import _base_classes, _exceptions
10+
11+
12+
class Event(_base_classes.Receiver[None]):
13+
"""A receiver that can be made ready through an event.
14+
15+
The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait
16+
until [`set()`][frequenz.channels.util.Event.set] is called. At that point the
17+
receiver will wait again after the event is
18+
[`consume()`][frequenz.channels.Receiver.consume]d.
19+
20+
The receiver can be completely stopped by calling
21+
[`stop()`][frequenz.channels.Receiver.stop].
22+
23+
Example:
24+
```python
25+
import asyncio
26+
from frequenz.channels import Receiver
27+
from frequenz.channels.util import Event, select, selected_from
28+
29+
other_receiver: Receiver[int] = ...
30+
exit_event = Event()
31+
32+
async def exit_after_10_seconds() -> None:
33+
asyncio.sleep(10)
34+
exit_event.set()
35+
36+
asyncio.ensure_future(exit_after_10_seconds())
37+
38+
async for selected in select(exit_event, other_receiver):
39+
if selected_from(selected, exit_event):
40+
break
41+
if selected_from(selected, other_receiver):
42+
print(selected.value)
43+
else:
44+
assert False, "Unknow receiver selected"
45+
```
46+
"""
47+
48+
def __init__(self, name: str | None = None) -> None:
49+
"""Create a new instance.
50+
51+
Args:
52+
name: The name of the receiver. If `None` the `id(self)` will be used as
53+
the name. This is only for debugging purposes, it will be shown in the
54+
string representation of the receiver.
55+
"""
56+
self._event: _asyncio.Event = _asyncio.Event()
57+
"""The event that is set when the receiver is ready."""
58+
59+
self._name: str = name or str(id(self))
60+
"""The name of the receiver.
61+
62+
This is for debugging purposes, it will be shown in the string representation
63+
of the receiver.
64+
"""
65+
66+
self._is_set: bool = False
67+
"""Whether the receiver is ready to be consumed.
68+
69+
This is used to differentiate between when the receiver was stopped (the event
70+
is triggered too) but still there is an event to be consumed and when it was
71+
stopped but was not explicitly set().
72+
"""
73+
74+
self._is_stopped: bool = False
75+
"""Whether the receiver is stopped."""
76+
77+
@property
78+
def name(self) -> str:
79+
"""The name of this receiver.
80+
81+
This is for debugging purposes, it will be shown in the string representation
82+
of this receiver.
83+
84+
Returns:
85+
The name of this receiver.
86+
"""
87+
return self._name
88+
89+
@property
90+
def is_set(self) -> bool:
91+
"""Whether this receiver is set (ready).
92+
93+
Returns:
94+
Whether this receiver is set (ready).
95+
"""
96+
return self._is_set
97+
98+
@property
99+
def is_stopped(self) -> bool:
100+
"""Whether this receiver is stopped.
101+
102+
Returns:
103+
Whether this receiver is stopped.
104+
"""
105+
return self._is_stopped
106+
107+
def stop(self) -> None:
108+
"""Stop this receiver."""
109+
self._is_stopped = True
110+
self._event.set()
111+
112+
def set(self) -> None:
113+
"""Trigger the event (make the receiver ready)."""
114+
self._is_set = True
115+
self._event.set()
116+
117+
async def ready(self) -> bool:
118+
"""Wait until this receiver is ready.
119+
120+
Returns:
121+
Whether this receiver is still running.
122+
"""
123+
if self._is_stopped:
124+
return False
125+
await self._event.wait()
126+
return not self._is_stopped
127+
128+
def consume(self) -> None:
129+
"""Consume the event.
130+
131+
This makes this receiver wait again until the event is set again.
132+
133+
Raises:
134+
ReceiverStoppedError: If this receiver is stopped.
135+
"""
136+
if not self._is_set and self._is_stopped:
137+
raise _exceptions.ReceiverStoppedError(self)
138+
139+
assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"
140+
141+
self._is_set = False
142+
self._event.clear()
143+
144+
def __str__(self) -> str:
145+
"""Return a string representation of this receiver.
146+
147+
Returns:
148+
A string representation of this receiver.
149+
"""
150+
return f"{type(self).__name__}({self._name!r})"
151+
152+
def __repr__(self) -> str:
153+
"""Return a string representation of this receiver.
154+
155+
Returns:
156+
A string representation of this receiver.
157+
"""
158+
return (
159+
f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
160+
f"is_stopped={self.is_stopped!r}>"
161+
)

0 commit comments

Comments
 (0)