Skip to content

Commit e0cde3c

Browse files
Add BatterPool interface
BatteryPool is and interface for user to receive the aggregated battery-inverter data. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 75f0f48 commit e0cde3c

File tree

2 files changed

+206
-0
lines changed

2 files changed

+206
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manage a pool of batteries."""
5+
6+
from ._result_types import Bound, CapacityMetrics, PowerMetrics, SoCMetrics
7+
from .battery_pool import BatteryPool
8+
9+
__all__ = [
10+
"BatteryPool",
11+
"PowerMetrics",
12+
"SoCMetrics",
13+
"CapacityMetrics",
14+
"Bound",
15+
]
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""User interface for requesting aggregated battery-inverter data."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
from collections.abc import Set
10+
from datetime import timedelta
11+
from typing import Any
12+
13+
from frequenz.channels import Receiver
14+
15+
from ... import microgrid
16+
from ..._internal._constants import RECEIVER_MAX_SIZE
17+
from ..._internal.asyncio import cancel_and_await
18+
from ...actor.power_distributing._battery_pool_status import BatteryStatus
19+
from ...microgrid.component import ComponentCategory
20+
from ._methods import AggregateMethod, SendOnUpdate
21+
from ._metric_calculator import CapacityCalculator, PowerBoundsCalculator, SoCCalculator
22+
from ._result_types import CapacityMetrics, PowerMetrics, SoCMetrics
23+
24+
25+
class BatteryPool:
26+
"""Calculate high level metrics for a pool of the batteries.
27+
28+
BatterPool accepts subset of the battery ids and provides methods methods for
29+
fetching high level metrics for this subset.
30+
"""
31+
32+
def __init__(
33+
self,
34+
batteries_status_receiver: Receiver[BatteryStatus],
35+
min_update_interval: timedelta,
36+
batteries_id: Set[int] | None = None,
37+
) -> None:
38+
"""Create the class instance.
39+
40+
Args:
41+
batteries_status_receiver: Receiver to receive status of the batteries.
42+
Receivers should has maxsize = 1 to fetch only the latest status.
43+
Battery status channel should has resend_latest = True.
44+
It should send information when any battery changed status.
45+
Battery status should include status of the inverter adjacent to this
46+
battery.
47+
min_update_interval: Some metrics in BatteryPool are send only when they
48+
change. For these metrics min_update_interval is the minimum time
49+
interval between the following messages.
50+
Note that this argument is similar to the resampling period
51+
argument in the ComponentMetricsResamplingActor. But as opposed to
52+
ResamplingActor, timestamp returned in the resulting message will be
53+
the timestamp of the last received component data.
54+
It is currently impossible to use resampling actor for these metrics,
55+
because we can't specify resampling function for them.
56+
batteries_id: Subset of the batteries that should be included in the
57+
battery pool. If None or empty, then all batteries from the microgrid
58+
will be used.
59+
"""
60+
if batteries_id:
61+
self._batteries: Set[int] = batteries_id
62+
else:
63+
self._batteries = self._get_all_batteries()
64+
65+
self._working_batteries: set[int] = set()
66+
67+
self._update_battery_status_task = asyncio.create_task(
68+
self._update_battery_status(batteries_status_receiver)
69+
)
70+
71+
self._min_update_interval = min_update_interval
72+
self._active_methods: dict[str, AggregateMethod[Any]] = {}
73+
74+
@property
75+
def battery_ids(self) -> Set[int]:
76+
"""Return ids of the batteries in the pool.
77+
78+
Returns:
79+
Ids of the batteries in the pool
80+
"""
81+
return self._batteries
82+
83+
async def soc(
84+
self, maxsize: int | None = RECEIVER_MAX_SIZE
85+
) -> Receiver[SoCMetrics | None]:
86+
"""Get receiver to receive new soc metrics when they change.
87+
88+
Soc formulas are described in the receiver return type.
89+
None will be send if there is no component to calculate metric.
90+
91+
Args:
92+
maxsize: Maxsize of the receiver channel.
93+
94+
Returns:
95+
Receiver for this metric.
96+
"""
97+
method_name = SendOnUpdate.name() + "_" + SoCCalculator.name()
98+
99+
if method_name not in self._active_methods:
100+
calculator = SoCCalculator(self._batteries)
101+
self._active_methods[method_name] = SendOnUpdate(
102+
metric_calculator=calculator,
103+
working_batteries=self._working_batteries,
104+
min_update_interval=self._min_update_interval,
105+
)
106+
107+
running_method = self._active_methods[method_name]
108+
return running_method.new_receiver(maxsize)
109+
110+
async def capacity(
111+
self, maxsize: int | None = RECEIVER_MAX_SIZE
112+
) -> Receiver[CapacityMetrics | None]:
113+
"""Get receiver to receive new capacity metrics when they change.
114+
115+
Capacity formulas are described in the receiver return type.
116+
None will be send if there is no component to calculate metrics.
117+
118+
Args:
119+
maxsize: Maxsize of the receiver channel.
120+
121+
Returns:
122+
Receiver for this metric.
123+
"""
124+
method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name()
125+
126+
if method_name not in self._active_methods:
127+
calculator = CapacityCalculator(self._batteries)
128+
self._active_methods[method_name] = SendOnUpdate(
129+
metric_calculator=calculator,
130+
working_batteries=self._working_batteries,
131+
min_update_interval=self._min_update_interval,
132+
)
133+
134+
running_method = self._active_methods[method_name]
135+
return running_method.new_receiver(maxsize)
136+
137+
async def power_bounds(
138+
self, maxsize: int | None = RECEIVER_MAX_SIZE
139+
) -> Receiver[PowerMetrics | None]:
140+
"""Get receiver to receive new power bounds when they change.
141+
142+
Power bounds formulas are described in the receiver return type.
143+
None will be send if there is no component to calculate metrics.
144+
145+
Args:
146+
maxsize: Maxsize of the receivers channel.
147+
148+
Returns:
149+
Receiver for this metric.
150+
"""
151+
method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name()
152+
153+
if method_name not in self._active_methods:
154+
calculator = PowerBoundsCalculator(self._batteries)
155+
self._active_methods[method_name] = SendOnUpdate(
156+
metric_calculator=calculator,
157+
working_batteries=self._working_batteries,
158+
min_update_interval=self._min_update_interval,
159+
)
160+
161+
running_method = self._active_methods[method_name]
162+
return running_method.new_receiver(maxsize)
163+
164+
async def stop(self) -> None:
165+
"""Stop all pending async tasks."""
166+
await asyncio.gather(
167+
*[method.stop() for method in self._active_methods.values()],
168+
cancel_and_await(self._update_battery_status_task),
169+
)
170+
171+
def _get_all_batteries(self) -> Set[int]:
172+
"""Get all batteries from the microgrid.
173+
174+
Returns:
175+
All batteries in the microgrid.
176+
"""
177+
graph = microgrid.get().component_graph
178+
return {
179+
battery.component_id
180+
for battery in graph.components(
181+
component_category={ComponentCategory.BATTERY}
182+
)
183+
}
184+
185+
async def _update_battery_status(self, receiver: Receiver[BatteryStatus]) -> None:
186+
async for status in receiver:
187+
self._working_batteries = status.get_working_batteries(
188+
self._batteries # type: ignore[arg-type]
189+
)
190+
for item in self._active_methods.values():
191+
item.update_working_batteries(self._working_batteries)

0 commit comments

Comments
 (0)