diff --git a/benchmarks/power_distribution/power_distributor.py b/benchmarks/power_distribution/power_distributor.py index bdc2885d8..0cbef5497 100644 --- a/benchmarks/power_distribution/power_distributor.py +++ b/benchmarks/power_distribution/power_distributor.py @@ -115,6 +115,7 @@ async def run_test( # pylint: disable=too-many-locals battery_status_channel = Broadcast[ComponentPoolStatus]("battery-status") power_result_channel = Broadcast[Result]("power-result") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=power_request_channel.new_receiver(), results_sender=power_result_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), diff --git a/benchmarks/timeseries/benchmark_ringbuffer.py b/benchmarks/timeseries/benchmark_ringbuffer.py index 73dcd0842..9176c4ff2 100644 --- a/benchmarks/timeseries/benchmark_ringbuffer.py +++ b/benchmarks/timeseries/benchmark_ringbuffer.py @@ -174,14 +174,14 @@ def main() -> None: "Time to fill 29 days with data:\n\t" + f"Array: {array_times['fill']} seconds\n\t" + f"List: {list_times['fill']} seconds\n\t" - + f"Diff: {array_times['fill'] - list_times['fill']}" + + f"Diff: {array_times['fill'] - list_times['fill']}" ) print( "Day-Slices into 29 days with data:\n\t" + f"Array: {array_times['test']/num_runs} seconds\n\t" + f"List: {list_times['test']/num_runs} seconds\n\t" - + f"Diff: {array_times['test']/num_runs - list_times['test']/num_runs}" + + f"Diff: {array_times['test']/num_runs - list_times['test']/num_runs}" ) print(f" {''.join(['='] * (num_runs + 1))}") @@ -195,7 +195,7 @@ def main() -> None: "Avg of windows of 29 days and running average & mean on every day:\n\t" + f"Array: {slicing_array_times['avg']/num_runs} seconds\n\t" + f"List: {slicing_list_times['avg']/num_runs} seconds\n\t" - + f"Diff: {slicing_array_times['avg']/num_runs - slicing_list_times['avg']/num_runs}" + + f"Diff: {slicing_array_times['avg']/num_runs - slicing_list_times['avg']/num_runs}" ) print( @@ -203,7 +203,7 @@ def main() -> None: + f"Array: {slicing_array_times['median']/num_runs} seconds\n\t" + f"List: {slicing_list_times['median']/num_runs} seconds\n\t" + "Diff: " - + f"{slicing_array_times['median']/num_runs - slicing_list_times['median']/num_runs}" + + f"{slicing_array_times['median']/num_runs - slicing_list_times['median']/num_runs}" ) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 394a7dadd..bb3946958 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -133,17 +133,22 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None: microgrid, ) - if self._component_category is not ComponentCategory.BATTERY: + bounds_receiver: Receiver[SystemBounds] + # pylint: disable=protected-access + if self._component_category is ComponentCategory.BATTERY: + battery_pool = microgrid.battery_pool(component_ids) + bounds_receiver = battery_pool._system_power_bounds.new_receiver() + elif self._component_category is ComponentCategory.EV_CHARGER: + ev_charger_pool = microgrid.ev_charger_pool(component_ids) + bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver() + # pylint: enable=protected-access + else: err = ( "PowerManagingActor: Unsupported component category: " f"{self._component_category}" ) _logger.error(err) raise NotImplementedError(err) - battery_pool = microgrid.battery_pool(component_ids) - # pylint: disable=protected-access - bounds_receiver = battery_pool._system_power_bounds.new_receiver() - # pylint: enable=protected-access self._system_bounds[component_ids] = SystemBounds( timestamp=datetime.now(tz=timezone.utc), diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py index d9cb07266..0ec7d84be 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py @@ -5,8 +5,10 @@ from ._battery_manager import BatteryManager from ._component_manager import ComponentManager +from ._ev_charger_manager import EVChargerManager __all__ = [ "BatteryManager", "ComponentManager", + "EVChargerManager", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/__init__.py new file mode 100644 index 000000000..e0e4beca7 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/__init__.py @@ -0,0 +1,10 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manage ev chargers for the power distributor.""" + +from ._ev_charger_manager import EVChargerManager + +__all__ = [ + "EVChargerManager", +] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py new file mode 100644 index 000000000..81e659f56 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py @@ -0,0 +1,27 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Configuration for the power distributor's EV charger manager.""" + +from collections import abc +from dataclasses import dataclass, field +from datetime import timedelta + +from .....timeseries import Current + + +@dataclass(frozen=True) +class EVDistributionConfig: + """Configuration for the power distributor's EV charger manager.""" + + component_ids: abc.Set[int] + """The component ids of the EV chargers.""" + + min_current: Current = field(default_factory=lambda: Current.from_amperes(6.0)) + """The minimum current that can be allocated to an EV charger.""" + + initial_current: Current = field(default_factory=lambda: Current.from_amperes(10.0)) + """The initial current that can be allocated to an EV charger.""" + + increase_power_interval: timedelta = timedelta(seconds=30) + """The interval at which the power can be increased for an EV charger.""" diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py new file mode 100644 index 000000000..9662ba0b4 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -0,0 +1,333 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manage EV chargers for the power distributor.""" + +import asyncio +import collections.abc +import logging +from datetime import datetime, timedelta, timezone + +from frequenz.channels import Broadcast, Sender +from frequenz.channels.util import Merge, select, selected_from +from typing_extensions import override + +from frequenz.sdk import microgrid + +from ....._internal._channels import LatestValueCache +from ....._internal._math import is_close_to_zero +from .....microgrid.component import ComponentCategory, EVChargerData +from .....timeseries import Power, Sample3Phase, Voltage +from ..._component_pool_status_tracker import ComponentPoolStatusTracker + +# from .._component_pool_status_tracker import ComponentPoolStatusTracker +from ..._component_status import ComponentPoolStatus, EVChargerStatusTracker +from ...request import Request +from ...result import Result, Success +from .._component_manager import ComponentManager +from ._config import EVDistributionConfig +from ._states import EvcState, EvcStates + +_logger = logging.getLogger(__name__) + + +class EVChargerManager(ComponentManager): + """Manage ev chargers for the power distributor.""" + + @override + def __init__( + self, + component_pool_status_sender: Sender[ComponentPoolStatus], + ): + """Initialize the ev charger data manager. + + Args: + component_pool_status_sender: Channel for sending information about which + components are expected to be working. + """ + self._ev_charger_ids = self._get_ev_charger_ids() + self._evc_states = EvcStates() + self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache( + microgrid.voltage().new_receiver() + ) + self._config = EVDistributionConfig(component_ids=self._ev_charger_ids) + self._component_pool_status_tracker = ComponentPoolStatusTracker( + component_ids=self._ev_charger_ids, + component_status_sender=component_pool_status_sender, + max_data_age=timedelta(seconds=10.0), + max_blocking_duration=timedelta(seconds=30.0), + component_status_tracker_type=EVChargerStatusTracker, + ) + self._target_power = Power.zero() + self._target_power_channel = Broadcast[Power]("target_power") + self._target_power_tx = self._target_power_channel.new_sender() + self._task = None + + @override + def component_ids(self) -> collections.abc.Set[int]: + """Return the set of ev charger ids.""" + return self._ev_charger_ids + + @override + async def start(self) -> None: + """Start the ev charger data manager.""" + self._task = asyncio.create_task(self._run_forever()) + + @override + async def distribute_power(self, request: Request) -> Result: + """Distribute the requested power to the ev chargers. + + Args: + request: Request to get the distribution for. + + Returns: + Result of the distribution. + """ + await self._target_power_tx.send(request.power) + + # TODO: check max_power check based on sum of bounds + return Success( + request, + Power.zero(), + set(), + Power.zero(), + ) + + @override + async def stop(self) -> None: + """Stop the ev charger data manager.""" + + def _get_ev_charger_ids(self) -> collections.abc.Set[int]: + return { + evc.component_id + for evc in microgrid.connection_manager.get().component_graph.components( + component_categories={ComponentCategory.EV_CHARGER} + ) + } + + def _allocate_new_ev(self, component_id: int) -> list[tuple[int, Power]]: + available_power = ( + self._target_power - self._evc_states.get_total_allocated_power() + ) + voltage = self._voltage_cache.get().min() + if voltage is None: + _logger.warning( + "Voltage data is not available. Cannot allocate power to EV charger %s", + component_id, + ) + return [] + initial_power = voltage * self._config.initial_current + if available_power > initial_power: + return [(component_id, initial_power)] + + min_power = voltage * self._config.min_current + if available_power > min_power: + return [(component_id, min_power)] + + return [] + + def _act_on_new_data(self, ev_data: EVChargerData) -> list[tuple[int, Power]]: + component_id = ev_data.component_id + ev_connected = ev_data.is_ev_connected() + ev_previously_connected = self._evc_states.get( + component_id + ).last_data.is_ev_connected() + + # if EV is just connected, try to set config.initial_current, throttle other + # EVs if necessary + ev_newly_connected = ev_connected and not ev_previously_connected + if ev_newly_connected: + self._evc_states.get(component_id).update_state(ev_data) + _logger.info("New EV connected to EV charger %s", component_id) + return self._allocate_new_ev(component_id) + + # if EV is disconnected, set limit to 0.0. redistribution to other EVs will + # happen separately, when possible. + if not ev_connected: + if ev_previously_connected: + _logger.info("EV disconnected from EV charger %s", component_id) + self._evc_states.get(component_id).update_state(ev_data) + if self._evc_states.get(component_id).last_allocation > Power.zero(): + return [(component_id, Power.zero())] + + # else if last throttling was less than 'increase_power_interval', do nothing. + now = datetime.now(tz=timezone.utc) + last_throttling_time = self._evc_states.get(component_id).last_reallocation_time + if last_throttling_time is not None: + dur = now - last_throttling_time + if dur < self._config.increase_power_interval: + return [] + + self._evc_states.get(ev_data.component_id).update_state(ev_data) + + # if ev's bounds were previously set to zero, treat it like it is newly + # connected + evc = self._evc_states.get(component_id) + if is_close_to_zero(evc.last_allocation.as_watts()): + return self._allocate_new_ev(component_id) + + # if the ev charger is already allocated the max power, do nothing + allottable_power = Power.from_watts( + evc.last_data.active_power_inclusion_upper_bound + - evc.last_allocation.as_watts() + ) + if ( + is_close_to_zero(allottable_power.as_watts()) + or allottable_power < Power.zero() + ): + return [] + + available_power = ( + self._target_power - self._evc_states.get_total_allocated_power() + ) + allottable_power = min(allottable_power, available_power) + + target_power = min( + evc.last_allocation + allottable_power, + Power.from_watts(evc.last_data.active_power_inclusion_upper_bound), + ) + _logger.info( + "Increasing power to EV charger %s from %s to %s", + component_id, + evc.last_allocation, + target_power, + ) + return [(component_id, target_power)] + + async def _run_forever(self) -> None: + while True: + try: + await self._run() + except: # pylint: disable=bare-except + _logger.exception("Recovering from an error in EV charger manager.") + await asyncio.sleep(1.0) + + async def _run(self) -> None: + api = microgrid.connection_manager.get().api_client + ev_charger_data_rx = Merge( + *[await api.ev_charger_data(evc_id) for evc_id in self._ev_charger_ids] + ) + target_power_rx = self._target_power_channel.new_receiver() + async for selected in select(ev_charger_data_rx, target_power_rx): + bounds_changes = [] + if selected_from(selected, ev_charger_data_rx): + evc_data = selected.value + # If a new ev charger is added, add it to the state tracker, with + # now as the last reallocation time and last charging time. + # + # This means it won't be assigned any power until the reallocation + # duration has passed. + if evc_data.component_id not in self._evc_states: + now = datetime.now(tz=timezone.utc) + self._evc_states.add_evc( + EvcState( + component_id=evc_data.component_id, + last_data=evc_data, + power=Power.zero(), + last_allocation=Power.zero(), + last_reallocation_time=now, + last_charging_time=now, + ) + ) + bounds_changes = [(evc_data.component_id, Power.zero())] + + # See if the ev charger has room for more power, and if the last + # allocation was not in the last reallocation duration. + else: + bounds_changes = self._act_on_new_data(evc_data) + + elif selected_from(selected, target_power_rx): + self._target_power = selected.value + _logger.debug("New target power: %s", self._target_power) + used_power = self._evc_states.get_ev_total_used_power() + if self._target_power < used_power: + bounds_changes = self._throttle_ev_chargers( + used_power - self._target_power + ) + + if bounds_changes: + _logger.debug("Setting power to EV chargers: %s", bounds_changes) + else: + continue + now = datetime.now(tz=timezone.utc) + for component_id, power in bounds_changes: + try: + self._evc_states.get(component_id).update_last_allocation( + power, now + ) + await api.set_power(component_id, power.as_watts()) + except Exception: # pylint: disable=bare-except + _logger.error( + "Failed to set power to EV charger %s to %s", + component_id, + power, + ) + + def _throttle_ev_chargers(self, throttle_by: Power) -> list[tuple[int, Power]]: + """Reduce EV charging power to meet the target power. + + Level 1 throttling is done by reducing the power to the minimum current required + to charge the EV. When the consumption is still above the target power, level 2 + throttling is done by reducing the power to 0. + + Args: + throttle_by: The amount of power to reduce the total EV charging power by. + + Returns: + A list of new (reduced) charging current limits for a subset of ev + chargers, required to bring down the consumption by the given value. + """ + if throttle_by <= Power.zero(): + return [] + + min_power = Power.zero() + voltage = self._voltage_cache.get().min() + if voltage is None: + _logger.warning( + "Voltage data is not available. Cannot perform level 1 throttling.", + ) + else: + min_power = voltage * self._config.min_current + + evc_list = list(self._evc_states.values()) + evc_list.sort(key=lambda st: (st.power, st.last_allocation), reverse=True) + + level1_throttling_count = 0 + level1_amps_achieved = Power.zero() + + level2_throttling_count = 0 + level2_amps_achieved = Power.zero() + + for evc in evc_list: + evc_power = evc.power + evc_level1_power = Power.zero() + if evc_power > min_power: + evc_level1_power = evc_power - min_power + + if evc_power == Power.zero(): + evc_power = evc.last_allocation + + if evc_power == Power.zero(): + break + + if level1_amps_achieved < throttle_by: + level1_amps_achieved += evc_level1_power + level1_throttling_count += 1 + else: + break + if level2_amps_achieved < throttle_by: + level2_amps_achieved += evc_power + level2_throttling_count += 1 + + if level1_amps_achieved >= throttle_by: + throttling_bounds = [ + (evc.component_id, min_power) + for evc in evc_list[:level1_throttling_count] + ] + else: + throttling_bounds = [ + (evc.component_id, Power.zero()) + for evc in evc_list[:level2_throttling_count] + ] + _logger.debug("Throttling: %s", throttling_bounds) + return throttling_bounds diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_states.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_states.py new file mode 100644 index 000000000..349eeb8c6 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_states.py @@ -0,0 +1,131 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Power distribution state tracking for ev chargers.""" + + +from dataclasses import dataclass +from datetime import datetime +from typing import Iterable + +from .....microgrid.component import EVChargerData +from .....timeseries import Power + + +@dataclass +class EvcState: + """A class for tracking state of an ev charger.""" + + component_id: int + """The component id of the ev charger.""" + + last_data: EVChargerData + """The last data received from the EV charger.""" + + power: Power + """The power currently used by the EV charger.""" + + last_allocation: Power + """The last allocation made for the EV.""" + + last_reallocation_time: datetime + """The last time the ev charger was allocated power. + + Used to make sure we don't allocate power to the ev charger too often. + """ + + last_charging_time: datetime + """The last time the ev charger was charging. + + Used to de-allocate power from the ev charger if it has not been charging + for a while. + """ + + def update_last_allocation(self, allocation: Power, alloc_time: datetime) -> None: + """Update the last allocation and related timestamps. + + Args: + allocation: The most allocation allocation made for the EV. + alloc_time: The time at which the allocation was made. + """ + self.last_allocation = allocation + self.last_reallocation_time = alloc_time + self.last_charging_time = alloc_time + + def update_state( + self, + latest_ev_data: EVChargerData, + ) -> None: + """Update EvcState from component data. + + Args: + latest_ev_data: latest ev data from component data stream. + """ + self.power = Power.from_watts(latest_ev_data.active_power) + self.last_data = latest_ev_data + + if self.power > Power.zero(): + self.last_charging_time = latest_ev_data.timestamp + + +class EvcStates: + """Tracks states of all ev chargers.""" + + _states: dict[int, EvcState] + + def __init__(self) -> None: + """Initialize the EvcStates object.""" + self._states = {} + + def get_ev_total_used_power(self) -> Power: + """Return the total power consumed by all EV Chargers.""" + total_used = Power.zero() + for evc in self._states.values(): + total_used += evc.power + return total_used + + def get_total_allocated_power(self) -> Power: + """Return the total power allocated to all EV Chargers.""" + total_allocated = Power.zero() + for evc in self._states.values(): + total_allocated += evc.last_allocation + return total_allocated + + def get(self, component_id: int) -> EvcState: + """Return a reference to the EvcState object with the given component_id. + + Args: + component_id: identifies the object to retrieve. + + Returns: + The EvcState object with the given component_id. + """ + return self._states[component_id] + + def add_evc(self, state: EvcState) -> None: + """Add the given EvcState object to the list. + + Args: + state: The EvcState object to add to the list. + """ + self._states[state.component_id] = state + + def values(self) -> Iterable[EvcState]: + """Return an iterator over all EvcState objects. + + Returns: + An iterator over all EvcState objects. + """ + return self._states.values() + + def __contains__(self, component_id: int) -> bool: + """Check if the given component_id has an associated EvcState object. + + Args: + component_id: The component id to test. + + Returns: + Boolean indicating whether the given component_id is a known + EvCharger. + """ + return component_id in self._states diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py index 8fb33415c..929cbc726 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py @@ -11,6 +11,7 @@ ComponentStatusTracker, SetPowerResult, ) +from ._ev_charger_status_tracker import EVChargerStatusTracker __all__ = [ "BatteryStatusTracker", @@ -18,5 +19,6 @@ "ComponentStatus", "ComponentStatusEnum", "ComponentStatusTracker", + "EVChargerStatusTracker", "SetPowerResult", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py new file mode 100644 index 000000000..3995385f5 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py @@ -0,0 +1,194 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Background service that tracks the status of an EV charger.""" + + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +from frequenz.channels import Receiver, Sender +from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from +from typing_extensions import override + +from frequenz.sdk.microgrid import connection_manager +from frequenz.sdk.microgrid.component import ( + EVChargerCableState, + EVChargerComponentState, + EVChargerData, +) + +from ..._background_service import BackgroundService +from ._blocking_status import BlockingStatus +from ._component_status import ( + ComponentStatus, + ComponentStatusEnum, + ComponentStatusTracker, + SetPowerResult, +) + +_logger = logging.getLogger(__name__) + + +class EVChargerStatusTracker(ComponentStatusTracker, BackgroundService): + """Status tracker for EV chargers.""" + + @override + def __init__( # pylint: disable=too-many-arguments + self, + component_id: int, + max_data_age: timedelta, + max_blocking_duration: timedelta, + status_sender: Sender[ComponentStatus], + set_power_result_receiver: Receiver[SetPowerResult], + ) -> None: + """Create an `EVChargerStatusTracker` instance. + + Args: + component_id: ID of the EV charger to monitor the status of. + max_data_age: max duration to wait for, before marking a component as + NOT_WORKING, unless new data arrives. + max_blocking_duration: duration for which the component status should be + UNCERTAIN if a request to the component failed unexpectedly. + status_sender: Channel sender to send status updates to. + set_power_result_receiver: Receiver to fetch PowerDistributor responses + from, to get the status of the most recent request made for an EV + Charger. + """ + self._component_id = component_id + self._max_data_age = max_data_age + self._status_sender = status_sender + self._set_power_result_receiver = set_power_result_receiver + + self._last_status = ComponentStatusEnum.NOT_WORKING + self._blocking_status = BlockingStatus( + min_duration=timedelta(seconds=1.0), + max_duration=max_blocking_duration, + ) + + BackgroundService.__init__(self, name=f"EVChargerStatusTracker({component_id})") + + @override + def start(self) -> None: + """Start the status tracker.""" + self._tasks.add(asyncio.create_task(self._run_forever())) + + def _is_working(self, ev_data: EVChargerData) -> bool: + """Return whether the given EV charger can be assigned power. + + This is True when an EV is connected and the charger is in a healthy state. + """ + return ev_data.cable_state in ( + EVChargerCableState.EV_PLUGGED, + EVChargerCableState.EV_LOCKED, + ) and ev_data.component_state in ( + EVChargerComponentState.READY, + EVChargerComponentState.CHARGING, + EVChargerComponentState.DISCHARGING, + ) + + def _is_stale(self, ev_data: EVChargerData) -> bool: + """Return whether the given data is stale.""" + now = datetime.now(tz=timezone.utc) + stale = now - ev_data.timestamp > self._max_data_age + return stale + + async def _run_forever(self) -> None: + """Run the status tracker forever.""" + while True: + try: + await self._run() + except: # pylint: disable=broad-except + _logger.exception( + "Restarting after exception in EVChargerStatusTracker" + ) + await asyncio.sleep(1.0) + + def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum: + """Handle new EV charger data.""" + if self._is_stale(ev_data): + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s data is stale. Last timestamp: %s", + self._component_id, + ev_data.timestamp, + ) + return ComponentStatusEnum.NOT_WORKING + + if self._is_working(ev_data): + if self._last_status == ComponentStatusEnum.NOT_WORKING: + _logger.warning( + "EV charger %s is in WORKING state.", + self._component_id, + ) + return ComponentStatusEnum.WORKING + + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s is in NOT_WORKING state. " + "Cable state: %s, component state: %s", + self._component_id, + ev_data.cable_state, + ev_data.component_state, + ) + return ComponentStatusEnum.NOT_WORKING + + def _handle_set_power_result( + self, set_power_result: SetPowerResult + ) -> ComponentStatusEnum: + """Handle a new set power result.""" + if self._component_id in set_power_result.succeeded: + return ComponentStatusEnum.WORKING + + self._blocking_status.block() + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "EV charger %s is in UNCERTAIN state. Set power result: %s", + self._component_id, + set_power_result, + ) + return ComponentStatusEnum.UNCERTAIN + + async def _run(self) -> None: + """Run the status tracker.""" + api_client = connection_manager.get().api_client + ev_data_rx = await api_client.ev_charger_data(self._component_id) + set_power_result_rx = self._set_power_result_receiver + # TODO: Add missing data timer once resets are fixed in channels + # missing_data_timer = Timer(self._max_data_age, SkipMissedAndDrift()) + missing_data_timer = Timer(timedelta(seconds=100.0), SkipMissedAndDrift()) + async for selected in select( + ev_data_rx, set_power_result_rx, missing_data_timer + ): + new_status = ComponentStatusEnum.NOT_WORKING + if selected_from(selected, ev_data_rx): + missing_data_timer.reset() + new_status = self._handle_ev_data(selected.value) + elif selected_from(selected, set_power_result_rx): + new_status = self._handle_set_power_result(selected.value) + elif selected_from(selected, missing_data_timer): + _logger.warning( + "No EV charger %s data received for %s. Setting status to NOT_WORKING.", + self._component_id, + self._max_data_age, + ) + + # Send status update if status changed + if ( + self._blocking_status.is_blocked() + and new_status != ComponentStatusEnum.NOT_WORKING + ): + new_status = ComponentStatusEnum.UNCERTAIN + + if new_status != self._last_status: + _logger.info( + "EV charger %s status changed from %s to %s", + self._component_id, + self._last_status, + new_status, + ) + self._last_status = new_status + await self._status_sender.send( + ComponentStatus(self._component_id, new_status) + ) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 4a213b94f..19bd822bc 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -17,7 +17,8 @@ from frequenz.channels import Receiver, Sender from ...actor._actor import Actor -from ._component_managers import BatteryManager, ComponentManager +from ...microgrid.component import ComponentCategory +from ._component_managers import BatteryManager, ComponentManager, EVChargerManager from ._component_status import ComponentPoolStatus from .request import Request from .result import Result @@ -51,6 +52,7 @@ class PowerDistributingActor(Actor): def __init__( # pylint: disable=too-many-arguments self, + component_category: ComponentCategory, requests_receiver: Receiver[Request], results_sender: Sender[Result], component_pool_status_sender: Sender[ComponentPoolStatus], @@ -61,6 +63,8 @@ def __init__( # pylint: disable=too-many-arguments """Create class instance. Args: + component_category: The category of the components that this actor is + responsible for. requests_receiver: Receiver for receiving power requests from the power manager. results_sender: Sender for sending results to the power manager. @@ -70,15 +74,25 @@ def __init__( # pylint: disable=too-many-arguments request. It is a time needed to collect first components data. name: The name of the actor. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. + + Raises: + ValueError: If the given component category is not supported. """ super().__init__(name=name) + self._component_category = component_category self._requests_receiver = requests_receiver self._result_sender = results_sender self._wait_for_data_sec = wait_for_data_sec - self._component_manager: ComponentManager = BatteryManager( - component_pool_status_sender - ) + self._component_manager: ComponentManager + if component_category == ComponentCategory.BATTERY: + self._component_manager = BatteryManager(component_pool_status_sender) + elif component_category == ComponentCategory.EV_CHARGER: + self._component_manager = EVChargerManager(component_pool_status_sender) + else: + raise ValueError( + f"PowerDistributor doesn't support controlling: {component_category}" + ) async def _run(self) -> None: # pylint: disable=too-many-locals """Run actor main function. diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index 422bf0cc3..fed22d20b 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -18,6 +18,7 @@ from frequenz.channels import Broadcast, Sender from ..actor._actor import Actor +from ..microgrid.component import ComponentCategory from ..timeseries._grid_frequency import GridFrequency from ..timeseries._voltage_streamer import VoltageStreamer from ..timeseries.grid import Grid @@ -37,6 +38,9 @@ ) from ..timeseries.consumer import Consumer from ..timeseries.ev_charger_pool import EVChargerPool + from ..timeseries.ev_charger_pool._ev_charger_pool_reference_store import ( + EVChargerPoolReferenceStore, + ) from ..timeseries.logical_meter import LogicalMeter from ..timeseries.producer import Producer @@ -88,13 +92,18 @@ def __init__( self._data_sourcing_actor: _ActorInfo | None = None self._resampling_actor: _ActorInfo | None = None - self._battery_power_wrapper = PowerWrapper(self._channel_registry) + self._battery_power_wrapper = PowerWrapper( + ComponentCategory.BATTERY, self._channel_registry + ) + self._ev_power_wrapper = PowerWrapper( + ComponentCategory.EV_CHARGER, self._channel_registry + ) self._logical_meter: LogicalMeter | None = None self._consumer: Consumer | None = None self._producer: Producer | None = None self._grid: Grid | None = None - self._ev_charger_pools: dict[frozenset[int], EVChargerPool] = {} + self._ev_charger_pools: dict[frozenset[int], EVChargerPoolReferenceStore] = {} self._battery_pools: dict[frozenset[int], BatteryPoolReferenceStore] = {} self._frequency_instance: GridFrequency | None = None self._voltage_instance: VoltageStreamer | None = None @@ -154,7 +163,9 @@ def producer(self) -> Producer: def ev_charger_pool( self, - ev_charger_ids: set[int] | None = None, + ev_charger_ids: abc.Set[int] | None = None, + name: str | None = None, + priority: int = -sys.maxsize - 1, ) -> EVChargerPool: """Return the corresponding EVChargerPool instance for the given ids. @@ -164,11 +175,20 @@ def ev_charger_pool( Args: ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the EVChargerPool. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor making the call. Returns: An EVChargerPool instance. """ from ..timeseries.ev_charger_pool import EVChargerPool + from ..timeseries.ev_charger_pool._ev_charger_pool_reference_store import ( + EVChargerPoolReferenceStore, + ) + + if not self._ev_power_wrapper.started: + self._ev_power_wrapper.start() # We use frozenset to make a hashable key from the input set. key: frozenset[int] = frozenset() @@ -176,12 +196,21 @@ def ev_charger_pool( key = frozenset(ev_charger_ids) if key not in self._ev_charger_pools: - self._ev_charger_pools[key] = EVChargerPool( + self._ev_charger_pools[key] = EVChargerPoolReferenceStore( channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), + status_receiver=self._ev_power_wrapper.status_channel.new_receiver( + maxsize=1 + ), + power_manager_requests_sender=( + self._ev_power_wrapper.proposal_channel.new_sender() + ), + power_manager_bounds_subs_sender=( + self._ev_power_wrapper.bounds_subscription_channel.new_sender() + ), component_ids=ev_charger_ids, ) - return self._ev_charger_pools[key] + return EVChargerPool(self._ev_charger_pools[key], name, priority) def grid(self) -> Grid: """Return the grid measuring point.""" @@ -352,21 +381,32 @@ def producer() -> Producer: return _get().producer() -def ev_charger_pool(ev_charger_ids: set[int] | None = None) -> EVChargerPool: - """Return the corresponding EVChargerPool instance for the given ids. +def ev_charger_pool( + ev_charger_ids: abc.Set[int] | None = None, + name: str | None = None, + priority: int = -sys.maxsize - 1, +) -> EVChargerPool: + """Return a new `EVChargerPool` instance for the given parameters. + + The priority value is used to resolve conflicts when multiple actors are trying to + propose different power values for the same set of EV chargers. - If an EVChargerPool instance for the given ids doesn't exist, a new one is - created and returned. + !!! note + When specifying priority, bigger values indicate higher priority. The default + priority is the lowest possible value. Args: ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the EVChargerPool. If not specified, all EV Chargers available in the component graph are used. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor making the call. Returns: - An EVChargerPool instance. + An `EVChargerPool` instance. """ - return _get().ev_charger_pool(ev_charger_ids) + return _get().ev_charger_pool(ev_charger_ids, name, priority) def battery_pool( diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index cc2748737..36b4efff5 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -34,12 +34,17 @@ class PowerWrapper: """Wrapper around the power managing and power distributing actors.""" - def __init__(self, channel_registry: ChannelRegistry): + def __init__( + self, component_category: ComponentCategory, channel_registry: ChannelRegistry + ): """Initialize the power control. Args: + component_category: The category of the components that actors started by + this instance of the PowerWrapper will be responsible for. channel_registry: A channel registry for use in the actors. """ + self._component_category = component_category self._channel_registry = channel_registry self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast( @@ -74,10 +79,10 @@ def _start_power_managing_actor(self) -> None: # constraint needs to be relaxed if the actor is extended to support other # components. if not component_graph.components( - component_categories={ComponentCategory.BATTERY} + component_categories={self._component_category} ): _logger.warning( - "No batteries found in the component graph. " + f"No {self._component_category} found in the component graph. " "The power managing actor will not be started." ) return @@ -85,7 +90,7 @@ def _start_power_managing_actor(self) -> None: from ..actor._power_managing._power_managing_actor import PowerManagingActor self._power_managing_actor = PowerManagingActor( - component_category=ComponentCategory.BATTERY, + component_category=self._component_category, proposals_receiver=self.proposal_channel.new_receiver(), bounds_subscription_receiver=( self.bounds_subscription_channel.new_receiver() @@ -109,10 +114,10 @@ def _start_power_distributing_actor(self) -> None: component_graph = microgrid.connection_manager.get().component_graph if not component_graph.components( - component_categories={ComponentCategory.BATTERY} + component_categories={self._component_category} ): _logger.warning( - "No batteries found in the component graph. " + f"No {self._component_category} found in the component graph. " "The power distributing actor will not be started." ) return @@ -123,6 +128,7 @@ def _start_power_distributing_actor(self) -> None: # Until the PowerManager is implemented, support for multiple use-case actors # will not be available in the high level interface. self._power_distributing_actor = PowerDistributingActor( + component_category=self._component_category, requests_receiver=self._power_distribution_requests_channel.new_receiver(), results_sender=self._power_distribution_results_channel.new_sender(), component_pool_status_sender=self.status_channel.new_sender(), diff --git a/src/frequenz/sdk/microgrid/component/_component_data.py b/src/frequenz/sdk/microgrid/component/_component_data.py index 05524cb88..984fa3c73 100644 --- a/src/frequenz/sdk/microgrid/component/_component_data.py +++ b/src/frequenz/sdk/microgrid/component/_component_data.py @@ -478,9 +478,10 @@ def is_ev_connected(self) -> bool: When the charger is not in an error state, whether an EV is connected to the charger. """ - return self.component_state not in ( - EVChargerComponentState.AUTHORIZATION_REJECTED, - EVChargerComponentState.ERROR, + return self.component_state in ( + EVChargerComponentState.READY, + EVChargerComponentState.CHARGING, + EVChargerComponentState.DISCHARGING, ) and self.cable_state in ( EVChargerCableState.EV_LOCKED, EVChargerCableState.EV_PLUGGED, diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 05268593b..26b69bde3 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -230,7 +230,7 @@ async def propose_discharge( ) @property - def battery_ids(self) -> abc.Set[int]: + def component_ids(self) -> abc.Set[int]: """Return ids of the batteries in the pool. Returns: diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py b/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py index 592c8fce6..999538785 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py @@ -3,14 +3,11 @@ """Interactions with EV Chargers.""" -from ._ev_charger_pool import EVChargerData, EVChargerPool, EVChargerPoolError -from ._set_current_bounds import ComponentCurrentLimit -from ._state_tracker import EVChargerState +from ._ev_charger_pool import EVChargerPool, EVChargerPoolError +from ._result_types import EVChargerPoolReport __all__ = [ - "ComponentCurrentLimit", "EVChargerPool", - "EVChargerData", "EVChargerPoolError", - "EVChargerState", + "EVChargerPoolReport", ] diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 940ee7602..6963c9fec 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -5,52 +5,30 @@ import asyncio -import logging +import typing import uuid -from asyncio import Task from collections import abc -from dataclasses import dataclass from datetime import timedelta -from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender - -from ..._internal._asyncio import cancel_and_await -from ...actor import ChannelRegistry, ComponentMetricRequest -from ...microgrid import connection_manager -from ...microgrid.component import ComponentCategory, ComponentMetricId -from .. import Sample, Sample3Phase -from .._quantities import Current, Power, Quantity +from ..._internal._channels import ReceiverFetcher +from ...actor import _power_managing +from ...timeseries import Bounds +from .._base_types import SystemBounds +from .._quantities import Current, Power from ..formula_engine import FormulaEngine, FormulaEngine3Phase -from ..formula_engine._formula_engine_pool import FormulaEnginePool from ..formula_engine._formula_generators import ( EVChargerCurrentFormula, EVChargerPowerFormula, FormulaGeneratorConfig, ) -from ._set_current_bounds import BoundsSetter, ComponentCurrentLimit -from ._state_tracker import EVChargerState, StateTracker - -_logger = logging.getLogger(__name__) +from ._ev_charger_pool_reference_store import EVChargerPoolReferenceStore +from ._result_types import EVChargerPoolReport class EVChargerPoolError(Exception): """An error that occurred in any of the EVChargerPool methods.""" -@dataclass(frozen=True) -class EVChargerData: - """Data for an EV Charger, including the 3-phase current and the component state.""" - - component_id: int - """The component ID of the EV Charger.""" - - current: Sample3Phase[Current] - """The 3-phase current of the EV Charger.""" - - state: EVChargerState - """The state of the EV Charger.""" - - class EVChargerPool: """An interface for interaction with pools of EV Chargers. @@ -64,21 +42,13 @@ class EVChargerPool: and 3-phase [`current`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.current] measurements of the EV Chargers in the pool. - - The - [`component_data`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.component_data] - method for fetching the 3-phase current and state of individual EV Chargers in - the pool. - - The - [`set_bounds`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.set_bounds] - method for limiting the max current of individual EV Chargers in the pool. """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, - channel_registry: ChannelRegistry, - resampler_subscription_sender: Sender[ComponentMetricRequest], - component_ids: set[int] | None = None, - repeat_interval: timedelta = timedelta(seconds=3.0), + ev_charger_pool_ref: EVChargerPoolReferenceStore, + name: str | None, + priority: int, ) -> None: """Create an `EVChargerPool` instance. @@ -88,43 +58,75 @@ def __init__( method for creating `EVChargerPool` instances. Args: - channel_registry: A channel registry instance shared with the resampling - actor. - resampler_subscription_sender: A sender for sending metric requests to the - resampling actor. - component_ids: An optional list of component_ids belonging to this pool. If - not specified, IDs of all EV Chargers in the microgrid will be fetched - from the component graph. - repeat_interval: Interval after which to repeat the last set bounds to the - microgrid API, if no new calls to `set_bounds` have been made. + ev_charger_pool_ref: The EV charger pool reference store instance. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor using this wrapper. """ - self._channel_registry: ChannelRegistry = channel_registry - self._repeat_interval: timedelta = repeat_interval - self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( - resampler_subscription_sender - ) - self._component_ids: set[int] = set() - if component_ids is not None: - self._component_ids = component_ids - else: - graph = connection_manager.get().component_graph - self._component_ids = { - evc.component_id - for evc in graph.components( - component_categories={ComponentCategory.EV_CHARGER} - ) - } - self._state_tracker: StateTracker | None = None - self._status_streams: dict[int, tuple[Task[None], Broadcast[EVChargerData]]] = ( - {} - ) - self._namespace: str = f"ev-charger-pool-{uuid.uuid4()}" - self._formula_pool: FormulaEnginePool = FormulaEnginePool( - self._namespace, - self._channel_registry, - self._resampler_subscription_sender, + self._ev_charger_pool = ev_charger_pool_ref + unique_id = str(uuid.uuid4()) + self._source_id = unique_id if name is None else f"{name}-{unique_id}" + self._priority = priority + + async def propose_power( + self, + power: Power | None, + *, + request_timeout: timedelta = timedelta(seconds=5.0), + bounds: Bounds[Power | None] = Bounds(None, None), + ) -> None: + """Send a proposal to the power manager for the pool's set of EV chargers. + + This proposal is for the maximum power that can be set for the EV chargers in + the pool. The actual consumption might be lower based on the number of phases + an EV is drawing power from, and its current state of charge. + + Power values need to follow the Passive Sign Convention (PSC). That is, positive + values indicate charge power and negative values indicate discharge power. + Discharging from EV chargers is currently not supported. + + If the same EV chargers are shared by multiple actors, the power manager will + consider the priority of the actors, the bounds they set, and their preferred + power, when calculating the target power for the EV chargers + + The preferred power of lower priority actors will take precedence as long as + they respect the bounds set by higher priority actors. If lower priority actors + request power values outside of the bounds set by higher priority actors, the + target power will be the closest value to the preferred power that is within the + bounds. + + When there are no other actors trying to use the same EV chargers, the actor's + preferred power would be set as the target power, as long as it falls within the + system power bounds for the EV chargers. + + The result of the request can be accessed using the receiver returned from the + [`power_status`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.power_status] + method, which also streams the bounds that an actor should comply with, based on + its priority. + + Args: + power: The power to propose for the EV chargers in the pool. If `None`, + this proposal will not have any effect on the target power, unless + bounds are specified. If both are `None`, it is equivalent to not + having a proposal or withdrawing a previous one. + request_timeout: The timeout for the request. + bounds: The power bounds for the proposal. These bounds will apply to + actors with a lower priority, and can be overridden by bounds from + actors with a higher priority. If None, the power bounds will be set to + the maximum power of the batteries in the pool. This is currently and + experimental feature. + """ + await self._ev_charger_pool.power_manager_requests_sender.send( + _power_managing.Proposal( + source_id=self._source_id, + preferred_power=power, + bounds=bounds, + component_ids=self._ev_charger_pool.component_ids, + priority=self._priority, + creation_time=asyncio.get_running_loop().time(), + request_timeout=request_timeout, + ) ) - self._bounds_setter: BoundsSetter | None = None @property def component_ids(self) -> abc.Set[int]: @@ -133,7 +135,7 @@ def component_ids(self) -> abc.Set[int]: Returns: Set of managed component IDs. """ - return self._component_ids + return self._ev_charger_pool.component_ids @property def current(self) -> FormulaEngine3Phase[Current]: @@ -151,10 +153,14 @@ def current(self) -> FormulaEngine3Phase[Current]: A FormulaEngine that will calculate and stream the total current of all EV Chargers. """ - engine = self._formula_pool.from_3_phase_current_formula_generator( - "ev_charger_total_current", - EVChargerCurrentFormula, - FormulaGeneratorConfig(component_ids=self._component_ids), + engine = ( + self._ev_charger_pool.formula_pool.from_3_phase_current_formula_generator( + "ev_charger_total_current", + EVChargerCurrentFormula, + FormulaGeneratorConfig( + component_ids=self._ev_charger_pool.component_ids + ), + ) ) assert isinstance(engine, FormulaEngine3Phase) return engine @@ -175,181 +181,53 @@ def power(self) -> FormulaEngine[Power]: A FormulaEngine that will calculate and stream the total power of all EV Chargers. """ - engine = self._formula_pool.from_power_formula_generator( + engine = self._ev_charger_pool.formula_pool.from_power_formula_generator( "ev_charger_power", EVChargerPowerFormula, FormulaGeneratorConfig( - component_ids=self._component_ids, + component_ids=self._ev_charger_pool.component_ids, ), ) assert isinstance(engine, FormulaEngine) return engine - def component_data(self, component_id: int) -> Receiver[EVChargerData]: - """Stream 3-phase current values and state of an EV Charger. + @property + def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: + """Get a receiver to receive new power status reports when they change. - Args: - component_id: id of the EV Charger for which data is requested. + These include + - the current inclusion/exclusion bounds available for the pool's priority, + - the current target power for the pool's set of batteries, + - the result of the last distribution request for the pool's set of batteries. Returns: - A receiver that streams objects containing 3-phase current and state of - an EV Charger. + A receiver that will stream power status reports for the pool's priority. """ - if recv := self._status_streams.get(component_id, None): - task, output_chan = recv - if not task.done(): - return output_chan.new_receiver() - _logger.warning("Restarting component_status for id: %s", component_id) - else: - output_chan = Broadcast[EVChargerData]( - f"evpool-component_status-{component_id}" + sub = _power_managing.ReportRequest( + source_id=self._source_id, + priority=self._priority, + component_ids=self._ev_charger_pool.component_ids, + ) + self._ev_charger_pool.power_bounds_subs[sub.get_channel_name()] = ( + asyncio.create_task( + self._ev_charger_pool.power_manager_bounds_subs_sender.send(sub) ) - - task = asyncio.create_task( - self._stream_component_data(component_id, output_chan.new_sender()) ) + channel = self._ev_charger_pool.channel_registry.get_or_create( + _power_managing._Report, # pylint: disable=protected-access + sub.get_channel_name(), + ) + channel.resend_latest = True - self._status_streams[component_id] = (task, output_chan) - - return output_chan.new_receiver() - - async def set_bounds(self, component_id: int, max_current: Current) -> None: - """Send given max current bound for the given EV Charger to the microgrid API. - - Bounds are used to limit the max current drawn by an EV, although the exact - value will be determined by the EV. - - Args: - component_id: ID of EV Charger to set the current bounds to. - max_current: maximum current that an EV can draw from this EV Charger. - """ - if not self._bounds_setter: - self._bounds_setter = BoundsSetter(self._repeat_interval) - await self._bounds_setter.set(component_id, max_current.as_amperes()) - - def new_bounds_sender(self) -> Sender[ComponentCurrentLimit]: - """Return a `Sender` for setting EV Charger current bounds with. - - Bounds are used to limit the max current drawn by an EV, although the exact - value will be determined by the EV. - - Returns: - A new `Sender`. - """ - if not self._bounds_setter: - self._bounds_setter = BoundsSetter(self._repeat_interval) - return self._bounds_setter.new_bounds_sender() + # More details on why the cast is needed here: + # https://github.com/frequenz-floss/frequenz-sdk-python/issues/823 + return typing.cast(ReceiverFetcher[EVChargerPoolReport], channel) async def stop(self) -> None: """Stop all tasks and channels owned by the EVChargerPool.""" - if self._bounds_setter: - await self._bounds_setter.stop() - if self._state_tracker: - await self._state_tracker.stop() - await self._formula_pool.stop() - for stream in self._status_streams.values(): - task, chan = stream - await chan.close() - await cancel_and_await(task) - - async def _get_current_streams(self, component_id: int) -> tuple[ - Receiver[Sample[Quantity]], - Receiver[Sample[Quantity]], - Receiver[Sample[Quantity]], - ]: - """Fetch current streams from the resampler for each phase. - - Args: - component_id: id of EV Charger for which current streams are being fetched. - - Returns: - A tuple of 3 receivers stream resampled current values for the given - component id, one for each phase. - """ - - async def resampler_subscribe( - metric_id: ComponentMetricId, - ) -> Receiver[Sample[Quantity]]: - request = ComponentMetricRequest( - namespace="ev-pool", - component_id=component_id, - metric_id=metric_id, - start_time=None, - ) - await self._resampler_subscription_sender.send(request) - return self._channel_registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ).new_receiver() - - return ( - await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_1), - await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_2), - await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_3), - ) + await self._ev_charger_pool.stop() - async def _stream_component_data( - self, - component_id: int, - sender: Sender[EVChargerData], - ) -> None: - """Stream 3-phase current values and state of an EV Charger. - - Args: - component_id: id of the EV Charger for which data is requested. - sender: A sender to stream EV Charger data to. - - Raises: - ChannelClosedError: If the channels from the resampler are closed. - """ - if not self._state_tracker: - self._state_tracker = StateTracker(self._component_ids) - - (phase_1_rx, phase_2_rx, phase_3_rx) = await self._get_current_streams( - component_id - ) - while True: - try: - (phase_1, phase_2, phase_3) = ( - await phase_1_rx.receive(), - await phase_2_rx.receive(), - await phase_3_rx.receive(), - ) - except ChannelClosedError: - _logger.exception("Streams closed for component_id=%s.", component_id) - raise - - sample = Sample3Phase( - timestamp=phase_1.timestamp, - value_p1=( - None - if phase_1.value is None - else Current.from_amperes(phase_1.value.base_value) - ), - value_p2=( - None - if phase_2.value is None - else Current.from_amperes(phase_2.value.base_value) - ), - value_p3=( - None - if phase_3.value is None - else Current.from_amperes(phase_3.value.base_value) - ), - ) - - if ( - phase_1.value is None - and phase_2.value is None - and phase_3.value is None - ): - state = EVChargerState.MISSING - else: - state = self._state_tracker.get(component_id) - - await sender.send( - EVChargerData( - component_id=component_id, - current=sample, - state=state, - ) - ) + @property + def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: + """Return a receiver for the system power bounds.""" + return self._ev_charger_pool.bounds_channel diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py new file mode 100644 index 000000000..ecc40eeb9 --- /dev/null +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -0,0 +1,103 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manages shared state/tasks for a set of EV chargers.""" + + +import asyncio +import uuid +from collections import abc + +from frequenz.channels import Broadcast, Receiver, Sender + +from ...actor import ChannelRegistry, ComponentMetricRequest +from ...actor._power_managing._base_classes import Proposal, ReportRequest +from ...actor.power_distributing import ComponentPoolStatus +from ...microgrid import connection_manager +from ...microgrid.component import ComponentCategory +from .._base_types import SystemBounds +from ..formula_engine._formula_engine_pool import FormulaEnginePool +from ._system_bounds_tracker import EVCSystemBoundsTracker + + +class EVChargerPoolReferenceStore: + """A class for maintaining the shared state/tasks for a set of pool of EV chargers. + + This includes ownership of + - the formula engine pool and metric calculators. + - the tasks for calculating system bounds for the EV chargers. + + These are independent of the priority of the actors and can be shared between + multiple users of the same set of EV chargers. + + They are exposed through the EVChargerPool class. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + channel_registry: ChannelRegistry, + resampler_subscription_sender: Sender[ComponentMetricRequest], + status_receiver: Receiver[ComponentPoolStatus], + power_manager_requests_sender: Sender[Proposal], + power_manager_bounds_subs_sender: Sender[ReportRequest], + component_ids: abc.Set[int] | None = None, + ): + """Create an instance of the class. + + Args: + channel_registry: A channel registry instance shared with the resampling + actor. + resampler_subscription_sender: A sender for sending metric requests to the + resampling actor. + status_receiver: A receiver that streams the status of the EV Chargers in + the pool. + power_manager_requests_sender: A Channel sender for sending power + requests to the power managing actor. + power_manager_bounds_subs_sender: A Channel sender for sending power bounds + subscription requests to the power managing actor. + component_ids: An optional list of component_ids belonging to this pool. If + not specified, IDs of all EV Chargers in the microgrid will be fetched + from the component graph. + """ + self.channel_registry = channel_registry + self.resampler_subscription_sender = resampler_subscription_sender + self.status_receiver = status_receiver + self.power_manager_requests_sender = power_manager_requests_sender + self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender + + if component_ids is not None: + self.component_ids: frozenset[int] = frozenset(component_ids) + else: + graph = connection_manager.get().component_graph + self.component_ids = frozenset( + { + evc.component_id + for evc in graph.components( + component_categories={ComponentCategory.EV_CHARGER} + ) + } + ) + + self.power_bounds_subs: dict[str, asyncio.Task[None]] = {} + + self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}" + self.formula_pool = FormulaEnginePool( + self.namespace, + self.channel_registry, + self.resampler_subscription_sender, + ) + + self.bounds_channel: Broadcast[SystemBounds] = Broadcast( + name=f"System Bounds for EV Chargers: {component_ids}" + ) + self.bounds_tracker: EVCSystemBoundsTracker = EVCSystemBoundsTracker( + self.component_ids, + self.status_receiver, + self.bounds_channel.new_sender(), + ) + self.bounds_tracker.start() + + async def stop(self) -> None: + """Stop all tasks and channels owned by the EVChargerPool.""" + await self.formula_pool.stop() + await self.bounds_tracker.stop() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py new file mode 100644 index 000000000..a3c8686fc --- /dev/null +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py @@ -0,0 +1,36 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Types for exposing EV charger pool reports.""" + +import typing + +from ...actor import power_distributing +from .._base_types import Bounds +from .._quantities import Power + + +class EVChargerPoolReport(typing.Protocol): + """A status report for an EV chargers pool.""" + + target_power: Power | None + """The currently set power for the batteries.""" + + distribution_result: power_distributing.Result | None + """The result of the last power distribution. + + This is `None` if no power distribution has been performed yet. + """ + + @property + def bounds(self) -> Bounds[Power] | None: + """The usable bounds for the batteries. + + These bounds are adjusted to any restrictions placed by actors with higher + priorities. + + There might be exclusion zones within these bounds. If necessary, the + [`adjust_to_bounds`][frequenz.sdk.timeseries.battery_pool.BatteryPoolReport.adjust_to_bounds] + method may be used to check if a desired power value fits the bounds, or to get + the closest possible power values that do fit the bounds. + """ diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py deleted file mode 100644 index de216b22c..000000000 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py +++ /dev/null @@ -1,133 +0,0 @@ -# License: MIT -# Copyright © 2023 Frequenz Energy-as-a-Service GmbH - -"""A task for sending EV Charger power bounds to the microgrid API.""" - -import asyncio -import logging -from dataclasses import dataclass -from datetime import timedelta - -from frequenz.channels import Broadcast, Sender -from frequenz.channels.util import Timer, select, selected_from - -from ..._internal._asyncio import cancel_and_await -from ..._internal._channels import LatestValueCache -from ...microgrid import connection_manager -from ...microgrid.component import ComponentCategory - -_logger = logging.getLogger(__name__) - - -@dataclass -class ComponentCurrentLimit: - """A current limit, to be sent to the EV Charger.""" - - component_id: int - """The component ID of the EV Charger.""" - - max_amps: float - """The maximum current in amps, that an EV can draw from this EV Charger.""" - - -class BoundsSetter: - """A task for sending EV Charger power bounds to the microgrid API. - - Also, periodically resends the last set bounds to the microgrid API, if no new - bounds have been set. - """ - - _NUM_PHASES = 3 - """Number of phases in the microgrid.""" - - def __init__(self, repeat_interval: timedelta) -> None: - """Create a `BoundsSetter` instance. - - Args: - repeat_interval: Interval after which to repeat the last set bounds to the - microgrid API, if no new calls to `set_bounds` have been made. - """ - self._repeat_interval = repeat_interval - - self._task: asyncio.Task[None] = asyncio.create_task(self._run()) - self._bounds_chan: Broadcast[ComponentCurrentLimit] = Broadcast("BoundsSetter") - self._bounds_rx = self._bounds_chan.new_receiver() - self._bounds_tx = self._bounds_chan.new_sender() - - async def set(self, component_id: int, max_amps: float) -> None: - """Send the given current limit to the microgrid for the given component id. - - Args: - component_id: ID of EV Charger to set the current bounds to. - max_amps: maximum current in amps, that an EV can draw from this EV Charger. - """ - await self._bounds_tx.send(ComponentCurrentLimit(component_id, max_amps)) - - def new_bounds_sender(self) -> Sender[ComponentCurrentLimit]: - """Return a `Sender` for setting EV Charger current bounds with. - - Returns: - A new `Sender`. - """ - return self._bounds_chan.new_sender() - - async def stop(self) -> None: - """Stop the BoundsSetter.""" - await self._bounds_chan.close() - await cancel_and_await(self._task) - - async def _run(self) -> None: - """Wait for new bounds and forward them to the microgrid API. - - Also, periodically resend the last set bounds to the microgrid API, if no new - bounds have been set. - - Raises: - RuntimeError: If no meters are found in the component graph. - ValueError: If the meter channel is closed. - """ - api_client = connection_manager.get().api_client - graph = connection_manager.get().component_graph - meters = graph.components(component_categories={ComponentCategory.METER}) - if not meters: - err = "No meters found in the component graph." - _logger.error(err) - raise RuntimeError(err) - - meter_data = LatestValueCache( - await api_client.meter_data(next(iter(meters)).component_id) - ) - latest_bound: dict[int, ComponentCurrentLimit] = {} - - bound_chan = self._bounds_rx - timer = Timer.timeout(timedelta(self._repeat_interval.total_seconds())) - - async for selected in select(bound_chan, timer): - meter = meter_data.get() - if meter is None: - raise ValueError("Meter channel closed.") - - if selected_from(selected, bound_chan): - bound: ComponentCurrentLimit = selected.value - if ( - bound.component_id in latest_bound - and latest_bound[bound.component_id] == bound - ): - continue - latest_bound[bound.component_id] = bound - min_voltage = min(meter.voltage_per_phase) - _logger.info("sending new bounds: %s", bound) - await api_client.set_bounds( - bound.component_id, - 0, - bound.max_amps * min_voltage * self._NUM_PHASES, - ) - elif selected_from(selected, timer): - for bound in latest_bound.values(): - min_voltage = min(meter.voltage_per_phase) - _logger.debug("resending bounds: %s", bound) - await api_client.set_bounds( - bound.component_id, - 0, - bound.max_amps * min_voltage * self._NUM_PHASES, - ) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_state_tracker.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_state_tracker.py deleted file mode 100644 index abbf6c629..000000000 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_state_tracker.py +++ /dev/null @@ -1,135 +0,0 @@ -# License: MIT -# Copyright © 2023 Frequenz Energy-as-a-Service GmbH - -"""State tracking for EV Charger pools.""" - -from __future__ import annotations - -import asyncio -from enum import Enum - -from frequenz.channels import Receiver -from frequenz.channels.util import Merge - -from ... import microgrid -from ..._internal._asyncio import cancel_and_await -from ...microgrid.component import ( - EVChargerCableState, - EVChargerComponentState, - EVChargerData, -) - - -class EVChargerState(Enum): - """State of individual EV charger.""" - - UNSPECIFIED = "UNSPECIFIED" - """The state is unspecified.""" - - MISSING = "MISSING" - """The EV Charger is missing.""" - - IDLE = "IDLE" - """The EV Charger is idle.""" - - EV_PLUGGED = "EV_PLUGGED" - """The EV Charger has an EV plugged in.""" - - EV_LOCKED = "EV_LOCKED" - """The EV Charger has an EV plugged in and locked.""" - - ERROR = "ERROR" - """The EV Charger is in an error state.""" - - @classmethod - def from_ev_charger_data(cls, data: EVChargerData) -> EVChargerState: - """Create an `EVChargerState` instance from component data. - - Args: - data: ev charger data coming from microgrid. - - Returns: - An `EVChargerState` instance. - """ - if data.component_state == EVChargerComponentState.UNSPECIFIED: - return EVChargerState.UNSPECIFIED - if data.component_state in ( - EVChargerComponentState.AUTHORIZATION_REJECTED, - EVChargerComponentState.ERROR, - ): - return EVChargerState.ERROR - - if data.cable_state == EVChargerCableState.UNSPECIFIED: - return EVChargerState.UNSPECIFIED - if data.cable_state == EVChargerCableState.EV_LOCKED: - return EVChargerState.EV_LOCKED - if data.cable_state == EVChargerCableState.EV_PLUGGED: - return EVChargerState.EV_PLUGGED - return EVChargerState.IDLE - - def is_ev_connected(self) -> bool: - """Check whether an EV is connected to the charger. - - Returns: - Whether an EV is connected to the charger. - """ - return self in (EVChargerState.EV_PLUGGED, EVChargerState.EV_LOCKED) - - -class StateTracker: - """A class for keeping track of the states of all EV Chargers in a pool.""" - - def __init__(self, component_ids: set[int]) -> None: - """Create a `_StateTracker` instance. - - Args: - component_ids: EV Charger component ids to track the states of. - """ - self._component_ids = component_ids - self._task: asyncio.Task[None] = asyncio.create_task(self._run()) - self._merged_stream: Merge[EVChargerData] | None = None - - # Initialize all components to the `MISSING` state. This will change as data - # starts arriving from the individual components. - self._states: dict[int, EVChargerState] = { - component_id: EVChargerState.MISSING for component_id in component_ids - } - - def get(self, component_id: int) -> EVChargerState: - """Return the current state of the EV Charger with the given component ID. - - Args: - component_id: id of the EV Charger whose state is being fetched. - - Returns: - An `EVChargerState` value corresponding to the given component id. - """ - return self._states[component_id] - - def _update( - self, - data: EVChargerData, - ) -> None: - """Update the state of an EV Charger, from a new data point. - - Args: - data: component data from the microgrid, for an EV Charger in the pool. - """ - evc_id = data.component_id - new_state = EVChargerState.from_ev_charger_data(data) - self._states[evc_id] = new_state - - async def _run(self) -> None: - api_client = microgrid.connection_manager.get().api_client - streams: list[Receiver[EVChargerData]] = await asyncio.gather( - *[api_client.ev_charger_data(cid) for cid in self._component_ids] - ) - self._merged_stream = Merge(*streams) - async for data in self._merged_stream: - self._update(data) - - async def stop(self) -> None: - """Stop the status tracker.""" - await cancel_and_await(self._task) - if self._merged_stream: - await self._merged_stream.stop() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py new file mode 100644 index 000000000..c0183a365 --- /dev/null +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py @@ -0,0 +1,137 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""System bounds tracker for the EV chargers.""" + + +import asyncio +import logging +from collections import abc +from datetime import datetime + +from frequenz.channels import Receiver, Sender +from frequenz.channels.util import Merge, select, selected_from + +from ... import microgrid +from ...actor import BackgroundService +from ...actor.power_distributing._component_status import ComponentPoolStatus +from .. import Power +from .._base_types import Bounds, SystemBounds + +_logger = logging.getLogger(__name__) + + +class EVCSystemBoundsTracker(BackgroundService): + """Track the system bounds for the EV chargers.""" + + def __init__( + self, + component_ids: abc.Set[int], + status_receiver: Receiver[ComponentPoolStatus], + bounds_sender: Sender[SystemBounds], + ): + """Initialize the system bounds tracker. + + Args: + component_ids: The ids of the components to track. + status_receiver: A receiver that streams the status of the EV Chargers in + the pool. + bounds_sender: A sender to send the system bounds to. + """ + super().__init__() + + self._component_ids = component_ids + self._status_receiver = status_receiver + self._bounds_sender = bounds_sender + self._latest_component_data: dict[int, microgrid.component.EVChargerData] = {} + self._last_sent_bounds: SystemBounds | None = None + self._component_pool_status = ComponentPoolStatus(set(), set()) + + def start(self) -> None: + """Start the EV charger system bounds tracker.""" + self._tasks.add(asyncio.create_task(self._run())) + + async def _send_bounds(self) -> None: + if not self._latest_component_data: + return + inclusion_bounds = Bounds( + lower=Power.from_watts( + sum( + data.active_power_inclusion_lower_bound + for data in self._latest_component_data.values() + ) + ), + upper=Power.from_watts( + sum( + data.active_power_inclusion_upper_bound + for data in self._latest_component_data.values() + ) + ), + ) + exclusion_bounds = Bounds( + lower=Power.from_watts( + sum( + data.active_power_exclusion_lower_bound + for data in self._latest_component_data.values() + ) + ), + upper=Power.from_watts( + sum( + data.active_power_exclusion_upper_bound + for data in self._latest_component_data.values() + ) + ), + ) + + if ( + self._last_sent_bounds is None + or inclusion_bounds != self._last_sent_bounds.inclusion_bounds + or exclusion_bounds != self._last_sent_bounds.exclusion_bounds + ): + self._last_sent_bounds = SystemBounds( + timestamp=max( + msg.timestamp for msg in self._latest_component_data.values() + ), + inclusion_bounds=inclusion_bounds, + exclusion_bounds=exclusion_bounds, + ) + await self._bounds_sender.send(self._last_sent_bounds) + + async def _run(self) -> None: + """Run the system bounds tracker.""" + api_client = microgrid.connection_manager.get().api_client + status_rx = self._status_receiver + ev_data_rx = Merge( + *( + await asyncio.gather( + *[api_client.ev_charger_data(cid) for cid in self._component_ids] + ) + ) + ) + + try: + async for selected in select(status_rx, ev_data_rx): + if selected_from(selected, status_rx): + self._component_pool_status = selected.value + to_pop = [] + for comp_id in self._latest_component_data: + if ( + comp_id not in self._component_pool_status.working + and comp_id not in self._component_pool_status.uncertain + ): + to_pop.append(comp_id) + for comp_id in to_pop: + self._latest_component_data.pop(comp_id, None) + elif selected_from(selected, ev_data_rx): + data = selected.value + comp_id = data.component_id + if ( + comp_id not in self._component_pool_status.working + and comp_id not in self._component_pool_status.uncertain + ): + continue + self._latest_component_data[data.component_id] = data + + await self._send_bounds() + except: + _logger.exception("bounds tracker failed") diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 56bf0636f..fd5e71e00 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -183,6 +183,7 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None: results_channel = Broadcast[Result]("power_distributor results") battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -210,6 +211,7 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No results_channel = Broadcast[Result]("power_distributor results") battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -273,6 +275,7 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -330,6 +333,7 @@ async def test_power_distributor_exclusion_bounds( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -429,6 +433,7 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -505,6 +510,7 @@ async def test_two_batteries_one_broken_one_inverters( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -557,6 +563,7 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -605,6 +612,7 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -687,6 +695,7 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -770,6 +779,7 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -832,6 +842,7 @@ async def test_connected_but_not_requested_batteries( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -886,6 +897,7 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: await self._patch_battery_pool_status(mocks, mocker, request.component_ids) battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -937,6 +949,7 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1007,6 +1020,7 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1049,6 +1063,7 @@ async def test_power_distributor_invalid_battery_id( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1090,6 +1105,7 @@ async def test_power_distributor_one_user_adjust_power_consume( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1133,6 +1149,7 @@ async def test_power_distributor_one_user_adjust_power_supply( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1176,6 +1193,7 @@ async def test_power_distributor_one_user_adjust_power_success( battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1212,6 +1230,7 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1262,6 +1281,7 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus]("battery_status") async with PowerDistributingActor( + component_category=ComponentCategory.BATTERY, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), diff --git a/tests/timeseries/_battery_pool/test_battery_pool.py b/tests/timeseries/_battery_pool/test_battery_pool.py index d65a3c7ab..3766dd005 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool.py +++ b/tests/timeseries/_battery_pool/test_battery_pool.py @@ -659,7 +659,7 @@ async def run_capacity_test( # pylint: disable=too-many-locals ) compare_messages(msg, expected, WAIT_FOR_COMPONENT_DATA_SEC + 0.2) - batteries_in_pool = list(battery_pool.battery_ids) + batteries_in_pool = list(battery_pool.component_ids) scenarios: list[Scenario[Sample[Energy]]] = [ Scenario( batteries_in_pool[0], @@ -851,7 +851,7 @@ async def run_soc_test(setup_args: SetupArgs) -> None: ) compare_messages(msg, expected, WAIT_FOR_COMPONENT_DATA_SEC + 0.2) - batteries_in_pool = list(battery_pool.battery_ids) + batteries_in_pool = list(battery_pool.component_ids) scenarios: list[Scenario[Sample[Percentage]]] = [ Scenario( batteries_in_pool[0], @@ -1007,7 +1007,7 @@ async def run_power_bounds_test( # pylint: disable=too-many-locals ) compare_messages(msg, expected, WAIT_FOR_COMPONENT_DATA_SEC + 0.2) - batteries_in_pool = list(battery_pool.battery_ids) + batteries_in_pool = list(battery_pool.component_ids) scenarios: list[Scenario[SystemBounds]] = [ Scenario( next(iter(bat_invs_map[batteries_in_pool[0]])), @@ -1241,7 +1241,7 @@ async def run_temperature_test( # pylint: disable=too-many-locals expected = Sample(now, value=Temperature.from_celsius(25.0)) compare_messages(msg, expected, WAIT_FOR_COMPONENT_DATA_SEC + 0.2) - batteries_in_pool = list(battery_pool.battery_ids) + batteries_in_pool = list(battery_pool.component_ids) bat_0, bat_1 = batteries_in_pool scenarios: list[Scenario[Sample[Temperature]]] = [ Scenario( diff --git a/tests/timeseries/test_ev_charger_pool.py b/tests/timeseries/test_ev_charger_pool.py index 5e6620a14..136e1d949 100644 --- a/tests/timeseries/test_ev_charger_pool.py +++ b/tests/timeseries/test_ev_charger_pool.py @@ -4,89 +4,16 @@ """Tests for the `EVChargerPool`.""" -import asyncio -from contextlib import AsyncExitStack, asynccontextmanager -from typing import Any, AsyncIterator - from pytest_mock import MockerFixture from frequenz.sdk import microgrid -from frequenz.sdk.microgrid.component import ( - EVChargerCableState, - EVChargerComponentState, -) -from frequenz.sdk.timeseries._quantities import Current, Power -from frequenz.sdk.timeseries.ev_charger_pool._state_tracker import ( - EVChargerState, - StateTracker, -) +from frequenz.sdk.timeseries._quantities import Power from tests.timeseries.mock_microgrid import MockMicrogrid -@asynccontextmanager -async def new_state_tracker(*args: Any, **kwargs: Any) -> AsyncIterator[StateTracker]: - """Create a state tracker.""" - tracker = StateTracker(*args, **kwargs) - try: - yield tracker - finally: - await tracker.stop() - - class TestEVChargerPool: """Tests for the `EVChargerPool`.""" - async def test_state_updates(self, mocker: MockerFixture) -> None: - """Test ev charger state updates are visible.""" - mockgrid = MockMicrogrid( - grid_meter=False, - api_client_streaming=True, - sample_rate_s=0.01, - mocker=mocker, - ) - mockgrid.add_ev_chargers(5) - - async with mockgrid, new_state_tracker(set(mockgrid.evc_ids)) as state_tracker: - await asyncio.sleep(0.05) - - async def check_states( - expected: dict[int, EVChargerState], - ) -> None: - await mockgrid.send_ev_charger_data( - [0.0] * 5 # for testing status updates, the values don't matter. - ) - await asyncio.sleep(0.05) - for comp_id, exp_state in expected.items(): - assert state_tracker.get(comp_id) == exp_state - - # check that all chargers are in idle state. - expected_states = { - evc_id: EVChargerState.IDLE for evc_id in mockgrid.evc_ids - } - assert len(expected_states) == 5 - await check_states(expected_states) - - # check that EV_PLUGGED state gets set - evc_2_id = mockgrid.evc_ids[2] - mockgrid.evc_cable_states[evc_2_id] = EVChargerCableState.EV_PLUGGED - mockgrid.evc_component_states[evc_2_id] = EVChargerComponentState.READY - expected_states[evc_2_id] = EVChargerState.EV_PLUGGED - await check_states(expected_states) - - # check that EV_LOCKED state gets set - evc_3_id = mockgrid.evc_ids[3] - mockgrid.evc_cable_states[evc_3_id] = EVChargerCableState.EV_LOCKED - mockgrid.evc_component_states[evc_3_id] = EVChargerComponentState.READY - expected_states[evc_3_id] = EVChargerState.EV_LOCKED - await check_states(expected_states) - - # check that ERROR state gets set - evc_1_id = mockgrid.evc_ids[1] - mockgrid.evc_cable_states[evc_1_id] = EVChargerCableState.EV_LOCKED - mockgrid.evc_component_states[evc_1_id] = EVChargerComponentState.ERROR - expected_states[evc_1_id] = EVChargerState.ERROR - await check_states(expected_states) - async def test_ev_power( # pylint: disable=too-many-locals self, mocker: MockerFixture, @@ -104,106 +31,3 @@ async def test_ev_power( # pylint: disable=too-many-locals await mockgrid.mock_resampler.send_evc_power([2.0, 4.0, -10.0]) assert (await power_receiver.receive()).value == Power.from_watts(-4.0) - - async def test_ev_component_data(self, mocker: MockerFixture) -> None: - """Test the component_data method of EVChargerPool.""" - mockgrid = MockMicrogrid( - grid_meter=False, - api_client_streaming=True, - sample_rate_s=0.05, - mocker=mocker, - ) - mockgrid.add_ev_chargers(1) - - async with mockgrid, AsyncExitStack() as stack: - evc_id = mockgrid.evc_ids[0] - ev_pool = microgrid.ev_charger_pool() - stack.push_async_callback(ev_pool.stop) - - recv = ev_pool.component_data(evc_id) - - await mockgrid.send_ev_charger_data( - [0.0] # only the status gets used from this. - ) - await asyncio.sleep(0.05) - await mockgrid.mock_resampler.send_evc_current([[2, 3, 5]]) - status = await recv.receive() - assert ( - status.current.value_p1, - status.current.value_p2, - status.current.value_p3, - ) == ( - Current.from_amperes(2), - Current.from_amperes(3), - Current.from_amperes(5), - ) - assert status.state == EVChargerState.MISSING - - await mockgrid.send_ev_charger_data( - [0.0] # only the status gets used from this. - ) - await asyncio.sleep(0.05) - await mockgrid.mock_resampler.send_evc_current([[2, 3, None]]) - status = await recv.receive() - assert ( - status.current.value_p1, - status.current.value_p2, - status.current.value_p3, - ) == ( - Current.from_amperes(2), - Current.from_amperes(3), - None, - ) - assert status.state == EVChargerState.IDLE - - await mockgrid.send_ev_charger_data( - [0.0] # only the status gets used from this. - ) - await asyncio.sleep(0.05) - await mockgrid.mock_resampler.send_evc_current([[None, None, None]]) - status = await recv.receive() - assert ( - status.current.value_p1, - status.current.value_p2, - status.current.value_p3, - ) == ( - None, - None, - None, - ) - assert status.state == EVChargerState.MISSING - - mockgrid.evc_cable_states[evc_id] = EVChargerCableState.EV_PLUGGED - await mockgrid.send_ev_charger_data( - [0.0] # only the status gets used from this. - ) - await asyncio.sleep(0.05) - await mockgrid.mock_resampler.send_evc_current([[None, None, None]]) - status = await recv.receive() - assert ( - status.current.value_p1, - status.current.value_p2, - status.current.value_p3, - ) == ( - None, - None, - None, - ) - assert status.state == EVChargerState.MISSING - - await mockgrid.send_ev_charger_data( - [0.0] # only the status gets used from this. - ) - await asyncio.sleep(0.05) - await mockgrid.mock_resampler.send_evc_current([[4, None, None]]) - status = await recv.receive() - assert ( - status.current.value_p1, - status.current.value_p2, - status.current.value_p3, - ) == ( - Current.from_amperes(4), - None, - None, - ) - assert status.state == EVChargerState.EV_PLUGGED