Skip to content

Commit aab52be

Browse files
committed
Handle proposals from power shifting actors in the PowerManager
This is done by having an additional `Matryoshka` instance for resolving requests from shifting actors. And the target power thus obtained is added to the target power from the regular actors, which in effect, shifts the target power of the regular actors by the target power of the shifting actors. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 437487e commit aab52be

File tree

1 file changed

+158
-15
lines changed

1 file changed

+158
-15
lines changed

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 158 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
1717
from typing_extensions import override
1818

19-
from ...timeseries._base_types import SystemBounds
19+
from ...timeseries import Power
20+
from ...timeseries._base_types import Bounds, SystemBounds
2021
from .._actor import Actor
2122
from .._channel_registry import ChannelRegistry
2223
from ._base_classes import Algorithm, BaseAlgorithm, Proposal, ReportRequest, _Report
@@ -28,7 +29,7 @@
2829
from .. import power_distributing
2930

3031

31-
class PowerManagingActor(Actor):
32+
class PowerManagingActor(Actor): # pylint: disable=too-many-instance-attributes
3233
"""The power manager."""
3334

3435
def __init__( # pylint: disable=too-many-arguments
@@ -84,10 +85,18 @@ def __init__( # pylint: disable=too-many-arguments
8485

8586
self._system_bounds: dict[frozenset[int], SystemBounds] = {}
8687
self._bound_tracker_tasks: dict[frozenset[int], asyncio.Task[None]] = {}
87-
self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {}
88+
self._non_shifting_subscriptions: dict[
89+
frozenset[int], dict[int, Sender[_Report]]
90+
] = {}
91+
self._shifting_subscriptions: dict[
92+
frozenset[int], dict[int, Sender[_Report]]
93+
] = {}
8894
self._distribution_results: dict[frozenset[int], power_distributing.Result] = {}
8995

90-
self._algorithm: BaseAlgorithm = Matryoshka(
96+
self._non_shifting_group: BaseAlgorithm = Matryoshka(
97+
max_proposal_age=timedelta(seconds=60.0)
98+
)
99+
self._shifting_group: BaseAlgorithm = Matryoshka(
91100
max_proposal_age=timedelta(seconds=60.0)
92101
)
93102

@@ -104,14 +113,29 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
104113
if bounds is None:
105114
_logger.warning("PowerManagingActor: No bounds for %s", component_ids)
106115
return
107-
for priority, sender in self._subscriptions.get(component_ids, {}).items():
108-
status = self._algorithm.get_status(
116+
for priority, sender in self._shifting_subscriptions.get(
117+
component_ids, {}
118+
).items():
119+
status = self._shifting_group.get_status(
109120
component_ids,
110121
priority,
111122
bounds,
112123
self._distribution_results.get(component_ids),
113124
)
114125
await sender.send(status)
126+
for priority, sender in self._non_shifting_subscriptions.get(
127+
component_ids, {}
128+
).items():
129+
status = self._non_shifting_group.get_status(
130+
component_ids,
131+
priority,
132+
self._calculate_remaining_bounds(
133+
bounds,
134+
self._shifting_group.get_target_power(component_ids),
135+
),
136+
self._distribution_results.get(component_ids),
137+
)
138+
await sender.send(status)
115139

116140
async def _bounds_tracker(
117141
self,
@@ -184,6 +208,116 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None:
184208
self._bounds_tracker(component_ids, bounds_receiver)
185209
)
186210

211+
def _calculate_remaining_bounds(
212+
self, bounds: SystemBounds, target_power: Power | None
213+
) -> SystemBounds:
214+
"""Calculate the remaining bounds after a target power is applied.
215+
216+
This is the values that can be added to the target power and still remain within
217+
the actual system bounds.
218+
219+
| system bounds | shifting | shifted |
220+
| | target power | bounds |
221+
|---------------+--------------+------------|
222+
| -100 to 100 | 70 | -170 to 30 |
223+
| -100 to 100 | -50 | -50 to 150 |
224+
225+
Args:
226+
bounds: The bounds to calculate the remaining bounds from.
227+
target_power: The target power to apply.
228+
229+
Returns:
230+
The remaining bounds.
231+
"""
232+
if target_power is None:
233+
return bounds
234+
235+
inclusion_bounds: Bounds[Power] | None = None
236+
if bounds.inclusion_bounds is not None:
237+
inclusion_bounds = Bounds(
238+
bounds.inclusion_bounds.lower - target_power,
239+
bounds.inclusion_bounds.upper - target_power,
240+
)
241+
return SystemBounds(
242+
timestamp=bounds.timestamp,
243+
inclusion_bounds=inclusion_bounds,
244+
exclusion_bounds=bounds.exclusion_bounds,
245+
)
246+
247+
def _calculate_target_power(
248+
self,
249+
component_ids: frozenset[int],
250+
proposal: Proposal | None,
251+
must_send: bool = False,
252+
) -> Power | None:
253+
"""Calculate the target power for a set of components.
254+
255+
This is the power from the non-shifting group, shifted by the power from the
256+
shifting group.
257+
258+
Args:
259+
component_ids: The component IDs for which to calculate the target power.
260+
proposal: The proposal to calculate the target power for.
261+
must_send: If `True`, a new request will be sent to the PowerDistributor,
262+
even if there's no change in power.
263+
264+
Returns:
265+
The target power.
266+
"""
267+
tgt_power_shift: Power | None = None
268+
tgt_power_no_shift: Power | None = None
269+
if proposal is not None:
270+
if proposal.in_shifting_group:
271+
tgt_power_shift = self._shifting_group.calculate_target_power(
272+
component_ids,
273+
proposal,
274+
self._system_bounds[component_ids],
275+
must_send,
276+
)
277+
tgt_power_no_shift = self._non_shifting_group.calculate_target_power(
278+
component_ids,
279+
None,
280+
self._calculate_remaining_bounds(
281+
self._system_bounds[component_ids], tgt_power_shift
282+
),
283+
must_send,
284+
)
285+
else:
286+
tgt_power_no_shift = self._non_shifting_group.calculate_target_power(
287+
component_ids,
288+
proposal,
289+
self._system_bounds[component_ids],
290+
must_send,
291+
)
292+
tgt_power_shift = self._shifting_group.calculate_target_power(
293+
component_ids,
294+
None,
295+
self._calculate_remaining_bounds(
296+
self._system_bounds[component_ids], tgt_power_no_shift
297+
),
298+
must_send,
299+
)
300+
else:
301+
tgt_power_no_shift = self._non_shifting_group.calculate_target_power(
302+
component_ids,
303+
None,
304+
self._system_bounds[component_ids],
305+
must_send,
306+
)
307+
tgt_power_shift = self._shifting_group.calculate_target_power(
308+
component_ids,
309+
None,
310+
self._calculate_remaining_bounds(
311+
self._system_bounds[component_ids], tgt_power_no_shift
312+
),
313+
must_send,
314+
)
315+
if tgt_power_shift is not None and tgt_power_no_shift is not None:
316+
return tgt_power_shift + tgt_power_no_shift
317+
if tgt_power_shift is not None:
318+
return tgt_power_shift
319+
return tgt_power_no_shift
320+
187321
async def _send_updated_target_power(
188322
self,
189323
component_ids: frozenset[int],
@@ -192,10 +326,9 @@ async def _send_updated_target_power(
192326
) -> None:
193327
from .. import power_distributing # pylint: disable=import-outside-toplevel
194328

195-
target_power = self._algorithm.calculate_target_power(
329+
target_power = self._calculate_target_power(
196330
component_ids,
197331
proposal,
198-
self._system_bounds[component_ids],
199332
must_send,
200333
)
201334
request_timeout = (
@@ -245,22 +378,29 @@ async def _run(self) -> None:
245378
sub = selected.message
246379
component_ids = sub.component_ids
247380
priority = sub.priority
381+
in_shifting_group = sub.in_shifting_group
382+
383+
subs_set = (
384+
self._shifting_subscriptions
385+
if in_shifting_group
386+
else self._non_shifting_subscriptions
387+
)
248388

249-
if component_ids not in self._subscriptions:
250-
self._subscriptions[component_ids] = {
389+
if component_ids not in subs_set:
390+
subs_set[component_ids] = {
251391
priority: self._channel_registry.get_or_create(
252392
_Report, sub.get_channel_name()
253393
).new_sender()
254394
}
255-
elif priority not in self._subscriptions[component_ids]:
256-
self._subscriptions[component_ids][priority] = (
395+
elif priority not in subs_set[component_ids]:
396+
subs_set[component_ids][priority] = (
257397
self._channel_registry.get_or_create(
258398
_Report, sub.get_channel_name()
259399
).new_sender()
260400
)
261401

262-
if sub.component_ids not in self._bound_tracker_tasks:
263-
self._add_system_bounds_tracker(sub.component_ids)
402+
if component_ids not in self._bound_tracker_tasks:
403+
self._add_system_bounds_tracker(component_ids)
264404

265405
elif selected_from(selected, self._power_distributing_results_receiver):
266406
from .. import ( # pylint: disable=import-outside-toplevel
@@ -287,4 +427,7 @@ async def _run(self) -> None:
287427
await self._send_reports(frozenset(result.request.component_ids))
288428

289429
elif selected_from(selected, drop_old_proposals_timer):
290-
self._algorithm.drop_old_proposals(asyncio.get_event_loop().time())
430+
self._non_shifting_group.drop_old_proposals(
431+
asyncio.get_event_loop().time()
432+
)
433+
self._shifting_group.drop_old_proposals(asyncio.get_event_loop().time())

0 commit comments

Comments
 (0)