Skip to content

Commit 151c6ab

Browse files
Change tracking batteries status feature to use channel communication
BatteryStatusTracker receives messages from components, parse it. If status change BatteryStatusTracker sends new status using channel. BatteriesStatus is now only class that listen for change of status from any batteries and updates set of working batteries. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 5b99f97 commit 151c6ab

File tree

8 files changed

+1102
-1130
lines changed

8 files changed

+1102
-1130
lines changed

src/frequenz/sdk/actor/power_distributing/_batteries_status.py

Lines changed: 141 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,75 @@
66

77
import asyncio
88
import logging
9+
from dataclasses import dataclass
910
from typing import Dict, Set
1011

11-
from ..._internal.asyncio import AsyncConstructible
12-
from ...microgrid._battery import BatteryStatus, BatteryStatusTracker
12+
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.channels.util import MergeNamed
14+
15+
from ..._internal.asyncio import cancel_and_await
16+
from ...microgrid._battery import BatteryStatus, BatteryStatusTracker, RequestResult
1317

1418
_logger = logging.getLogger(__name__)
1519

1620

17-
class BatteriesStatus(AsyncConstructible):
18-
"""Return status of batteries in the pool.
21+
@dataclass
22+
class Status:
23+
"""Status of the batteries."""
24+
25+
working: Set[int]
26+
"""Set of working battery ids."""
27+
28+
uncertain: Set[int]
29+
"""Set of batteries that should be used only if there are no working batteries."""
30+
31+
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
32+
"""From the given set of batteries return working batteries.
33+
34+
Args:
35+
batteries: Set of batteries
1936
20-
To create an instance of this class you should use `async_new` class method.
21-
Standard constructor (__init__) is not supported and using it will raise
22-
`NotSyncConstructible` error.
37+
Returns:
38+
Subset with working batteries.
39+
"""
40+
working = self.working.intersection(batteries)
41+
if len(working) > 0:
42+
return working
43+
return self.uncertain.intersection(batteries)
44+
45+
46+
@dataclass
47+
class _BatteryStatusChannelHelper:
48+
"""Helper class to create battery status channel.
49+
50+
Channel has only one receiver.
51+
Receiver has size 1, because we need only latest status.
2352
"""
2453

25-
# This is instance attribute.
26-
# Don't assign default value, because then it becomes class attribute.
27-
_batteries: Dict[int, BatteryStatusTracker]
54+
battery_id: int
55+
"""Id of the battery for which we should create channel."""
56+
57+
def __post_init__(self):
58+
self.name: str = f"battery-{self.battery_id}-status"
59+
channel = Broadcast[BatteryStatus](self.name)
60+
61+
receiver_name = f"{self.name}-receiver"
62+
self.receiver = channel.new_receiver(name=receiver_name, maxsize=1)
63+
self.sender = channel.new_sender()
64+
65+
66+
class BatteriesStatus:
67+
"""Track status of the batteries.
68+
69+
Send set of working and uncertain batteries, when the any battery change status.
70+
"""
2871

29-
@classmethod
30-
async def async_new(
31-
cls,
72+
def __init__(
73+
self,
3274
battery_ids: Set[int],
3375
max_data_age_sec: float,
3476
max_blocking_duration_sec: float,
35-
) -> BatteriesStatus:
77+
) -> None:
3678
"""Create BatteriesStatus instance.
3779
3880
Args:
@@ -46,75 +88,103 @@ async def async_new(
4688
4789
Raises:
4890
RuntimeError: If any battery has no adjacent inverter.
49-
50-
Returns:
51-
New instance of this class.
5291
"""
53-
self: BatteriesStatus = BatteriesStatus.__new__(cls)
54-
55-
tasks = [
56-
BatteryStatusTracker.async_new(
57-
id, max_data_age_sec, max_blocking_duration_sec
92+
# At first no battery is working, we will get notification when they start
93+
# working.
94+
self._current_status = Status(working=set(), uncertain=set())
95+
96+
# Channel for sending results of requests to the batteries
97+
request_result_channel = Broadcast[RequestResult]("battery_request_status")
98+
self._request_result_sender = request_result_channel.new_sender()
99+
100+
self._batteries: Dict[str, BatteryStatusTracker] = {}
101+
102+
# Receivers for individual battery statuses are needed to create a `MergeNamed`
103+
# object.
104+
receivers: Dict[str, Receiver[BatteryStatus]] = {}
105+
106+
for battery_id in battery_ids:
107+
channel = _BatteryStatusChannelHelper(battery_id)
108+
receivers[channel.name] = channel.receiver
109+
110+
self._batteries[channel.name] = BatteryStatusTracker(
111+
battery_id=battery_id,
112+
max_data_age_sec=max_data_age_sec,
113+
max_blocking_duration_sec=max_blocking_duration_sec,
114+
status_sender=channel.sender,
115+
request_result_receiver=request_result_channel.new_receiver(
116+
f"battery_{battery_id}_request_status"
117+
),
58118
)
59-
for id in battery_ids
60-
]
61119

62-
trackers = await asyncio.gather(*tasks)
63-
self._batteries = {tracker.battery_id: tracker for tracker in trackers}
120+
self._battery_status_channel = MergeNamed[BatteryStatus](
121+
**receivers,
122+
)
64123

65-
return self
124+
self._task = asyncio.create_task(self._run())
66125

67-
def get_working_batteries(self, battery_ids: Set[int]) -> Set[int]:
68-
"""Get subset of battery_ids with working batteries.
126+
async def stop(self) -> None:
127+
"""Stop tracking batteries status."""
128+
await asyncio.gather(
129+
*[
130+
tracker.stop() # pylint: disable=protected-access
131+
for tracker in self._batteries.values()
132+
]
133+
)
134+
await cancel_and_await(self._task)
69135

70-
Args:
71-
battery_ids: batteries ids
136+
async def _run(self) -> None:
137+
"""Start tracking batteries status."""
138+
while True:
139+
try:
140+
await self._update_status(self._battery_status_channel)
141+
except Exception as err: # pylint: disable=broad-except
142+
_logger.error("BatteriesStatus failed with error: %s. Restarting.", err)
72143

73-
Raises:
74-
KeyError: If any battery in the given batteries is not in the pool.
144+
async def _update_status(self, status_channel: MergeNamed[BatteryStatus]) -> None:
145+
"""Wait for any battery to change status and update status.
75146
76-
Returns:
77-
Subset of given batteries with working batteries.
147+
Args:
148+
status_channel: Receivers packed in Select object.
78149
"""
79-
working: Set[int] = set()
80-
uncertain: Set[int] = set()
81-
for bat_id in battery_ids:
82-
if bat_id not in battery_ids:
83-
ids = str(self._batteries.keys())
84-
raise KeyError(f"No battery {bat_id} in pool. All batteries: {ids}")
85-
battery_status = self._batteries[bat_id].get_status()
86-
if battery_status == BatteryStatus.WORKING:
87-
working.add(bat_id)
88-
elif battery_status == BatteryStatus.UNCERTAIN:
89-
uncertain.add(bat_id)
150+
async for channel_name, status in status_channel:
151+
battery_id = self._batteries[channel_name].battery_id
152+
if status == BatteryStatus.WORKING:
153+
self._current_status.working.add(battery_id)
154+
self._current_status.uncertain.discard(battery_id)
155+
elif status == BatteryStatus.UNCERTAIN:
156+
self._current_status.working.discard(battery_id)
157+
self._current_status.uncertain.add(battery_id)
158+
elif status == BatteryStatus.NOT_WORKING:
159+
self._current_status.working.discard(battery_id)
160+
self._current_status.uncertain.discard(battery_id)
161+
162+
# In the future here we should send status to the subscribed actors
163+
164+
async def update_status(
165+
self, succeed_batteries: Set[int], failed_batteries: Set[int]
166+
) -> None:
167+
"""Notify which batteries succeed and failed in the request.
168+
169+
Batteries that failed will be considered as broken and will be blocked for
170+
some time.
171+
Batteries that succeed will be unblocked.
90172
91-
if len(working) > 0:
92-
return working
93-
94-
_logger.warning(
95-
"There are no working batteries in %s. Falling back to using uncertain batteries %s.",
96-
str(battery_ids),
97-
str(uncertain),
173+
Args:
174+
succeed_batteries: Batteries that succeed request
175+
failed_batteries: Batteries that failed request
176+
"""
177+
await self._request_result_sender.send(
178+
RequestResult(succeed_batteries, failed_batteries)
98179
)
99-
return uncertain
100180

101-
def update_status(self, succeed_batteries: Set[int], failed_batteries: Set[int]):
102-
"""Update batteries in pool based on the last result from the request.
181+
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
182+
"""From the given set of batteries get working.
103183
104184
Args:
105-
succeed_batteries: Batteries that succeed request
106-
failed_batteries: Batteries that failed request.
185+
batteries: Set of batteries
186+
187+
Returns:
188+
Subset with working batteries.
107189
"""
108-
for battery_id in succeed_batteries:
109-
self._batteries[battery_id].unblock()
110-
111-
for battery_id in failed_batteries:
112-
duration = self._batteries[battery_id].block()
113-
if duration > 0:
114-
_logger.warning(
115-
"Battery %d failed last response. Block it for %f sec",
116-
battery_id,
117-
duration,
118-
)
119-
120-
self._batteries[battery_id].block()
190+
return self._current_status.get_working_batteries(batteries)

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@ def __init__(
183183
self._users_tasks = self._create_users_tasks()
184184
self._started = asyncio.Event()
185185

186+
self._batteries_status = BatteriesStatus(
187+
battery_ids=set(self._bat_inv_map.keys()),
188+
max_blocking_duration_sec=30.0,
189+
max_data_age_sec=10.0,
190+
)
191+
186192
def _create_users_tasks(self) -> List[asyncio.Task[Empty]]:
187193
"""For each user create a task to wait for request.
188194
@@ -246,11 +252,6 @@ async def run(self) -> None:
246252
await self._create_channels()
247253

248254
api = microgrid.get().api_client
249-
battery_pool = await BatteriesStatus.async_new(
250-
battery_ids=set(self._bat_inv_map.keys()),
251-
max_blocking_duration_sec=30.0,
252-
max_data_age_sec=10.0,
253-
)
254255

255256
# Wait few seconds to get data from the channels created above.
256257
await asyncio.sleep(self._wait_for_data_sec)
@@ -261,7 +262,7 @@ async def run(self) -> None:
261262

262263
try:
263264
pairs_data: List[InvBatPair] = self._get_components_data(
264-
battery_pool.get_working_batteries(request.batteries)
265+
self._batteries_status.get_working_batteries(request.batteries)
265266
)
266267
except KeyError as err:
267268
await user.channel.send(Error(request, str(err)))
@@ -316,8 +317,14 @@ async def run(self) -> None:
316317
excess_power=distribution.remaining_power,
317318
)
318319

319-
battery_pool.update_status(succeed_batteries, failed_batteries)
320-
await user.channel.send(response)
320+
asyncio.gather(
321+
*[
322+
self._batteries_status.update_status(
323+
succeed_batteries, failed_batteries
324+
),
325+
user.channel.send(response),
326+
]
327+
)
321328

322329
async def _set_distributed_power(
323330
self,
@@ -645,4 +652,5 @@ async def _cancel_tasks(self, tasks: Iterable[asyncio.Task[Any]]) -> None:
645652
async def _stop_actor(self) -> None:
646653
"""Stop all running async tasks."""
647654
await asyncio.gather(*[cancel_and_await(t) for t in self._users_tasks])
655+
await self._batteries_status.stop()
648656
await self._stop() # type: ignore # pylint: disable=no-member

src/frequenz/sdk/microgrid/_battery/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
Stores features for the batteries.
77
"""
88

9-
from ._status import BatteryStatus, BatteryStatusTracker
9+
from ._status import BatteryStatus, BatteryStatusTracker, RequestResult
1010

1111
__all__ = [
1212
"BatteryStatusTracker",
1313
"BatteryStatus",
14+
"RequestResult",
1415
]

0 commit comments

Comments
 (0)