Skip to content

Commit d1c89e7

Browse files
committed
Update existing code to use Selector instead of Select
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 24601fc commit d1c89e7

File tree

4 files changed

+73
-74
lines changed

4 files changed

+73
-74
lines changed

src/frequenz/channels/_anycast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Anycast(Generic[T]):
2828
thread-safe.
2929
3030
When there are multiple channel receivers, they can be awaited
31-
simultaneously using [Select][frequenz.channels.util.Select],
31+
simultaneously using a [Selector][frequenz.channels.util.Selector],
3232
[Merge][frequenz.channels.util.Merge] or
3333
[MergeNamed][frequenz.channels.util.MergeNamed].
3434

src/frequenz/channels/_broadcast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Broadcast(Generic[T]):
3838
are thread-safe. Because of this, `Broadcast` channels are thread-safe.
3939
4040
When there are multiple channel receivers, they can be awaited
41-
simultaneously using [Select][frequenz.channels.util.Select],
41+
simultaneously using a [Selector][frequenz.channels.util.Selector],
4242
[Merge][frequenz.channels.util.Merge] or
4343
[MergeNamed][frequenz.channels.util.MergeNamed].
4444

src/frequenz/channels/util/_timer.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -307,37 +307,37 @@ class Timer(Receiver[timedelta]):
307307
print(f"The timer has triggered {drift=}")
308308
```
309309
310-
But you can also use [`Select`][frequenz.channels.util.Select] to combine it
311-
with other receivers, and even start it (semi) manually:
310+
But you can also use a [`Selector`][frequenz.channels.util.Selector] to combine
311+
it with other receivers, and even start it (semi) manually:
312312
313313
```python
314314
import logging
315-
from frequenz.channels.util import Select
315+
from frequenz.channels.util import Selector, selected_from
316316
from frequenz.channels import Broadcast
317317
318318
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
319319
chan = Broadcast[int]("input-chan")
320-
receiver1 = chan.new_receiver()
320+
battery_data = chan.new_receiver()
321321
322322
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
323323
# Do some other initialization, the timer will start automatically if
324324
# a message is awaited (or manually via `reset()`).
325-
select = Select(bat_1=receiver1, timer=timer)
326-
while await select.ready():
327-
if msg := select.bat_1:
328-
if val := msg.inner:
329-
battery_soc = val
330-
else:
331-
logging.warning("battery channel closed")
332-
elif drift := select.timer:
333-
# Print some regular battery data
334-
print(f"Battery is charged at {battery_soc}%")
325+
async with Selector(battery_data, timer) as selector:
326+
async for selected in selector:
327+
if selected_from(selected, battery_data):
328+
if selected.was_closed():
329+
logging.warning("battery channel closed")
330+
continue
331+
battery_soc = selected.value
332+
elif selected_from(selected, timer):
333+
# Print some regular battery data
334+
print(f"Battery is charged at {battery_soc}%")
335335
```
336336
337337
Example: Timeout example
338338
```python
339339
import logging
340-
from frequenz.channels.util import Select
340+
from frequenz.channels.util import Selector, selected_from
341341
from frequenz.channels import Broadcast
342342
343343
def process_data(data: int):
@@ -349,23 +349,23 @@ def do_heavy_processing(data: int):
349349
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
350350
chan1 = Broadcast[int]("input-chan-1")
351351
chan2 = Broadcast[int]("input-chan-2")
352-
receiver1 = chan1.new_receiver()
353-
receiver2 = chan2.new_receiver()
354-
select = Select(bat_1=receiver1, heavy_process=receiver2, timeout=timer)
355-
while await select.ready():
356-
if msg := select.bat_1:
357-
if val := msg.inner:
358-
process_data(val)
352+
battery_data = chan1.new_receiver()
353+
heavy_process = chan2.new_receiver()
354+
async with Selector(battery_data, heavy_process, timer) as selector:
355+
async for selected in selector:
356+
if selected_from(selected, battery_data):
357+
if selected.was_closed():
358+
logging.warning("battery channel closed")
359+
continue
360+
process_data(selected.value)
359361
timer.reset()
360-
else:
361-
logging.warning("battery channel closed")
362-
if msg := select.heavy_process:
363-
if val := msg.inner:
364-
do_heavy_processing(val)
365-
else:
366-
logging.warning("processing channel closed")
367-
elif drift := select.timeout:
368-
logging.warning("No data received in time")
362+
elif selected_from(selected, heavy_process):
363+
if selected.was_closed():
364+
logging.warning("processing channel closed")
365+
continue
366+
do_heavy_processing(selected.value)
367+
elif selected_from(selected, timer):
368+
logging.warning("No data received in time")
369369
```
370370
371371
In this case `do_heavy_processing` might take 2 seconds, and we don't

tests/utils/test_integration.py

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import pytest
1111

12-
from frequenz.channels.util import FileWatcher, Select, Timer
12+
from frequenz.channels.util import FileWatcher, Selector, Timer, selected_from
1313

1414

1515
@pytest.mark.integration
@@ -20,29 +20,30 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None:
2020
tmp_path: A tmp directory to run the file watcher on. Created by pytest.
2121
"""
2222
filename = tmp_path / "test-file"
23-
file_watcher = FileWatcher(paths=[str(tmp_path)])
2423

2524
number_of_writes = 0
2625
expected_number_of_writes = 3
2726

28-
select = Select(
29-
timer=Timer.timeout(timedelta(seconds=0.1)),
30-
file_watcher=file_watcher,
31-
)
32-
while await select.ready():
33-
if msg := select.timer:
34-
filename.write_text(f"{msg.inner}")
35-
elif msg := select.file_watcher:
36-
event_type = (
37-
FileWatcher.EventType.CREATE
38-
if number_of_writes == 0
39-
else FileWatcher.EventType.MODIFY
40-
)
41-
assert msg.inner == FileWatcher.Event(type=event_type, path=filename)
42-
number_of_writes += 1
43-
# After receiving a write 3 times, unsubscribe from the writes channel
44-
if number_of_writes == expected_number_of_writes:
45-
break
27+
file_watcher = FileWatcher(paths=[str(tmp_path)])
28+
timer = Timer.timeout(timedelta(seconds=0.1))
29+
30+
async with Selector(file_watcher, timer) as selector:
31+
async for selected in selector:
32+
if selected_from(selected, timer):
33+
filename.write_text(f"{selected.value}")
34+
elif selected_from(selected, file_watcher):
35+
event_type = (
36+
FileWatcher.EventType.CREATE
37+
if number_of_writes == 0
38+
else FileWatcher.EventType.MODIFY
39+
)
40+
assert selected.value == FileWatcher.Event(
41+
type=event_type, path=filename
42+
)
43+
number_of_writes += 1
44+
# After receiving a write 3 times, unsubscribe from the writes channel
45+
if number_of_writes == expected_number_of_writes:
46+
break
4647

4748
assert number_of_writes == expected_number_of_writes
4849

@@ -61,12 +62,9 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
6162
file_watcher = FileWatcher(
6263
paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE}
6364
)
65+
write_timer = Timer.timeout(timedelta(seconds=0.1))
66+
deletion_timer = Timer.timeout(timedelta(seconds=0.25))
6467

65-
select = Select(
66-
write_timer=Timer.timeout(timedelta(seconds=0.1)),
67-
deletion_timer=Timer.timeout(timedelta(seconds=0.25)),
68-
watcher=file_watcher,
69-
)
7068
number_of_write = 0
7169
number_of_deletes = 0
7270
number_of_events = 0
@@ -87,22 +85,23 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
8785
# W: Write
8886
# D: Delete
8987
# E: FileWatcher Event
90-
while await select.ready():
91-
if msg := select.write_timer:
92-
if number_of_write >= 2 and number_of_events == 0:
93-
continue
94-
filename.write_text(f"{msg.inner}")
95-
number_of_write += 1
96-
elif _ := select.deletion_timer:
97-
# Avoid removing the file twice
98-
if not pathlib.Path(filename).is_file():
99-
continue
100-
os.remove(filename)
101-
number_of_deletes += 1
102-
elif _ := select.watcher:
103-
number_of_events += 1
104-
if number_of_events >= 2:
105-
break
88+
async with Selector(file_watcher, write_timer, deletion_timer) as selector:
89+
async for selected in selector:
90+
if selected_from(selected, write_timer):
91+
if number_of_write >= 2 and number_of_events == 0:
92+
continue
93+
filename.write_text(f"{selected.value}")
94+
number_of_write += 1
95+
elif selected_from(selected, deletion_timer):
96+
# Avoid removing the file twice
97+
if not pathlib.Path(filename).is_file():
98+
continue
99+
os.remove(filename)
100+
number_of_deletes += 1
101+
elif selected_from(selected, file_watcher):
102+
number_of_events += 1
103+
if number_of_events >= 2:
104+
break
106105

107106
assert number_of_deletes == 2
108107
# Can be more because the watcher could take some time to trigger

0 commit comments

Comments
 (0)