|
11 | 11 | from datetime import timedelta |
12 | 12 |
|
13 | 13 | import grpc |
14 | | -from frequenz.channels import Peekable, Receiver, Sender |
| 14 | +from frequenz.channels import Receiver, Sender |
15 | 15 |
|
16 | 16 | from .... import microgrid |
| 17 | +from ...._internal._channels import LatestValueCache |
17 | 18 | from ...._internal._math import is_close_to_zero |
18 | 19 | from ....microgrid import connection_manager |
19 | 20 | from ....microgrid.component import BatteryData, ComponentCategory, InverterData |
@@ -139,8 +140,8 @@ def __init__( |
139 | 140 | self._bat_bats_map = maps["bat_bats"] |
140 | 141 | self._inv_invs_map = maps["inv_invs"] |
141 | 142 |
|
142 | | - self._battery_receivers: dict[int, Peekable[BatteryData]] = {} |
143 | | - self._inverter_receivers: dict[int, Peekable[InverterData]] = {} |
| 143 | + self._battery_caches: dict[int, LatestValueCache[BatteryData]] = {} |
| 144 | + self._inverter_caches: dict[int, LatestValueCache[InverterData]] = {} |
144 | 145 |
|
145 | 146 | self._component_pool_status_tracker = ComponentPoolStatusTracker( |
146 | 147 | component_ids=set(self._battery_ids), |
@@ -294,11 +295,11 @@ async def _create_channels(self) -> None: |
294 | 295 | api = connection_manager.get().api_client |
295 | 296 | for battery_id, inverter_ids in self._bat_invs_map.items(): |
296 | 297 | bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id) |
297 | | - self._battery_receivers[battery_id] = bat_recv.into_peekable() |
| 298 | + self._battery_caches[battery_id] = LatestValueCache(bat_recv) |
298 | 299 |
|
299 | 300 | for inverter_id in inverter_ids: |
300 | 301 | inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id) |
301 | | - self._inverter_receivers[inverter_id] = inv_recv.into_peekable() |
| 302 | + self._inverter_caches[inverter_id] = LatestValueCache(inv_recv) |
302 | 303 |
|
303 | 304 | def _get_bounds( |
304 | 305 | self, |
@@ -370,10 +371,10 @@ def _check_request( |
370 | 371 |
|
371 | 372 | for battery in request.component_ids: |
372 | 373 | _logger.debug("Checking battery %d", battery) |
373 | | - if battery not in self._battery_receivers: |
| 374 | + if battery not in self._battery_caches: |
374 | 375 | msg = ( |
375 | 376 | f"No battery {battery}, available batteries: " |
376 | | - f"{list(self._battery_receivers.keys())}" |
| 377 | + f"{list(self._battery_caches.keys())}" |
377 | 378 | ) |
378 | 379 | return Error(request=request, msg=msg) |
379 | 380 |
|
@@ -420,10 +421,11 @@ def _get_battery_inverter_data( |
420 | 421 | Return None if we could not replace NaN values. |
421 | 422 | """ |
422 | 423 | battery_data_none = [ |
423 | | - self._battery_receivers[battery_id].peek() for battery_id in battery_ids |
| 424 | + self._battery_caches[battery_id].latest_value for battery_id in battery_ids |
424 | 425 | ] |
425 | 426 | inverter_data_none = [ |
426 | | - self._inverter_receivers[inverter_id].peek() for inverter_id in inverter_ids |
| 427 | + self._inverter_caches[inverter_id].latest_value |
| 428 | + for inverter_id in inverter_ids |
427 | 429 | ] |
428 | 430 |
|
429 | 431 | # It means that nothing has been send on this channels, yet. |
@@ -498,10 +500,10 @@ def _get_components_data( |
498 | 500 | ) |
499 | 501 |
|
500 | 502 | for battery_id in working_batteries: |
501 | | - if battery_id not in self._battery_receivers: |
| 503 | + if battery_id not in self._battery_caches: |
502 | 504 | raise KeyError( |
503 | 505 | f"No battery {battery_id}, " |
504 | | - f"available batteries: {list(self._battery_receivers.keys())}" |
| 506 | + f"available batteries: {list(self._battery_caches.keys())}" |
505 | 507 | ) |
506 | 508 |
|
507 | 509 | connected_inverters = _get_all_from_map(self._bat_invs_map, batteries) |
|
0 commit comments