Skip to content

Commit 21e569f

Browse files
Add BatteryPool implementation for aggregating battery-inverter data (#205)
BatteryPool implementation for aggregating battery-inverter metrics into higher level metrics
2 parents 6b7f178 + 2dff59e commit 21e569f

24 files changed

+2845
-34
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
from dataclasses import dataclass
1111
from typing import Any, Coroutine, Dict, List, Set # pylint: disable=unused-import
1212

13-
from frequenz.channels import Bidirectional
13+
from frequenz.channels import Bidirectional, Broadcast
1414

1515
from frequenz.sdk import microgrid
1616
from frequenz.sdk.actor.power_distributing import (
17+
BatteryStatus,
1718
Error,
1819
Ignored,
1920
OutOfBound,
@@ -123,7 +124,11 @@ async def run_test( # pylint: disable=too-many-locals
123124
user_id: channel.service_handle for user_id, channel in channels.items()
124125
}
125126

126-
distributor = PowerDistributingActor(service_channels)
127+
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
128+
129+
distributor = PowerDistributingActor(
130+
service_channels, battery_status_sender=battery_status_channel.new_sender()
131+
)
127132

128133
tasks: List[Coroutine[Any, Any, List[Result]]] = []
129134
for user_id, channel in channels.items():

examples/battery_pool.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Script with an example how to use BatteryPool."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import logging
10+
from datetime import timedelta
11+
from typing import Any, Dict
12+
13+
from frequenz.channels import Broadcast, Receiver
14+
from frequenz.channels.util import MergeNamed
15+
16+
from frequenz.sdk import microgrid
17+
from frequenz.sdk.actor import (
18+
ChannelRegistry,
19+
ComponentMetricRequest,
20+
DataSourcingActor,
21+
)
22+
from frequenz.sdk.actor.power_distributing import PowerDistributingActor
23+
from frequenz.sdk.actor.power_distributing._battery_pool_status import BatteryStatus
24+
from frequenz.sdk.microgrid.component import ComponentCategory
25+
from frequenz.sdk.timeseries.battery_pool import BatteryPool
26+
27+
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
28+
PORT = 61060
29+
30+
31+
def create_battery_pool() -> BatteryPool:
32+
"""Create battery pool.
33+
34+
It needs many instance to be created before.
35+
36+
Returns:
37+
BatteryPool instance ready to use.
38+
"""
39+
channel_registry = ChannelRegistry(name="data-registry")
40+
41+
# Create a channels for sending/receiving subscription requests
42+
data_source_request_channel = Broadcast[ComponentMetricRequest](
43+
"data-source", resend_latest=True
44+
)
45+
46+
# Instantiate a data sourcing actor
47+
_ = DataSourcingActor(
48+
request_receiver=data_source_request_channel.new_receiver(
49+
"data_sourcing_receiver"
50+
),
51+
registry=channel_registry,
52+
)
53+
54+
battery_status_channel = Broadcast[BatteryStatus]("batteries-status")
55+
_ = PowerDistributingActor(
56+
users_channels={},
57+
battery_status_sender=battery_status_channel.new_sender(),
58+
)
59+
60+
batteries = microgrid.get().component_graph.components(
61+
component_category={ComponentCategory.BATTERY}
62+
)
63+
64+
return BatteryPool(
65+
batteries_id=set(battery.component_id for battery in batteries),
66+
batteries_status_receiver=battery_status_channel.new_receiver(
67+
name="battery_pool", maxsize=1
68+
),
69+
min_update_interval=timedelta(seconds=0.2),
70+
)
71+
72+
73+
async def main() -> None:
74+
"""Create the battery pool, activate all formulas and listen for any update."""
75+
logging.basicConfig(
76+
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
77+
)
78+
await microgrid.initialize(host=HOST, port=PORT)
79+
80+
battery_pool = create_battery_pool()
81+
receivers: Dict[str, Receiver[Any]] = {
82+
"soc": await battery_pool.soc(maxsize=1),
83+
"capacity": await battery_pool.capacity(maxsize=1),
84+
"power_bounds": await battery_pool.power_bounds(maxsize=1),
85+
}
86+
87+
merged_channel = MergeNamed[Any](**receivers)
88+
async for metric_name, metric in merged_channel:
89+
print(f"Received new {metric_name}: {metric}")
90+
91+
92+
asyncio.run(main())

examples/battery_status.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@
1010
import asyncio
1111
import logging
1212

13+
from frequenz.channels import Broadcast
14+
1315
from frequenz.sdk import microgrid
14-
from frequenz.sdk.actor.power_distributing._battery_pool_status import BatteryPoolStatus
16+
from frequenz.sdk.actor.power_distributing._battery_pool_status import (
17+
BatteryPoolStatus,
18+
BatteryStatus,
19+
)
1520
from frequenz.sdk.microgrid.component import ComponentCategory
1621

1722
_logger = logging.getLogger(__name__)
@@ -31,13 +36,19 @@ async def main() -> None:
3136
component_category={ComponentCategory.BATTERY}
3237
)
3338
}
39+
battery_status_channel = Broadcast[BatteryStatus]("battery-status-channel")
3440

3541
batteries_status = BatteryPoolStatus(
42+
battery_status_sender=battery_status_channel.new_sender(),
3643
battery_ids=batteries,
3744
max_data_age_sec=5,
3845
max_blocking_duration_sec=30,
3946
)
4047

48+
battery_status_receiver = battery_status_channel.new_receiver()
49+
async for status in battery_status_receiver:
50+
print(f"Received new battery status {status}")
51+
4152
await batteries_status.join()
4253

4354

examples/power_distribution.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ResamplerConfig,
2828
)
2929
from frequenz.sdk.actor.power_distributing import (
30+
BatteryStatus,
3031
PowerDistributingActor,
3132
Request,
3233
Result,
@@ -194,11 +195,14 @@ async def run() -> None:
194195
)
195196
}
196197

198+
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
199+
197200
power_distributor = PowerDistributingActor(
198201
users_channels={
199202
key: channel.service_handle
200203
for key, channel in power_distributor_channels.items()
201204
},
205+
battery_status_sender=battery_status_channel.new_sender(),
202206
)
203207

204208
# Channel to communicate between actors.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Module with constants shared between instances of the sdk.
5+
6+
To be replaced by ConfigManager.
7+
"""
8+
9+
RECEIVER_MAX_SIZE = 50
10+
"""Default buffer size of the receiver."""
11+
12+
WAIT_FOR_COMPONENT_DATA_SEC: float = 2
13+
"""Delay the start of the application to wait for the data."""
14+
15+
MAX_BATTERY_DATA_AGE_SEC: float = 2
16+
"""Max time difference for the battery or inverter data to be considered as reliable.
17+
18+
If battery or inverter stopped sending data, then this is the maximum time when its
19+
last message should be considered as valid. After that time, component data
20+
should not be used.
21+
"""

src/frequenz/sdk/actor/power_distributing/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
PowerDistributingActor and send requests for charging or discharging power.
1111
"""
1212

13+
from ._battery_pool_status import BatteryStatus
1314
from .power_distributing import PowerDistributingActor
1415
from .request import Request
1516
from .result import Error, Ignored, OutOfBound, PartialFailure, Result, Success
@@ -23,4 +24,5 @@
2324
"Ignored",
2425
"OutOfBound",
2526
"PartialFailure",
27+
"BatteryStatus",
2628
]

src/frequenz/sdk/actor/power_distributing/_battery_pool_status.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from dataclasses import dataclass
1010
from typing import Dict, Set
1111

12-
from frequenz.channels import Broadcast, Receiver
12+
from frequenz.channels import Broadcast, Receiver, Sender
1313
from frequenz.channels.util import MergeNamed
1414

1515
from ..._internal.asyncio import cancel_and_await
@@ -72,13 +72,16 @@ class BatteryPoolStatus:
7272
def __init__(
7373
self,
7474
battery_ids: Set[int],
75+
battery_status_sender: Sender[BatteryStatus],
7576
max_data_age_sec: float,
7677
max_blocking_duration_sec: float,
7778
) -> None:
7879
"""Create BatteryPoolStatus instance.
7980
8081
Args:
8182
battery_ids: set of batteries ids that should be stored in pool.
83+
battery_status_sender: The sender used for sending the status of the
84+
batteries in the pool.
8285
max_data_age_sec: If component stopped sending data, then
8386
this is the maximum time when its last message should be considered as
8487
valid. After that time, component won't be used until it starts sending
@@ -121,7 +124,7 @@ def __init__(
121124
**receivers,
122125
)
123126

124-
self._task = asyncio.create_task(self._run())
127+
self._task = asyncio.create_task(self._run(battery_status_sender))
125128

126129
async def join(self) -> None:
127130
"""Await for the battery pool, and return when the task completes.
@@ -146,23 +149,30 @@ async def stop(self) -> None:
146149
)
147150
await self._battery_status_channel.stop()
148151

149-
async def _run(self) -> None:
150-
"""Start tracking batteries status."""
152+
async def _run(self, battery_status_sender: Sender[BatteryStatus]) -> None:
153+
"""Start tracking batteries status.
154+
155+
Args:
156+
battery_status_sender: The sender used for sending the status of the
157+
batteries in the pool.
158+
"""
151159
while True:
152160
try:
153-
await self._update_status(self._battery_status_channel)
161+
await self._update_status(battery_status_sender)
154162
except Exception as err: # pylint: disable=broad-except
155163
_logger.error(
156164
"BatteryPoolStatus failed with error: %s. Restarting.", err
157165
)
158166

159-
async def _update_status(self, status_channel: MergeNamed[Status]) -> None:
167+
async def _update_status(
168+
self, battery_status_sender: Sender[BatteryStatus]
169+
) -> None:
160170
"""Wait for any battery to change status and update status.
161171
162172
Args:
163-
status_channel: Receivers packed in Select object.
173+
battery_status_sender: Sender to send the current status of the batteries.
164174
"""
165-
async for channel_name, status in status_channel:
175+
async for channel_name, status in self._battery_status_channel:
166176
battery_id = self._batteries[channel_name].battery_id
167177
if status == Status.WORKING:
168178
self._current_status.working.add(battery_id)
@@ -174,7 +184,7 @@ async def _update_status(self, status_channel: MergeNamed[Status]) -> None:
174184
self._current_status.working.discard(battery_id)
175185
self._current_status.uncertain.discard(battery_id)
176186

177-
# In the future here we should send status to the subscribed actors
187+
await battery_status_sender.send(self._current_status)
178188

179189
async def update_status(
180190
self, succeed_batteries: Set[int], failed_batteries: Set[int]

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
)
3030

3131
import grpc
32-
from frequenz.channels import Bidirectional, Peekable, Receiver
32+
from frequenz.channels import Bidirectional, Peekable, Receiver, Sender
3333
from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module
3434

3535
from ... import microgrid
@@ -44,7 +44,7 @@
4444
InverterData,
4545
)
4646
from ...power import DistributionAlgorithm, DistributionResult, InvBatPair
47-
from ._battery_pool_status import BatteryPoolStatus
47+
from ._battery_pool_status import BatteryPoolStatus, BatteryStatus
4848
from .request import Request
4949
from .result import Error, Ignored, OutOfBound, PartialFailure, Result, Success
5050

@@ -143,13 +143,16 @@ class PowerDistributingActor:
143143
def __init__(
144144
self,
145145
users_channels: Dict[str, Bidirectional.Handle[Result, Request]],
146+
battery_status_sender: Sender[BatteryStatus],
146147
wait_for_data_sec: float = 2,
147148
) -> None:
148149
"""Create class instance.
149150
150151
Args:
151152
users_channels: BidirectionalHandle for each user. Key should be
152153
user id and value should be BidirectionalHandle.
154+
battery_status_sender: Channel for sending information which batteries are
155+
working.
153156
wait_for_data_sec: How long actor should wait before processing first
154157
request. It is a time needed to collect first components data.
155158
"""
@@ -185,6 +188,7 @@ def __init__(
185188

186189
self._all_battery_status = BatteryPoolStatus(
187190
battery_ids=set(self._bat_inv_map.keys()),
191+
battery_status_sender=battery_status_sender,
188192
max_blocking_duration_sec=30.0,
189193
max_data_age_sec=10.0,
190194
)

0 commit comments

Comments
 (0)