Skip to content

Commit 7fa7b70

Browse files
Allow optionally forcing power requests
A power request might need to be forced to implement safety mechanisms, even when some components might be seemingly failing (i.e. when there is not proper consumption information, the user wants to slowly discharge batteries to prevent potential peak breaches). Signed-off-by: Daniel Zullo <[email protected]>
1 parent 0540381 commit 7fa7b70

File tree

5 files changed

+345
-25
lines changed

5 files changed

+345
-25
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ This release drops support for Python versions older than 3.11.
2424
soc_rx = battery_pool.soc.new_receiver() # new
2525
```
2626

27+
* A power request can now be forced by setting the `include_broken` attribute. This is especially helpful as a safety measure when components appear to be failing, such as when battery metrics are unavailable. Note that applications previously relying on automatic fallback to all batteries when none of them was working will now require the `include_broken` attribute to be explicitly set in the request.
28+
2729
## New Features
2830

2931
<!-- Here goes the main new features and examples or instructions on how to use them -->

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 117 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,12 @@
1515

1616
import asyncio
1717
import logging
18+
import time
1819
from asyncio.tasks import ALL_COMPLETED
1920
from dataclasses import dataclass, replace
21+
from datetime import timedelta
2022
from math import isnan
21-
from typing import ( # pylint: disable=unused-import
22-
Any,
23-
Dict,
24-
Iterable,
25-
List,
26-
Optional,
27-
Set,
28-
Tuple,
29-
)
23+
from typing import Any, Dict, Iterable, List, Optional, Self, Set, Tuple
3024

3125
import grpc
3226
from frequenz.channels import Bidirectional, Peekable, Receiver, Sender
@@ -62,6 +56,40 @@ class _User:
6256
"""The bidirectional channel to communicate with the user."""
6357

6458

59+
@dataclass
60+
class _CacheEntry:
61+
"""Represents an entry in the cache with expiry time."""
62+
63+
inv_bat_pair: InvBatPair
64+
"""The inverter and adjacent battery data pair."""
65+
66+
expiry_time: int
67+
"""The expiration time (taken from the monotonic clock) of the cache entry."""
68+
69+
@classmethod
70+
def from_ttl(
71+
cls, inv_bat_pair: InvBatPair, ttl: timedelta = timedelta(hours=2.5)
72+
) -> Self:
73+
"""Initialize a CacheEntry instance from a TTL (Time-To-Live).
74+
75+
Args:
76+
inv_bat_pair: the inverter and adjacent battery data pair to cache.
77+
ttl: the time a cache entry is kept alive.
78+
79+
Returns:
80+
this class instance.
81+
"""
82+
return cls(inv_bat_pair, time.monotonic_ns() + int(ttl.total_seconds() * 1e9))
83+
84+
def has_expired(self) -> bool:
85+
"""Check whether the cache entry has expired.
86+
87+
Returns:
88+
whether the cache entry has expired.
89+
"""
90+
return time.monotonic_ns() >= self.expiry_time
91+
92+
6593
@actor
6694
class PowerDistributingActor:
6795
# pylint: disable=too-many-instance-attributes
@@ -211,6 +239,10 @@ def __init__(
211239
max_data_age_sec=10.0,
212240
)
213241

242+
self._cached_metrics: dict[int, _CacheEntry | None] = {
243+
bat_id: None for bat_id, _ in self._bat_inv_map.items()
244+
}
245+
214246
def _create_users_tasks(self) -> List[asyncio.Task[None]]:
215247
"""For each user create a task to wait for request.
216248
@@ -224,37 +256,45 @@ def _create_users_tasks(self) -> List[asyncio.Task[None]]:
224256
)
225257
return tasks
226258

227-
def _get_upper_bound(self, batteries: Set[int]) -> float:
259+
def _get_upper_bound(self, batteries: Set[int], include_broken: bool) -> float:
228260
"""Get total upper bound of power to be set for given batteries.
229261
230262
Note, output of that function doesn't guarantee that this bound will be
231263
the same when the request is processed.
232264
233265
Args:
234266
batteries: List of batteries
267+
include_broken: whether all batteries in the batteries set in the
268+
request must be used regardless the status.
235269
236270
Returns:
237271
Upper bound for `set_power` operation.
238272
"""
239-
pairs_data: List[InvBatPair] = self._get_components_data(batteries)
273+
pairs_data: List[InvBatPair] = self._get_components_data(
274+
batteries, include_broken
275+
)
240276
return sum(
241277
min(battery.power_upper_bound, inverter.active_power_upper_bound)
242278
for battery, inverter in pairs_data
243279
)
244280

245-
def _get_lower_bound(self, batteries: Set[int]) -> float:
281+
def _get_lower_bound(self, batteries: Set[int], include_broken: bool) -> float:
246282
"""Get total lower bound of power to be set for given batteries.
247283
248284
Note, output of that function doesn't guarantee that this bound will be
249285
the same when the request is processed.
250286
251287
Args:
252288
batteries: List of batteries
289+
include_broken: whether all batteries in the batteries set in the
290+
request must be used regardless the status.
253291
254292
Returns:
255293
Lower bound for `set_power` operation.
256294
"""
257-
pairs_data: List[InvBatPair] = self._get_components_data(batteries)
295+
pairs_data: List[InvBatPair] = self._get_components_data(
296+
batteries, include_broken
297+
)
258298
return sum(
259299
max(battery.power_lower_bound, inverter.active_power_lower_bound)
260300
for battery, inverter in pairs_data
@@ -282,21 +322,19 @@ async def run(self) -> None:
282322

283323
try:
284324
pairs_data: List[InvBatPair] = self._get_components_data(
285-
request.batteries
325+
request.batteries, request.include_broken
286326
)
287327
except KeyError as err:
288328
await user.channel.send(Error(request=request, msg=str(err)))
289329
continue
290330

291-
if len(pairs_data) == 0:
331+
if not pairs_data and not request.include_broken:
292332
error_msg = f"No data for the given batteries {str(request.batteries)}"
293333
await user.channel.send(Error(request=request, msg=str(error_msg)))
294334
continue
295335

296336
try:
297-
distribution = self.distribution_algorithm.distribute_power(
298-
request.power, pairs_data
299-
)
337+
distribution = self._get_power_distribution(request, pairs_data)
300338
except ValueError as err:
301339
error_msg = f"Couldn't distribute power, error: {str(err)}"
302340
await user.channel.send(Error(request=request, msg=str(error_msg)))
@@ -379,6 +417,44 @@ async def _set_distributed_power(
379417

380418
return self._parse_result(tasks, distribution.distribution, timeout_sec)
381419

420+
def _get_power_distribution(
421+
self, request: Request, inv_bat_pairs: List[InvBatPair]
422+
) -> DistributionResult:
423+
"""Get power distribution result for the batteries in the request.
424+
425+
Args:
426+
request: the power request to process.
427+
inv_bat_pairs: the battery and adjacent inverter data pairs.
428+
429+
Returns:
430+
the power distribution result.
431+
"""
432+
available_bat_ids = {battery.component_id for battery, _ in inv_bat_pairs}
433+
unavailable_bat_ids = request.batteries - available_bat_ids
434+
unavailable_inv_ids = {
435+
self._bat_inv_map[battery_id] for battery_id in unavailable_bat_ids
436+
}
437+
438+
if request.include_broken and not available_bat_ids:
439+
return self.distribution_algorithm.distribute_power_equally(
440+
request.power, unavailable_inv_ids
441+
)
442+
443+
result = self.distribution_algorithm.distribute_power(
444+
request.power, inv_bat_pairs
445+
)
446+
447+
if request.include_broken and unavailable_inv_ids:
448+
additional_result = self.distribution_algorithm.distribute_power_equally(
449+
result.remaining_power, unavailable_inv_ids
450+
)
451+
452+
for inv_id, power in additional_result.distribution.items():
453+
result.distribution[inv_id] = power
454+
result.remaining_power = 0.0
455+
456+
return result
457+
382458
def _check_request(self, request: Request) -> Optional[Result]:
383459
"""Check whether the given request if correct.
384460
@@ -388,7 +464,7 @@ def _check_request(self, request: Request) -> Optional[Result]:
388464
Returns:
389465
Result for the user if the request is wrong, None otherwise.
390466
"""
391-
if len(request.batteries) == 0:
467+
if not request.batteries:
392468
return Error(request=request, msg="Empty battery IDs in the request")
393469

394470
for battery in request.batteries:
@@ -401,11 +477,11 @@ def _check_request(self, request: Request) -> Optional[Result]:
401477

402478
if not request.adjust_power:
403479
if request.power < 0:
404-
bound = self._get_lower_bound(request.batteries)
480+
bound = self._get_lower_bound(request.batteries, request.include_broken)
405481
if request.power < bound:
406482
return OutOfBound(request=request, bound=bound)
407483
else:
408-
bound = self._get_upper_bound(request.batteries)
484+
bound = self._get_upper_bound(request.batteries, request.include_broken)
409485
if request.power > bound:
410486
return OutOfBound(request=request, bound=bound)
411487

@@ -554,11 +630,15 @@ def _get_components_pairs(
554630

555631
return bat_inv_map, inv_bat_map
556632

557-
def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
633+
def _get_components_data(
634+
self, batteries: Set[int], include_broken: bool
635+
) -> List[InvBatPair]:
558636
"""Get data for the given batteries and adjacent inverters.
559637
560638
Args:
561639
batteries: Batteries that needs data.
640+
include_broken: whether all batteries in the batteries set in the
641+
request must be used regardless the status.
562642
563643
Raises:
564644
KeyError: If any battery in the given list doesn't exists in microgrid.
@@ -568,7 +648,9 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
568648
"""
569649
pairs_data: List[InvBatPair] = []
570650
working_batteries = (
571-
self._all_battery_status.get_working_batteries(batteries) or batteries
651+
batteries
652+
if include_broken
653+
else self._all_battery_status.get_working_batteries(batteries)
572654
)
573655

574656
for battery_id in working_batteries:
@@ -581,6 +663,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
581663
inverter_id: int = self._bat_inv_map[battery_id]
582664

583665
data = self._get_battery_inverter_data(battery_id, inverter_id)
666+
if not data and include_broken:
667+
cached_entry = self._cached_metrics[battery_id]
668+
if cached_entry and not cached_entry.has_expired():
669+
data = cached_entry.inv_bat_pair
670+
else:
671+
data = None
584672
if data is None:
585673
_logger.warning(
586674
"Skipping battery %d because its message isn't correct.",
@@ -648,7 +736,9 @@ def _get_battery_inverter_data(
648736

649737
# If all values are ok then return them.
650738
if not any(map(isnan, replaceable_metrics)):
651-
return InvBatPair(battery_data, inverter_data)
739+
inv_bat_pair = InvBatPair(battery_data, inverter_data)
740+
self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair)
741+
return inv_bat_pair
652742

653743
# Replace NaN with the corresponding value in the adjacent component.
654744
# If both metrics are None, return None to ignore this battery.
@@ -670,10 +760,12 @@ def _get_battery_inverter_data(
670760
elif isnan(inv_bound):
671761
inverter_new_metrics[inv_attr] = bat_bound
672762

673-
return InvBatPair(
763+
inv_bat_pair = InvBatPair(
674764
replace(battery_data, **battery_new_metrics),
675765
replace(inverter_data, **inverter_new_metrics),
676766
)
767+
self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair)
768+
return inv_bat_pair
677769

678770
async def _create_channels(self) -> None:
679771
"""Create channels to get data of components in microgrid."""

src/frequenz/sdk/actor/power_distributing/request.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,14 @@ class Request:
2929
If `False` and the power is outside the batteries' bounds, the request will
3030
fail and be replied to with an `OutOfBound` result.
3131
"""
32+
33+
include_broken: bool = False
34+
"""Whether to use all batteries included in the batteries set regardless the status.
35+
36+
if `True`, the remaining power after distributing between working batteries
37+
will be distributed equally between broken batteries. Also if all batteries
38+
in the batteries set are broken then the power is distributed equally between
39+
broken batteries.
40+
41+
if `False`, the power will be only distributed between the working batteries.
42+
"""

src/frequenz/sdk/power/_distribution_algorithm.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,27 @@ def _greedy_distribute_remaining_power(
423423

424424
return DistributionResult(new_distribution, remaining_power)
425425

426+
def distribute_power_equally(
427+
self, power: float, inverters: set[int]
428+
) -> DistributionResult:
429+
"""Distribute the power equally between the inverters in the set.
430+
431+
This function is mainly useful to set the power for components that are
432+
broken or have no metrics available.
433+
434+
Args:
435+
power: the power to distribute.
436+
inverters: the inverters to set the power to.
437+
438+
Returns:
439+
the power distribution result.
440+
"""
441+
power_per_inverter = power / len(inverters)
442+
return DistributionResult(
443+
distribution={id: power_per_inverter for id in inverters},
444+
remaining_power=0.0,
445+
)
446+
426447
def distribute_power(
427448
self, power: float, components: List[InvBatPair]
428449
) -> DistributionResult:

0 commit comments

Comments
 (0)