Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
from ....microgrid import connection_manager
from ....microgrid.component import BatteryData, ComponentCategory, InverterData
from ....timeseries._quantities import Power
from .._battery_status_tracker import BatteryStatusTracker
from .._component_pool_status_tracker import ComponentPoolStatusTracker
from .._component_status import ComponentPoolStatus
from .._component_status import BatteryStatusTracker, ComponentPoolStatus
from .._distribution_algorithm import (
AggregatedBatteryData,
BatteryDistributionAlgorithm,
Expand Down Expand Up @@ -146,8 +145,8 @@ def __init__(
self._component_pool_status_tracker = ComponentPoolStatusTracker(
component_ids=set(self._battery_ids),
component_status_sender=component_pool_status_sender,
max_blocking_duration_sec=30.0,
max_data_age_sec=10.0,
max_blocking_duration=timedelta(seconds=30.0),
max_data_age=timedelta(seconds=10.0),
component_status_tracker_type=BatteryStatusTracker,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@


import asyncio
import contextlib
import logging
from collections import abc
from datetime import timedelta

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import Merge
Expand All @@ -32,10 +34,10 @@ class ComponentPoolStatusTracker:

def __init__( # pylint: disable=too-many-arguments
self,
component_ids: set[int],
component_ids: abc.Set[int],
component_status_sender: Sender[ComponentPoolStatus],
max_data_age_sec: float,
max_blocking_duration_sec: float,
max_data_age: timedelta,
max_blocking_duration: timedelta,
component_status_tracker_type: type[ComponentStatusTracker],
) -> None:
"""Create ComponentPoolStatusTracker instance.
Expand All @@ -44,18 +46,17 @@ def __init__( # pylint: disable=too-many-arguments
component_ids: set of component ids whose status is to be tracked.
component_status_sender: The sender used for sending the status of the
tracked components.
max_data_age_sec: If a component stops sending data, then this is the
maximum time for which its last message should be considered as
valid. After that time, the component won't be used until it starts
sending data.
max_blocking_duration_sec: This value tell what should be the maximum
timeout used for blocking failing component.
component_status_tracker_type: component status tracker to use
for tracking the status of the components.
max_data_age: If a component stops sending data, then this is the maximum
time for which its last message should be considered as valid. After
that time, the component won't be used until it starts sending data.
max_blocking_duration: This value tell what should be the maximum timeout
used for blocking failing component.
component_status_tracker_type: component status tracker to use for tracking
the status of the components.
"""
self._component_ids = component_ids
self._max_data_age_sec = max_data_age_sec
self._max_blocking_duration_sec = max_blocking_duration_sec
self._max_data_age = max_data_age
self._max_blocking_duration = max_blocking_duration
self._component_status_sender = component_status_sender
self._component_status_tracker_type = component_status_tracker_type

Expand Down Expand Up @@ -83,9 +84,6 @@ async def join(self) -> None:
async def stop(self) -> None:
"""Stop the ComponentPoolStatusTracker instance."""
await cancel_and_await(self._task)
await asyncio.gather(
*[tracker.stop() for tracker in self._component_status_trackers],
)
await self._merged_status_receiver.stop()

def _make_merged_status_receiver(
Expand All @@ -99,8 +97,8 @@ def _make_merged_status_receiver(
)
tracker = self._component_status_tracker_type(
component_id=component_id,
max_data_age_sec=self._max_data_age_sec,
max_blocking_duration_sec=self._max_blocking_duration_sec,
max_data_age=self._max_data_age,
max_blocking_duration=self._max_blocking_duration,
status_sender=channel.new_sender(),
set_power_result_receiver=self._set_power_result_channel.new_receiver(),
)
Expand All @@ -110,14 +108,17 @@ def _make_merged_status_receiver(

async def _run(self) -> None:
"""Start tracking component status."""
while True:
try:
await self._update_status()
except Exception as err: # pylint: disable=broad-except
_logger.error(
"ComponentPoolStatus failed with error: %s. Restarting.", err
)
await asyncio.sleep(1.0)
async with contextlib.AsyncExitStack() as stack:
for tracker in self._component_status_trackers:
await stack.enter_async_context(tracker)
while True:
try:
await self._update_status()
except Exception as err: # pylint: disable=broad-except
_logger.error(
"ComponentPoolStatus failed with error: %s. Restarting.", err
)
await asyncio.sleep(1.0)

async def _update_status(self) -> None:
async for status in self._merged_status_receiver:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Status tracking for components."""

from ._battery_status_tracker import BatteryStatusTracker
from ._component_status import (
ComponentPoolStatus,
ComponentStatus,
ComponentStatusEnum,
ComponentStatusTracker,
SetPowerResult,
)

__all__ = [
"BatteryStatusTracker",
"ComponentPoolStatus",
"ComponentStatus",
"ComponentStatusEnum",
"ComponentStatusTracker",
"SetPowerResult",
]
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
from frequenz.channels.util import Timer, select, selected_from
from typing_extensions import override

from ..._internal._asyncio import cancel_and_await
from ...microgrid import connection_manager
from ...microgrid.component import (
from ....microgrid import connection_manager
from ....microgrid.component import (
BatteryData,
ComponentCategory,
ComponentData,
InverterData,
)
from ..._background_service import BackgroundService
from ._component_status import (
ComponentStatus,
ComponentStatusEnum,
Expand Down Expand Up @@ -55,55 +55,54 @@ class _ComponentStreamStatus:

@dataclass
class _BlockingStatus:
min_duration_sec: float
"""The minimum blocking duration (in seconds)."""
min_duration: timedelta
"""The minimum blocking duration."""

max_duration_sec: float
"""The maximum blocking duration (in seconds)."""
max_duration: timedelta
"""The maximum blocking duration."""

last_blocking_duration_sec: float = 0.0
"""Last blocking duration (in seconds)."""
last_blocking_duration: timedelta = timedelta(seconds=0.0)
"""Last blocking duration."""

blocked_until: datetime | None = None
"""Until when the battery is blocked."""

def __post_init__(self) -> None:
assert self.min_duration_sec <= self.max_duration_sec, (
f"Minimum blocking duration ({self.min_duration_sec}) cannot be greater "
f"than maximum blocking duration ({self.max_duration_sec})"
assert self.min_duration <= self.max_duration, (
f"Minimum blocking duration ({self.min_duration}) cannot be greater "
f"than maximum blocking duration ({self.max_duration})"
)
self.last_blocking_duration_sec = self.min_duration_sec
self.last_blocking_duration = self.min_duration
self._timedelta_zero = timedelta(seconds=0.0)

def block(self) -> float:
def block(self) -> timedelta:
"""Block battery.

Battery can be unblocked using `self.unblock()` method.

Returns:
For how long (in seconds) the battery is blocked.
The duration for which the battery is blocked.
"""
now = datetime.now(tz=timezone.utc)

# If is not blocked
if self.blocked_until is None:
self.last_blocking_duration_sec = self.min_duration_sec
self.blocked_until = now + timedelta(
seconds=self.last_blocking_duration_sec
)
return self.last_blocking_duration_sec
self.last_blocking_duration = self.min_duration
self.blocked_until = now + self.last_blocking_duration
return self.last_blocking_duration

# If still blocked, then do nothing
if self.blocked_until > now:
return 0.0
return self._timedelta_zero

# If previous blocking time expired, then blocked it once again.
# Increase last blocking time, unless it reach the maximum.
self.last_blocking_duration_sec = min(
2 * self.last_blocking_duration_sec, self.max_duration_sec
self.last_blocking_duration = min(
2 * self.last_blocking_duration, self.max_duration
)
self.blocked_until = now + timedelta(seconds=self.last_blocking_duration_sec)
self.blocked_until = now + self.last_blocking_duration

return self.last_blocking_duration_sec
return self.last_blocking_duration

def unblock(self) -> None:
"""Unblock battery.
Expand All @@ -127,7 +126,7 @@ def is_blocked(self) -> bool:
return self.blocked_until > datetime.now(tz=timezone.utc)


class BatteryStatusTracker(ComponentStatusTracker):
class BatteryStatusTracker(ComponentStatusTracker, BackgroundService):
"""Class for tracking if battery is working.

Status updates are sent out only when there is a status change.
Expand Down Expand Up @@ -166,20 +165,19 @@ class BatteryStatusTracker(ComponentStatusTracker):
def __init__( # pylint: disable=too-many-arguments
self,
component_id: int,
max_data_age_sec: float,
max_blocking_duration_sec: float,
max_data_age: timedelta,
max_blocking_duration: timedelta,
status_sender: Sender[ComponentStatus],
set_power_result_receiver: Receiver[SetPowerResult],
) -> None:
"""Create class instance.

Args:
component_id: Id of this battery
max_data_age_sec: If component stopped sending data, then
this is the maximum time when its last message should be considered as
valid. After that time, component won't be used until it starts sending
data.
max_blocking_duration_sec: This value tell what should be the maximum
max_data_age: If component stopped sending data, then this is the maximum
time when its last message should be considered as valid. After that
time, component won't be used until it starts sending data.
max_blocking_duration: This value tell what should be the maximum
timeout used for blocking failing component.
status_sender: Channel to send status updates.
set_power_result_receiver: Channel to receive results of the requests to the
Expand All @@ -188,13 +186,18 @@ def __init__( # pylint: disable=too-many-arguments
Raises:
RuntimeError: If battery has no adjacent inverter.
"""
self._max_data_age = max_data_age_sec
BackgroundService.__init__(self, name=f"BatteryStatusTracker({component_id})")
self._max_data_age = max_data_age
self._status_sender = status_sender
self._set_power_result_receiver = set_power_result_receiver

# First battery is considered as not working.
# Change status after first messages are received.
self._last_status: ComponentStatusEnum = ComponentStatusEnum.NOT_WORKING
self._blocking_status: _BlockingStatus = _BlockingStatus(
1.0, max_blocking_duration_sec
timedelta(seconds=1.0), max_blocking_duration
)
self._timedelta_zero = timedelta(seconds=0.0)

inverter_id = self._find_adjacent_inverter_id(component_id)
if inverter_id is None:
Expand All @@ -204,17 +207,22 @@ def __init__( # pylint: disable=too-many-arguments

self._battery: _ComponentStreamStatus = _ComponentStreamStatus(
component_id,
data_recv_timer=Timer.timeout(timedelta(seconds=max_data_age_sec)),
data_recv_timer=Timer.timeout(max_data_age),
)
self._inverter: _ComponentStreamStatus = _ComponentStreamStatus(
inverter_id,
data_recv_timer=Timer.timeout(timedelta(seconds=max_data_age_sec)),
data_recv_timer=Timer.timeout(max_data_age),
)

# Select needs receivers that can be get in async way only.

self._task: asyncio.Task[None] = asyncio.create_task(
self._run(status_sender, set_power_result_receiver)
@override
def start(self) -> None:
"""Start the BatteryStatusTracker instance."""
self._tasks.add(
asyncio.create_task(
self._run(self._status_sender, self._set_power_result_receiver)
)
)

@property
Expand All @@ -226,10 +234,6 @@ def battery_id(self) -> int:
"""
return self._battery.component_id

async def stop(self) -> None:
"""Stop tracking battery status."""
await cancel_and_await(self._task)

def _handle_status_battery(self, bat_data: BatteryData) -> None:
self._battery.last_msg_correct = (
self._is_message_reliable(bat_data)
Expand Down Expand Up @@ -259,9 +263,9 @@ def _handle_status_set_power_result(self, result: SetPowerResult) -> None:
):
duration = self._blocking_status.block()

if duration > 0:
if duration > self._timedelta_zero:
_logger.warning(
"battery %d failed last response. block it for %f sec",
"battery %d failed last response. block it for %s",
self.battery_id,
duration,
)
Expand Down Expand Up @@ -345,7 +349,7 @@ async def _run(
if (
datetime.now(tz=timezone.utc)
- self._battery.last_msg_timestamp
) < timedelta(seconds=self._max_data_age):
) < self._max_data_age:
# This means that we have received data from the battery
# since the timer triggered, but the timer event arrived
# late, so we can ignore it.
Expand All @@ -356,7 +360,7 @@ async def _run(
if (
datetime.now(tz=timezone.utc)
- self._inverter.last_msg_timestamp
) < timedelta(seconds=self._max_data_age):
) < self._max_data_age:
# This means that we have received data from the inverter
# since the timer triggered, but the timer event arrived
# late, so we can ignore it.
Expand Down Expand Up @@ -505,7 +509,7 @@ def _is_timestamp_outdated(self, timestamp: datetime) -> bool:
_True if timestamp is to old, False otherwise
"""
now = datetime.now(tz=timezone.utc)
diff = (now - timestamp).total_seconds()
diff = now - timestamp
return diff > self._max_data_age

def _is_message_reliable(self, message: ComponentData) -> bool:
Expand Down
Loading