Skip to content

Commit 471adc6

Browse files
authored
Un-Peekable-ification of the PowerDistributingActor (#800)
This PR replaces `Peekable`s with `LatestValueCache`. And updates tests to not mock time, because that somehow makes a difference when using actual receivers.
2 parents 80db721 + 8df3de3 commit 471adc6

File tree

7 files changed

+309
-342
lines changed

7 files changed

+309
-342
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""General purpose classes for use with channels."""
55

66
import abc
7+
import asyncio
78
import typing
89

910
from frequenz.channels import Receiver
@@ -24,3 +25,26 @@ def new_receiver(self, maxsize: int = 50) -> Receiver[T]:
2425
Returns:
2526
A receiver instance.
2627
"""
28+
29+
30+
class LatestValueCache(typing.Generic[T]):
31+
"""A cache that stores the latest value in a receiver."""
32+
33+
def __init__(self, receiver: Receiver[T]) -> None:
34+
"""Create a new cache.
35+
36+
Args:
37+
receiver: The receiver to cache.
38+
"""
39+
self._receiver = receiver
40+
self._latest_value: T | None = None
41+
self._task = asyncio.create_task(self._run())
42+
43+
@property
44+
def latest_value(self) -> T | None:
45+
"""Get the latest value in the cache."""
46+
return self._latest_value
47+
48+
async def _run(self) -> None:
49+
async for value in self._receiver:
50+
self._latest_value = value

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
from datetime import timedelta
1212

1313
import grpc
14-
from frequenz.channels import Peekable, Receiver, Sender
14+
from frequenz.channels import Receiver, Sender
1515

1616
from .... import microgrid
17+
from ...._internal._channels import LatestValueCache
1718
from ...._internal._math import is_close_to_zero
1819
from ....microgrid import connection_manager
1920
from ....microgrid.component import BatteryData, ComponentCategory, InverterData
@@ -139,8 +140,8 @@ def __init__(
139140
self._bat_bats_map = maps["bat_bats"]
140141
self._inv_invs_map = maps["inv_invs"]
141142

142-
self._battery_receivers: dict[int, Peekable[BatteryData]] = {}
143-
self._inverter_receivers: dict[int, Peekable[InverterData]] = {}
143+
self._battery_caches: dict[int, LatestValueCache[BatteryData]] = {}
144+
self._inverter_caches: dict[int, LatestValueCache[InverterData]] = {}
144145

145146
self._component_pool_status_tracker = ComponentPoolStatusTracker(
146147
component_ids=set(self._battery_ids),
@@ -294,11 +295,11 @@ async def _create_channels(self) -> None:
294295
api = connection_manager.get().api_client
295296
for battery_id, inverter_ids in self._bat_invs_map.items():
296297
bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id)
297-
self._battery_receivers[battery_id] = bat_recv.into_peekable()
298+
self._battery_caches[battery_id] = LatestValueCache(bat_recv)
298299

299300
for inverter_id in inverter_ids:
300301
inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id)
301-
self._inverter_receivers[inverter_id] = inv_recv.into_peekable()
302+
self._inverter_caches[inverter_id] = LatestValueCache(inv_recv)
302303

303304
def _get_bounds(
304305
self,
@@ -370,10 +371,10 @@ def _check_request(
370371

371372
for battery in request.component_ids:
372373
_logger.debug("Checking battery %d", battery)
373-
if battery not in self._battery_receivers:
374+
if battery not in self._battery_caches:
374375
msg = (
375376
f"No battery {battery}, available batteries: "
376-
f"{list(self._battery_receivers.keys())}"
377+
f"{list(self._battery_caches.keys())}"
377378
)
378379
return Error(request=request, msg=msg)
379380

@@ -420,10 +421,11 @@ def _get_battery_inverter_data(
420421
Return None if we could not replace NaN values.
421422
"""
422423
battery_data_none = [
423-
self._battery_receivers[battery_id].peek() for battery_id in battery_ids
424+
self._battery_caches[battery_id].latest_value for battery_id in battery_ids
424425
]
425426
inverter_data_none = [
426-
self._inverter_receivers[inverter_id].peek() for inverter_id in inverter_ids
427+
self._inverter_caches[inverter_id].latest_value
428+
for inverter_id in inverter_ids
427429
]
428430

429431
# It means that nothing has been send on this channels, yet.
@@ -498,10 +500,10 @@ def _get_components_data(
498500
)
499501

500502
for battery_id in working_batteries:
501-
if battery_id not in self._battery_receivers:
503+
if battery_id not in self._battery_caches:
502504
raise KeyError(
503505
f"No battery {battery_id}, "
504-
f"available batteries: {list(self._battery_receivers.keys())}"
506+
f"available batteries: {list(self._battery_caches.keys())}"
505507
)
506508

507509
connected_inverters = _get_all_from_map(self._bat_invs_map, batteries)

src/frequenz/sdk/microgrid/client/_client.py

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ async def meter_data(
9898
) -> Receiver[MeterData]:
9999
"""Return a channel receiver that provides a `MeterData` stream.
100100
101-
If only the latest value is required, the `Receiver` returned by this
102-
method can be converted into a `Peekable` with the `into_peekable`
103-
method on the `Receiver.`
104-
105101
Args:
106102
component_id: id of the meter to get data for.
107103
maxsize: Size of the receiver's buffer.
@@ -118,10 +114,6 @@ async def battery_data(
118114
) -> Receiver[BatteryData]:
119115
"""Return a channel receiver that provides a `BatteryData` stream.
120116
121-
If only the latest value is required, the `Receiver` returned by this
122-
method can be converted into a `Peekable` with the `into_peekable`
123-
method on the `Receiver.`
124-
125117
Args:
126118
component_id: id of the battery to get data for.
127119
maxsize: Size of the receiver's buffer.
@@ -138,10 +130,6 @@ async def inverter_data(
138130
) -> Receiver[InverterData]:
139131
"""Return a channel receiver that provides an `InverterData` stream.
140132
141-
If only the latest value is required, the `Receiver` returned by this
142-
method can be converted into a `Peekable` with the `into_peekable`
143-
method on the `Receiver.`
144-
145133
Args:
146134
component_id: id of the inverter to get data for.
147135
maxsize: Size of the receiver's buffer.
@@ -158,10 +146,6 @@ async def ev_charger_data(
158146
) -> Receiver[EVChargerData]:
159147
"""Return a channel receiver that provides an `EvChargeData` stream.
160148
161-
If only the latest value is required, the `Receiver` returned by this
162-
method can be converted into a `Peekable` with the `into_peekable`
163-
method on the `Receiver.`
164-
165149
Args:
166150
component_id: id of the ev charger to get data for.
167151
maxsize: Size of the receiver's buffer.
@@ -441,7 +425,7 @@ def _get_component_data_channel(
441425
if component_id in self._component_streams:
442426
return self._component_streams[component_id]
443427
task_name = f"raw-component-data-{component_id}"
444-
chan = Broadcast[_GenericComponentData](task_name)
428+
chan = Broadcast[_GenericComponentData](task_name, resend_latest=True)
445429
self._component_streams[component_id] = chan
446430

447431
self._streaming_tasks[component_id] = asyncio.create_task(
@@ -493,10 +477,6 @@ async def meter_data( # noqa: DOC502 (ValueError is raised indirectly by _expec
493477
) -> Receiver[MeterData]:
494478
"""Return a channel receiver that provides a `MeterData` stream.
495479
496-
If only the latest value is required, the `Receiver` returned by this
497-
method can be converted into a `Peekable` with the `into_peekable`
498-
method on the `Receiver.`
499-
500480
Raises:
501481
ValueError: if the given id is unknown or has a different type.
502482
@@ -523,10 +503,6 @@ async def battery_data( # noqa: DOC502 (ValueError is raised indirectly by _exp
523503
) -> Receiver[BatteryData]:
524504
"""Return a channel receiver that provides a `BatteryData` stream.
525505
526-
If only the latest value is required, the `Receiver` returned by this
527-
method can be converted into a `Peekable` with the `into_peekable`
528-
method on the `Receiver.`
529-
530506
Raises:
531507
ValueError: if the given id is unknown or has a different type.
532508
@@ -553,10 +529,6 @@ async def inverter_data( # noqa: DOC502 (ValueError is raised indirectly by _ex
553529
) -> Receiver[InverterData]:
554530
"""Return a channel receiver that provides an `InverterData` stream.
555531
556-
If only the latest value is required, the `Receiver` returned by this
557-
method can be converted into a `Peekable` with the `into_peekable`
558-
method on the `Receiver.`
559-
560532
Raises:
561533
ValueError: if the given id is unknown or has a different type.
562534
@@ -583,10 +555,6 @@ async def ev_charger_data( # noqa: DOC502 (ValueError is raised indirectly by _
583555
) -> Receiver[EVChargerData]:
584556
"""Return a channel receiver that provides an `EvChargeData` stream.
585557
586-
If only the latest value is required, the `Receiver` returned by this
587-
method can be converted into a `Peekable` with the `into_peekable`
588-
method on the `Receiver.`
589-
590558
Raises:
591559
ValueError: if the given id is unknown or has a different type.
592560

src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from frequenz.channels.util import Timer, select, selected_from
1313

1414
from ..._internal._asyncio import cancel_and_await
15+
from ..._internal._channels import LatestValueCache
1516
from ...microgrid import connection_manager
1617
from ...microgrid.component import ComponentCategory
1718

@@ -93,16 +94,16 @@ async def _run(self) -> None:
9394
_logger.error(err)
9495
raise RuntimeError(err)
9596

96-
meter_data = (
97+
meter_data = LatestValueCache(
9798
await api_client.meter_data(next(iter(meters)).component_id)
98-
).into_peekable()
99+
)
99100
latest_bound: dict[int, ComponentCurrentLimit] = {}
100101

101102
bound_chan = self._bounds_rx
102103
timer = Timer.timeout(timedelta(self._repeat_interval.total_seconds()))
103104

104105
async for selected in select(bound_chan, timer):
105-
meter = meter_data.peek()
106+
meter = meter_data.latest_value
106107
if meter is None:
107108
raise ValueError("Meter channel closed.")
108109

0 commit comments

Comments
 (0)