Skip to content

Commit e3514cb

Browse files
committed
Implement a SystemBoundsTracker for PV inverters
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 125ff58 commit e3514cb

File tree

2 files changed

+158
-0
lines changed

2 files changed

+158
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Interactions with PV inverters."""
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""System bounds tracker for PV inverters."""
5+
6+
import asyncio
7+
import logging
8+
from collections import abc
9+
10+
from frequenz.channels import Receiver, Sender, merge, select, selected_from
11+
from frequenz.client.microgrid import InverterData
12+
13+
from ...actor import BackgroundService
14+
from ...actor.power_distributing._component_status import ComponentPoolStatus
15+
from ...microgrid import connection_manager
16+
from .._base_types import Bounds, SystemBounds
17+
from .._quantities import Power
18+
19+
_logger = logging.getLogger(__name__)
20+
21+
22+
class PVSystemBoundsTracker(BackgroundService):
23+
"""Track the system bounds for PV inverters.
24+
25+
System bounds are the aggregate bounds for the PV inverters in the pool that are in
26+
a working state. They are calculated from the individual bounds received from the
27+
microgrid API.
28+
29+
The system bounds are sent to the `bounds_sender` whenever they change.
30+
"""
31+
32+
def __init__(
33+
self,
34+
component_ids: abc.Set[int],
35+
status_receiver: Receiver[ComponentPoolStatus],
36+
bounds_sender: Sender[SystemBounds],
37+
):
38+
"""Initialize the system bounds tracker.
39+
40+
Args:
41+
component_ids: The ids of the components to track.
42+
status_receiver: A receiver that streams the status of the PV inverters in
43+
the pool.
44+
bounds_sender: A sender to send the system bounds to.
45+
"""
46+
super().__init__()
47+
48+
self._component_ids = component_ids
49+
self._status_receiver = status_receiver
50+
self._bounds_sender = bounds_sender
51+
self._latest_component_data: dict[int, InverterData] = {}
52+
self._last_sent_bounds: SystemBounds | None = None
53+
self._component_pool_status = ComponentPoolStatus(set(), set())
54+
55+
def start(self) -> None:
56+
"""Start the PV inverter system bounds tracker."""
57+
self._tasks.add(asyncio.create_task(self._run_forever()))
58+
59+
async def _send_bounds(self) -> None:
60+
"""Calculate and send the aggregate system bounds if they have changed."""
61+
if not self._latest_component_data:
62+
return
63+
inclusion_bounds = Bounds(
64+
lower=Power.from_watts(
65+
sum(
66+
data.active_power_inclusion_lower_bound
67+
for data in self._latest_component_data.values()
68+
)
69+
),
70+
upper=Power.from_watts(
71+
sum(
72+
data.active_power_inclusion_upper_bound
73+
for data in self._latest_component_data.values()
74+
)
75+
),
76+
)
77+
exclusion_bounds = Bounds(
78+
lower=Power.from_watts(
79+
sum(
80+
data.active_power_exclusion_lower_bound
81+
for data in self._latest_component_data.values()
82+
)
83+
),
84+
upper=Power.from_watts(
85+
sum(
86+
data.active_power_exclusion_upper_bound
87+
for data in self._latest_component_data.values()
88+
)
89+
),
90+
)
91+
92+
if (
93+
self._last_sent_bounds is None
94+
or self._last_sent_bounds.inclusion_bounds != inclusion_bounds
95+
or self._last_sent_bounds.exclusion_bounds != exclusion_bounds
96+
):
97+
new_bounds = SystemBounds(
98+
timestamp=max(
99+
data.timestamp for data in self._latest_component_data.values()
100+
),
101+
inclusion_bounds=inclusion_bounds,
102+
exclusion_bounds=exclusion_bounds,
103+
)
104+
await self._bounds_sender.send(new_bounds)
105+
106+
async def _run_forever(self) -> None:
107+
"""Run the system bounds tracker."""
108+
while True:
109+
try:
110+
await self._run()
111+
except Exception: # pylint: disable=broad-except
112+
_logger.exception(
113+
"Restarting after exception in PVSystemBoundsTracker.run()"
114+
)
115+
await asyncio.sleep(1.0)
116+
117+
async def _run(self) -> None:
118+
"""Run the system bounds tracker."""
119+
api_client = connection_manager.get().api_client
120+
status_rx = self._status_receiver
121+
pv_data_rx = merge(
122+
*(
123+
await asyncio.gather(
124+
*(
125+
api_client.inverter_data(component_id)
126+
for component_id in self._component_ids
127+
)
128+
)
129+
)
130+
)
131+
132+
async for selected in select(status_rx, pv_data_rx):
133+
if selected_from(selected, status_rx):
134+
self._component_pool_status = selected.message
135+
to_remove = []
136+
for comp_id in self._latest_component_data:
137+
if (
138+
comp_id not in self._component_pool_status.working
139+
and comp_id not in self._component_pool_status.uncertain
140+
):
141+
to_remove.append(comp_id)
142+
for comp_id in to_remove:
143+
del self._latest_component_data[comp_id]
144+
elif selected_from(selected, pv_data_rx):
145+
data = selected.message
146+
comp_id = data.component_id
147+
if (
148+
comp_id not in self._component_pool_status.working
149+
and comp_id not in self._component_pool_status.uncertain
150+
):
151+
continue
152+
self._latest_component_data[data.component_id] = data
153+
154+
await self._send_bounds()

0 commit comments

Comments
 (0)