1515
1616import asyncio
1717import logging
18+ import time
1819from asyncio .tasks import ALL_COMPLETED
1920from dataclasses import dataclass , replace
21+ from datetime import timedelta
2022from math import isnan
21- from typing import ( # pylint: disable=unused-import
22- Any ,
23- Dict ,
24- Iterable ,
25- List ,
26- Optional ,
27- Set ,
28- Tuple ,
29- )
23+ from typing import Any , Dict , Iterable , List , Optional , Self , Set , Tuple
3024
3125import grpc
3226from frequenz .channels import Bidirectional , Peekable , Receiver , Sender
@@ -62,6 +56,40 @@ class _User:
6256 """The bidirectional channel to communicate with the user."""
6357
6458
59+ @dataclass
60+ class _CacheEntry :
61+ """Represents an entry in the cache with expiry time."""
62+
63+ inv_bat_pair : InvBatPair
64+ """The inverter and adjacent battery data pair."""
65+
66+ expiry_time : int
67+ """The expiration time (taken from the monotonic clock) of the cache entry."""
68+
69+ @classmethod
70+ def from_ttl (
71+ cls , inv_bat_pair : InvBatPair , ttl : timedelta = timedelta (hours = 2.5 )
72+ ) -> Self :
73+ """Initialize a CacheEntry instance from a TTL (Time-To-Live).
74+
75+ Args:
76+ inv_bat_pair: the inverter and adjacent battery data pair to cache.
77+ ttl: the time a cache entry is kept alive.
78+
79+ Returns:
80+ this class instance.
81+ """
82+ return cls (inv_bat_pair , time .monotonic_ns () + int (ttl .total_seconds () * 1e9 ))
83+
84+ def has_expired (self ) -> bool :
85+ """Check whether the cache entry has expired.
86+
87+ Returns:
88+ whether the cache entry has expired.
89+ """
90+ return time .monotonic_ns () >= self .expiry_time
91+
92+
6593@actor
6694class PowerDistributingActor :
6795 # pylint: disable=too-many-instance-attributes
@@ -211,6 +239,10 @@ def __init__(
211239 max_data_age_sec = 10.0 ,
212240 )
213241
242+ self ._cached_metrics : dict [int , _CacheEntry | None ] = {
243+ bat_id : None for bat_id , _ in self ._bat_inv_map .items ()
244+ }
245+
214246 def _create_users_tasks (self ) -> List [asyncio .Task [None ]]:
215247 """For each user create a task to wait for request.
216248
@@ -224,37 +256,45 @@ def _create_users_tasks(self) -> List[asyncio.Task[None]]:
224256 )
225257 return tasks
226258
227- def _get_upper_bound (self , batteries : Set [int ]) -> float :
259+ def _get_upper_bound (self , batteries : Set [int ], include_broken : bool ) -> float :
228260 """Get total upper bound of power to be set for given batteries.
229261
230262 Note, output of that function doesn't guarantee that this bound will be
231263 the same when the request is processed.
232264
233265 Args:
234266 batteries: List of batteries
267+ include_broken: whether all batteries in the batteries set in the
268+ request must be used regardless the status.
235269
236270 Returns:
237271 Upper bound for `set_power` operation.
238272 """
239- pairs_data : List [InvBatPair ] = self ._get_components_data (batteries )
273+ pairs_data : List [InvBatPair ] = self ._get_components_data (
274+ batteries , include_broken
275+ )
240276 return sum (
241277 min (battery .power_upper_bound , inverter .active_power_upper_bound )
242278 for battery , inverter in pairs_data
243279 )
244280
245- def _get_lower_bound (self , batteries : Set [int ]) -> float :
281+ def _get_lower_bound (self , batteries : Set [int ], include_broken : bool ) -> float :
246282 """Get total lower bound of power to be set for given batteries.
247283
248284 Note, output of that function doesn't guarantee that this bound will be
249285 the same when the request is processed.
250286
251287 Args:
252288 batteries: List of batteries
289+ include_broken: whether all batteries in the batteries set in the
290+ request must be used regardless the status.
253291
254292 Returns:
255293 Lower bound for `set_power` operation.
256294 """
257- pairs_data : List [InvBatPair ] = self ._get_components_data (batteries )
295+ pairs_data : List [InvBatPair ] = self ._get_components_data (
296+ batteries , include_broken
297+ )
258298 return sum (
259299 max (battery .power_lower_bound , inverter .active_power_lower_bound )
260300 for battery , inverter in pairs_data
@@ -282,21 +322,19 @@ async def run(self) -> None:
282322
283323 try :
284324 pairs_data : List [InvBatPair ] = self ._get_components_data (
285- request .batteries
325+ request .batteries , request . include_broken
286326 )
287327 except KeyError as err :
288328 await user .channel .send (Error (request = request , msg = str (err )))
289329 continue
290330
291- if len ( pairs_data ) == 0 :
331+ if not pairs_data and not request . include_broken :
292332 error_msg = f"No data for the given batteries { str (request .batteries )} "
293333 await user .channel .send (Error (request = request , msg = str (error_msg )))
294334 continue
295335
296336 try :
297- distribution = self .distribution_algorithm .distribute_power (
298- request .power , pairs_data
299- )
337+ distribution = self ._get_power_distribution (request , pairs_data )
300338 except ValueError as err :
301339 error_msg = f"Couldn't distribute power, error: { str (err )} "
302340 await user .channel .send (Error (request = request , msg = str (error_msg )))
@@ -379,6 +417,44 @@ async def _set_distributed_power(
379417
380418 return self ._parse_result (tasks , distribution .distribution , timeout_sec )
381419
420+ def _get_power_distribution (
421+ self , request : Request , inv_bat_pairs : List [InvBatPair ]
422+ ) -> DistributionResult :
423+ """Get power distribution result for the batteries in the request.
424+
425+ Args:
426+ request: the power request to process.
427+ inv_bat_pairs: the battery and adjacent inverter data pairs.
428+
429+ Returns:
430+ the power distribution result.
431+ """
432+ available_bat_ids = {battery .component_id for battery , _ in inv_bat_pairs }
433+ unavailable_bat_ids = request .batteries - available_bat_ids
434+ unavailable_inv_ids = {
435+ self ._bat_inv_map [battery_id ] for battery_id in unavailable_bat_ids
436+ }
437+
438+ if request .include_broken and not available_bat_ids :
439+ return self .distribution_algorithm .distribute_power_equally (
440+ request .power , unavailable_inv_ids
441+ )
442+
443+ result = self .distribution_algorithm .distribute_power (
444+ request .power , inv_bat_pairs
445+ )
446+
447+ if request .include_broken and unavailable_inv_ids :
448+ additional_result = self .distribution_algorithm .distribute_power_equally (
449+ result .remaining_power , unavailable_inv_ids
450+ )
451+
452+ for inv_id , power in additional_result .distribution .items ():
453+ result .distribution [inv_id ] = power
454+ result .remaining_power = 0.0
455+
456+ return result
457+
382458 def _check_request (self , request : Request ) -> Optional [Result ]:
383459 """Check whether the given request if correct.
384460
@@ -388,6 +464,9 @@ def _check_request(self, request: Request) -> Optional[Result]:
388464 Returns:
389465 Result for the user if the request is wrong, None otherwise.
390466 """
467+ if not request .batteries :
468+ return Error (request = request , msg = "Empty battery IDs in the request" )
469+
391470 for battery in request .batteries :
392471 if battery not in self ._battery_receivers :
393472 msg = (
@@ -398,11 +477,11 @@ def _check_request(self, request: Request) -> Optional[Result]:
398477
399478 if not request .adjust_power :
400479 if request .power < 0 :
401- bound = self ._get_lower_bound (request .batteries )
480+ bound = self ._get_lower_bound (request .batteries , request . include_broken )
402481 if request .power < bound :
403482 return OutOfBound (request = request , bound = bound )
404483 else :
405- bound = self ._get_upper_bound (request .batteries )
484+ bound = self ._get_upper_bound (request .batteries , request . include_broken )
406485 if request .power > bound :
407486 return OutOfBound (request = request , bound = bound )
408487
@@ -551,29 +630,15 @@ def _get_components_pairs(
551630
552631 return bat_inv_map , inv_bat_map
553632
554- def _get_working_batteries (self , batteries : Set [int ]) -> Set [int ]:
555- """Get subset with working batteries.
556-
557- If none of the given batteries are working, then treat all of them
558- as working.
559-
560- Args:
561- batteries: requested batteries
562-
563- Returns:
564- Subset with working batteries or input set if none of the given batteries
565- are working.
566- """
567- working_batteries = self ._all_battery_status .get_working_batteries (batteries )
568- if len (working_batteries ) == 0 :
569- return batteries
570- return working_batteries
571-
572- def _get_components_data (self , batteries : Set [int ]) -> List [InvBatPair ]:
633+ def _get_components_data (
634+ self , batteries : Set [int ], include_broken : bool
635+ ) -> List [InvBatPair ]:
573636 """Get data for the given batteries and adjacent inverters.
574637
575638 Args:
576639 batteries: Batteries that needs data.
640+ include_broken: whether all batteries in the batteries set in the
641+ request must be used regardless the status.
577642
578643 Raises:
579644 KeyError: If any battery in the given list doesn't exists in microgrid.
@@ -582,7 +647,11 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
582647 Pairs of battery and adjacent inverter data.
583648 """
584649 pairs_data : List [InvBatPair ] = []
585- working_batteries = self ._get_working_batteries (batteries )
650+ working_batteries = (
651+ batteries
652+ if include_broken
653+ else self ._all_battery_status .get_working_batteries (batteries )
654+ )
586655
587656 for battery_id in working_batteries :
588657 if battery_id not in self ._battery_receivers :
@@ -594,6 +663,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
594663 inverter_id : int = self ._bat_inv_map [battery_id ]
595664
596665 data = self ._get_battery_inverter_data (battery_id , inverter_id )
666+ if not data and include_broken :
667+ cached_entry = self ._cached_metrics [battery_id ]
668+ if cached_entry and not cached_entry .has_expired ():
669+ data = cached_entry .inv_bat_pair
670+ else :
671+ data = None
597672 if data is None :
598673 _logger .warning (
599674 "Skipping battery %d because its message isn't correct." ,
@@ -661,7 +736,9 @@ def _get_battery_inverter_data(
661736
662737 # If all values are ok then return them.
663738 if not any (map (isnan , replaceable_metrics )):
664- return InvBatPair (battery_data , inverter_data )
739+ inv_bat_pair = InvBatPair (battery_data , inverter_data )
740+ self ._cached_metrics [battery_id ] = _CacheEntry .from_ttl (inv_bat_pair )
741+ return inv_bat_pair
665742
666743 # Replace NaN with the corresponding value in the adjacent component.
667744 # If both metrics are None, return None to ignore this battery.
@@ -683,10 +760,12 @@ def _get_battery_inverter_data(
683760 elif isnan (inv_bound ):
684761 inverter_new_metrics [inv_attr ] = bat_bound
685762
686- return InvBatPair (
763+ inv_bat_pair = InvBatPair (
687764 replace (battery_data , ** battery_new_metrics ),
688765 replace (inverter_data , ** inverter_new_metrics ),
689766 )
767+ self ._cached_metrics [battery_id ] = _CacheEntry .from_ttl (inv_bat_pair )
768+ return inv_bat_pair
690769
691770 async def _create_channels (self ) -> None :
692771 """Create channels to get data of components in microgrid."""
0 commit comments