diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index 8f6e16960..0fe910f71 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -19,9 +19,8 @@ from ....microgrid import connection_manager from ....microgrid.component import BatteryData, ComponentCategory, InverterData from ....timeseries._quantities import Power -from .._battery_status_tracker import BatteryStatusTracker from .._component_pool_status_tracker import ComponentPoolStatusTracker -from .._component_status import ComponentPoolStatus +from .._component_status import BatteryStatusTracker, ComponentPoolStatus from .._distribution_algorithm import ( AggregatedBatteryData, BatteryDistributionAlgorithm, @@ -146,8 +145,8 @@ def __init__( self._component_pool_status_tracker = ComponentPoolStatusTracker( component_ids=set(self._battery_ids), component_status_sender=component_pool_status_sender, - max_blocking_duration_sec=30.0, - max_data_age_sec=10.0, + max_blocking_duration=timedelta(seconds=30.0), + max_data_age=timedelta(seconds=10.0), component_status_tracker_type=BatteryStatusTracker, ) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py index 691902797..992e8b75c 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py @@ -5,8 +5,10 @@ import asyncio +import contextlib import logging from collections import abc +from datetime import timedelta from frequenz.channels import Broadcast, Receiver, Sender from frequenz.channels.util import Merge @@ -32,10 +34,10 @@ class ComponentPoolStatusTracker: def __init__( # pylint: disable=too-many-arguments self, - component_ids: set[int], + component_ids: abc.Set[int], component_status_sender: Sender[ComponentPoolStatus], - max_data_age_sec: float, - max_blocking_duration_sec: float, + max_data_age: timedelta, + max_blocking_duration: timedelta, component_status_tracker_type: type[ComponentStatusTracker], ) -> None: """Create ComponentPoolStatusTracker instance. @@ -44,18 +46,17 @@ def __init__( # pylint: disable=too-many-arguments component_ids: set of component ids whose status is to be tracked. component_status_sender: The sender used for sending the status of the tracked components. - max_data_age_sec: If a component stops sending data, then this is the - maximum time for which its last message should be considered as - valid. After that time, the component won't be used until it starts - sending data. - max_blocking_duration_sec: This value tell what should be the maximum - timeout used for blocking failing component. - component_status_tracker_type: component status tracker to use - for tracking the status of the components. + max_data_age: If a component stops sending data, then this is the maximum + time for which its last message should be considered as valid. After + that time, the component won't be used until it starts sending data. + max_blocking_duration: This value tell what should be the maximum timeout + used for blocking failing component. + component_status_tracker_type: component status tracker to use for tracking + the status of the components. """ self._component_ids = component_ids - self._max_data_age_sec = max_data_age_sec - self._max_blocking_duration_sec = max_blocking_duration_sec + self._max_data_age = max_data_age + self._max_blocking_duration = max_blocking_duration self._component_status_sender = component_status_sender self._component_status_tracker_type = component_status_tracker_type @@ -83,9 +84,6 @@ async def join(self) -> None: async def stop(self) -> None: """Stop the ComponentPoolStatusTracker instance.""" await cancel_and_await(self._task) - await asyncio.gather( - *[tracker.stop() for tracker in self._component_status_trackers], - ) await self._merged_status_receiver.stop() def _make_merged_status_receiver( @@ -99,8 +97,8 @@ def _make_merged_status_receiver( ) tracker = self._component_status_tracker_type( component_id=component_id, - max_data_age_sec=self._max_data_age_sec, - max_blocking_duration_sec=self._max_blocking_duration_sec, + max_data_age=self._max_data_age, + max_blocking_duration=self._max_blocking_duration, status_sender=channel.new_sender(), set_power_result_receiver=self._set_power_result_channel.new_receiver(), ) @@ -110,14 +108,17 @@ def _make_merged_status_receiver( async def _run(self) -> None: """Start tracking component status.""" - while True: - try: - await self._update_status() - except Exception as err: # pylint: disable=broad-except - _logger.error( - "ComponentPoolStatus failed with error: %s. Restarting.", err - ) - await asyncio.sleep(1.0) + async with contextlib.AsyncExitStack() as stack: + for tracker in self._component_status_trackers: + await stack.enter_async_context(tracker) + while True: + try: + await self._update_status() + except Exception as err: # pylint: disable=broad-except + _logger.error( + "ComponentPoolStatus failed with error: %s. Restarting.", err + ) + await asyncio.sleep(1.0) async def _update_status(self) -> None: async for status in self._merged_status_receiver: diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py new file mode 100644 index 000000000..8fb33415c --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py @@ -0,0 +1,22 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Status tracking for components.""" + +from ._battery_status_tracker import BatteryStatusTracker +from ._component_status import ( + ComponentPoolStatus, + ComponentStatus, + ComponentStatusEnum, + ComponentStatusTracker, + SetPowerResult, +) + +__all__ = [ + "BatteryStatusTracker", + "ComponentPoolStatus", + "ComponentStatus", + "ComponentStatusEnum", + "ComponentStatusTracker", + "SetPowerResult", +] diff --git a/src/frequenz/sdk/actor/power_distributing/_battery_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_status/_battery_status_tracker.py similarity index 87% rename from src/frequenz/sdk/actor/power_distributing/_battery_status_tracker.py rename to src/frequenz/sdk/actor/power_distributing/_component_status/_battery_status_tracker.py index adcb88436..6d6643cc6 100644 --- a/src/frequenz/sdk/actor/power_distributing/_battery_status_tracker.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/_battery_status_tracker.py @@ -20,14 +20,14 @@ from frequenz.channels.util import Timer, select, selected_from from typing_extensions import override -from ..._internal._asyncio import cancel_and_await -from ...microgrid import connection_manager -from ...microgrid.component import ( +from ....microgrid import connection_manager +from ....microgrid.component import ( BatteryData, ComponentCategory, ComponentData, InverterData, ) +from ..._background_service import BackgroundService from ._component_status import ( ComponentStatus, ComponentStatusEnum, @@ -55,55 +55,54 @@ class _ComponentStreamStatus: @dataclass class _BlockingStatus: - min_duration_sec: float - """The minimum blocking duration (in seconds).""" + min_duration: timedelta + """The minimum blocking duration.""" - max_duration_sec: float - """The maximum blocking duration (in seconds).""" + max_duration: timedelta + """The maximum blocking duration.""" - last_blocking_duration_sec: float = 0.0 - """Last blocking duration (in seconds).""" + last_blocking_duration: timedelta = timedelta(seconds=0.0) + """Last blocking duration.""" blocked_until: datetime | None = None """Until when the battery is blocked.""" def __post_init__(self) -> None: - assert self.min_duration_sec <= self.max_duration_sec, ( - f"Minimum blocking duration ({self.min_duration_sec}) cannot be greater " - f"than maximum blocking duration ({self.max_duration_sec})" + assert self.min_duration <= self.max_duration, ( + f"Minimum blocking duration ({self.min_duration}) cannot be greater " + f"than maximum blocking duration ({self.max_duration})" ) - self.last_blocking_duration_sec = self.min_duration_sec + self.last_blocking_duration = self.min_duration + self._timedelta_zero = timedelta(seconds=0.0) - def block(self) -> float: + def block(self) -> timedelta: """Block battery. Battery can be unblocked using `self.unblock()` method. Returns: - For how long (in seconds) the battery is blocked. + The duration for which the battery is blocked. """ now = datetime.now(tz=timezone.utc) # If is not blocked if self.blocked_until is None: - self.last_blocking_duration_sec = self.min_duration_sec - self.blocked_until = now + timedelta( - seconds=self.last_blocking_duration_sec - ) - return self.last_blocking_duration_sec + self.last_blocking_duration = self.min_duration + self.blocked_until = now + self.last_blocking_duration + return self.last_blocking_duration # If still blocked, then do nothing if self.blocked_until > now: - return 0.0 + return self._timedelta_zero # If previous blocking time expired, then blocked it once again. # Increase last blocking time, unless it reach the maximum. - self.last_blocking_duration_sec = min( - 2 * self.last_blocking_duration_sec, self.max_duration_sec + self.last_blocking_duration = min( + 2 * self.last_blocking_duration, self.max_duration ) - self.blocked_until = now + timedelta(seconds=self.last_blocking_duration_sec) + self.blocked_until = now + self.last_blocking_duration - return self.last_blocking_duration_sec + return self.last_blocking_duration def unblock(self) -> None: """Unblock battery. @@ -127,7 +126,7 @@ def is_blocked(self) -> bool: return self.blocked_until > datetime.now(tz=timezone.utc) -class BatteryStatusTracker(ComponentStatusTracker): +class BatteryStatusTracker(ComponentStatusTracker, BackgroundService): """Class for tracking if battery is working. Status updates are sent out only when there is a status change. @@ -166,8 +165,8 @@ class BatteryStatusTracker(ComponentStatusTracker): def __init__( # pylint: disable=too-many-arguments self, component_id: int, - max_data_age_sec: float, - max_blocking_duration_sec: float, + max_data_age: timedelta, + max_blocking_duration: timedelta, status_sender: Sender[ComponentStatus], set_power_result_receiver: Receiver[SetPowerResult], ) -> None: @@ -175,11 +174,10 @@ def __init__( # pylint: disable=too-many-arguments Args: component_id: Id of this battery - max_data_age_sec: If component stopped sending data, then - this is the maximum time when its last message should be considered as - valid. After that time, component won't be used until it starts sending - data. - max_blocking_duration_sec: This value tell what should be the maximum + max_data_age: If component stopped sending data, then this is the maximum + time when its last message should be considered as valid. After that + time, component won't be used until it starts sending data. + max_blocking_duration: This value tell what should be the maximum timeout used for blocking failing component. status_sender: Channel to send status updates. set_power_result_receiver: Channel to receive results of the requests to the @@ -188,13 +186,18 @@ def __init__( # pylint: disable=too-many-arguments Raises: RuntimeError: If battery has no adjacent inverter. """ - self._max_data_age = max_data_age_sec + BackgroundService.__init__(self, name=f"BatteryStatusTracker({component_id})") + self._max_data_age = max_data_age + self._status_sender = status_sender + self._set_power_result_receiver = set_power_result_receiver + # First battery is considered as not working. # Change status after first messages are received. self._last_status: ComponentStatusEnum = ComponentStatusEnum.NOT_WORKING self._blocking_status: _BlockingStatus = _BlockingStatus( - 1.0, max_blocking_duration_sec + timedelta(seconds=1.0), max_blocking_duration ) + self._timedelta_zero = timedelta(seconds=0.0) inverter_id = self._find_adjacent_inverter_id(component_id) if inverter_id is None: @@ -204,17 +207,22 @@ def __init__( # pylint: disable=too-many-arguments self._battery: _ComponentStreamStatus = _ComponentStreamStatus( component_id, - data_recv_timer=Timer.timeout(timedelta(seconds=max_data_age_sec)), + data_recv_timer=Timer.timeout(max_data_age), ) self._inverter: _ComponentStreamStatus = _ComponentStreamStatus( inverter_id, - data_recv_timer=Timer.timeout(timedelta(seconds=max_data_age_sec)), + data_recv_timer=Timer.timeout(max_data_age), ) # Select needs receivers that can be get in async way only. - self._task: asyncio.Task[None] = asyncio.create_task( - self._run(status_sender, set_power_result_receiver) + @override + def start(self) -> None: + """Start the BatteryStatusTracker instance.""" + self._tasks.add( + asyncio.create_task( + self._run(self._status_sender, self._set_power_result_receiver) + ) ) @property @@ -226,10 +234,6 @@ def battery_id(self) -> int: """ return self._battery.component_id - async def stop(self) -> None: - """Stop tracking battery status.""" - await cancel_and_await(self._task) - def _handle_status_battery(self, bat_data: BatteryData) -> None: self._battery.last_msg_correct = ( self._is_message_reliable(bat_data) @@ -259,9 +263,9 @@ def _handle_status_set_power_result(self, result: SetPowerResult) -> None: ): duration = self._blocking_status.block() - if duration > 0: + if duration > self._timedelta_zero: _logger.warning( - "battery %d failed last response. block it for %f sec", + "battery %d failed last response. block it for %s", self.battery_id, duration, ) @@ -345,7 +349,7 @@ async def _run( if ( datetime.now(tz=timezone.utc) - self._battery.last_msg_timestamp - ) < timedelta(seconds=self._max_data_age): + ) < self._max_data_age: # This means that we have received data from the battery # since the timer triggered, but the timer event arrived # late, so we can ignore it. @@ -356,7 +360,7 @@ async def _run( if ( datetime.now(tz=timezone.utc) - self._inverter.last_msg_timestamp - ) < timedelta(seconds=self._max_data_age): + ) < self._max_data_age: # This means that we have received data from the inverter # since the timer triggered, but the timer event arrived # late, so we can ignore it. @@ -505,7 +509,7 @@ def _is_timestamp_outdated(self, timestamp: datetime) -> bool: _True if timestamp is to old, False otherwise """ now = datetime.now(tz=timezone.utc) - diff = (now - timestamp).total_seconds() + diff = now - timestamp return diff > self._max_data_age def _is_message_reliable(self, message: ComponentData) -> bool: diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status.py b/src/frequenz/sdk/actor/power_distributing/_component_status/_component_status.py similarity index 80% rename from src/frequenz/sdk/actor/power_distributing/_component_status.py rename to src/frequenz/sdk/actor/power_distributing/_component_status/_component_status.py index a5c22938b..368527722 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_status.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/_component_status.py @@ -8,9 +8,12 @@ from abc import ABC, abstractmethod from collections import abc from dataclasses import dataclass +from datetime import timedelta from frequenz.channels import Receiver, Sender +from ..._background_service import BackgroundService + @dataclass class ComponentPoolStatus: @@ -76,15 +79,15 @@ class SetPowerResult: """Component IDs for which the last set power command failed.""" -class ComponentStatusTracker(ABC): +class ComponentStatusTracker(BackgroundService, ABC): """Interface for specialized component status trackers to implement.""" @abstractmethod - def __init__( # pylint: disable=too-many-arguments + def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, component_id: int, - max_data_age_sec: float, - max_blocking_duration_sec: float, + max_data_age: timedelta, + max_blocking_duration: timedelta, status_sender: Sender[ComponentStatus], set_power_result_receiver: Receiver[SetPowerResult], ) -> None: @@ -92,17 +95,12 @@ def __init__( # pylint: disable=too-many-arguments Args: component_id: Id of this component - max_data_age_sec: If component stopped sending data, then - this is the maximum time when its last message should be considered as - valid. After that time, component won't be used until it starts sending - data. - max_blocking_duration_sec: This value tell what should be the maximum + max_data_age: If component stopped sending data, then this is the maximum + time when its last message should be considered as valid. After that + time, component won't be used until it starts sending data. + max_blocking_duration: This value tell what should be the maximum timeout used for blocking failing component. status_sender: Channel to send status updates. set_power_result_receiver: Channel to receive results of the requests to the components. """ - - @abstractmethod - async def stop(self) -> None: - """Stop the ComponentStatusTracker instance.""" diff --git a/tests/actor/test_battery_pool_status.py b/tests/actor/test_battery_pool_status.py index edcbe3646..17fbe9398 100644 --- a/tests/actor/test_battery_pool_status.py +++ b/tests/actor/test_battery_pool_status.py @@ -3,17 +3,18 @@ """Tests for BatteryPoolStatus.""" import asyncio +from datetime import timedelta from frequenz.channels import Broadcast from pytest_mock import MockerFixture -from frequenz.sdk.actor.power_distributing._battery_status_tracker import ( - BatteryStatusTracker, -) from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import ( ComponentPoolStatusTracker, ) -from frequenz.sdk.actor.power_distributing._component_status import ComponentPoolStatus +from frequenz.sdk.actor.power_distributing._component_status import ( + BatteryStatusTracker, + ComponentPoolStatus, +) from frequenz.sdk.microgrid.component import ComponentCategory from tests.timeseries.mock_microgrid import MockMicrogrid @@ -47,8 +48,8 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None: batteries_status = ComponentPoolStatusTracker( component_ids=batteries, component_status_sender=battery_status_channel.new_sender(), - max_data_age_sec=5, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=5), + max_blocking_duration=timedelta(seconds=30), component_status_tracker_type=BatteryStatusTracker, ) await asyncio.sleep(0.1) diff --git a/tests/actor/test_battery_status.py b/tests/actor/test_battery_status.py index 65cea1438..b4295cf5e 100644 --- a/tests/actor/test_battery_status.py +++ b/tests/actor/test_battery_status.py @@ -7,10 +7,9 @@ import asyncio import math from collections.abc import AsyncIterator, Iterable -from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Any, Generic, TypeVar +from typing import Generic, TypeVar import pytest @@ -29,10 +28,8 @@ from pytest_mock import MockerFixture from time_machine import TimeMachineFixture -from frequenz.sdk.actor.power_distributing._battery_status_tracker import ( - BatteryStatusTracker, -) from frequenz.sdk.actor.power_distributing._component_status import ( + BatteryStatusTracker, ComponentStatus, ComponentStatusEnum, SetPowerResult, @@ -146,26 +143,6 @@ async def recv_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[_Tim return _Timeout -@asynccontextmanager -async def battery_status_tracker( - *args: Any, **kwargs: Any -) -> AsyncIterator[BatteryStatusTracker]: - """Create BatteryStatusTracker with given arguments. - - Args: - *args: Arguments for BatteryStatusTracker. - **kwargs: Arguments for BatteryStatusTracker. - - Yields: - BatteryStatusTracker with given arguments. - """ - tracker = BatteryStatusTracker(*args, **kwargs) - try: - yield tracker - finally: - await tracker.stop() - - # pylint: disable=protected-access, unused-argument class TestBatteryStatus: """Tests BatteryStatusTracker.""" @@ -188,10 +165,10 @@ async def test_sync_update_status_with_messages( status_channel = Broadcast[ComponentStatus]("battery_status") set_power_result_channel = Broadcast[SetPowerResult]("set_power_result") - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( BATTERY_ID, - max_data_age_sec=5, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=5), + max_blocking_duration=timedelta(seconds=30), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ) as tracker: @@ -356,12 +333,12 @@ async def test_sync_blocking_feature(self, mocker: MockerFixture) -> None: status_channel = Broadcast[ComponentStatus]("battery_status") set_power_result_channel = Broadcast[SetPowerResult]("set_power_result") - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( # increase max_data_age_sec for blocking tests. # Otherwise it will block blocking. BATTERY_ID, - max_data_age_sec=500, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=500), + max_blocking_duration=timedelta(seconds=30), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ) as tracker: @@ -494,10 +471,10 @@ async def test_sync_blocking_interrupted_with_with_max_data( status_channel = Broadcast[ComponentStatus]("battery_status") set_power_result_channel = Broadcast[SetPowerResult]("set_power_result") - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( BATTERY_ID, - max_data_age_sec=5, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=5), + max_blocking_duration=timedelta(seconds=30), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ) as tracker: @@ -542,10 +519,10 @@ async def test_sync_blocking_interrupted_with_invalid_message( status_channel = Broadcast[ComponentStatus]("battery_status") set_power_result_channel = Broadcast[SetPowerResult]("set_power_result") - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( BATTERY_ID, - max_data_age_sec=5, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=5), + max_blocking_duration=timedelta(seconds=30), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ) as tracker: @@ -603,10 +580,10 @@ async def test_timers( status_channel = Broadcast[ComponentStatus]("battery_status") set_power_result_channel = Broadcast[SetPowerResult]("set_power_result") - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( BATTERY_ID, - max_data_age_sec=5, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=5), + max_blocking_duration=timedelta(seconds=30), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ) as tracker: @@ -669,10 +646,10 @@ async def test_async_battery_status(self, mocker: MockerFixture) -> None: status_receiver = status_channel.new_receiver() set_power_result_sender = set_power_result_channel.new_sender() - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( BATTERY_ID, - max_data_age_sec=5, - max_blocking_duration_sec=30, + max_data_age=timedelta(seconds=5), + max_blocking_duration=timedelta(seconds=30), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ): @@ -753,10 +730,10 @@ async def setup_tracker( status_receiver = status_channel.new_receiver() - async with mock_microgrid, battery_status_tracker( + async with mock_microgrid, BatteryStatusTracker( BATTERY_ID, - max_data_age_sec=0.1, - max_blocking_duration_sec=1, + max_data_age=timedelta(seconds=0.1), + max_blocking_duration=timedelta(seconds=1), status_sender=status_channel.new_sender(), set_power_result_receiver=set_power_result_channel.new_receiver(), ):