Skip to content

Commit 80341a5

Browse files
committed
Port all existing uses of Selector to select
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent ea40812 commit 80341a5

File tree

5 files changed

+60
-66
lines changed

5 files changed

+60
-66
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ body:
4949
- Unit, integration and performance tests (part:tests)
5050
- Build script, CI, dependencies, etc. (part:tooling)
5151
- Channels, `Broadcast`, `Bidirectional`, etc. (part:channels)
52-
- Selector (part:selector)
52+
- Select (part:select)
5353
- Utility receivers, `Merge`, etc. (part:receivers)
5454
validations:
5555
required: true

src/frequenz/channels/_anycast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Anycast(Generic[T]):
2828
thread-safe.
2929
3030
When there are multiple channel receivers, they can be awaited
31-
simultaneously using a [Selector][frequenz.channels.util.Selector],
31+
simultaneously using [select][frequenz.channels.util.select],
3232
[Merge][frequenz.channels.util.Merge] or
3333
[MergeNamed][frequenz.channels.util.MergeNamed].
3434

src/frequenz/channels/_broadcast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Broadcast(Generic[T]):
3838
are thread-safe. Because of this, `Broadcast` channels are thread-safe.
3939
4040
When there are multiple channel receivers, they can be awaited
41-
simultaneously using a [Selector][frequenz.channels.util.Selector],
41+
simultaneously using [select][frequenz.channels.util.select],
4242
[Merge][frequenz.channels.util.Merge] or
4343
[MergeNamed][frequenz.channels.util.MergeNamed].
4444

src/frequenz/channels/util/_timer.py

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,12 @@ class Timer(Receiver[timedelta]):
307307
print(f"The timer has triggered {drift=}")
308308
```
309309
310-
But you can also use a [`Selector`][frequenz.channels.util.Selector] to combine
310+
But you can also use a [`select`][frequenz.channels.util.select] to combine
311311
it with other receivers, and even start it (semi) manually:
312312
313313
```python
314314
import logging
315-
from frequenz.channels.util import Selector, selected_from
315+
from frequenz.channels.util import select, selected_from
316316
from frequenz.channels import Broadcast
317317
318318
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
@@ -322,22 +322,21 @@ class Timer(Receiver[timedelta]):
322322
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
323323
# Do some other initialization, the timer will start automatically if
324324
# a message is awaited (or manually via `reset()`).
325-
async with Selector(battery_data, timer) as selector:
326-
async for selected in selector:
327-
if selected_from(selected, battery_data):
328-
if selected.was_closed():
329-
logging.warning("battery channel closed")
330-
continue
331-
battery_soc = selected.value
332-
elif selected_from(selected, timer):
333-
# Print some regular battery data
334-
print(f"Battery is charged at {battery_soc}%")
325+
async for selected in select(battery_data, timer):
326+
if selected_from(selected, battery_data):
327+
if selected.was_closed():
328+
logging.warning("battery channel closed")
329+
continue
330+
battery_soc = selected.value
331+
elif selected_from(selected, timer):
332+
# Print some regular battery data
333+
print(f"Battery is charged at {battery_soc}%")
335334
```
336335
337336
Example: Timeout example
338337
```python
339338
import logging
340-
from frequenz.channels.util import Selector, selected_from
339+
from frequenz.channels.util import select, selected_from
341340
from frequenz.channels import Broadcast
342341
343342
def process_data(data: int):
@@ -351,21 +350,20 @@ def do_heavy_processing(data: int):
351350
chan2 = Broadcast[int]("input-chan-2")
352351
battery_data = chan1.new_receiver()
353352
heavy_process = chan2.new_receiver()
354-
async with Selector(battery_data, heavy_process, timer) as selector:
355-
async for selected in selector:
356-
if selected_from(selected, battery_data):
357-
if selected.was_closed():
358-
logging.warning("battery channel closed")
359-
continue
360-
process_data(selected.value)
361-
timer.reset()
362-
elif selected_from(selected, heavy_process):
363-
if selected.was_closed():
364-
logging.warning("processing channel closed")
365-
continue
366-
do_heavy_processing(selected.value)
367-
elif selected_from(selected, timer):
368-
logging.warning("No data received in time")
353+
async for selected in select(battery_data, heavy_process, timer):
354+
if selected_from(selected, battery_data):
355+
if selected.was_closed():
356+
logging.warning("battery channel closed")
357+
continue
358+
process_data(selected.value)
359+
timer.reset()
360+
elif selected_from(selected, heavy_process):
361+
if selected.was_closed():
362+
logging.warning("processing channel closed")
363+
continue
364+
do_heavy_processing(selected.value)
365+
elif selected_from(selected, timer):
366+
logging.warning("No data received in time")
369367
```
370368
371369
In this case `do_heavy_processing` might take 2 seconds, and we don't

tests/utils/test_integration.py

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import pytest
1111

12-
from frequenz.channels.util import FileWatcher, Selector, Timer, selected_from
12+
from frequenz.channels.util import FileWatcher, Timer, select, selected_from
1313

1414

1515
@pytest.mark.integration
@@ -27,23 +27,20 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None:
2727
file_watcher = FileWatcher(paths=[str(tmp_path)])
2828
timer = Timer.timeout(timedelta(seconds=0.1))
2929

30-
async with Selector(file_watcher, timer) as selector:
31-
async for selected in selector:
32-
if selected_from(selected, timer):
33-
filename.write_text(f"{selected.value}")
34-
elif selected_from(selected, file_watcher):
35-
event_type = (
36-
FileWatcher.EventType.CREATE
37-
if number_of_writes == 0
38-
else FileWatcher.EventType.MODIFY
39-
)
40-
assert selected.value == FileWatcher.Event(
41-
type=event_type, path=filename
42-
)
43-
number_of_writes += 1
44-
# After receiving a write 3 times, unsubscribe from the writes channel
45-
if number_of_writes == expected_number_of_writes:
46-
break
30+
async for selected in select(file_watcher, timer):
31+
if selected_from(selected, timer):
32+
filename.write_text(f"{selected.value}")
33+
elif selected_from(selected, file_watcher):
34+
event_type = (
35+
FileWatcher.EventType.CREATE
36+
if number_of_writes == 0
37+
else FileWatcher.EventType.MODIFY
38+
)
39+
assert selected.value == FileWatcher.Event(type=event_type, path=filename)
40+
number_of_writes += 1
41+
# After receiving a write 3 times, unsubscribe from the writes channel
42+
if number_of_writes == expected_number_of_writes:
43+
break
4744

4845
assert number_of_writes == expected_number_of_writes
4946

@@ -85,23 +82,22 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
8582
# W: Write
8683
# D: Delete
8784
# E: FileWatcher Event
88-
async with Selector(file_watcher, write_timer, deletion_timer) as selector:
89-
async for selected in selector:
90-
if selected_from(selected, write_timer):
91-
if number_of_write >= 2 and number_of_events == 0:
92-
continue
93-
filename.write_text(f"{selected.value}")
94-
number_of_write += 1
95-
elif selected_from(selected, deletion_timer):
96-
# Avoid removing the file twice
97-
if not pathlib.Path(filename).is_file():
98-
continue
99-
os.remove(filename)
100-
number_of_deletes += 1
101-
elif selected_from(selected, file_watcher):
102-
number_of_events += 1
103-
if number_of_events >= 2:
104-
break
85+
async for selected in select(file_watcher, write_timer, deletion_timer):
86+
if selected_from(selected, write_timer):
87+
if number_of_write >= 2 and number_of_events == 0:
88+
continue
89+
filename.write_text(f"{selected.value}")
90+
number_of_write += 1
91+
elif selected_from(selected, deletion_timer):
92+
# Avoid removing the file twice
93+
if not pathlib.Path(filename).is_file():
94+
continue
95+
os.remove(filename)
96+
number_of_deletes += 1
97+
elif selected_from(selected, file_watcher):
98+
number_of_events += 1
99+
if number_of_events >= 2:
100+
break
105101

106102
assert number_of_deletes == 2
107103
# Can be more because the watcher could take some time to trigger

0 commit comments

Comments
 (0)