Skip to content

Commit 2d91d3b

Browse files
committed
Clone the Matryoshka implementation and name it ShiftingMatryoshka
The shifting logic will be implemented in the next commit. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent c0ad345 commit 2d91d3b

File tree

3 files changed

+847
-0
lines changed

3 files changed

+847
-0
lines changed

src/frequenz/sdk/microgrid/_power_managing/_base_classes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class Algorithm(enum.Enum):
203203
"""The available algorithms for the power manager."""
204204

205205
MATRYOSHKA = "matryoshka"
206+
SHIFTING_MATRYOSHKA = "shifting_matryoshka"
206207

207208

208209
class BaseAlgorithm(abc.ABC):
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A power manager implementation that uses the matryoshka algorithm.
5+
6+
When there are multiple proposals from different actors for the same set of components,
7+
the matryoshka algorithm will consider the priority of the actors, the bounds they set
8+
and their preferred power to determine the target power for the components.
9+
10+
The preferred power of lower priority actors will take precedence as long as they
11+
respect the bounds set by higher priority actors. If lower priority actors request
12+
power values outside the bounds set by higher priority actors, the target power will
13+
be the closest value to the preferred power that is within the bounds.
14+
15+
When there is only a single proposal for a set of components, its preferred power would
16+
be the target power, as long as it falls within the system power bounds for the
17+
components.
18+
"""
19+
20+
from __future__ import annotations
21+
22+
import logging
23+
import typing
24+
from datetime import timedelta
25+
26+
from frequenz.quantities import Power
27+
from typing_extensions import override
28+
29+
from ... import timeseries
30+
from . import _bounds
31+
from ._base_classes import BaseAlgorithm, Proposal, _Report
32+
33+
if typing.TYPE_CHECKING:
34+
from ...timeseries._base_types import SystemBounds
35+
36+
_logger = logging.getLogger(__name__)
37+
38+
39+
class ShiftingMatryoshka(BaseAlgorithm):
40+
"""The matryoshka algorithm."""
41+
42+
def __init__(self, max_proposal_age: timedelta) -> None:
43+
"""Create a new instance of the matryoshka algorithm."""
44+
self._max_proposal_age_sec = max_proposal_age.total_seconds()
45+
self._component_buckets: dict[frozenset[int], set[Proposal]] = {}
46+
self._target_power: dict[frozenset[int], Power] = {}
47+
48+
def _calc_target_power(
49+
self,
50+
proposals: set[Proposal],
51+
system_bounds: SystemBounds,
52+
) -> Power:
53+
"""Calculate the target power for the given components.
54+
55+
Args:
56+
proposals: The proposals for the given components.
57+
system_bounds: The system bounds for the components in the proposal.
58+
59+
Returns:
60+
The new target power for the components.
61+
"""
62+
lower_bound = (
63+
system_bounds.inclusion_bounds.lower
64+
if system_bounds.inclusion_bounds
65+
# if a target power exists from a previous proposal, and the system bounds
66+
# have become unavailable, force the target power to be zero, by narrowing
67+
# the bounds to zero.
68+
else Power.zero()
69+
)
70+
upper_bound = (
71+
system_bounds.inclusion_bounds.upper
72+
if system_bounds.inclusion_bounds
73+
else Power.zero()
74+
)
75+
76+
exclusion_bounds = None
77+
if system_bounds.exclusion_bounds is not None and (
78+
system_bounds.exclusion_bounds.lower != Power.zero()
79+
or system_bounds.exclusion_bounds.upper != Power.zero()
80+
):
81+
exclusion_bounds = system_bounds.exclusion_bounds
82+
83+
target_power = Power.zero()
84+
for next_proposal in sorted(proposals, reverse=True):
85+
if upper_bound < lower_bound:
86+
break
87+
if next_proposal.preferred_power:
88+
match _bounds.clamp_to_bounds(
89+
next_proposal.preferred_power,
90+
lower_bound,
91+
upper_bound,
92+
exclusion_bounds,
93+
):
94+
case (None, power) | (power, None) if power:
95+
target_power = power
96+
case (power_low, power_high) if power_low and power_high:
97+
if (
98+
power_high - next_proposal.preferred_power
99+
< next_proposal.preferred_power - power_low
100+
):
101+
target_power = power_high
102+
else:
103+
target_power = power_low
104+
105+
proposal_lower = next_proposal.bounds.lower or lower_bound
106+
proposal_upper = next_proposal.bounds.upper or upper_bound
107+
# If the bounds from the current proposal are fully within the exclusion
108+
# bounds, then don't use them to narrow the bounds further. This allows
109+
# subsequent proposals to not be blocked by the current proposal.
110+
match _bounds.check_exclusion_bounds_overlap(
111+
proposal_lower, proposal_upper, exclusion_bounds
112+
):
113+
case (True, True):
114+
continue
115+
lower_bound = max(lower_bound, proposal_lower)
116+
upper_bound = min(upper_bound, proposal_upper)
117+
lower_bound, upper_bound = _bounds.adjust_exclusion_bounds(
118+
lower_bound, upper_bound, exclusion_bounds
119+
)
120+
121+
return target_power
122+
123+
def _validate_component_ids(
124+
self,
125+
component_ids: frozenset[int],
126+
proposal: Proposal | None,
127+
system_bounds: SystemBounds,
128+
) -> bool:
129+
if component_ids not in self._component_buckets:
130+
# if there are no previous proposals and there are no system bounds, then
131+
# don't calculate a target power and fail the validation.
132+
if (
133+
system_bounds.inclusion_bounds is None
134+
and system_bounds.exclusion_bounds is None
135+
):
136+
if proposal is not None:
137+
_logger.warning(
138+
"PowerManagingActor: No system bounds available for component "
139+
+ "IDs %s, but a proposal was given. The proposal will be "
140+
+ "ignored.",
141+
component_ids,
142+
)
143+
return False
144+
145+
for bucket in self._component_buckets:
146+
if any(component_id in bucket for component_id in component_ids):
147+
raise NotImplementedError(
148+
f"PowerManagingActor: component IDs {component_ids} are already"
149+
+ " part of another bucket. Overlapping buckets are not"
150+
+ " yet supported."
151+
)
152+
return True
153+
154+
@override
155+
def calculate_target_power(
156+
self,
157+
component_ids: frozenset[int],
158+
proposal: Proposal | None,
159+
system_bounds: SystemBounds,
160+
must_return_power: bool = False,
161+
) -> Power | None:
162+
"""Calculate and return the target power for the given components.
163+
164+
Args:
165+
component_ids: The component IDs to calculate the target power for.
166+
proposal: If given, the proposal to added to the bucket, before the target
167+
power is calculated.
168+
system_bounds: The system bounds for the components in the proposal.
169+
must_return_power: If `True`, the algorithm must return a target power,
170+
even if it hasn't changed since the last call.
171+
172+
Returns:
173+
The new target power for the components, or `None` if the target power
174+
didn't change.
175+
176+
Raises: # noqa: DOC502
177+
NotImplementedError: When the proposal contains component IDs that are
178+
already part of another bucket.
179+
"""
180+
if not self._validate_component_ids(component_ids, proposal, system_bounds):
181+
return None
182+
183+
if proposal is not None:
184+
bucket = self._component_buckets.setdefault(component_ids, set())
185+
if proposal in bucket:
186+
bucket.remove(proposal)
187+
if (
188+
proposal.preferred_power is not None
189+
or proposal.bounds.lower is not None
190+
or proposal.bounds.upper is not None
191+
):
192+
bucket.add(proposal)
193+
elif not bucket:
194+
del self._component_buckets[component_ids]
195+
_ = self._target_power.pop(component_ids, None)
196+
197+
# If there has not been any proposal for the given components, don't calculate a
198+
# target power and just return `None`.
199+
proposals = self._component_buckets.get(component_ids)
200+
if proposals is None:
201+
return None
202+
203+
target_power = self._calc_target_power(proposals, system_bounds)
204+
205+
if (
206+
must_return_power
207+
or component_ids not in self._target_power
208+
or self._target_power[component_ids] != target_power
209+
):
210+
self._target_power[component_ids] = target_power
211+
return target_power
212+
return None
213+
214+
@override
215+
def get_status(
216+
self,
217+
component_ids: frozenset[int],
218+
priority: int,
219+
system_bounds: SystemBounds,
220+
) -> _Report:
221+
"""Get the bounds for the algorithm.
222+
223+
Args:
224+
component_ids: The IDs of the components to get the bounds for.
225+
priority: The priority of the actor for which the bounds are requested.
226+
system_bounds: The system bounds for the components.
227+
228+
Returns:
229+
The target power and the available bounds for the given components, for
230+
the given priority.
231+
"""
232+
target_power = self._target_power.get(component_ids)
233+
if system_bounds.inclusion_bounds is None:
234+
return _Report(
235+
target_power=target_power,
236+
_inclusion_bounds=None,
237+
_exclusion_bounds=system_bounds.exclusion_bounds,
238+
)
239+
240+
lower_bound = system_bounds.inclusion_bounds.lower
241+
upper_bound = system_bounds.inclusion_bounds.upper
242+
243+
exclusion_bounds = None
244+
if system_bounds.exclusion_bounds is not None and (
245+
system_bounds.exclusion_bounds.lower != Power.zero()
246+
or system_bounds.exclusion_bounds.upper != Power.zero()
247+
):
248+
exclusion_bounds = system_bounds.exclusion_bounds
249+
250+
for next_proposal in sorted(
251+
self._component_buckets.get(component_ids, []), reverse=True
252+
):
253+
if next_proposal.priority <= priority:
254+
break
255+
proposal_lower = next_proposal.bounds.lower or lower_bound
256+
proposal_upper = next_proposal.bounds.upper or upper_bound
257+
match _bounds.check_exclusion_bounds_overlap(
258+
proposal_lower, proposal_upper, exclusion_bounds
259+
):
260+
case (True, True):
261+
continue
262+
calc_lower_bound = max(lower_bound, proposal_lower)
263+
calc_upper_bound = min(upper_bound, proposal_upper)
264+
if calc_lower_bound <= calc_upper_bound:
265+
lower_bound, upper_bound = _bounds.adjust_exclusion_bounds(
266+
calc_lower_bound, calc_upper_bound, exclusion_bounds
267+
)
268+
else:
269+
break
270+
return _Report(
271+
target_power=target_power,
272+
_inclusion_bounds=timeseries.Bounds[Power](
273+
lower=lower_bound, upper=upper_bound
274+
),
275+
_exclusion_bounds=system_bounds.exclusion_bounds,
276+
)
277+
278+
@override
279+
def drop_old_proposals(self, loop_time: float) -> None:
280+
"""Drop old proposals.
281+
282+
This will remove all proposals that have not been updated for longer than
283+
`max_proposal_age`.
284+
285+
Args:
286+
loop_time: The current loop time.
287+
"""
288+
buckets_to_delete: list[frozenset[int]] = []
289+
for component_ids, proposals in self._component_buckets.items():
290+
to_delete: list[Proposal] = []
291+
for proposal in proposals:
292+
if (loop_time - proposal.creation_time) > self._max_proposal_age_sec:
293+
to_delete.append(proposal)
294+
for proposal in to_delete:
295+
proposals.remove(proposal)
296+
if not proposals:
297+
buckets_to_delete.append(component_ids)
298+
299+
for component_ids in buckets_to_delete:
300+
del self._component_buckets[component_ids]
301+
_ = self._target_power.pop(component_ids, None)

0 commit comments

Comments
 (0)