Skip to content

Commit 89de962

Browse files
committed
Add a PVManager implementation
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 0fcb11f commit 89de962

File tree

3 files changed

+240
-0
lines changed

3 files changed

+240
-0
lines changed

src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
from ._battery_manager import BatteryManager
77
from ._component_manager import ComponentManager
88
from ._ev_charger_manager import EVChargerManager
9+
from ._pv_inverter_manager import PVManager
910

1011
__all__ = [
1112
"BatteryManager",
1213
"ComponentManager",
1314
"EVChargerManager",
15+
"PVManager",
1416
]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manage PV inverters for the power distributor."""
5+
6+
from ._pv_inverter_manager import PVManager
7+
8+
__all__ = ["PVManager"]
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manage PV inverters for the power distributor."""
5+
6+
import asyncio
7+
import collections.abc
8+
import logging
9+
from datetime import timedelta
10+
11+
import grpc
12+
from frequenz.channels import Broadcast, Sender
13+
from frequenz.client.microgrid import ComponentCategory, InverterData, InverterType
14+
from typing_extensions import override
15+
16+
from ....._internal._channels import LatestValueCache
17+
from ....._internal._math import is_close_to_zero
18+
from .....microgrid import connection_manager
19+
from .....timeseries import Power
20+
from ..._component_pool_status_tracker import ComponentPoolStatusTracker
21+
from ..._component_status import ComponentPoolStatus, PVInverterStatusTracker
22+
from ...request import Request
23+
from ...result import PartialFailure, Result, Success
24+
from .._component_manager import ComponentManager
25+
26+
_logger = logging.getLogger(__name__)
27+
28+
29+
class PVManager(ComponentManager):
30+
"""Manage PV inverters for the power distributor."""
31+
32+
@override
33+
def __init__(
34+
self,
35+
component_pool_status_sender: Sender[ComponentPoolStatus],
36+
results_sender: Sender[Result],
37+
) -> None:
38+
"""Initialize this instance.
39+
40+
Args:
41+
component_pool_status_sender: Channel for sending information about which
42+
components are expected to be working.
43+
results_sender: Channel for sending results of power distribution.
44+
"""
45+
self._results_sender = results_sender
46+
self._pv_inverter_ids = self._get_pv_inverter_ids()
47+
48+
self._component_pool_status_tracker = (
49+
ComponentPoolStatusTracker(
50+
component_ids=self._pv_inverter_ids,
51+
component_status_sender=component_pool_status_sender,
52+
max_data_age=timedelta(seconds=10.0),
53+
max_blocking_duration=timedelta(seconds=30.0),
54+
component_status_tracker_type=PVInverterStatusTracker,
55+
)
56+
if self._pv_inverter_ids
57+
else None
58+
)
59+
self._component_data_caches: dict[int, LatestValueCache[InverterData]] = {}
60+
self._target_power = Power.zero()
61+
self._target_power_channel = Broadcast[Request](name="target_power")
62+
self._target_power_tx = self._target_power_channel.new_sender()
63+
self._task: asyncio.Task[None] | None = None
64+
65+
@override
66+
def component_ids(self) -> collections.abc.Set[int]:
67+
"""Return the set of PV inverter ids."""
68+
return self._pv_inverter_ids
69+
70+
@override
71+
async def start(self) -> None:
72+
"""Start the PV inverter manager."""
73+
self._component_data_caches = {
74+
inv_id: LatestValueCache(
75+
await connection_manager.get().api_client.inverter_data(inv_id)
76+
)
77+
for inv_id in self._pv_inverter_ids
78+
}
79+
80+
@override
81+
async def stop(self) -> None:
82+
"""Stop the PV inverter manager."""
83+
await asyncio.gather(
84+
*[tracker.stop() for tracker in self._component_data_caches.values()]
85+
)
86+
if self._component_pool_status_tracker:
87+
await self._component_pool_status_tracker.stop()
88+
89+
@override
90+
async def distribute_power(self, request: Request) -> None:
91+
"""Distribute the requested power to the PV inverters.
92+
93+
Args:
94+
request: Request to get the distribution for.
95+
96+
Raises:
97+
ValueError: If no PV inverters are present in the component graph, but
98+
component_ids are provided in the request.
99+
"""
100+
remaining_power = request.power
101+
allocations: dict[int, Power] = {}
102+
if not self._component_pool_status_tracker:
103+
if not request.component_ids:
104+
await self._results_sender.send(
105+
Success(
106+
succeeded_components=set(),
107+
succeeded_power=Power.zero(),
108+
excess_power=remaining_power,
109+
request=request,
110+
)
111+
)
112+
return
113+
raise ValueError(
114+
"Cannot distribute power to PV inverters without any inverters"
115+
)
116+
working_components = list(
117+
self._component_pool_status_tracker.get_working_components(
118+
request.component_ids
119+
)
120+
)
121+
122+
# When sorting by lower bounds, which are negative for PV inverters, we have to
123+
# reverse the order, so that the inverters with the higher bounds i.e., the
124+
# least absolute value are first.
125+
working_components.sort(
126+
key=lambda inv_id: self._component_data_caches[inv_id]
127+
.get()
128+
.active_power_inclusion_lower_bound,
129+
reverse=True,
130+
)
131+
132+
num_components = len(working_components)
133+
for idx, inv_id in enumerate(working_components):
134+
# Request powers are negative for PV inverters. When remaining power is
135+
# greater than 0.0, we can stop allocating further.
136+
if remaining_power > Power.zero() or is_close_to_zero(
137+
remaining_power.as_watts()
138+
):
139+
break
140+
distribution = remaining_power / float(num_components - idx)
141+
inv_data = self._component_data_caches[inv_id]
142+
if not inv_data.has_value():
143+
allocations[inv_id] = Power.zero()
144+
# Can't get device bounds, so can't use inverter.
145+
continue
146+
discharge_bounds = Power.from_watts(
147+
inv_data.get().active_power_inclusion_lower_bound
148+
)
149+
# Because all 3 values are negative or 0, we use max, to get the value
150+
# with the least absolute value.
151+
allocated_power = max(remaining_power, discharge_bounds, distribution)
152+
allocations[inv_id] = allocated_power
153+
remaining_power -= allocated_power
154+
155+
_logger.debug(
156+
"Distributing %s to PV inverters %s",
157+
request.power,
158+
allocations,
159+
)
160+
await self._set_api_power(request, allocations, remaining_power)
161+
162+
async def _set_api_power(
163+
self, request: Request, allocations: dict[int, Power], remaining_power: Power
164+
) -> None:
165+
api_client = connection_manager.get().api_client
166+
tasks: dict[int, asyncio.Task[None]] = {}
167+
for component_id, power in allocations.items():
168+
tasks[component_id] = asyncio.create_task(
169+
api_client.set_power(component_id, power.as_watts())
170+
)
171+
_, pending = await asyncio.wait(
172+
tasks.values(),
173+
timeout=request.request_timeout.total_seconds(),
174+
return_when=asyncio.ALL_COMPLETED,
175+
)
176+
for task in pending:
177+
task.cancel()
178+
await asyncio.gather(*pending, return_exceptions=True)
179+
180+
failed_components: set[int] = set()
181+
succeeded_components: set[int] = set()
182+
failed_power = Power.zero()
183+
for component_id, task in tasks.items():
184+
exc = task.exception()
185+
if exc is not None:
186+
failed_components.add(component_id)
187+
failed_power += allocations[component_id]
188+
else:
189+
succeeded_components.add(component_id)
190+
191+
match task.exception():
192+
case asyncio.CancelledError:
193+
_logger.warning(
194+
"Timeout while setting power to EV charger %s", component_id
195+
)
196+
case grpc.aio.AioRpcError as err:
197+
_logger.warning(
198+
"Error while setting power to EV charger %s: %s",
199+
component_id,
200+
err,
201+
)
202+
if failed_components:
203+
await self._results_sender.send(
204+
PartialFailure(
205+
failed_components=failed_components,
206+
succeeded_components=succeeded_components,
207+
failed_power=failed_power,
208+
succeeded_power=self._target_power - failed_power,
209+
excess_power=remaining_power,
210+
request=request,
211+
)
212+
)
213+
await self._results_sender.send(
214+
Success(
215+
succeeded_components=succeeded_components,
216+
succeeded_power=self._target_power,
217+
excess_power=remaining_power,
218+
request=request,
219+
)
220+
)
221+
222+
def _get_pv_inverter_ids(self) -> collections.abc.Set[int]:
223+
"""Return the IDs of all PV inverters present in the component graph."""
224+
return {
225+
inv.component_id
226+
for inv in connection_manager.get().component_graph.components(
227+
component_categories={ComponentCategory.INVERTER}
228+
)
229+
if inv.type == InverterType.SOLAR
230+
}

0 commit comments

Comments
 (0)