Skip to content

Commit 2d86a13

Browse files
committed
Update channel dependancy to 0.16.0
Signed-off-by: Jack <[email protected]>
1 parent 79fb177 commit 2d86a13

File tree

8 files changed

+274
-244
lines changed

8 files changed

+274
-244
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
1010

11+
- `Channels` has been upgraded to version 0.16.0, for information on how to upgrade visit https://github.com/frequenz-floss/frequenz-channels-python/releases/tag/v0.16.0
12+
1113
## New Features
1214

1315
<!-- Here goes the main new features and examples or instructions on how to use them -->

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ dependencies = [
3030
# Make sure to update the mkdocs.yml file when
3131
# changing the version
3232
# (plugins.mkdocstrings.handlers.python.import)
33-
"frequenz-channels >= 0.14.0, < 0.15.0",
33+
"frequenz-channels >= 0.16.0, < 0.17.0",
3434
"google-api-python-client >= 2.71, < 3",
3535
"grpcio >= 1.54.2, < 2",
3636
"grpcio-tools >= 1.54.2, < 2",

src/frequenz/sdk/actor/_config_managing.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import os
88
import tomllib
9+
from collections import abc
910
from typing import Any, Dict, Optional, Set
1011

1112
from frequenz.channels import Sender
@@ -31,7 +32,7 @@ def __init__(
3132
self,
3233
conf_file: str,
3334
output: Sender[Config],
34-
event_types: Optional[Set[FileWatcher.EventType]] = None,
35+
event_types: abc.Set[FileWatcher.EventType] = frozenset(FileWatcher.EventType),
3536
) -> None:
3637
"""Read config variables from the file.
3738
@@ -79,8 +80,8 @@ async def run(self) -> None:
7980
"""
8081
await self.send_config()
8182

82-
async for path in self._file_watcher:
83-
if str(path) == self._conf_file:
83+
async for event in self._file_watcher:
84+
if str(event.path) == self._conf_file:
8485
_logger.info(
8586
"Update configs, because file %s was modified.",
8687
self._conf_file,

src/frequenz/sdk/actor/_decorator.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def actor(cls: Callable[_P, _R]) -> Type[_R]:
8282
Example (one actor receiving from two receivers):
8383
```python
8484
from frequenz.channels import Broadcast, Receiver, Sender
85-
from frequenz.channels.util import Select
85+
from frequenz.channels.util import select, selected_from
8686
@actor
8787
class EchoActor:
8888
def __init__(
@@ -99,13 +99,9 @@ def __init__(
9999
self._output = output
100100
101101
async def run(self) -> None:
102-
select = Select(channel_1=self._recv1, channel_2=self._recv2)
103-
while await select.ready():
104-
if msg := select.channel_1:
105-
await self._output.send(msg.inner)
106-
elif msg := select.channel_2:
107-
await self._output.send(msg.inner)
108-
102+
async for selected in select(self._recv1, self._recv2):
103+
if selected_from(selected, self._recv1):
104+
await self._output.send(selected.value)
109105
110106
input_chan_1: Broadcast[bool] = Broadcast("input_chan_1")
111107
input_chan_2: Broadcast[bool] = Broadcast("input_chan_2")

src/frequenz/sdk/actor/power_distributing/_battery_status.py

Lines changed: 102 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from frequenz.api.microgrid.common_pb2 import ErrorLevel
1818
from frequenz.api.microgrid.inverter_pb2 import ComponentState as InverterComponentState
1919
from frequenz.channels import Receiver, Sender
20-
from frequenz.channels.util import Select, Timer
20+
from frequenz.channels.util import Timer, select, selected_from
2121

2222
from frequenz.sdk._internal._asyncio import cancel_and_await
2323
from frequenz.sdk.microgrid import connection_manager
@@ -197,14 +197,15 @@ def __init__( # pylint: disable=too-many-arguments
197197
raise RuntimeError(f"Can't find inverter adjacent to battery: {battery_id}")
198198

199199
self._battery: _ComponentStreamStatus = _ComponentStreamStatus(
200-
battery_id, data_recv_timer=Timer(max_data_age_sec)
200+
battery_id,
201+
data_recv_timer=Timer.timeout(timedelta(max_data_age_sec)),
201202
)
202203
self._inverter: _ComponentStreamStatus = _ComponentStreamStatus(
203-
inverter_id, data_recv_timer=Timer(max_data_age_sec)
204+
inverter_id,
205+
data_recv_timer=Timer.timeout(timedelta(max_data_age_sec)),
204206
)
205207

206208
# Select needs receivers that can be get in async way only.
207-
self._select: Select | None = None
208209

209210
self._task: asyncio.Task[None] = asyncio.create_task(
210211
self._run(status_sender, set_power_result_receiver)
@@ -223,8 +224,70 @@ async def stop(self) -> None:
223224
"""Stop tracking battery status."""
224225
await cancel_and_await(self._task)
225226

226-
if self._select is not None:
227-
await self._select.stop()
227+
def _handle_status_battery(self, bat_data: BatteryData) -> None:
228+
self._battery.last_msg_correct = (
229+
self._is_message_reliable(bat_data)
230+
and self._is_battery_state_correct(bat_data)
231+
and self._no_critical_error(bat_data)
232+
and self._is_capacity_present(bat_data)
233+
)
234+
self._battery.last_msg_timestamp = bat_data.timestamp
235+
self._battery.data_recv_timer.reset()
236+
237+
def _handle_status_inverter(self, inv_data: InverterData) -> None:
238+
self._inverter.last_msg_correct = (
239+
self._is_message_reliable(inv_data)
240+
and self._is_inverter_state_correct(inv_data)
241+
and self._no_critical_error(inv_data)
242+
)
243+
self._inverter.last_msg_timestamp = inv_data.timestamp
244+
self._inverter.data_recv_timer.reset()
245+
246+
def _handle_status_set_power_result(self, result: SetPowerResult) -> None:
247+
if self.battery_id in result.succeed:
248+
self._blocking_status.unblock()
249+
250+
elif (
251+
self.battery_id in result.failed and self._last_status != Status.NOT_WORKING
252+
):
253+
duration = self._blocking_status.block()
254+
255+
if duration > 0:
256+
_logger.warning(
257+
"battery %d failed last response. block it for %f sec",
258+
self.battery_id,
259+
duration,
260+
)
261+
262+
def _handle_status_battery_timer(self) -> None:
263+
if self._battery.last_msg_correct:
264+
self._battery.last_msg_correct = False
265+
_logger.warning(
266+
"Battery %d stopped sending data, last timestamp: %s",
267+
self._battery.component_id,
268+
self._battery.last_msg_timestamp,
269+
)
270+
271+
def _handle_status_inverter_timer(self) -> None:
272+
if self._inverter.last_msg_correct:
273+
self._inverter.last_msg_correct = False
274+
_logger.warning(
275+
"Inverter %d stopped sending data, last timestamp: %s",
276+
self._inverter.component_id,
277+
self._inverter.last_msg_timestamp,
278+
)
279+
280+
def _get_new_status_if_changed(self) -> Optional[Status]:
281+
current_status = self._get_current_status()
282+
if self._last_status != current_status:
283+
self._last_status = current_status
284+
_logger.info(
285+
"battery %d changed status %s",
286+
self.battery_id,
287+
str(self._last_status),
288+
)
289+
return current_status
290+
return None
228291

229292
async def _run(
230293
self,
@@ -245,93 +308,48 @@ async def _run(
245308
battery_receiver = await api_client.battery_data(self._battery.component_id)
246309
inverter_receiver = await api_client.inverter_data(self._inverter.component_id)
247310

248-
self._select = Select(
249-
battery=battery_receiver,
250-
battery_timer=self._battery.data_recv_timer,
251-
inverter_timer=self._inverter.data_recv_timer,
252-
inverter=inverter_receiver,
253-
set_power_result=set_power_result_receiver,
254-
)
311+
battery = battery_receiver
312+
battery_timer = self._battery.data_recv_timer
313+
inverter_timer = self._inverter.data_recv_timer
314+
inverter = inverter_receiver
315+
set_power_result = set_power_result_receiver
255316

256317
while True:
257318
try:
258-
while await self._select.ready():
259-
new_status = self._update_status(self._select)
319+
async for selected in select(
320+
battery,
321+
battery_timer,
322+
inverter_timer,
323+
inverter,
324+
set_power_result,
325+
):
326+
new_status = None
260327

261-
if new_status is not None:
262-
await status_sender.send(new_status)
328+
if selected_from(selected, battery):
329+
self._handle_status_battery(selected.value)
263330

264-
except Exception as err: # pylint: disable=broad-except
265-
_logger.exception("BatteryStatusTracker crashed with error: %s", err)
331+
elif selected_from(selected, inverter):
332+
self._handle_status_inverter(selected.value)
266333

267-
def _update_status(self, select: Select) -> Optional[Status]:
268-
if msg := select.battery:
269-
self._battery.last_msg_correct = (
270-
self._is_message_reliable(msg.inner)
271-
and self._is_battery_state_correct(msg.inner)
272-
and self._no_critical_error(msg.inner)
273-
and self._is_capacity_present(msg.inner)
274-
)
275-
self._battery.last_msg_timestamp = msg.inner.timestamp
276-
self._battery.data_recv_timer.reset()
277-
278-
elif msg := select.inverter:
279-
self._inverter.last_msg_correct = (
280-
self._is_message_reliable(msg.inner)
281-
and self._is_inverter_state_correct(msg.inner)
282-
and self._no_critical_error(msg.inner)
283-
)
284-
self._inverter.last_msg_timestamp = msg.inner.timestamp
285-
self._inverter.data_recv_timer.reset()
286-
287-
elif msg := select.set_power_result:
288-
result: SetPowerResult = msg.inner
289-
if self.battery_id in result.succeed:
290-
self._blocking_status.unblock()
291-
292-
elif (
293-
self.battery_id in result.failed
294-
and self._last_status != Status.NOT_WORKING
295-
):
296-
duration = self._blocking_status.block()
297-
298-
if duration > 0:
299-
_logger.warning(
300-
"battery %d failed last response. block it for %f sec",
301-
self.battery_id,
302-
duration,
303-
)
304-
elif msg := select.battery_timer:
305-
if self._battery.last_msg_correct:
306-
self._battery.last_msg_correct = False
307-
_logger.warning(
308-
"Battery %d stopped sending data, last timestamp: %s",
309-
self._battery.component_id,
310-
self._battery.last_msg_timestamp,
311-
)
312-
elif msg := select.inverter_timer:
313-
if self._inverter.last_msg_correct:
314-
self._inverter.last_msg_correct = False
315-
_logger.warning(
316-
"Inverter %d stopped sending data, last timestamp: %s",
317-
self._inverter.component_id,
318-
self._inverter.last_msg_timestamp,
319-
)
334+
elif selected_from(selected, set_power_result):
335+
self._handle_status_set_power_result(selected.value)
320336

321-
else:
322-
_logger.error("Unknown message returned from select")
337+
elif selected_from(selected, battery_timer):
338+
self._handle_status_battery_timer()
323339

324-
current_status = self._get_current_status()
325-
if self._last_status != current_status:
326-
self._last_status = current_status
327-
_logger.info(
328-
"battery %d changed status %s",
329-
self.battery_id,
330-
str(self._last_status),
331-
)
332-
return current_status
340+
elif selected_from(selected, inverter_timer):
341+
self._handle_status_inverter_timer()
333342

334-
return None
343+
else:
344+
_logger.error("Unknown message returned from select")
345+
346+
new_status = self._get_new_status_if_changed()
347+
348+
if new_status is not None:
349+
await status_sender.send(new_status)
350+
351+
except Exception as err: # pylint: disable=broad-except
352+
_logger.exception("BatteryStatusTracker crashed with error: %s", err)
335353

336354
def _get_current_status(self) -> Status:
337355
"""Get current battery status.

src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from typing import Dict
1111

1212
from frequenz.channels import Broadcast, Sender
13-
from frequenz.channels.util import Select, Timer
13+
from frequenz.channels.util import Timer, select, selected_from
1414

1515
from ..._internal._asyncio import cancel_and_await
1616
from ...microgrid import connection_manager
@@ -95,17 +95,16 @@ async def _run(self) -> None:
9595
).into_peekable()
9696
latest_bound: Dict[int, ComponentCurrentLimit] = {}
9797

98-
select = Select(
99-
bound_chan=self._bounds_rx,
100-
timer=Timer(self._repeat_interval.total_seconds()),
101-
)
102-
while await select.ready():
98+
bound_chan = self._bounds_rx
99+
timer = Timer.timeout(timedelta(self._repeat_interval.total_seconds()))
100+
101+
async for selected in select(bound_chan, timer):
103102
meter = meter_data.peek()
104103
if meter is None:
105104
raise ValueError("Meter channel closed.")
106105

107-
if msg := select.bound_chan:
108-
bound: ComponentCurrentLimit = msg.inner
106+
if selected_from(selected, bound_chan):
107+
bound: ComponentCurrentLimit = selected.value
109108
if (
110109
bound.component_id in latest_bound
111110
and latest_bound[bound.component_id] == bound
@@ -119,7 +118,7 @@ async def _run(self) -> None:
119118
0,
120119
bound.max_amps * min_voltage * self._NUM_PHASES,
121120
)
122-
elif msg := select.timer:
121+
elif selected_from(selected, timer):
123122
for bound in latest_bound.values():
124123
min_voltage = min(meter.voltage_per_phase)
125124
logging.debug("resending bounds: %s", bound)

0 commit comments

Comments
 (0)