Skip to content

Commit bf36ba6

Browse files
Battery status with channel communication (#171)
2 parents 0c2501d + 6a865d4 commit bf36ba6

File tree

10 files changed

+1415
-1445
lines changed

10 files changed

+1415
-1445
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ classifiers = [
2626
requires-python = ">= 3.8, < 4"
2727
dependencies = [
2828
"frequenz-api-microgrid >= 0.11.0, < 0.12.0",
29-
"frequenz-channels >= 0.12.0, < 0.13.0",
29+
"frequenz-channels >= 0.13.0, < 0.14.0",
3030
"google-api-python-client >= 2.71, < 3",
3131
"grpcio >= 1.51.1, < 2",
3232
"grpcio-tools >= 1.51.1, < 2",

src/frequenz/sdk/actor/power_distributing/_battery_pool_status.py

Lines changed: 145 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,75 @@
66

77
import asyncio
88
import logging
9+
from dataclasses import dataclass
910
from typing import Dict, Set
1011

11-
from ..._internal.asyncio import AsyncConstructible
12-
from ...microgrid._battery import BatteryStatus, StatusTracker
13-
from .result import PartialFailure, Result, Success
12+
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.channels.util import MergeNamed
14+
15+
from ..._internal.asyncio import cancel_and_await
16+
from ._battery_status import BatteryStatusTracker, SetPowerResult, Status
1417

1518
_logger = logging.getLogger(__name__)
1619

1720

18-
class BatteryPoolStatus(AsyncConstructible):
19-
"""Return status of batteries in the pool.
21+
@dataclass
22+
class BatteryStatus:
23+
"""Status of the batteries."""
24+
25+
working: Set[int]
26+
"""Set of working battery ids."""
27+
28+
uncertain: Set[int]
29+
"""Set of batteries that should be used only if there are no working batteries."""
30+
31+
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
32+
"""From the given set of batteries return working batteries.
33+
34+
Args:
35+
batteries: Set of batteries
36+
37+
Returns:
38+
Subset with working batteries.
39+
"""
40+
working = self.working.intersection(batteries)
41+
if len(working) > 0:
42+
return working
43+
return self.uncertain.intersection(batteries)
44+
45+
46+
@dataclass
47+
class _BatteryStatusChannelHelper:
48+
"""Helper class to create battery status channel.
2049
21-
To create an instance of this class you should use `async_new` class method.
22-
Standard constructor (__init__) is not supported and using it will raise
23-
`NotSyncConstructible` error.
50+
Channel has only one receiver.
51+
Receiver has size 1, because we need only latest status.
2452
"""
2553

26-
# This is instance attribute.
27-
# Don't assign default value, because then it becomes class attribute.
28-
_batteries: Dict[int, StatusTracker]
54+
battery_id: int
55+
"""Id of the battery for which we should create channel."""
56+
57+
def __post_init__(self):
58+
self.name: str = f"battery-{self.battery_id}-status"
59+
channel = Broadcast[Status](self.name)
2960

30-
@classmethod
31-
async def async_new(
32-
cls,
61+
receiver_name = f"{self.name}-receiver"
62+
self.receiver = channel.new_receiver(name=receiver_name, maxsize=1)
63+
self.sender = channel.new_sender()
64+
65+
66+
class BatteryPoolStatus:
67+
"""Track status of the batteries.
68+
69+
Send set of working and uncertain batteries, when the any battery change status.
70+
"""
71+
72+
def __init__(
73+
self,
3374
battery_ids: Set[int],
3475
max_data_age_sec: float,
3576
max_blocking_duration_sec: float,
36-
) -> BatteryPoolStatus:
77+
) -> None:
3778
"""Create BatteryPoolStatus instance.
3879
3980
Args:
@@ -47,81 +88,107 @@ async def async_new(
4788
4889
Raises:
4990
RuntimeError: If any battery has no adjacent inverter.
50-
51-
Returns:
52-
New instance of this class.
5391
"""
54-
self: BatteryPoolStatus = BatteryPoolStatus.__new__(cls)
55-
56-
tasks = [
57-
StatusTracker.async_new(id, max_data_age_sec, max_blocking_duration_sec)
58-
for id in battery_ids
59-
]
92+
# At first no battery is working, we will get notification when they start
93+
# working.
94+
self._current_status = BatteryStatus(working=set(), uncertain=set())
95+
96+
# Channel for sending results of requests to the batteries
97+
request_result_channel = Broadcast[SetPowerResult]("battery_request_status")
98+
self._request_result_sender = request_result_channel.new_sender()
99+
100+
self._batteries: Dict[str, BatteryStatusTracker] = {}
101+
102+
# Receivers for individual battery statuses are needed to create a `MergeNamed`
103+
# object.
104+
receivers: Dict[str, Receiver[Status]] = {}
105+
106+
for battery_id in battery_ids:
107+
channel = _BatteryStatusChannelHelper(battery_id)
108+
receivers[channel.name] = channel.receiver
109+
110+
self._batteries[channel.name] = BatteryStatusTracker(
111+
battery_id=battery_id,
112+
max_data_age_sec=max_data_age_sec,
113+
max_blocking_duration_sec=max_blocking_duration_sec,
114+
status_sender=channel.sender,
115+
request_result_receiver=request_result_channel.new_receiver(
116+
f"battery_{battery_id}_request_status"
117+
),
118+
)
119+
120+
self._battery_status_channel = MergeNamed[Status](
121+
**receivers,
122+
)
60123

61-
trackers = await asyncio.gather(*tasks)
62-
self._batteries = {tracker.battery_id: tracker for tracker in trackers}
124+
self._task = asyncio.create_task(self._run())
63125

64-
return self
126+
async def stop(self) -> None:
127+
"""Stop tracking batteries status."""
128+
await cancel_and_await(self._task)
65129

66-
def get_working_batteries(self, battery_ids: Set[int]) -> Set[int]:
67-
"""Get subset of battery_ids with working batteries.
130+
await asyncio.gather(
131+
*[
132+
tracker.stop() # pylint: disable=protected-access
133+
for tracker in self._batteries.values()
134+
],
135+
)
136+
await self._battery_status_channel.stop()
68137

69-
Args:
70-
battery_ids: batteries ids
138+
async def _run(self) -> None:
139+
"""Start tracking batteries status."""
140+
while True:
141+
try:
142+
await self._update_status(self._battery_status_channel)
143+
except Exception as err: # pylint: disable=broad-except
144+
_logger.error(
145+
"BatteryPoolStatus failed with error: %s. Restarting.", err
146+
)
71147

72-
Raises:
73-
RuntimeError: If `async_init` method was not called at the beginning to
74-
initialized object.
75-
KeyError: If any battery in the given batteries is not in the pool.
148+
async def _update_status(self, status_channel: MergeNamed[Status]) -> None:
149+
"""Wait for any battery to change status and update status.
76150
77-
Returns:
78-
Subset of given batteries with working batteries.
151+
Args:
152+
status_channel: Receivers packed in Select object.
79153
"""
80-
working: Set[int] = set()
81-
uncertain: Set[int] = set()
82-
for bat_id in battery_ids:
83-
if bat_id not in battery_ids:
84-
ids = str(self._batteries.keys())
85-
raise KeyError(f"No battery {bat_id} in pool. All batteries: {ids}")
86-
battery_status = self._batteries[bat_id].get_status()
87-
if battery_status == BatteryStatus.WORKING:
88-
working.add(bat_id)
89-
elif battery_status == BatteryStatus.UNCERTAIN:
90-
uncertain.add(bat_id)
91-
92-
if len(working) > 0:
93-
return working
154+
async for channel_name, status in status_channel:
155+
battery_id = self._batteries[channel_name].battery_id
156+
if status == Status.WORKING:
157+
self._current_status.working.add(battery_id)
158+
self._current_status.uncertain.discard(battery_id)
159+
elif status == Status.UNCERTAIN:
160+
self._current_status.working.discard(battery_id)
161+
self._current_status.uncertain.add(battery_id)
162+
elif status == Status.NOT_WORKING:
163+
self._current_status.working.discard(battery_id)
164+
self._current_status.uncertain.discard(battery_id)
165+
166+
# In the future here we should send status to the subscribed actors
167+
168+
async def update_status(
169+
self, succeed_batteries: Set[int], failed_batteries: Set[int]
170+
) -> None:
171+
"""Notify which batteries succeed and failed in the request.
172+
173+
Batteries that failed will be considered as broken and will be blocked for
174+
some time.
175+
Batteries that succeed will be unblocked.
94176
95-
_logger.warning(
96-
"There are no working batteries in %s. Falling back to using uncertain batteries %s.",
97-
str(battery_ids),
98-
str(uncertain),
177+
Args:
178+
succeed_batteries: Batteries that succeed request
179+
failed_batteries: Batteries that failed request
180+
"""
181+
await self._request_result_sender.send(
182+
SetPowerResult(succeed_batteries, failed_batteries)
99183
)
100-
return uncertain
101184

102-
def update_last_request_status(self, result: Result):
103-
"""Update batteries in pool based on the last result from the request.
185+
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
186+
"""From the given set of batteries get working.
104187
105188
Args:
106-
result: Summary of what batteries failed and succeed in last request.
189+
batteries: Set of batteries
107190
108-
Raises:
109-
RuntimeError: If `async_init` method was not called at the beginning to
110-
initialize object.
191+
Returns:
192+
Subset with working batteries.
111193
"""
112-
if isinstance(result, Success):
113-
for bat_id in result.used_batteries:
114-
self._batteries[bat_id].unblock()
115-
116-
elif isinstance(result, PartialFailure):
117-
for bat_id in result.failed_batteries:
118-
duration = self._batteries[bat_id].block()
119-
if duration > 0:
120-
_logger.warning(
121-
"Battery %d failed last response. Block it for %f sec",
122-
bat_id,
123-
duration,
124-
)
125-
126-
for bat_id in result.succeed_batteries:
127-
self._batteries[bat_id].unblock()
194+
return self._current_status.get_working_batteries(batteries)

0 commit comments

Comments
 (0)