Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
* `grid.current` -> `grid.current_per_phase`
* `ev_charger_pool.current` -> `ev_charger_pool.current_per_phase`

## New Features
* Passing a `request_timeout` in calls to `*_pool.propose_power` is no longer supported. It may be specified at application startup, through the new optional `api_power_request_timeout` parameter in the `microgrid.initialize()` method.

<!-- Here goes the main new features and examples or instructions on how to use them -->
## New Features

- Classes Bounds and SystemBounds now work with the `in` operator
- Classes `Bounds` and `SystemBounds` now implement the `__contains__` method, allowing the use of the `in` operator to check whether a value falls within the bounds or not.

## Bug Fixes

Expand Down
1 change: 1 addition & 0 deletions benchmarks/power_distribution/power_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async def run_test( # pylint: disable=too-many-locals
requests_receiver=power_request_channel.new_receiver(),
results_sender=power_result_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
api_power_request_timeout=timedelta(seconds=5.0),
):
tasks: list[Coroutine[Any, Any, list[Result]]] = []
tasks.append(send_requests(batteries, num_requests))
Expand Down
4 changes: 0 additions & 4 deletions src/frequenz/sdk/actor/_power_managing/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import abc
import dataclasses
import datetime
import enum
import typing

Expand Down Expand Up @@ -164,9 +163,6 @@ class Proposal:
This is used by the power manager to determine the age of the proposal.
"""

request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0)
"""The maximum amount of time to wait for the request to be fulfilled."""

set_operating_point: bool
"""Whether this proposal sets the operating point power or the normal power."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,11 @@ async def _send_updated_target_power(
proposal,
must_send,
)
request_timeout = (
proposal.request_timeout if proposal else timedelta(seconds=5.0)
)
if target_power is not None:
await self._power_distributing_requests_sender.send(
power_distributing.Request(
power=target_power,
component_ids=component_ids,
request_timeout=request_timeout,
adjust_power=True,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,15 @@ def _add(key: str, value: dict[int, set[int]] | None) -> None:
return mapping


class BatteryManager(ComponentManager):
class BatteryManager(ComponentManager): # pylint: disable=too-many-instance-attributes
"""Class to manage the data streams for batteries."""

@override
def __init__(
self,
component_pool_status_sender: Sender[ComponentPoolStatus],
results_sender: Sender[Result],
api_power_request_timeout: timedelta,
):
"""Initialize this instance.

Expand All @@ -142,8 +143,11 @@ def __init__(
streams, to dynamically adjust the values based on the health of the
individual batteries.
results_sender: Channel sender to send the power distribution results to.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API.
"""
self._results_sender = results_sender
self._api_power_request_timeout = api_power_request_timeout
self._batteries = connection_manager.get().component_graph.components(
component_categories={ComponentCategory.BATTERY}
)
Expand Down Expand Up @@ -277,7 +281,7 @@ async def _distribute_power(
)

failed_power, failed_batteries = await self._set_distributed_power(
distribution, request.request_timeout
distribution, self._api_power_request_timeout
)

response: Success | PartialFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

_logger = logging.getLogger(__name__)

_DEFAULT_API_REQUEST_TIMEOUT = timedelta(seconds=5.0)


class EVChargerManager(ComponentManager):
"""Manage ev chargers for the power distributor."""
Expand All @@ -43,15 +41,19 @@ def __init__(
self,
component_pool_status_sender: Sender[ComponentPoolStatus],
results_sender: Sender[Result],
api_power_request_timeout: timedelta,
):
"""Initialize the ev charger data manager.

Args:
component_pool_status_sender: Channel for sending information about which
components are expected to be working.
results_sender: Channel for sending results of power distribution.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API.
"""
self._results_sender = results_sender
self._api_power_request_timeout = api_power_request_timeout
self._ev_charger_ids = self._get_ev_charger_ids()
self._evc_states = EvcStates()
self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache(
Expand Down Expand Up @@ -226,7 +228,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
*[await api.ev_charger_data(evc_id) for evc_id in self._ev_charger_ids]
)
target_power_rx = self._target_power_channel.new_receiver()
api_request_timeout = _DEFAULT_API_REQUEST_TIMEOUT
latest_target_powers: dict[int, Power] = {}
async for selected in select(ev_charger_data_rx, target_power_rx):
target_power_changes = {}
Expand Down Expand Up @@ -260,7 +261,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals

elif selected_from(selected, target_power_rx):
self._latest_request = selected.message
api_request_timeout = selected.message.request_timeout
self._target_power = selected.message.power
_logger.debug("New target power: %s", self._target_power)
used_power = self._evc_states.get_ev_total_used_power()
Expand Down Expand Up @@ -288,7 +288,7 @@ async def _run(self) -> None: # pylint: disable=too-many-locals

latest_target_powers.update(target_power_changes)
result = await self._set_api_power(
api, target_power_changes, api_request_timeout
api, target_power_changes, self._api_power_request_timeout
)
await self._results_sender.send(result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ def __init__(
self,
component_pool_status_sender: Sender[ComponentPoolStatus],
results_sender: Sender[Result],
api_power_request_timeout: timedelta,
) -> None:
"""Initialize this instance.

Args:
component_pool_status_sender: Channel for sending information about which
components are expected to be working.
results_sender: Channel for sending results of power distribution.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API.
"""
self._results_sender = results_sender
self._api_power_request_timeout = api_power_request_timeout
self._pv_inverter_ids = self._get_pv_inverter_ids()

self._component_pool_status_tracker = (
Expand Down Expand Up @@ -177,7 +181,7 @@ async def _set_api_power( # pylint: disable=too-many-locals
)
_, pending = await asyncio.wait(
tasks.values(),
timeout=request.request_timeout.total_seconds(),
timeout=self._api_power_request_timeout.total_seconds(),
return_when=asyncio.ALL_COMPLETED,
)
# collect the timed out tasks and cancel them while keeping the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"""


from datetime import timedelta

from frequenz.channels import Receiver, Sender
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
from typing_extensions import override
Expand Down Expand Up @@ -60,6 +62,7 @@ def __init__( # pylint: disable=too-many-arguments
results_sender: Sender[Result],
component_pool_status_sender: Sender[ComponentPoolStatus],
*,
api_power_request_timeout: timedelta,
component_category: ComponentCategory,
component_type: ComponentType | None = None,
name: str | None = None,
Expand All @@ -72,6 +75,8 @@ def __init__( # pylint: disable=too-many-arguments
results_sender: Sender for sending results to the power manager.
component_pool_status_sender: Channel for sending information about which
components are expected to be working.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API.
component_category: The category of the components that this actor is
responsible for.
component_type: The type of the component of the given category that this
Expand All @@ -92,22 +97,23 @@ def __init__( # pylint: disable=too-many-arguments
self._component_type = component_type
self._requests_receiver = requests_receiver
self._result_sender = results_sender
self._api_power_request_timeout = api_power_request_timeout

self._component_manager: ComponentManager
if component_category == ComponentCategory.BATTERY:
self._component_manager = BatteryManager(
component_pool_status_sender, results_sender
component_pool_status_sender, results_sender, api_power_request_timeout
)
elif component_category == ComponentCategory.EV_CHARGER:
self._component_manager = EVChargerManager(
component_pool_status_sender, results_sender
component_pool_status_sender, results_sender, api_power_request_timeout
)
elif (
component_category == ComponentCategory.INVERTER
and component_type == InverterType.SOLAR
):
self._component_manager = PVManager(
component_pool_status_sender, results_sender
component_pool_status_sender, results_sender, api_power_request_timeout
)
else:
raise ValueError(
Expand Down
4 changes: 0 additions & 4 deletions src/frequenz/sdk/actor/power_distributing/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import dataclasses
from collections import abc
from datetime import timedelta

from ...timeseries._quantities import Power

Expand All @@ -20,9 +19,6 @@ class Request:
component_ids: abc.Set[int]
"""The component ids of the components to be used for this request."""

request_timeout: timedelta = timedelta(seconds=5.0)
"""The maximum amount of time to wait for the request to be fulfilled."""

adjust_power: bool = True
"""Whether to adjust the power to match the bounds.

Expand Down
24 changes: 20 additions & 4 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import typing
from collections import abc
from dataclasses import dataclass
from datetime import timedelta

from frequenz.channels import Broadcast, Sender
from frequenz.client.microgrid import ComponentCategory, InverterType
Expand Down Expand Up @@ -79,11 +80,14 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes
def __init__(
self,
resampler_config: ResamplerConfig,
api_power_request_timeout: timedelta = timedelta(seconds=5.0),
) -> None:
"""Create a `DataPipeline` instance.

Args:
resampler_config: Config to pass on to the resampler.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API.
"""
from ..actor import ChannelRegistry

Expand All @@ -97,13 +101,18 @@ def __init__(
self._resampling_actor: _ActorInfo | None = None

self._battery_power_wrapper = PowerWrapper(
self._channel_registry, component_category=ComponentCategory.BATTERY
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
component_category=ComponentCategory.BATTERY,
)
self._ev_power_wrapper = PowerWrapper(
self._channel_registry, component_category=ComponentCategory.EV_CHARGER
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
component_category=ComponentCategory.EV_CHARGER,
)
self._pv_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
component_category=ComponentCategory.INVERTER,
component_type=InverterType.SOLAR,
)
Expand Down Expand Up @@ -485,11 +494,18 @@ async def _stop(self) -> None:
_DATA_PIPELINE: _DataPipeline | None = None


async def initialize(resampler_config: ResamplerConfig) -> None:
async def initialize(
resampler_config: ResamplerConfig,
api_power_request_timeout: timedelta = timedelta(seconds=5.0),
) -> None:
"""Initialize a `DataPipeline` instance.

Args:
resampler_config: Config to pass on to the resampler.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API. When requests to components timeout, they will
be marked as blocked for a short duration, during which time they
will be unavailable from the corresponding component pools.

Raises:
RuntimeError: if the DataPipeline is already initialized.
Expand All @@ -498,7 +514,7 @@ async def initialize(resampler_config: ResamplerConfig) -> None:

if _DATA_PIPELINE is not None:
raise RuntimeError("DataPipeline is already initialized.")
_DATA_PIPELINE = _DataPipeline(resampler_config)
_DATA_PIPELINE = _DataPipeline(resampler_config, api_power_request_timeout)


def frequency() -> GridFrequency:
Expand Down
6 changes: 6 additions & 0 deletions src/frequenz/sdk/microgrid/_power_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import logging
import typing
from datetime import timedelta

from frequenz.channels import Broadcast

Expand Down Expand Up @@ -38,13 +39,16 @@ def __init__(
self,
channel_registry: ChannelRegistry,
*,
api_power_request_timeout: timedelta,
component_category: ComponentCategory,
component_type: ComponentType | None = None,
):
"""Initialize the power control.

Args:
channel_registry: A channel registry for use in the actors.
api_power_request_timeout: Timeout to use when making power requests to
the microgrid API.
component_category: The category of the components that actors started by
this instance of the PowerWrapper will be responsible for.
component_type: The type of the component of the given category that this
Expand All @@ -58,6 +62,7 @@ def __init__(
self._component_category = component_category
self._component_type = component_type
self._channel_registry = channel_registry
self._api_power_request_timeout = api_power_request_timeout

self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast(
name="Component Status Channel", resend_latest=True
Expand Down Expand Up @@ -145,6 +150,7 @@ def _start_power_distributing_actor(self) -> None:
self._power_distributing_actor = PowerDistributingActor(
component_category=self._component_category,
component_type=self._component_type,
api_power_request_timeout=self._api_power_request_timeout,
requests_receiver=self._power_distribution_requests_channel.new_receiver(),
results_sender=self._power_distribution_results_channel.new_sender(),
component_pool_status_sender=self.status_channel.new_sender(),
Expand Down
Loading