Skip to content

Commit 3b69c83

Browse files
committed
Add a PVManager implementation
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent c4afeab commit 3b69c83

File tree

3 files changed

+221
-0
lines changed

3 files changed

+221
-0
lines changed

src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
from ._battery_manager import BatteryManager
77
from ._component_manager import ComponentManager
88
from ._ev_charger_manager import EVChargerManager
9+
from ._pv_inverter_manager import PVManager
910

1011
__all__ = [
1112
"BatteryManager",
1213
"ComponentManager",
1314
"EVChargerManager",
15+
"PVManager",
1416
]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manage PV inverters for the power distributor."""
5+
6+
from ._pv_inverter_manager import PVManager
7+
8+
__all__ = ["PVManager"]
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manage PV inverters for the power distributor."""
5+
6+
import asyncio
7+
import collections.abc
8+
import logging
9+
from datetime import timedelta
10+
11+
import grpc
12+
from frequenz.channels import Broadcast, Sender
13+
from frequenz.client.microgrid import ComponentCategory, InverterData, InverterType
14+
from typing_extensions import override
15+
16+
from ....._internal._channels import LatestValueCache
17+
from ....._internal._math import is_close_to_zero
18+
from .....microgrid import connection_manager
19+
from .....timeseries import Power
20+
from ..._component_pool_status_tracker import ComponentPoolStatusTracker
21+
from ..._component_status import ComponentPoolStatus, PVInverterStatusTracker
22+
from ...request import Request
23+
from ...result import PartialFailure, Result, Success
24+
from .._component_manager import ComponentManager
25+
26+
_logger = logging.getLogger(__name__)
27+
28+
29+
class PVManager(ComponentManager):
30+
"""Manage PV inverters for the power distributor."""
31+
32+
@override
33+
def __init__(
34+
self,
35+
component_pool_status_sender: Sender[ComponentPoolStatus],
36+
results_sender: Sender[Result],
37+
) -> None:
38+
"""Initialize this instance.
39+
40+
Args:
41+
component_pool_status_sender: Channel for sending information about which
42+
components are expected to be working.
43+
results_sender: Channel for sending results of power distribution.
44+
"""
45+
self._results_sender = results_sender
46+
self._pv_inverter_ids = self._get_pv_inverter_ids()
47+
48+
self._component_pool_status_tracker = (
49+
ComponentPoolStatusTracker(
50+
component_ids=self._pv_inverter_ids,
51+
component_status_sender=component_pool_status_sender,
52+
max_data_age=timedelta(seconds=10.0),
53+
max_blocking_duration=timedelta(seconds=30.0),
54+
component_status_tracker_type=PVInverterStatusTracker,
55+
)
56+
if self._pv_inverter_ids
57+
else None
58+
)
59+
self._component_data_caches: dict[int, LatestValueCache[InverterData]] = {}
60+
self._target_power = Power.zero()
61+
self._target_power_channel = Broadcast[Request](name="target_power")
62+
self._target_power_tx = self._target_power_channel.new_sender()
63+
self._task: asyncio.Task[None] | None = None
64+
self._latest_request: Request = Request(Power.zero(), set())
65+
66+
@override
67+
def component_ids(self) -> collections.abc.Set[int]:
68+
"""Return the set of PV inverter ids."""
69+
return self._pv_inverter_ids
70+
71+
@override
72+
async def start(self) -> None:
73+
"""Start the PV inverter manager."""
74+
self._component_data_caches = {
75+
inv_id: LatestValueCache(
76+
await connection_manager.get().api_client.inverter_data(inv_id)
77+
)
78+
for inv_id in self._pv_inverter_ids
79+
}
80+
81+
@override
82+
async def stop(self) -> None:
83+
"""Stop the PV inverter manager."""
84+
await asyncio.gather(
85+
*[tracker.stop() for tracker in self._component_data_caches.values()]
86+
)
87+
if self._component_pool_status_tracker:
88+
await self._component_pool_status_tracker.stop()
89+
90+
@override
91+
async def distribute_power(self, request: Request) -> None:
92+
"""Distribute the requested power to the PV inverters.
93+
94+
Args:
95+
request: Request to get the distribution for.
96+
97+
Raises:
98+
ValueError: If no PV inverters are present in the component graph, but
99+
component_ids are provided in the request.
100+
"""
101+
remaining_power = request.power
102+
allocations: dict[int, Power] = {}
103+
if not self._component_pool_status_tracker:
104+
if not request.component_ids:
105+
await self._results_sender.send(
106+
Success(
107+
succeeded_components=set(),
108+
succeeded_power=Power.zero(),
109+
excess_power=remaining_power,
110+
request=request,
111+
)
112+
)
113+
return
114+
raise ValueError(
115+
"Cannot distribute power to PV inverters without any inverters"
116+
)
117+
for inv_id in self._component_pool_status_tracker.get_working_components(
118+
request.component_ids
119+
):
120+
if remaining_power < Power.zero() or is_close_to_zero(
121+
remaining_power.as_watts()
122+
):
123+
break
124+
inv_data = self._component_data_caches[inv_id]
125+
if not inv_data.has_value():
126+
allocations[inv_id] = Power.zero()
127+
# Can't get device bounds, so can't use inverter.
128+
continue
129+
discharge_bounds = Power.from_watts(
130+
inv_data.get().active_power_inclusion_lower_bound
131+
)
132+
allocated_power = min(remaining_power, discharge_bounds)
133+
allocations[inv_id] = allocated_power
134+
remaining_power -= allocated_power
135+
136+
_logger.debug(
137+
"Distributing %s to PV inverters %s",
138+
request.power,
139+
allocations,
140+
)
141+
await self._set_api_power(request, allocations, remaining_power)
142+
143+
async def _set_api_power(
144+
self, request: Request, allocations: dict[int, Power], remaining_power: Power
145+
) -> None:
146+
api_client = connection_manager.get().api_client
147+
tasks: dict[int, asyncio.Task[None]] = {}
148+
for component_id, power in allocations.items():
149+
tasks[component_id] = asyncio.create_task(
150+
api_client.set_power(component_id, power.as_watts())
151+
)
152+
_, pending = await asyncio.wait(
153+
tasks.values(),
154+
timeout=request.request_timeout.total_seconds(),
155+
return_when=asyncio.ALL_COMPLETED,
156+
)
157+
for task in pending:
158+
task.cancel()
159+
await asyncio.gather(*pending, return_exceptions=True)
160+
161+
failed_components: set[int] = set()
162+
succeeded_components: set[int] = set()
163+
failed_power = Power.zero()
164+
for component_id, task in tasks.items():
165+
exc = task.exception()
166+
if exc is not None:
167+
failed_components.add(component_id)
168+
failed_power += allocations[component_id]
169+
else:
170+
succeeded_components.add(component_id)
171+
172+
match task.exception():
173+
case asyncio.CancelledError:
174+
_logger.warning(
175+
"Timeout while setting power to EV charger %s", component_id
176+
)
177+
case grpc.aio.AioRpcError as err:
178+
_logger.warning(
179+
"Error while setting power to EV charger %s: %s",
180+
component_id,
181+
err,
182+
)
183+
if failed_components:
184+
await self._results_sender.send(
185+
PartialFailure(
186+
failed_components=failed_components,
187+
succeeded_components=succeeded_components,
188+
failed_power=failed_power,
189+
succeeded_power=self._target_power - failed_power,
190+
excess_power=remaining_power,
191+
request=self._latest_request,
192+
)
193+
)
194+
await self._results_sender.send(
195+
Success(
196+
succeeded_components=succeeded_components,
197+
succeeded_power=self._target_power,
198+
excess_power=remaining_power,
199+
request=self._latest_request,
200+
)
201+
)
202+
203+
def _get_pv_inverter_ids(self) -> collections.abc.Set[int]:
204+
"""Return the IDs of all PV inverters present in the component graph."""
205+
return {
206+
inv.component_id
207+
for inv in connection_manager.get().component_graph.components(
208+
component_categories={ComponentCategory.INVERTER}
209+
)
210+
if inv.type == InverterType.SOLAR
211+
}

0 commit comments

Comments
 (0)