Skip to content

Commit ac418b6

Browse files
committed
Add support for states in client and cli tool
States can now be requested via the client and are provided through the flat iterator. They can be identified via their category `state`, `warning` and `error`, respectively. Each individual state is provided as its own sample. Support for states is also added to the CLI tool via the `--states` flag. As of now requests for states without any metrics are not yet supported by the service. Signed-off-by: cwasicki <[email protected]>
1 parent 852843c commit ac418b6

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

src/frequenz/client/reporting/__main__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ def main() -> None:
3434
help="List of metrics to process",
3535
required=True,
3636
)
37+
parser.add_argument(
38+
"--states",
39+
action="store_true",
40+
help="Include states in the output",
41+
)
3742
parser.add_argument(
3843
"--start",
3944
type=datetime.fromisoformat,
@@ -66,6 +71,7 @@ def main() -> None:
6671
args.start,
6772
args.end,
6873
args.resolution,
74+
states=args.states,
6975
service_address=args.url,
7076
key=args.key,
7177
fmt=args.format,
@@ -81,6 +87,7 @@ async def run(
8187
start_dt: datetime,
8288
end_dt: datetime,
8389
resolution: int,
90+
states: bool,
8491
service_address: str,
8592
key: str,
8693
fmt: str,
@@ -94,6 +101,7 @@ async def run(
94101
start_dt: start datetime
95102
end_dt: end datetime
96103
resolution: resampling resolution in sec
104+
states: include states in the output
97105
service_address: service address
98106
key: API key
99107
fmt: output format
@@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]:
120128
start_dt=start_dt,
121129
end_dt=end_dt,
122130
resolution=resolution,
131+
include_states=states,
123132
)
124133

125134
if fmt == "iter":

src/frequenz/client/reporting/_client.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""Client for requests to the Reporting API."""
55

66
from collections import namedtuple
7-
from collections.abc import AsyncIterator, Iterator
7+
from collections.abc import AsyncIterator, Iterable, Iterator
88
from dataclasses import dataclass
99
from datetime import datetime
1010
from typing import cast
@@ -15,6 +15,7 @@
1515
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
1616
MicrogridComponentIDs as PBMicrogridComponentIDs,
1717
)
18+
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
1819
from frequenz.api.reporting.v1.reporting_pb2 import (
1920
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
2021
)
@@ -56,7 +57,10 @@ def is_empty(self) -> bool:
5657
"""
5758
if not self._data_pb.components:
5859
return True
59-
if not self._data_pb.components[0].metric_samples:
60+
if (
61+
not self._data_pb.components[0].metric_samples
62+
and not self._data_pb.components[0].states
63+
):
6064
return True
6165
return False
6266

@@ -94,6 +98,26 @@ def __iter__(self) -> Iterator[MetricSample]:
9498
metric=met,
9599
value=value,
96100
)
101+
for state in cdata.states:
102+
ts = state.sampled_at.ToDatetime()
103+
for name, category in {
104+
"state": state.states,
105+
"warning": state.warnings,
106+
"error": state.errors,
107+
}.items():
108+
# Skip if the category is not present
109+
if not isinstance(category, Iterable):
110+
continue
111+
# Each category can have multiple states
112+
# that are provided as individual samples
113+
for s in category:
114+
yield MetricSample(
115+
timestamp=ts,
116+
microgrid_id=mid,
117+
component_id=cid,
118+
metric=name,
119+
value=s,
120+
)
97121

98122

99123
class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]):
@@ -120,6 +144,7 @@ async def list_single_component_data(
120144
start_dt: datetime,
121145
end_dt: datetime,
122146
resolution: int | None,
147+
include_states: bool = False,
123148
) -> AsyncIterator[MetricSample]:
124149
"""Iterate over the data for a single metric.
125150
@@ -130,6 +155,7 @@ async def list_single_component_data(
130155
start_dt: The start date and time.
131156
end_dt: The end date and time.
132157
resolution: The resampling resolution for the data, represented in seconds.
158+
include_states: Whether to include the state data.
133159
134160
Yields:
135161
A named tuple with the following fields:
@@ -142,6 +168,7 @@ async def list_single_component_data(
142168
start_dt=start_dt,
143169
end_dt=end_dt,
144170
resolution=resolution,
171+
include_states=include_states,
145172
):
146173
for entry in batch:
147174
yield entry
@@ -155,6 +182,7 @@ async def list_microgrid_components_data(
155182
start_dt: datetime,
156183
end_dt: datetime,
157184
resolution: int | None,
185+
include_states: bool = False,
158186
) -> AsyncIterator[MetricSample]:
159187
"""Iterate over the data for multiple microgrids and components.
160188
@@ -165,6 +193,7 @@ async def list_microgrid_components_data(
165193
start_dt: The start date and time.
166194
end_dt: The end date and time.
167195
resolution: The resampling resolution for the data, represented in seconds.
196+
include_states: Whether to include the state data.
168197
169198
Yields:
170199
A named tuple with the following fields:
@@ -180,11 +209,13 @@ async def list_microgrid_components_data(
180209
start_dt=start_dt,
181210
end_dt=end_dt,
182211
resolution=resolution,
212+
include_states=include_states,
183213
):
184214
for entry in batch:
185215
yield entry
186216

187217
# pylint: disable=too-many-arguments
218+
# pylint: disable=too-many-locals
188219
async def _list_microgrid_components_data_batch(
189220
self,
190221
*,
@@ -193,6 +224,7 @@ async def _list_microgrid_components_data_batch(
193224
start_dt: datetime,
194225
end_dt: datetime,
195226
resolution: int | None,
227+
include_states: bool = False,
196228
) -> AsyncIterator[ComponentsDataBatch]:
197229
"""Iterate over the component data batches in the stream.
198230
@@ -205,6 +237,7 @@ async def _list_microgrid_components_data_batch(
205237
start_dt: The start date and time.
206238
end_dt: The end date and time.
207239
resolution: The resampling resolution for the data, represented in seconds.
240+
include_states: Whether to include the state data.
208241
209242
Yields:
210243
A ComponentsDataBatch object of microgrid components data.
@@ -224,9 +257,19 @@ def dt2ts(dt: datetime) -> PBTimestamp:
224257
end=dt2ts(end_dt),
225258
)
226259

260+
incl_states = (
261+
PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE
262+
if include_states
263+
else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE
264+
)
265+
include_options = PBIncludeOptions(
266+
states=incl_states,
267+
)
268+
227269
stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
228270
time_filter=time_filter,
229271
resampling_options=PBResamplingOptions(resolution=resolution),
272+
include_options=include_options,
230273
)
231274

232275
metrics_pb = [metric.to_proto() for metric in metrics]

0 commit comments

Comments
 (0)