Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- Refactored state extraction to return `StateRecord` objects with descriptive enum names (e.g., "ACTIVE", "FAULT") instead of raw integers for `state_value`, improving clarity and downstream usability.

## New Features

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ requires-python = ">= 3.11, < 4"
dependencies = [
"typing-extensions >= 4.6.1, < 5",
"frequenz-client-reporting >= 0.18.0, < 0.19.0",
"frequenz-client-common >= 0.3.0, < 0.4",
"frequenz-client-common >= 0.3.3, < 0.4",
]
dynamic = ["version"]

Expand Down
186 changes: 119 additions & 67 deletions src/frequenz/reporting/_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,54 @@
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""A highlevel interface for the reporting API."""

import enum
import logging
from collections import namedtuple
from datetime import datetime, timedelta
from itertools import groupby
from typing import Any
from typing import NamedTuple

from frequenz.client.common.enum_proto import enum_from_proto
from frequenz.client.common.metric import Metric
from frequenz.client.common.microgrid.components import (
ComponentErrorCode,
ComponentStateCode,
)
from frequenz.client.reporting import ReportingApiClient
from frequenz.client.reporting._types import MetricSample

_logger = logging.getLogger(__name__)

CumulativeEnergy = namedtuple(
"CumulativeEnergy", ["start_time", "end_time", "consumption", "production"]
)
"""Type for cumulative energy consumption and production over a specified time."""


class StateRecord(NamedTuple):
"""A record of a component state change.

A named tuple was chosen to allow safe access to the fields while keeping
the simplicity of a tuple. This data type can be easily used to create a
numpy array or a pandas DataFrame.
"""

microgrid_id: int
"""The ID of the microgrid."""

component_id: str
"""The ID of the component within the microgrid."""

state_type: str
"""The type of the state (e.g., "state", "warning", "error")."""

state_value: str
"""The value of the state (e.g., "ON", "OFF", "ERROR" etc.)."""

start_time: datetime | None
"""The start time of the state change."""

end_time: datetime | None
"""The end time of the state change."""


# pylint: disable-next=too-many-arguments
Expand Down Expand Up @@ -156,9 +190,9 @@ async def fetch_and_extract_state_durations(
start_time: datetime,
end_time: datetime,
resampling_period: timedelta | None,
alert_states: list[int],
alert_states: list[ComponentStateCode],
include_warnings: bool = True,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
) -> tuple[list[StateRecord], list[StateRecord]]:
"""Fetch data using the Reporting API and extract state durations and alert records.

Args:
Expand All @@ -172,14 +206,13 @@ async def fetch_and_extract_state_durations(
end_time: The end date and time for the period.
resampling_period: The period for resampling the data. If None, data
will be returned in its original resolution
alert_states: List of component state values that should trigger an alert.
include_warnings: Whether to include warning state values in the alert
records.
alert_states: List of ComponentStateCode names that should trigger an alert.
include_warnings: Whether to include warning states in the alert records.

Returns:
A tuple containing two lists:
- all_states: Contains all state records including start and end times.
- alert_records: Contains filtered records matching the alert criteria.
A tuple containing:
- A list of StateRecord instances representing the state changes.
- A list of StateRecord instances that match the alert criteria.
"""
samples = await _fetch_component_data(
client=client,
Expand All @@ -192,30 +225,23 @@ async def fetch_and_extract_state_durations(
include_bounds=False,
Copy link

Copilot AI Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass include_states=True to _fetch_component_data so that state metrics are actually fetched for processing.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check again

)

all_states, alert_records = extract_state_durations(
samples, alert_states, include_warnings
)
all_states = _extract_state_records(samples, include_warnings)
alert_records = _filter_alerts(all_states, alert_states, include_warnings)
return all_states, alert_records


def extract_state_durations(
def _extract_state_records(
samples: list[MetricSample],
alert_states: list[int],
include_warnings: bool = True,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""
Extract state durations and alert records based on state transitions.
include_warnings: bool,
) -> list[StateRecord]:
"""Extract state records from the provided samples.

Args:
samples: List of MetricSample instances containing the reporting data.
alert_states: List of component state values that should trigger an alert.
Component error codes are reported by default.
include_warnings: Whether to include warning state values in the alert records.
include_warnings: Whether to include warning states in the alert records.

Returns:
A tuple containing two lists:
- all_states: Contains all state records including start and end times.
- alert_records: Contains filtered records matching the alert criteria.
A list of StateRecord instances representing the state changes.
"""
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
state_metrics = ["state"] + alert_metrics
Expand All @@ -225,96 +251,122 @@ def extract_state_durations(
)

if not filtered_samples:
return [], []
return []

# Group samples by (microgrid_id, component_id, metric)
all_states = []
for key, group in groupby(
filtered_samples, key=lambda s: (s.microgrid_id, s.component_id, s.metric)
):
states = _process_group_samples(key, list(group))
all_states.extend(states)
all_states.extend(_process_sample_group(key, list(group)))

all_states.sort(
key=lambda x: (x["microgrid_id"], x["component_id"], x["start_time"])
)

alert_records = _filter_alerts(all_states, alert_states, alert_metrics)
return all_states, alert_records
all_states.sort(key=lambda x: (x.microgrid_id, x.component_id, x.start_time))
return all_states


def _process_group_samples(
key: tuple[int, int, str],
group_samples: list["MetricSample"],
) -> list[dict[str, Any]]:
def _process_sample_group(
key: tuple[int, str, str],
group_samples: list[MetricSample],
) -> list[StateRecord]:
"""Process samples for a single group to extract state durations.

Args:
key: Tuple containing microgrid ID, component ID, and metric.
group_samples: List of samples for the group.

Returns:
List of state records.
A list of StateRecord instances representing the state changes.
"""
mid, cid, metric = key
if not group_samples:
return []

state_records = []
current_state_value = None
start_time = None
current_state_value: float | None = None
start_time: datetime | None = None
enum_class = ComponentStateCode if metric == "state" else ComponentErrorCode

for sample in group_samples:
if current_state_value != sample.value:
# Close previous state run
if current_state_value is not None:
state_records.append(
{
"microgrid_id": mid,
"component_id": cid,
"state_type": metric,
"state_value": current_state_value,
"start_time": start_time,
"end_time": sample.timestamp,
}
StateRecord(
microgrid_id=mid,
component_id=cid,
state_type=metric,
state_value=_resolve_enum_name(current_state_value, enum_class),
start_time=start_time,
end_time=sample.timestamp,
)
)
# Start new state run
current_state_value = sample.value
start_time = sample.timestamp

# Close the last state run
state_records.append(
{
"microgrid_id": mid,
"component_id": cid,
"state_type": metric,
"state_value": current_state_value,
"start_time": start_time,
"end_time": None,
}
StateRecord(
microgrid_id=mid,
component_id=cid,
state_type=metric,
state_value=(
_resolve_enum_name(current_state_value, enum_class)
if current_state_value is not None
else ""
),
start_time=start_time,
end_time=None,
)
)

return state_records


def _resolve_enum_name(value: float, enum_class: type[enum.Enum]) -> str:
"""Resolve the name of an enum member from its integer value.

Args:
value: The integer value of the enum.
enum_class: The enum class to convert the value to.

Returns:
The name of the enum member if it exists, otherwise the value as a string.
"""
result = enum_from_proto(int(value), enum_class)
if isinstance(result, int):
_logger.warning(
"Unknown enum value %s for %s, returning the integer value as a string.",
value,
enum_class.__name__,
)
return str(result)
return result.name


def _filter_alerts(
all_states: list[dict[str, Any]],
alert_states: list[int],
alert_metrics: list[str],
) -> list[dict[str, Any]]:
all_states: list[StateRecord],
alert_states: list[ComponentStateCode],
include_warnings: bool,
) -> list[StateRecord]:
"""Identify alert records from all states.

Args:
all_states: List of all state records.
alert_states: List of component state values that should trigger an alert.
alert_metrics: List of metric names that should trigger an alert.
alert_states: List of ComponentStateCode names that should trigger an alert.
include_warnings: Whether to include warning states in the alert records.

Returns:
List of alert records.
A list of StateRecord instances that match the alert criteria.
"""
alert_metrics = ["warning", "error"] if include_warnings else ["error"]
_alert_state_names = {state.name for state in alert_states}
return [
state
for state in all_states
if (
(state["state_type"] == "state" and state["state_value"] in alert_states)
or (state["state_type"] in alert_metrics)
(state.state_type == "state" and state.state_value in _alert_state_names)
or (state.state_type in alert_metrics)
)
]

Expand Down
Loading
Loading