Skip to content
5 changes: 4 additions & 1 deletion src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from frequenz.channels import Broadcast, Sender
from frequenz.client.microgrid import ComponentCategory, InverterType

from frequenz.sdk.microgrid._power_managing._base_classes import Algorithm
from frequenz.sdk.microgrid._power_managing._base_classes import Algorithm, DefaultPower

from .._internal._channels import ChannelRegistry
from ..actor._actor import Actor
Expand Down Expand Up @@ -107,19 +107,22 @@ def __init__(
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=Algorithm.SHIFTING_MATRYOSHKA,
component_category=ComponentCategory.BATTERY,
default_power=DefaultPower.ZERO,
)
self._ev_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=Algorithm.MATRYOSHKA,
component_category=ComponentCategory.EV_CHARGER,
default_power=DefaultPower.MAX,
)
self._pv_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=Algorithm.MATRYOSHKA,
component_category=ComponentCategory.INVERTER,
component_type=InverterType.SOLAR,
default_power=DefaultPower.MIN,
)

self._logical_meter: LogicalMeter | None = None
Expand Down
18 changes: 14 additions & 4 deletions src/frequenz/sdk/microgrid/_power_managing/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ def __hash__(self) -> int:
return hash((self.priority, self.source_id))


class DefaultPower(enum.Enum):
"""The default power for a component category."""

ZERO = "zero"
"""The default power is 0 W."""

MIN = "min"
"""The default power is the minimum power of the component."""

MAX = "max"
"""The default power is the maximum power of the component."""


class Algorithm(enum.Enum):
"""The available algorithms for the power manager."""

Expand All @@ -215,7 +228,6 @@ def calculate_target_power(
component_ids: frozenset[int],
proposal: Proposal | None,
system_bounds: SystemBounds,
must_return_power: bool = False,
) -> Power | None:
"""Calculate and return the target power for the given components.

Expand All @@ -224,12 +236,10 @@ def calculate_target_power(
proposal: If given, the proposal to added to the bucket, before the target
power is calculated.
system_bounds: The system bounds for the components in the proposal.
must_return_power: If `True`, the algorithm must return a target power,
even if it hasn't changed since the last call.

Returns:
The new target power for the components, or `None` if the target power
didn't change.
couldn't be calculated.
"""

# The arguments for this method are tightly coupled to the `Matryoshka` algorithm.
Expand Down
49 changes: 30 additions & 19 deletions src/frequenz/sdk/microgrid/_power_managing/_matryoshka.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from ... import timeseries
from . import _bounds
from ._base_classes import BaseAlgorithm, Proposal, _Report
from ._base_classes import BaseAlgorithm, DefaultPower, Proposal, _Report

if typing.TYPE_CHECKING:
from ...timeseries._base_types import SystemBounds
Expand All @@ -39,17 +39,20 @@
class Matryoshka(BaseAlgorithm):
"""The matryoshka algorithm."""

def __init__(self, max_proposal_age: timedelta) -> None:
def __init__(
self, max_proposal_age: timedelta, default_power: DefaultPower
) -> None:
"""Create a new instance of the matryoshka algorithm."""
self._max_proposal_age_sec = max_proposal_age.total_seconds()
self._default_power = default_power
self._component_buckets: dict[frozenset[int], set[Proposal]] = {}
self._target_power: dict[frozenset[int], Power] = {}

def _calc_target_power(
self,
proposals: set[Proposal],
system_bounds: SystemBounds,
) -> Power:
) -> Power | None:
"""Calculate the target power for the given components.

Args:
Expand Down Expand Up @@ -80,7 +83,7 @@ def _calc_target_power(
):
exclusion_bounds = system_bounds.exclusion_bounds

target_power = Power.zero()
target_power = None
for next_proposal in sorted(proposals, reverse=True):
if upper_bound < lower_bound:
break
Expand Down Expand Up @@ -158,7 +161,6 @@ def calculate_target_power(
component_ids: frozenset[int],
proposal: Proposal | None,
system_bounds: SystemBounds,
must_return_power: bool = False,
) -> Power | None:
"""Calculate and return the target power for the given components.

Expand All @@ -167,12 +169,10 @@ def calculate_target_power(
proposal: If given, the proposal to added to the bucket, before the target
power is calculated.
system_bounds: The system bounds for the components in the proposal.
must_return_power: If `True`, the algorithm must return a target power,
even if it hasn't changed since the last call.

Returns:
The new target power for the components, or `None` if the target power
didn't change.
couldn't be calculated.

Raises: # noqa: DOC502
NotImplementedError: When the proposal contains component IDs that are
Expand All @@ -193,24 +193,35 @@ def calculate_target_power(
bucket.add(proposal)
elif not bucket:
del self._component_buckets[component_ids]
_ = self._target_power.pop(component_ids, None)

# If there has not been any proposal for the given components, don't calculate a
# target power and just return `None`.
proposals = self._component_buckets.get(component_ids)
if proposals is None:
return None

target_power = self._calc_target_power(proposals, system_bounds)
target_power = None
if proposals is not None:
target_power = self._calc_target_power(proposals, system_bounds)

if (
must_return_power
or component_ids not in self._target_power
or self._target_power[component_ids] != target_power
):
if target_power is not None:
self._target_power[component_ids] = target_power
return target_power
return None
elif self._target_power.get(component_ids) is not None:
# If the target power was previously set, but is now `None`, then we send
# the default power of the component category, to reset it immediately.
del self._target_power[component_ids]
bounds = system_bounds.inclusion_bounds
if bounds is None:
return None
match self._default_power:
case DefaultPower.MIN:
return bounds.lower
case DefaultPower.MAX:
return bounds.upper
case DefaultPower.ZERO:
return Power.zero()
case other:
typing.assert_never(other)

return target_power

@override
def get_status(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
from ...actor import Actor
from ...timeseries._base_types import SystemBounds
from .. import _data_pipeline, _power_distributing
from ._base_classes import Algorithm, BaseAlgorithm, Proposal, ReportRequest, _Report
from ._base_classes import (
Algorithm,
BaseAlgorithm,
DefaultPower,
Proposal,
ReportRequest,
_Report,
)
from ._matryoshka import Matryoshka
from ._shifting_matryoshka import ShiftingMatryoshka

Expand All @@ -40,6 +47,7 @@ def __init__( # pylint: disable=too-many-arguments
power_distributing_results_receiver: Receiver[_power_distributing.Result],
channel_registry: ChannelRegistry,
algorithm: Algorithm,
default_power: DefaultPower,
component_category: ComponentCategory,
component_type: ComponentType | None = None,
):
Expand All @@ -54,6 +62,7 @@ def __init__( # pylint: disable=too-many-arguments
results.
channel_registry: The channel registry.
algorithm: The power management algorithm to use.
default_power: The default power to use for the components.
component_category: The category of the component this power manager
instance is going to support.
component_type: The type of the component of the given category that this
Expand All @@ -66,6 +75,7 @@ def __init__( # pylint: disable=too-many-arguments
"""
self._component_category = component_category
self._component_type = component_type
self._default_power = default_power
self._bounds_subscription_receiver = bounds_subscription_receiver
self._power_distributing_requests_sender = power_distributing_requests_sender
self._power_distributing_results_receiver = power_distributing_results_receiver
Expand All @@ -79,11 +89,13 @@ def __init__( # pylint: disable=too-many-arguments
match algorithm:
case Algorithm.MATRYOSHKA:
self._algorithm: BaseAlgorithm = Matryoshka(
max_proposal_age=timedelta(seconds=60.0)
max_proposal_age=timedelta(seconds=60.0),
default_power=default_power,
)
case Algorithm.SHIFTING_MATRYOSHKA:
self._algorithm = ShiftingMatryoshka(
max_proposal_age=timedelta(seconds=60.0)
max_proposal_age=timedelta(seconds=60.0),
default_power=default_power,
)
case _:
assert_never(algorithm)
Expand Down Expand Up @@ -121,7 +133,14 @@ async def _bounds_tracker(
collective bounds of.
bounds_receiver: The receiver for power bounds.
"""
last_bounds: SystemBounds | None = None
async for bounds in bounds_receiver:
if (
last_bounds is not None
and bounds.inclusion_bounds == last_bounds.inclusion_bounds
):
continue
last_bounds = bounds
self._system_bounds[component_ids] = bounds
await self._send_updated_target_power(component_ids, None)
await self._send_reports(component_ids)
Expand Down Expand Up @@ -179,13 +198,11 @@ async def _send_updated_target_power(
self,
component_ids: frozenset[int],
proposal: Proposal | None,
must_send: bool = False,
) -> None:
target_power = self._algorithm.calculate_target_power(
component_ids,
proposal,
self._system_bounds[component_ids],
must_send,
)
if target_power is not None:
await self._power_distributing_requests_sender.send(
Expand Down Expand Up @@ -221,9 +238,7 @@ async def _run(self) -> None:
# This can be removed as soon as
# https://github.com/frequenz-floss/frequenz-sdk-python/issues/293 is
# implemented.
await self._send_updated_target_power(
proposal.component_ids, proposal, must_send=True
)
await self._send_updated_target_power(proposal.component_ids, proposal)
await self._send_reports(proposal.component_ids)

elif selected_from(selected, self._bounds_subscription_receiver):
Expand Down Expand Up @@ -258,7 +273,7 @@ async def _run(self) -> None:
if not last_result_partial_failure:
last_result_partial_failure = True
await self._send_updated_target_power(
frozenset(request.component_ids), None, must_send=True
frozenset(request.component_ids), None
)
case _power_distributing.Success():
last_result_partial_failure = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from ... import timeseries
from . import _bounds
from ._base_classes import BaseAlgorithm, Proposal, _Report
from ._base_classes import BaseAlgorithm, DefaultPower, Proposal, _Report

if typing.TYPE_CHECKING:
from ...timeseries._base_types import SystemBounds
Expand Down Expand Up @@ -55,8 +55,13 @@ class ShiftingMatryoshka(BaseAlgorithm):
Details about the algorithm can be found in the [microgrid module documentation](https://frequenz-floss.github.io/frequenz-sdk-python/v1.0-dev/user-guide/microgrid-concepts/#frequenz.sdk.microgrid--setting-power).
""" # noqa: E501 (line too long)

def __init__(self, max_proposal_age: timedelta) -> None:
def __init__(
self,
max_proposal_age: timedelta,
default_power: DefaultPower,
) -> None:
"""Create a new instance of the matryoshka algorithm."""
self._default_power = default_power
self._max_proposal_age_sec = max_proposal_age.total_seconds()
self._component_buckets: dict[frozenset[int], set[Proposal]] = {}
self._target_power: dict[frozenset[int], Power] = {}
Expand Down Expand Up @@ -218,7 +223,6 @@ def calculate_target_power(
component_ids: frozenset[int],
proposal: Proposal | None,
system_bounds: SystemBounds,
must_return_power: bool = False,
) -> Power | None:
"""Calculate and return the target power for the given components.

Expand All @@ -227,12 +231,10 @@ def calculate_target_power(
proposal: If given, the proposal to added to the bucket, before the target
power is calculated.
system_bounds: The system bounds for the components in the proposal.
must_return_power: If `True`, the algorithm must return a target power,
even if it hasn't changed since the last call.

Returns:
The new target power for the components, or `None` if the target power
didn't change.
couldn't be calculated.

Raises: # noqa: DOC502
NotImplementedError: When the proposal contains component IDs that are
Expand All @@ -253,18 +255,29 @@ def calculate_target_power(
bucket.add(proposal)
elif not bucket:
del self._component_buckets[component_ids]
_ = self._target_power.pop(component_ids, None)

target_power, _ = self._calc_targets(component_ids, system_bounds)

if target_power is not None and (
must_return_power
or component_ids not in self._target_power
or self._target_power[component_ids] != target_power
):
if target_power is not None:
self._target_power[component_ids] = target_power
return target_power
return None
elif self._target_power.get(component_ids) is not None:
# If the target power was previously set, but is now `None`, then we send
# the default power of the component category, to reset it immediately.
del self._target_power[component_ids]
bounds = system_bounds.inclusion_bounds
if bounds is None:
return None
match self._default_power:
case DefaultPower.MIN:
return bounds.lower
case DefaultPower.MAX:
return bounds.upper
case DefaultPower.ZERO:
return Power.zero()
case other:
typing.assert_never(other)

return target_power

@override
def get_status( # pylint: disable=too-many-locals
Expand Down
Loading
Loading