Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

* Passing a `request_timeout` in calls to `*_pool.propose_power` is no longer supported. It may be specified at application startup, through the new optional `api_power_request_timeout` parameter in the `microgrid.initialize()` method.

- Power distribution results are no longer available through the `power_status` streams in the `*Pool`s. They can now be accessed as a stream from a separate property `power_distribution_results`, which is available from all the `*Pool`s.

## New Features

- Classes `Bounds` and `SystemBounds` now implement the `__contains__` method, allowing the use of the `in` operator to check whether a value falls within the bounds or not.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
# changing the version
# (plugins.mkdocstrings.handlers.python.import)
"frequenz-client-microgrid >= 0.4.0, < 0.5.0",
"frequenz-channels >= 1.0.0-rc1, < 2.0.0",
"frequenz-channels >= 1.1.0, < 2.0.0",
"networkx >= 2.8, < 4",
"numpy >= 1.26.4, < 2",
"typing_extensions >= 4.6.1, < 5",
Expand Down
31 changes: 31 additions & 0 deletions src/frequenz/sdk/_internal/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ._asyncio import cancel_and_await

T_co = typing.TypeVar("T_co", covariant=True)
U_co = typing.TypeVar("U_co", covariant=True)


class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
Expand All @@ -29,6 +30,36 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
"""


class MappingReceiverFetcher(typing.Generic[T_co, U_co]):
"""A receiver fetcher that can manipulate receivers before returning them."""

def __init__(
self,
fetcher: ReceiverFetcher[T_co],
mapping_function: typing.Callable[[Receiver[T_co]], Receiver[U_co]],
) -> None:
"""Initialize this instance.

Args:
fetcher: The underlying fetcher to get receivers from.
mapping_function: The method to be applied on new receivers before returning
them.
"""
self._fetcher = fetcher
self._mapping_function = mapping_function

def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
"""Get a receiver from the channel.

Args:
limit: The maximum size of the receiver.

Returns:
A receiver instance.
"""
return self._mapping_function(self._fetcher.new_receiver(limit=limit))


class _Sentinel:
"""A sentinel to denote that no value has been received yet."""

Expand Down
9 changes: 0 additions & 9 deletions src/frequenz/sdk/actor/_power_managing/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

if typing.TYPE_CHECKING:
from ...timeseries._base_types import SystemBounds
from .. import power_distributing


@dataclasses.dataclass(frozen=True, kw_only=True)
Expand Down Expand Up @@ -52,12 +51,6 @@ class _Report:
target_power: Power | None
"""The currently set power for the components."""

distribution_result: power_distributing.Result | None
"""The result of the last power distribution.

This is `None` if no power distribution has been performed yet.
"""

_inclusion_bounds: timeseries.Bounds[Power] | None
"""The available inclusion bounds for the components, for the actor's priority.

Expand Down Expand Up @@ -266,15 +259,13 @@ def get_status(
component_ids: frozenset[int],
priority: int,
system_bounds: SystemBounds,
distribution_result: power_distributing.Result | None,
) -> _Report:
"""Get the bounds for a set of components, for the given priority.

Args:
component_ids: The IDs of the components to get the bounds for.
priority: The priority of the actor for which the bounds are requested.
system_bounds: The system bounds for the components.
distribution_result: The result of the last power distribution.

Returns:
The bounds for the components.
Expand Down
5 changes: 0 additions & 5 deletions src/frequenz/sdk/actor/_power_managing/_matryoshka.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

if typing.TYPE_CHECKING:
from ...timeseries._base_types import SystemBounds
from .. import power_distributing

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -225,15 +224,13 @@ def get_status(
component_ids: frozenset[int],
priority: int,
system_bounds: SystemBounds,
distribution_result: power_distributing.Result | None,
) -> _Report:
"""Get the bounds for the algorithm.
Args:
component_ids: The IDs of the components to get the bounds for.
priority: The priority of the actor for which the bounds are requested.
system_bounds: The system bounds for the components.
distribution_result: The result of the last power distribution.
Returns:
The target power and the available bounds for the given components, for
Expand All @@ -245,7 +242,6 @@ def get_status(
target_power=target_power,
_inclusion_bounds=None,
_exclusion_bounds=system_bounds.exclusion_bounds,
distribution_result=distribution_result,
)

lower_bound = system_bounds.inclusion_bounds.lower
Expand Down Expand Up @@ -284,7 +280,6 @@ def get_status(
lower=lower_bound, upper=upper_bound
),
_exclusion_bounds=system_bounds.exclusion_bounds,
distribution_result=distribution_result,
)

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments
self._set_op_power_subscriptions: dict[
frozenset[int], dict[int, Sender[_Report]]
] = {}
self._distribution_results: dict[frozenset[int], power_distributing.Result] = {}

self._set_power_group: BaseAlgorithm = Matryoshka(
max_proposal_age=timedelta(seconds=60.0)
Expand Down Expand Up @@ -120,7 +119,6 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
component_ids,
priority,
bounds,
self._distribution_results.get(component_ids),
)
await sender.send(status)
for priority, sender in self._set_power_subscriptions.get(
Expand All @@ -133,7 +131,6 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
bounds,
self._set_op_power_group.get_target_power(component_ids),
),
self._distribution_results.get(component_ids),
)
await sender.send(status)

Expand Down Expand Up @@ -403,9 +400,6 @@ async def _run(self) -> None:
)

result = selected.message
self._distribution_results[frozenset(result.request.component_ids)] = (
result
)
if not isinstance(result, power_distributing.Success):
_logger.warning(
"PowerManagingActor: PowerDistributing failed: %s", result
Expand Down
9 changes: 9 additions & 0 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ def new_ev_charger_pool(
power_manager_bounds_subs_sender=(
self._ev_power_wrapper.bounds_subscription_channel.new_sender()
),
power_distribution_results_fetcher=(
self._ev_power_wrapper.distribution_results_fetcher()
),
component_ids=component_ids,
)
)
Expand Down Expand Up @@ -343,6 +346,9 @@ def new_pv_pool(
power_manager_bounds_subs_sender=(
self._pv_power_wrapper.bounds_subscription_channel.new_sender()
),
power_distribution_results_fetcher=(
self._pv_power_wrapper.distribution_results_fetcher()
),
component_ids=component_ids,
)

Expand Down Expand Up @@ -420,6 +426,9 @@ def new_battery_pool(
power_manager_bounds_subscription_sender=(
self._battery_power_wrapper.bounds_subscription_channel.new_sender()
),
power_distribution_results_fetcher=(
self._battery_power_wrapper.distribution_results_fetcher()
),
min_update_interval=self._resampler_config.resampling_period,
batteries_id=component_ids,
)
Expand Down
6 changes: 6 additions & 0 deletions src/frequenz/sdk/microgrid/_power_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# pylint: disable=cyclic-import
from frequenz.client.microgrid import ComponentCategory, ComponentType

from .._internal._channels import ReceiverFetcher

# A number of imports had to be done inside functions where they are used, to break
# import cycles.
#
Expand Down Expand Up @@ -178,3 +180,7 @@ async def stop(self) -> None:
await self._power_distributing_actor.stop()
if self._power_managing_actor:
await self._power_managing_actor.stop()

def distribution_results_fetcher(self) -> ReceiverFetcher[Result]:
"""Return a fetcher for the power distribution results."""
return self._power_distribution_results_channel
19 changes: 17 additions & 2 deletions src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from collections import abc

from ... import timeseries
from ..._internal._channels import ReceiverFetcher
from ...actor import _power_managing
from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher
from ...actor import _power_managing, power_distributing
from ...timeseries import Energy, Percentage, Power, Sample, Temperature
from .._base_types import SystemBounds
from ..formula_engine import FormulaEngine
Expand Down Expand Up @@ -384,6 +384,21 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:

return channel

@property
def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]:
"""Get a receiver to receive power distribution results.

Returns:
A receiver that will stream power distribution results for the pool's set of
batteries.
"""
return MappingReceiverFetcher(
self._pool_ref_store._power_dist_results_fetcher,
lambda recv: recv.filter(
lambda x: x.request.component_ids == self._pool_ref_store._batteries
),
)

@property
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
"""Get receiver to receive new power bounds when they change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
from frequenz.client.microgrid import ComponentCategory

from ..._internal._asyncio import cancel_and_await
from ..._internal._channels import ReceiverFetcher
from ...actor._channel_registry import ChannelRegistry
from ...actor._data_sourcing._component_metric_request import ComponentMetricRequest
from ...actor._power_managing._base_classes import Proposal, ReportRequest
from ...actor.power_distributing import Result
from ...actor.power_distributing._component_status import ComponentPoolStatus
from ...microgrid import connection_manager
from ..formula_engine._formula_engine_pool import FormulaEnginePool
Expand All @@ -43,6 +45,7 @@ def __init__( # pylint: disable=too-many-arguments
batteries_status_receiver: Receiver[ComponentPoolStatus],
power_manager_requests_sender: Sender[Proposal],
power_manager_bounds_subscription_sender: Sender[ReportRequest],
power_distribution_results_fetcher: ReceiverFetcher[Result],
min_update_interval: timedelta,
batteries_id: Set[int] | None = None,
) -> None:
Expand All @@ -63,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments
requests to the power managing actor.
power_manager_bounds_subscription_sender: A Channel sender for sending
power bounds requests to the power managing actor.
power_distribution_results_fetcher: A ReceiverFetcher for the results from
the power distributing actor.
min_update_interval: Some metrics in BatteryPool are send only when they
change. For these metrics min_update_interval is the minimum time
interval between the following messages.
Expand Down Expand Up @@ -105,6 +110,9 @@ def __init__( # pylint: disable=too-many-arguments
self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}"
self._power_distributing_namespace: str = f"power-distributor-{self._namespace}"
self._channel_registry: ChannelRegistry = channel_registry
self._power_dist_results_fetcher: ReceiverFetcher[Result] = (
power_distribution_results_fetcher
)
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
self._namespace,
self._channel_registry,
Expand Down
8 changes: 0 additions & 8 deletions src/frequenz/sdk/timeseries/battery_pool/_result_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import abc
import typing

from ...actor import power_distributing
from .._base_types import Bounds
from .._quantities import Power

Expand All @@ -20,13 +19,6 @@ class BatteryPoolReport(typing.Protocol):
def target_power(self) -> Power | None:
"""The currently set power for the batteries."""

@property
def distribution_result(self) -> power_distributing.Result | None:
"""The result of the last power distribution.

This is `None` if no power distribution has been performed yet.
"""

@property
def bounds(self) -> Bounds[Power] | None:
"""The usable bounds for the batteries.
Expand Down
19 changes: 17 additions & 2 deletions src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import uuid
from collections import abc

from ..._internal._channels import ReceiverFetcher
from ...actor import _power_managing
from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher
from ...actor import _power_managing, power_distributing
from ...timeseries import Bounds
from .._base_types import SystemBounds
from .._quantities import Current, Power
Expand Down Expand Up @@ -227,6 +227,21 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]:

return channel

@property
def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]:
"""Get a receiver to receive power distribution results.
Returns:
A receiver that will stream power distribution results for the pool's set of
EV chargers.
"""
return MappingReceiverFetcher(
self._pool_ref_store.power_distribution_results_fetcher,
lambda recv: recv.filter(
lambda x: x.request.component_ids == self._pool_ref_store.component_ids
),
)

async def stop(self) -> None:
"""Stop all tasks and channels owned by the EVChargerPool."""
await self._pool_ref_store.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

"""Manages shared state/tasks for a set of EV chargers."""


import asyncio
import uuid
from collections import abc

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.microgrid import ComponentCategory

from ..._internal._channels import ReceiverFetcher
from ...actor import ChannelRegistry, ComponentMetricRequest
from ...actor._power_managing._base_classes import Proposal, ReportRequest
from ...actor.power_distributing import ComponentPoolStatus
from ...actor.power_distributing import ComponentPoolStatus, Result
from ...microgrid import connection_manager
from .._base_types import SystemBounds
from ..formula_engine._formula_engine_pool import FormulaEnginePool
Expand All @@ -40,6 +40,7 @@ def __init__( # pylint: disable=too-many-arguments
status_receiver: Receiver[ComponentPoolStatus],
power_manager_requests_sender: Sender[Proposal],
power_manager_bounds_subs_sender: Sender[ReportRequest],
power_distribution_results_fetcher: ReceiverFetcher[Result],
component_ids: abc.Set[int] | None = None,
):
"""Create an instance of the class.
Expand All @@ -55,6 +56,8 @@ def __init__( # pylint: disable=too-many-arguments
requests to the power managing actor.
power_manager_bounds_subs_sender: A Channel sender for sending power bounds
subscription requests to the power managing actor.
power_distribution_results_fetcher: A ReceiverFetcher for the results from
the power distributing actor.
component_ids: An optional list of component_ids belonging to this pool. If
not specified, IDs of all EV Chargers in the microgrid will be fetched
from the component graph.
Expand All @@ -64,6 +67,7 @@ def __init__( # pylint: disable=too-many-arguments
self.status_receiver = status_receiver
self.power_manager_requests_sender = power_manager_requests_sender
self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender
self.power_distribution_results_fetcher = power_distribution_results_fetcher

if component_ids is not None:
self.component_ids: frozenset[int] = frozenset(component_ids)
Expand Down
Loading