Skip to content

Commit 0c1cdb6

Browse files
authored
Add support for states in client and cli tool (frequenz-floss#97)
States can now be requested via the client and are provided through the flat iterator. They can be identified via their category `state`, `warning` and `error`, respectively. Each individual state is provided as its own sample. Support for states is also added to the CLI tool via the `--states` flag. As of now requests for states without any metrics are not yet supported by the service. Fixes frequenz-floss#25
2 parents d7664fa + fb865ff commit 0c1cdb6

File tree

3 files changed

+61
-5
lines changed

3 files changed

+61
-5
lines changed

RELEASE_NOTES.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* States can now be requested via the client and are provided through the flat iterator.
14+
They can be identified via their category `state`, `warning` and `error`, respectively.
15+
Each individual state is provided as its own sample.
16+
17+
* Support for states is also added to the CLI tool via the `--states` flag.
1418

1519
## Bug Fixes
1620

src/frequenz/client/reporting/__main__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ def main() -> None:
3434
help="List of metrics to process",
3535
required=True,
3636
)
37+
parser.add_argument(
38+
"--states",
39+
action="store_true",
40+
help="Include states in the output",
41+
)
3742
parser.add_argument(
3843
"--start",
3944
type=datetime.fromisoformat,
@@ -66,6 +71,7 @@ def main() -> None:
6671
args.start,
6772
args.end,
6873
args.resolution,
74+
states=args.states,
6975
service_address=args.url,
7076
key=args.key,
7177
fmt=args.format,
@@ -81,6 +87,7 @@ async def run(
8187
start_dt: datetime,
8288
end_dt: datetime,
8389
resolution: int,
90+
states: bool,
8491
service_address: str,
8592
key: str,
8693
fmt: str,
@@ -94,6 +101,7 @@ async def run(
94101
start_dt: start datetime
95102
end_dt: end datetime
96103
resolution: resampling resolution in sec
104+
states: include states in the output
97105
service_address: service address
98106
key: API key
99107
fmt: output format
@@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]:
120128
start_dt=start_dt,
121129
end_dt=end_dt,
122130
resolution=resolution,
131+
include_states=states,
123132
)
124133

125134
if fmt == "iter":

src/frequenz/client/reporting/_client.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""Client for requests to the Reporting API."""
55

66
from collections import namedtuple
7-
from collections.abc import AsyncIterator, Iterator
7+
from collections.abc import AsyncIterator, Iterable, Iterator
88
from dataclasses import dataclass
99
from datetime import datetime
1010
from typing import cast
@@ -15,6 +15,7 @@
1515
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
1616
MicrogridComponentIDs as PBMicrogridComponentIDs,
1717
)
18+
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
1819
from frequenz.api.reporting.v1.reporting_pb2 import (
1920
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
2021
)
@@ -56,7 +57,10 @@ def is_empty(self) -> bool:
5657
"""
5758
if not self._data_pb.components:
5859
return True
59-
if not self._data_pb.components[0].metric_samples:
60+
if (
61+
not self._data_pb.components[0].metric_samples
62+
and not self._data_pb.components[0].states
63+
):
6064
return True
6165
return False
6266

@@ -94,6 +98,26 @@ def __iter__(self) -> Iterator[MetricSample]:
9498
metric=met,
9599
value=value,
96100
)
101+
for state in cdata.states:
102+
ts = state.sampled_at.ToDatetime()
103+
for name, category in {
104+
"state": state.states,
105+
"warning": state.warnings,
106+
"error": state.errors,
107+
}.items():
108+
# Skip if the category is not present
109+
if not isinstance(category, Iterable):
110+
continue
111+
# Each category can have multiple states
112+
# that are provided as individual samples
113+
for s in category:
114+
yield MetricSample(
115+
timestamp=ts,
116+
microgrid_id=mid,
117+
component_id=cid,
118+
metric=name,
119+
value=s,
120+
)
97121

98122

99123
class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]):
@@ -120,6 +144,7 @@ async def list_single_component_data(
120144
start_dt: datetime,
121145
end_dt: datetime,
122146
resolution: int | None,
147+
include_states: bool = False,
123148
) -> AsyncIterator[MetricSample]:
124149
"""Iterate over the data for a single metric.
125150
@@ -130,6 +155,7 @@ async def list_single_component_data(
130155
start_dt: The start date and time.
131156
end_dt: The end date and time.
132157
resolution: The resampling resolution for the data, represented in seconds.
158+
include_states: Whether to include the state data.
133159
134160
Yields:
135161
A named tuple with the following fields:
@@ -142,6 +168,7 @@ async def list_single_component_data(
142168
start_dt=start_dt,
143169
end_dt=end_dt,
144170
resolution=resolution,
171+
include_states=include_states,
145172
):
146173
for entry in batch:
147174
yield entry
@@ -155,6 +182,7 @@ async def list_microgrid_components_data(
155182
start_dt: datetime,
156183
end_dt: datetime,
157184
resolution: int | None,
185+
include_states: bool = False,
158186
) -> AsyncIterator[MetricSample]:
159187
"""Iterate over the data for multiple microgrids and components.
160188
@@ -165,6 +193,7 @@ async def list_microgrid_components_data(
165193
start_dt: The start date and time.
166194
end_dt: The end date and time.
167195
resolution: The resampling resolution for the data, represented in seconds.
196+
include_states: Whether to include the state data.
168197
169198
Yields:
170199
A named tuple with the following fields:
@@ -180,11 +209,13 @@ async def list_microgrid_components_data(
180209
start_dt=start_dt,
181210
end_dt=end_dt,
182211
resolution=resolution,
212+
include_states=include_states,
183213
):
184214
for entry in batch:
185215
yield entry
186216

187217
# pylint: disable=too-many-arguments
218+
# pylint: disable=too-many-locals
188219
async def _list_microgrid_components_data_batch(
189220
self,
190221
*,
@@ -193,6 +224,7 @@ async def _list_microgrid_components_data_batch(
193224
start_dt: datetime,
194225
end_dt: datetime,
195226
resolution: int | None,
227+
include_states: bool = False,
196228
) -> AsyncIterator[ComponentsDataBatch]:
197229
"""Iterate over the component data batches in the stream.
198230
@@ -205,6 +237,7 @@ async def _list_microgrid_components_data_batch(
205237
start_dt: The start date and time.
206238
end_dt: The end date and time.
207239
resolution: The resampling resolution for the data, represented in seconds.
240+
include_states: Whether to include the state data.
208241
209242
Yields:
210243
A ComponentsDataBatch object of microgrid components data.
@@ -224,17 +257,27 @@ def dt2ts(dt: datetime) -> PBTimestamp:
224257
end=dt2ts(end_dt),
225258
)
226259

227-
list_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
260+
incl_states = (
261+
PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE
262+
if include_states
263+
else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE
264+
)
265+
include_options = PBIncludeOptions(
266+
states=incl_states,
267+
)
268+
269+
stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
228270
time_filter=time_filter,
229271
resampling_options=PBResamplingOptions(resolution=resolution),
272+
include_options=include_options,
230273
)
231274

232275
metrics_pb = [metric.to_proto() for metric in metrics]
233276

234277
request = PBReceiveMicrogridComponentsDataStreamRequest(
235278
microgrid_components=microgrid_components_pb,
236279
metrics=metrics_pb,
237-
filter=list_filter,
280+
filter=stream_filter,
238281
)
239282

240283
try:

0 commit comments

Comments
 (0)