1616from frequenz .client .microgrid import ComponentCategory , ComponentType , InverterType
1717from typing_extensions import override
1818
19- from ...timeseries ._base_types import SystemBounds
19+ from ...timeseries import Power
20+ from ...timeseries ._base_types import Bounds , SystemBounds
2021from .._actor import Actor
2122from .._channel_registry import ChannelRegistry
2223from ._base_classes import Algorithm , BaseAlgorithm , Proposal , ReportRequest , _Report
2829 from .. import power_distributing
2930
3031
31- class PowerManagingActor (Actor ):
32+ class PowerManagingActor (Actor ): # pylint: disable=too-many-instance-attributes
3233 """The power manager."""
3334
3435 def __init__ ( # pylint: disable=too-many-arguments
@@ -84,10 +85,18 @@ def __init__( # pylint: disable=too-many-arguments
8485
8586 self ._system_bounds : dict [frozenset [int ], SystemBounds ] = {}
8687 self ._bound_tracker_tasks : dict [frozenset [int ], asyncio .Task [None ]] = {}
87- self ._subscriptions : dict [frozenset [int ], dict [int , Sender [_Report ]]] = {}
88+ self ._non_shifting_subscriptions : dict [
89+ frozenset [int ], dict [int , Sender [_Report ]]
90+ ] = {}
91+ self ._shifting_subscriptions : dict [
92+ frozenset [int ], dict [int , Sender [_Report ]]
93+ ] = {}
8894 self ._distribution_results : dict [frozenset [int ], power_distributing .Result ] = {}
8995
90- self ._algorithm : BaseAlgorithm = Matryoshka (
96+ self ._non_shifting_group : BaseAlgorithm = Matryoshka (
97+ max_proposal_age = timedelta (seconds = 60.0 )
98+ )
99+ self ._shifting_group : BaseAlgorithm = Matryoshka (
91100 max_proposal_age = timedelta (seconds = 60.0 )
92101 )
93102
@@ -104,14 +113,29 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
104113 if bounds is None :
105114 _logger .warning ("PowerManagingActor: No bounds for %s" , component_ids )
106115 return
107- for priority , sender in self ._subscriptions .get (component_ids , {}).items ():
108- status = self ._algorithm .get_status (
116+ for priority , sender in self ._shifting_subscriptions .get (
117+ component_ids , {}
118+ ).items ():
119+ status = self ._shifting_group .get_status (
109120 component_ids ,
110121 priority ,
111122 bounds ,
112123 self ._distribution_results .get (component_ids ),
113124 )
114125 await sender .send (status )
126+ for priority , sender in self ._non_shifting_subscriptions .get (
127+ component_ids , {}
128+ ).items ():
129+ status = self ._non_shifting_group .get_status (
130+ component_ids ,
131+ priority ,
132+ self ._calculate_shifted_bounds (
133+ bounds ,
134+ self ._shifting_group .get_target_power (component_ids ),
135+ ),
136+ self ._distribution_results .get (component_ids ),
137+ )
138+ await sender .send (status )
115139
116140 async def _bounds_tracker (
117141 self ,
@@ -130,7 +154,7 @@ async def _bounds_tracker(
130154 await self ._send_updated_target_power (component_ids , None )
131155 await self ._send_reports (component_ids )
132156
133- def _add_bounds_tracker (self , component_ids : frozenset [int ]) -> None :
157+ def _add_system_bounds_tracker (self , component_ids : frozenset [int ]) -> None :
134158 """Add a bounds tracker.
135159
136160 Args:
@@ -184,6 +208,116 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None:
184208 self ._bounds_tracker (component_ids , bounds_receiver )
185209 )
186210
211+ def _calculate_shifted_bounds (
212+ self , bounds : SystemBounds , target_power : Power | None
213+ ) -> SystemBounds :
214+ """Calculate the shifted bounds corresponding to shifting group's target power.
215+
216+ Any value regular actors choose within these bounds can be shifted by the
217+ shifting power and still remain within the actual system bounds.
218+
219+ | system bounds | shifting | shifted |
220+ | | target power | bounds |
221+ |---------------+--------------+------------|
222+ | -100 to 100 | 70 | -170 to 30 |
223+ | -100 to 100 | -50 | -50 to 150 |
224+
225+ Args:
226+ bounds: The bounds to calculate the remaining bounds from.
227+ target_power: The target power to apply.
228+
229+ Returns:
230+ The remaining bounds.
231+ """
232+ if target_power is None :
233+ return bounds
234+
235+ inclusion_bounds : Bounds [Power ] | None = None
236+ if bounds .inclusion_bounds is not None :
237+ inclusion_bounds = Bounds (
238+ bounds .inclusion_bounds .lower - target_power ,
239+ bounds .inclusion_bounds .upper - target_power ,
240+ )
241+ return SystemBounds (
242+ timestamp = bounds .timestamp ,
243+ inclusion_bounds = inclusion_bounds ,
244+ exclusion_bounds = bounds .exclusion_bounds ,
245+ )
246+
247+ def _calculate_target_power (
248+ self ,
249+ component_ids : frozenset [int ],
250+ proposal : Proposal | None ,
251+ must_send : bool = False ,
252+ ) -> Power | None :
253+ """Calculate the target power for a set of components.
254+
255+ This is the power from the non-shifting group, shifted by the power from the
256+ shifting group.
257+
258+ Args:
259+ component_ids: The component IDs for which to calculate the target power.
260+ proposal: The proposal to calculate the target power for.
261+ must_send: If `True`, a new request will be sent to the PowerDistributor,
262+ even if there's no change in power.
263+
264+ Returns:
265+ The target power.
266+ """
267+ tgt_power_shift : Power | None = None
268+ tgt_power_no_shift : Power | None = None
269+ if proposal is not None :
270+ if proposal .in_shifting_group :
271+ tgt_power_shift = self ._shifting_group .calculate_target_power (
272+ component_ids ,
273+ proposal ,
274+ self ._system_bounds [component_ids ],
275+ must_send ,
276+ )
277+ tgt_power_no_shift = self ._non_shifting_group .calculate_target_power (
278+ component_ids ,
279+ None ,
280+ self ._calculate_shifted_bounds (
281+ self ._system_bounds [component_ids ], tgt_power_shift
282+ ),
283+ must_send ,
284+ )
285+ else :
286+ tgt_power_no_shift = self ._non_shifting_group .calculate_target_power (
287+ component_ids ,
288+ proposal ,
289+ self ._system_bounds [component_ids ],
290+ must_send ,
291+ )
292+ tgt_power_shift = self ._shifting_group .calculate_target_power (
293+ component_ids ,
294+ None ,
295+ self ._calculate_shifted_bounds (
296+ self ._system_bounds [component_ids ], tgt_power_no_shift
297+ ),
298+ must_send ,
299+ )
300+ else :
301+ tgt_power_no_shift = self ._non_shifting_group .calculate_target_power (
302+ component_ids ,
303+ None ,
304+ self ._system_bounds [component_ids ],
305+ must_send ,
306+ )
307+ tgt_power_shift = self ._shifting_group .calculate_target_power (
308+ component_ids ,
309+ None ,
310+ self ._calculate_shifted_bounds (
311+ self ._system_bounds [component_ids ], tgt_power_no_shift
312+ ),
313+ must_send ,
314+ )
315+ if tgt_power_shift is not None and tgt_power_no_shift is not None :
316+ return tgt_power_shift + tgt_power_no_shift
317+ if tgt_power_shift is not None :
318+ return tgt_power_shift
319+ return tgt_power_no_shift
320+
187321 async def _send_updated_target_power (
188322 self ,
189323 component_ids : frozenset [int ],
@@ -192,10 +326,9 @@ async def _send_updated_target_power(
192326 ) -> None :
193327 from .. import power_distributing # pylint: disable=import-outside-toplevel
194328
195- target_power = self ._algorithm . calculate_target_power (
329+ target_power = self ._calculate_target_power (
196330 component_ids ,
197331 proposal ,
198- self ._system_bounds [component_ids ],
199332 must_send ,
200333 )
201334 request_timeout = (
@@ -225,7 +358,7 @@ async def _run(self) -> None:
225358 if selected_from (selected , self ._proposals_receiver ):
226359 proposal = selected .message
227360 if proposal .component_ids not in self ._bound_tracker_tasks :
228- self ._add_bounds_tracker (proposal .component_ids )
361+ self ._add_system_bounds_tracker (proposal .component_ids )
229362
230363 # TODO: must_send=True forces a new request to # pylint: disable=fixme
231364 # be sent to the PowerDistributor, even if there's no change in power.
@@ -245,22 +378,29 @@ async def _run(self) -> None:
245378 sub = selected .message
246379 component_ids = sub .component_ids
247380 priority = sub .priority
381+ in_shifting_group = sub .in_shifting_group
382+
383+ subs_set = (
384+ self ._shifting_subscriptions
385+ if in_shifting_group
386+ else self ._non_shifting_subscriptions
387+ )
248388
249- if component_ids not in self . _subscriptions :
250- self . _subscriptions [component_ids ] = {
389+ if component_ids not in subs_set :
390+ subs_set [component_ids ] = {
251391 priority : self ._channel_registry .get_or_create (
252392 _Report , sub .get_channel_name ()
253393 ).new_sender ()
254394 }
255- elif priority not in self . _subscriptions [component_ids ]:
256- self . _subscriptions [component_ids ][priority ] = (
395+ elif priority not in subs_set [component_ids ]:
396+ subs_set [component_ids ][priority ] = (
257397 self ._channel_registry .get_or_create (
258398 _Report , sub .get_channel_name ()
259399 ).new_sender ()
260400 )
261401
262- if sub . component_ids not in self ._bound_tracker_tasks :
263- self ._add_bounds_tracker ( sub . component_ids )
402+ if component_ids not in self ._bound_tracker_tasks :
403+ self ._add_system_bounds_tracker ( component_ids )
264404
265405 elif selected_from (selected , self ._power_distributing_results_receiver ):
266406 from .. import ( # pylint: disable=import-outside-toplevel
@@ -287,4 +427,7 @@ async def _run(self) -> None:
287427 await self ._send_reports (frozenset (result .request .component_ids ))
288428
289429 elif selected_from (selected , drop_old_proposals_timer ):
290- self ._algorithm .drop_old_proposals (asyncio .get_event_loop ().time ())
430+ self ._non_shifting_group .drop_old_proposals (
431+ asyncio .get_event_loop ().time ()
432+ )
433+ self ._shifting_group .drop_old_proposals (asyncio .get_event_loop ().time ())
0 commit comments