Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* States can now be requested via the client and are provided through the flat iterator.
They can be identified via their category `state`, `warning` and `error`, respectively.
Each individual state is provided as its own sample.

* Support for states is also added to the CLI tool via the `--states` flag.

## Bug Fixes

Expand Down
9 changes: 9 additions & 0 deletions src/frequenz/client/reporting/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def main() -> None:
help="List of metrics to process",
required=True,
)
parser.add_argument(
"--states",
action="store_true",
help="Include states in the output",
)
parser.add_argument(
"--start",
type=datetime.fromisoformat,
Expand Down Expand Up @@ -66,6 +71,7 @@ def main() -> None:
args.start,
args.end,
args.resolution,
states=args.states,
service_address=args.url,
key=args.key,
fmt=args.format,
Expand All @@ -81,6 +87,7 @@ async def run(
start_dt: datetime,
end_dt: datetime,
resolution: int,
states: bool,
service_address: str,
key: str,
fmt: str,
Expand All @@ -94,6 +101,7 @@ async def run(
start_dt: start datetime
end_dt: end datetime
resolution: resampling resolution in sec
states: include states in the output
service_address: service address
key: API key
fmt: output format
Expand All @@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]:
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=states,
)

if fmt == "iter":
Expand Down
51 changes: 47 additions & 4 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""Client for requests to the Reporting API."""

from collections import namedtuple
from collections.abc import AsyncIterator, Iterator
from collections.abc import AsyncIterator, Iterable, Iterator
from dataclasses import dataclass
from datetime import datetime
from typing import cast
Expand All @@ -15,6 +15,7 @@
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
MicrogridComponentIDs as PBMicrogridComponentIDs,
)
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
from frequenz.api.reporting.v1.reporting_pb2 import (
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
)
Expand Down Expand Up @@ -56,7 +57,10 @@ def is_empty(self) -> bool:
"""
if not self._data_pb.components:
return True
if not self._data_pb.components[0].metric_samples:
if (
not self._data_pb.components[0].metric_samples
and not self._data_pb.components[0].states
):
return True
return False

Expand Down Expand Up @@ -94,6 +98,26 @@ def __iter__(self) -> Iterator[MetricSample]:
metric=met,
value=value,
)
for state in cdata.states:
ts = state.sampled_at.ToDatetime()
for name, category in {
"state": state.states,
"warning": state.warnings,
"error": state.errors,
}.items():
# Skip if the category is not present
if not isinstance(category, Iterable):
continue
# Each category can have multiple states
# that are provided as individual samples
for s in category:
yield MetricSample(
timestamp=ts,
microgrid_id=mid,
component_id=cid,
metric=name,
value=s,
)


class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]):
Expand All @@ -120,6 +144,7 @@ async def list_single_component_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for a single metric.

Expand All @@ -130,6 +155,7 @@ async def list_single_component_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.

Yields:
A named tuple with the following fields:
Expand All @@ -142,6 +168,7 @@ async def list_single_component_data(
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=include_states,
):
for entry in batch:
yield entry
Expand All @@ -155,6 +182,7 @@ async def list_microgrid_components_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for multiple microgrids and components.

Expand All @@ -165,6 +193,7 @@ async def list_microgrid_components_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.

Yields:
A named tuple with the following fields:
Expand All @@ -180,11 +209,13 @@ async def list_microgrid_components_data(
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=include_states,
):
for entry in batch:
yield entry

# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals
async def _list_microgrid_components_data_batch(
self,
*,
Expand All @@ -193,6 +224,7 @@ async def _list_microgrid_components_data_batch(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[ComponentsDataBatch]:
"""Iterate over the component data batches in the stream.

Expand All @@ -205,6 +237,7 @@ async def _list_microgrid_components_data_batch(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.

Yields:
A ComponentsDataBatch object of microgrid components data.
Expand All @@ -224,17 +257,27 @@ def dt2ts(dt: datetime) -> PBTimestamp:
end=dt2ts(end_dt),
)

list_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
incl_states = (
PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE
if include_states
else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE
)
include_options = PBIncludeOptions(
states=incl_states,
)

stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(resolution=resolution),
include_options=include_options,
)

metrics_pb = [metric.to_proto() for metric in metrics]

request = PBReceiveMicrogridComponentsDataStreamRequest(
microgrid_components=microgrid_components_pb,
metrics=metrics_pb,
filter=list_filter,
filter=stream_filter,
)

try:
Expand Down
Loading