Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
- The signature for passing config files MicrogridConfig.load_config() has been changed to accept a path a list of paths and a directory containing the config files.
- `MicrogridData` class needs to be initialized with a `MicrogridConfig` object instead of a path to config file(s).
- Added a transactional stateful data fetcher.
- Added a new `state_analysis` module for detecting and analysing component state transitions and alerts from reporting data.
Copy link

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The word 'analysing' should be spelled 'analyzing' to match American English spelling conventions used elsewhere in the codebase.

Suggested change
- Added a new `state_analysis` module for detecting and analysing component state transitions and alerts from reporting data.
- Added a new `state_analysis` module for detecting and analyzing component state transitions and alerts from reporting data.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually lean towards British english, thank you.

- Provides structured `StateRecord` objects with human-readable enum names.
- Supports filtering for alert states and warnings.
- Includes full test coverage for transition detection and alert filtering logic.

## Bug Fixes

Expand Down
34 changes: 34 additions & 0 deletions src/frequenz/lib/notebooks/reporting/_state_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Data structures for representing component state transitions and alerts."""

from datetime import datetime
from typing import NamedTuple


class StateRecord(NamedTuple):
"""A record of a component state change.

A named tuple was chosen to allow safe access to the fields while keeping
the simplicity of a tuple. This data type can be easily used to create a
numpy array or a pandas DataFrame.
"""

microgrid_id: int
"""The ID of the microgrid."""

component_id: str
"""The ID of the component within the microgrid."""

state_type: str
"""The type of the state (e.g., "state", "warning", "error")."""

state_value: str
"""The value of the state (e.g., "ON", "OFF", "ERROR" etc.)."""

start_time: datetime | None
"""The start time of the state change."""

end_time: datetime | None
"""The end time of the state change."""
302 changes: 302 additions & 0 deletions src/frequenz/lib/notebooks/reporting/state_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Functions for analyzing microgrid component state transitions and extracting alerts."""
import logging
from datetime import datetime, timedelta

from frequenz.client.common.metric import Metric
from frequenz.client.common.microgrid.components import (
ComponentErrorCode,
ComponentStateCode,
)
from frequenz.client.reporting import ReportingApiClient
from frequenz.client.reporting._types import MetricSample

from ._state_records import StateRecord

_logger = logging.getLogger(__name__)


# pylint: disable-next=too-many-arguments
async def fetch_and_extract_state_durations(
*,
client: ReportingApiClient,
microgrid_components: list[tuple[int, list[int]]],
metrics: list[Metric],
start_time: datetime,
end_time: datetime,
resampling_period: timedelta | None,
alert_states: list[ComponentStateCode],
include_warnings: bool = True,
) -> tuple[list[StateRecord], list[StateRecord]]:
"""Fetch data using the Reporting API and extract state durations and alert records.

Args:
client: The client used to fetch the metric samples from the Reporting API.
microgrid_components: List of tuples where each tuple contains microgrid
ID and corresponding component IDs.
metrics: List of metric names.
NOTE: The service will support requesting states without metrics in
the future and this argument will be removed.
start_time: The start date and time for the period.
end_time: The end date and time for the period.
resampling_period: The period for resampling the data. If None, data
will be returned in its original resolution.
alert_states: List of ComponentStateCode names that should trigger an alert.
include_warnings: Whether to include warning states in the alert records.

Returns:
A tuple containing:
- A list of StateRecord instances representing the state changes.
- A list of StateRecord instances that match the alert criteria.
"""
samples = await _fetch_component_data(
client=client,
microgrid_components=microgrid_components,
metrics=metrics,
start_time=start_time,
end_time=end_time,
resampling_period=resampling_period,
include_states=True,
include_bounds=False,
)

all_states = _extract_state_records(samples, include_warnings)
alert_records = _filter_alerts(all_states, alert_states, include_warnings)
return all_states, alert_records


def _extract_state_records(
samples: list[MetricSample], include_warnings: bool
) -> list[StateRecord]:
"""Extract state records from the provided samples.

Args:
samples: List of MetricSample instances containing the reporting data.
include_warnings: Whether to include warning states in the alert records.

Returns:
A list of StateRecord instances representing the state changes.
"""
component_groups = _group_samples_by_component(samples, include_warnings)

all_records = []
for (mid, cid), metrics in component_groups.items():
if "state" not in metrics:
continue
all_records.extend(_process_sample_group(mid, cid, metrics))

all_records.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
return all_records


# pylint: disable-next=too-many-locals,too-many-branches
def _process_sample_group(
microgrid_id: int,
component_id: str,
samples_by_metric: dict[str, list[MetricSample]],
) -> list[StateRecord]:
"""Process state/error/warning samples for a single component.

Args:
microgrid_id: ID of the microgrid.
component_id: ID of the component.
samples_by_metric: Dict with keys "state", "error", optionally "warning".

Returns:
A list of StateRecord instances representing the state changes and
error/warning durations (if any).
"""
state_samples = sorted(samples_by_metric["state"], key=lambda s: s.timestamp)
error_by_ts = {s.timestamp: s for s in samples_by_metric.get("error", [])}
warning_by_ts = {s.timestamp: s for s in samples_by_metric.get("warning", [])}

records: list[StateRecord] = []
state_val = error_val = warning_val = None
state_start = error_start = warning_start = None

def emit(
metric: str,
val: float,
start: datetime | None,
end: datetime | None,
enum_class: type[ComponentStateCode | ComponentErrorCode],
) -> None:
"""Emit a state record."""
records.append(
StateRecord(
microgrid_id=microgrid_id,
component_id=component_id,
state_type=metric,
state_value=_resolve_enum_name(val, enum_class),
start_time=start,
end_time=end,
)
)

for sample in state_samples:
ts = sample.timestamp

# State change
if sample.value != state_val:
if state_val is not None:
emit("state", state_val, state_start, ts, ComponentStateCode)
state_val = sample.value
state_start = ts

# Close error/warning if exiting ERROR
if state_val != ComponentStateCode.ERROR.value:
if error_val is not None:
emit("error", error_val, error_start, ts, ComponentErrorCode)
error_val = error_start = None
if warning_val is not None:
emit("warning", warning_val, warning_start, ts, ComponentErrorCode)
warning_val = warning_start = None

# While in ERROR
if state_val == ComponentStateCode.ERROR.value:
if ts in error_by_ts:
new_err = error_by_ts[ts].value
if new_err != error_val:
if error_val is not None:
emit("error", error_val, error_start, ts, ComponentErrorCode)
error_val = new_err
error_start = ts

if ts in warning_by_ts:
new_warn = warning_by_ts[ts].value
if new_warn != warning_val:
if warning_val is not None:
emit(
"warning",
warning_val,
warning_start,
ts,
ComponentErrorCode,
)
warning_val = new_warn
warning_start = ts

if state_val is not None:
emit("state", state_val, state_start, None, ComponentStateCode)
if state_val == ComponentStateCode.ERROR.value:
if error_val is not None:
emit("error", error_val, error_start, None, ComponentErrorCode)
if warning_val is not None:
emit("warning", warning_val, warning_start, None, ComponentErrorCode)
return records


def _group_samples_by_component(
samples: list[MetricSample], include_warnings: bool
) -> dict[tuple[int, str], dict[str, list[MetricSample]]]:
"""Group samples by (microgrid_id, component_id) and metric.

Args:
samples: List of MetricSample instances containing the reporting data.
include_warnings: Whether to include warning states in the alert records.

Returns:
A dictionary where keys are tuples of (microgrid_id, component_id) and values
are dictionaries with metric names as keys and lists of MetricSample as values.
"""
alert_metrics = {"state", "error"}
if include_warnings:
alert_metrics.add("warning")

component_groups: dict[tuple[int, str], dict[str, list[MetricSample]]] = {}
for sample in samples:
if sample.metric not in alert_metrics:
continue
key = (sample.microgrid_id, sample.component_id)
metric_dict = component_groups.setdefault(key, {})
metric_dict.setdefault(sample.metric, []).append(sample)
return component_groups


def _resolve_enum_name(
value: float, enum_class: type[ComponentStateCode | ComponentErrorCode]
) -> str:
"""Resolve the name of an enum member from its integer value.

Args:
value: The integer value of the enum.
enum_class: The enum class to convert the value to.

Returns:
The name of the enum member if it exists, otherwise if the value is invalid,
the enum class will return a default value (e.g., "UNSPECIFIED").
"""
result = enum_class.from_proto(int(value)) # type: ignore[arg-type]
return result.name


def _filter_alerts(
all_states: list[StateRecord],
alert_states: list[ComponentStateCode],
include_warnings: bool,
) -> list[StateRecord]:
"""Identify alert records from all states.

Args:
all_states: List of all state records.
alert_states: List of ComponentStateCode names that should trigger an alert.
include_warnings: Whether to include warning states in the alert records.

Returns:
A list of StateRecord instances that match the alert criteria.
"""
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
_alert_state_names = {state.name for state in alert_states}
return [
state
for state in all_states
if (
(state.state_type == "state" and state.state_value in _alert_state_names)
or (state.state_type in alert_metrics)
)
]


# pylint: disable-next=too-many-arguments
async def _fetch_component_data(
*,
client: ReportingApiClient,
microgrid_components: list[tuple[int, list[int]]],
metrics: list[Metric],
start_time: datetime,
end_time: datetime,
resampling_period: timedelta | None,
include_states: bool = False,
include_bounds: bool = False,
) -> list[MetricSample]:
"""Fetch component data from the Reporting API.

Args:
client: The client used to fetch the metric samples from the Reporting API.
microgrid_components: List of tuples where each tuple contains
microgrid ID and corresponding component IDs.
metrics: List of metric names.
start_time: The start date and time for the period.
end_time: The end date and time for the period.
resampling_period: The period for resampling the data. If None, data
will be returned in its original resolution
include_states: Whether to include the state data.
include_bounds: Whether to include the bound data.

Returns:
List of MetricSample instances containing the reporting data.
"""
return [
sample
async for sample in client.receive_microgrid_components_data(
microgrid_components=microgrid_components,
metrics=metrics,
start_time=start_time,
end_time=end_time,
resampling_period=resampling_period,
include_states=include_states,
include_bounds=include_bounds,
)
]
Loading
Loading