Skip to content

Commit fbd67cc

Browse files
Use BatteryPoolStatus in PowerDistribution
Replace old tool to track broken batteries with new BatteryPoolStatus. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 7ed29a8 commit fbd67cc

File tree

2 files changed

+90
-284
lines changed

2 files changed

+90
-284
lines changed

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 35 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import logging
1818
from asyncio.tasks import ALL_COMPLETED
1919
from dataclasses import dataclass
20-
from datetime import datetime, timezone
2120
from math import ceil, floor
2221
from typing import ( # pylint: disable=unused-import
2322
Any,
@@ -27,7 +26,6 @@
2726
Optional,
2827
Set,
2928
Tuple,
30-
Union,
3129
)
3230

3331
import grpc
@@ -44,6 +42,7 @@
4442
InverterData,
4543
)
4644
from ...power import DistributionAlgorithm, InvBatPair
45+
from ._battery_pool_status import BatteryPoolStatus
4746
from .request import Request
4847
from .result import Error, Ignored, OutOfBound, PartialFailure, Result, Success
4948

@@ -62,85 +61,6 @@ class _User:
6261
"""The bidirectional channel to communicate with the user."""
6362

6463

65-
class _BrokenComponents:
66-
"""Store components marked as broken."""
67-
68-
def __init__(self, timeout_sec: float) -> None:
69-
"""Create object instance.
70-
71-
Args:
72-
timeout_sec: How long the component should be marked as broken.
73-
"""
74-
self._broken: Dict[int, datetime] = {}
75-
self._timeout_sec = timeout_sec
76-
77-
def mark_as_broken(self, component_id: int) -> None:
78-
"""Mark component as broken.
79-
80-
After marking component as broken it would be considered as broken for
81-
self._timeout_sec.
82-
83-
Args:
84-
component_id: component id
85-
"""
86-
self._broken[component_id] = datetime.now(timezone.utc)
87-
88-
def update_retry(self, timeout_sec: float) -> None:
89-
"""Change how long the component should be marked as broken.
90-
91-
Args:
92-
timeout_sec: New retry time after sec.
93-
"""
94-
self._timeout_sec = timeout_sec
95-
96-
def get_working_subset(self, components_ids: Set[int]) -> Set[int]:
97-
"""Get subset of batteries that are not marked as broken.
98-
99-
If all given batteries are broken, then mark them as working and return them.
100-
This is temporary workaround to not block user command.
101-
102-
Args:
103-
components_ids: set of component ids
104-
105-
Returns:
106-
Subset of given components_ids with working components.
107-
"""
108-
working = set(filter(lambda cid: not self.is_broken(cid), components_ids))
109-
110-
if len(working) == 0:
111-
_logger.warning(
112-
"All requested components: %s are marked as broken. "
113-
"Marking them as working to not block command.",
114-
str(components_ids),
115-
)
116-
117-
for cid in components_ids:
118-
self._broken.pop(cid, None)
119-
120-
working = components_ids
121-
122-
return working
123-
124-
def is_broken(self, component_id: int) -> bool:
125-
"""Check if component is marked as broken.
126-
127-
Args:
128-
component_id: component id
129-
130-
Returns:
131-
True if component is broken, False otherwise.
132-
"""
133-
if component_id in self._broken:
134-
last_broken = self._broken[component_id]
135-
if (
136-
datetime.now(timezone.utc) - last_broken
137-
).total_seconds() < self._timeout_sec:
138-
return True
139-
140-
del self._broken[component_id]
141-
return False
142-
143-
14464
@actor
14565
class PowerDistributingActor:
14666
# pylint: disable=too-many-instance-attributes
@@ -238,19 +158,21 @@ def __init__(
238158
self._api = microgrid_api
239159
self._wait_for_data_sec = wait_for_data_sec
240160

241-
# Max permitted time when the component should send any information.
242-
# After that timeout the component will be treated as not existing.
243-
# Formulas will put 0 in place of data from this components.
244-
# This will happen until component starts sending data.
245-
self.component_data_timeout_sec: float = 60.0
246-
self.broken_component_timeout_sec: float = 30.0
161+
# NOTE: power_distributor_exponent should be received from ConfigManager
247162
self.power_distributor_exponent: float = 1.0
248-
249-
# distributor_exponent and timeout_sec should be get from ConfigManager
250163
self.distribution_algorithm = DistributionAlgorithm(
251164
self.power_distributor_exponent
252165
)
253-
self._broken_components = _BrokenComponents(self.broken_component_timeout_sec)
166+
167+
batteries = component_graph.components(
168+
component_category={ComponentCategory.BATTERY}
169+
)
170+
171+
self._battery_pool = BatteryPoolStatus(
172+
battery_ids={battery.component_id for battery in batteries},
173+
max_blocking_duration_sec=30.0,
174+
max_data_age_sec=10.0,
175+
)
254176

255177
self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(
256178
component_graph
@@ -327,6 +249,7 @@ async def run(self) -> None:
327249
as broken for some time.
328250
"""
329251
await self._create_channels()
252+
await self._battery_pool.async_init()
330253

331254
# Wait few seconds to get data from the channels created above.
332255
await asyncio.sleep(self._wait_for_data_sec)
@@ -388,26 +311,25 @@ async def run(self) -> None:
388311

389312
if len(failed_batteries) > 0:
390313
succeed_batteries = set(battery_distribution.keys()) - failed_batteries
391-
await user.channel.send(
392-
PartialFailure(
393-
request=request,
394-
succeed_power=distributed_power_value,
395-
succeed_batteries=succeed_batteries,
396-
failed_power=failed_power,
397-
failed_batteries=failed_batteries,
398-
excess_power=distribution.remaining_power,
399-
)
314+
response = PartialFailure(
315+
request=request,
316+
succeed_power=distributed_power_value,
317+
succeed_batteries=succeed_batteries,
318+
failed_power=failed_power,
319+
failed_batteries=failed_batteries,
320+
excess_power=distribution.remaining_power,
400321
)
401322
else:
402-
await user.channel.send(
403-
Success(
404-
request=request,
405-
succeed_power=distributed_power_value,
406-
used_batteries=set(battery_distribution.keys()),
407-
excess_power=distribution.remaining_power,
408-
)
323+
response = Success(
324+
request=request,
325+
succeed_power=distributed_power_value,
326+
used_batteries=set(battery_distribution.keys()),
327+
excess_power=distribution.remaining_power,
409328
)
410329

330+
self._battery_pool.update_last_request_status(response)
331+
await user.channel.send(response)
332+
411333
def _check_request(self, request: Request) -> Optional[Result]:
412334
"""Check whether the given request if correct.
413335
@@ -580,7 +502,7 @@ def _get_components_pairs(
580502

581503
return bat_inv_map, inv_bat_map
582504

583-
def _get_components_data(self, batteries: Iterable[int]) -> List[InvBatPair]:
505+
def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
584506
"""Get data for the given batteries and adjacent inverters.
585507
586508
Args:
@@ -594,7 +516,7 @@ def _get_components_data(self, batteries: Iterable[int]) -> List[InvBatPair]:
594516
"""
595517
pairs_data: List[InvBatPair] = []
596518

597-
for battery_id in self._broken_components.get_working_subset(batteries):
519+
for battery_id in self._battery_pool.get_working_batteries(batteries):
598520
if battery_id not in self._battery_receivers:
599521
raise KeyError(
600522
f"No battery {battery_id}, "
@@ -607,53 +529,21 @@ def _get_components_data(self, batteries: Iterable[int]) -> List[InvBatPair]:
607529
battery_id
608530
].peek()
609531

610-
if not self._is_component_data_valid(battery_id, battery_data):
532+
if battery_data is None:
533+
_logger.warning("None returned from battery receiver %d.", battery_id)
611534
continue
612535

613536
inverter_data: Optional[InverterData] = self._inverter_receivers[
614537
inverter_id
615538
].peek()
616539

617-
if not self._is_component_data_valid(inverter_id, inverter_data):
540+
if inverter_data is None:
541+
_logger.warning("None returned from inverter receiver %d.", inverter_id)
618542
continue
619543

620-
# None case already checked but mypy don't see that.
621-
if battery_data is not None and inverter_data is not None:
622-
pairs_data.append(InvBatPair(battery_data, inverter_data))
544+
pairs_data.append(InvBatPair(battery_data, inverter_data))
623545
return pairs_data
624546

625-
def _is_component_data_valid(
626-
self, component_id: int, component_data: Union[None, BatteryData, InverterData]
627-
) -> bool:
628-
"""Check whether the component data from microgrid are correct.
629-
630-
Args:
631-
component_id: component id
632-
component_data: component data instance
633-
634-
Returns:
635-
True if data are correct, false otherwise
636-
"""
637-
if component_data is None:
638-
_logger.warning(
639-
"No data from component %d.",
640-
component_id,
641-
)
642-
return False
643-
644-
now = datetime.now(timezone.utc)
645-
time_delta = now - component_data.timestamp
646-
if time_delta.total_seconds() > self.component_data_timeout_sec:
647-
_logger.warning(
648-
"Component %d data are stale. Last timestamp: %s, now: %s",
649-
component_id,
650-
str(component_data.timestamp),
651-
str(now),
652-
)
653-
return False
654-
655-
return True
656-
657547
async def _create_channels(self) -> None:
658548
"""Create channels to get data of components in microgrid."""
659549
for battery_id, inverter_id in self._bat_inv_map.items():
@@ -710,7 +600,6 @@ def _parse_result(
710600
battery_id,
711601
str(err),
712602
)
713-
self._broken_components.mark_as_broken(battery_id)
714603
except asyncio.exceptions.CancelledError:
715604
failed_power += distribution[inverter_id]
716605
failed_batteries.add(battery_id)
@@ -719,7 +608,6 @@ def _parse_result(
719608
battery_id,
720609
request_timeout_sec,
721610
)
722-
self._broken_components.mark_as_broken(battery_id)
723611

724612
return failed_power, failed_batteries
725613

0 commit comments

Comments
 (0)