Skip to content

Commit 715a6b8

Browse files
committed
Move type definitions to _types.py
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent dbe823f commit 715a6b8

File tree

4 files changed

+174
-143
lines changed

4 files changed

+174
-143
lines changed

src/frequenz/client/reporting/_client.py

Lines changed: 5 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -53,148 +53,12 @@
5353
from frequenz.client.common.metric import Metric
5454
from google.protobuf.timestamp_pb2 import Timestamp as PBTimestamp
5555

56-
MetricSample = namedtuple(
57-
"MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"]
56+
from ._types import (
57+
AggregatedMetric,
58+
ComponentsDataBatch,
59+
MetricSample,
60+
SensorsDataBatch,
5861
)
59-
"""Type for a sample of a time series incl. metric type, microgrid and component ID
60-
61-
A named tuple was chosen to allow safe access to the fields while keeping the
62-
simplicity of a tuple. This data type can be easily used to create a numpy array
63-
or a pandas DataFrame.
64-
"""
65-
66-
67-
@dataclass(frozen=True)
68-
class GenericDataBatch:
69-
"""Base class for batches of microgrid data (components or sensors).
70-
71-
This class serves as a base for handling batches of data related to microgrid
72-
components or sensors. It manages the received protocol buffer (PB) data,
73-
provides access to relevant items via specific attributes, and includes
74-
functionality to work with bounds if applicable.
75-
"""
76-
77-
_data_pb: Any
78-
id_attr: str
79-
items_attr: str
80-
has_bounds: bool = False
81-
82-
def is_empty(self) -> bool:
83-
"""Check if the batch contains valid data.
84-
85-
Returns:
86-
True if the batch contains no valid data.
87-
"""
88-
items = getattr(self._data_pb, self.items_attr, [])
89-
if not items:
90-
return True
91-
for item in items:
92-
if not getattr(item, "metric_samples", []) and not getattr(
93-
item, "states", []
94-
):
95-
return True
96-
return False
97-
98-
def __iter__(self) -> Iterator[MetricSample]:
99-
"""Get generator that iterates over all values in the batch.
100-
101-
Note: So far only `SimpleMetricSample` in the `MetricSampleVariant`
102-
message is supported.
103-
104-
105-
Yields:
106-
A named tuple with the following fields:
107-
* timestamp: The timestamp of the metric sample.
108-
* microgrid_id: The microgrid ID.
109-
* component_id: The component ID.
110-
* metric: The metric name.
111-
* value: The metric value.
112-
"""
113-
mid = self._data_pb.microgrid_id
114-
items = getattr(self._data_pb, self.items_attr)
115-
116-
for item in items:
117-
cid = getattr(item, self.id_attr)
118-
for sample in getattr(item, "metric_samples", []):
119-
ts = sample.sampled_at.ToDatetime().replace(tzinfo=timezone.utc)
120-
met = Metric.from_proto(sample.metric).name
121-
value = (
122-
sample.value.simple_metric.value
123-
if sample.value.HasField("simple_metric")
124-
else None
125-
)
126-
yield MetricSample(ts, mid, cid, met, value)
127-
128-
if self.has_bounds:
129-
for i, bound in enumerate(sample.bounds):
130-
if bound.lower:
131-
yield MetricSample(
132-
ts, mid, cid, f"{met}_bound_{i}_lower", bound.lower
133-
)
134-
if bound.upper:
135-
yield MetricSample(
136-
ts, mid, cid, f"{met}_bound_{i}_upper", bound.upper
137-
)
138-
139-
for state in getattr(item, "states", []):
140-
ts = state.sampled_at.ToDatetime().replace(tzinfo=timezone.utc)
141-
for name, category in {
142-
"state": getattr(state, "states", []),
143-
"warning": getattr(state, "warnings", []),
144-
"error": getattr(state, "errors", []),
145-
}.items():
146-
if not isinstance(category, abc.Iterable):
147-
continue
148-
for s in category:
149-
yield MetricSample(ts, mid, cid, name, s)
150-
151-
152-
@dataclass(frozen=True)
153-
class ComponentsDataBatch(GenericDataBatch):
154-
"""Batch of microgrid components data."""
155-
156-
def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse):
157-
"""Initialize the ComponentsDataBatch.
158-
159-
Args:
160-
data_pb: The underlying protobuf message.
161-
"""
162-
super().__init__(
163-
data_pb, id_attr="component_id", items_attr="components", has_bounds=True
164-
)
165-
166-
167-
@dataclass(frozen=True)
168-
class SensorsDataBatch(GenericDataBatch):
169-
"""Batch of microgrid sensors data."""
170-
171-
def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse):
172-
"""Initialize the SensorsDataBatch.
173-
174-
Args:
175-
data_pb: The underlying protobuf message.
176-
"""
177-
super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors")
178-
179-
180-
@dataclass(frozen=True)
181-
class AggregatedMetric:
182-
"""An aggregated metric sample returned by the Reporting service."""
183-
184-
_data_pb: PBAggregatedStreamResponse
185-
"""The underlying protobuf message."""
186-
187-
def sample(self) -> MetricSample:
188-
"""Return the aggregated metric sample."""
189-
return MetricSample(
190-
timestamp=self._data_pb.sample.sampled_at.ToDatetime().replace(
191-
tzinfo=timezone.utc
192-
),
193-
microgrid_id=self._data_pb.aggregation_config.microgrid_id,
194-
component_id=self._data_pb.aggregation_config.aggregation_formula,
195-
metric=self._data_pb.aggregation_config.metric,
196-
value=self._data_pb.sample.sample.value,
197-
)
19862

19963

20064
class ReportingApiClient(BaseApiClient[ReportingStub]):
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Types for the Reporting API client."""
5+
6+
from collections import namedtuple
7+
from collections.abc import Iterable, Iterator
8+
from dataclasses import dataclass
9+
from datetime import timezone
10+
from typing import Any
11+
12+
# pylint: disable=no-name-in-module
13+
from frequenz.api.reporting.v1.reporting_pb2 import (
14+
ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse,
15+
)
16+
from frequenz.api.reporting.v1.reporting_pb2 import (
17+
ReceiveMicrogridComponentsDataStreamResponse as PBReceiveMicrogridComponentsDataStreamResponse,
18+
)
19+
from frequenz.api.reporting.v1.reporting_pb2 import (
20+
ReceiveMicrogridSensorsDataStreamResponse as PBReceiveMicrogridSensorsDataStreamResponse,
21+
)
22+
23+
# pylint: enable=no-name-in-module
24+
from frequenz.client.common.metric import Metric
25+
26+
MetricSample = namedtuple(
27+
"MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"]
28+
)
29+
"""Type for a sample of a time series incl. metric type, microgrid and component ID
30+
31+
A named tuple was chosen to allow safe access to the fields while keeping the
32+
simplicity of a tuple. This data type can be easily used to create a numpy array
33+
or a pandas DataFrame.
34+
"""
35+
36+
37+
@dataclass(frozen=True)
38+
class GenericDataBatch:
39+
"""Base class for batches of microgrid data (components or sensors).
40+
41+
This class serves as a base for handling batches of data related to microgrid
42+
components or sensors. It manages the received protocol buffer (PB) data,
43+
provides access to relevant items via specific attributes, and includes
44+
functionality to work with bounds if applicable.
45+
"""
46+
47+
_data_pb: Any
48+
id_attr: str
49+
items_attr: str
50+
has_bounds: bool = False
51+
52+
def is_empty(self) -> bool:
53+
"""Check if the batch contains valid data.
54+
55+
Returns:
56+
True if the batch contains no valid data.
57+
"""
58+
items = getattr(self._data_pb, self.items_attr, [])
59+
if not items:
60+
return True
61+
for item in items:
62+
if not getattr(item, "metric_samples", []) and not getattr(
63+
item, "states", []
64+
):
65+
return True
66+
return False
67+
68+
def __iter__(self) -> Iterator[MetricSample]:
69+
"""Get generator that iterates over all values in the batch.
70+
71+
Note: So far only `SimpleMetricSample` in the `MetricSampleVariant`
72+
message is supported.
73+
74+
75+
Yields:
76+
A named tuple with the following fields:
77+
* timestamp: The timestamp of the metric sample.
78+
* microgrid_id: The microgrid ID.
79+
* component_id: The component ID.
80+
* metric: The metric name.
81+
* value: The metric value.
82+
"""
83+
mid = self._data_pb.microgrid_id
84+
items = getattr(self._data_pb, self.items_attr)
85+
86+
for item in items:
87+
cid = getattr(item, self.id_attr)
88+
for sample in getattr(item, "metric_samples", []):
89+
ts = sample.sampled_at.ToDatetime().replace(tzinfo=timezone.utc)
90+
met = Metric.from_proto(sample.metric).name
91+
value = (
92+
sample.value.simple_metric.value
93+
if sample.value.HasField("simple_metric")
94+
else None
95+
)
96+
yield MetricSample(ts, mid, cid, met, value)
97+
98+
if self.has_bounds:
99+
for i, bound in enumerate(sample.bounds):
100+
if bound.lower:
101+
yield MetricSample(
102+
ts, mid, cid, f"{met}_bound_{i}_lower", bound.lower
103+
)
104+
if bound.upper:
105+
yield MetricSample(
106+
ts, mid, cid, f"{met}_bound_{i}_upper", bound.upper
107+
)
108+
109+
for state in getattr(item, "states", []):
110+
ts = state.sampled_at.ToDatetime().replace(tzinfo=timezone.utc)
111+
for name, category in {
112+
"state": getattr(state, "states", []),
113+
"warning": getattr(state, "warnings", []),
114+
"error": getattr(state, "errors", []),
115+
}.items():
116+
if not isinstance(category, Iterable):
117+
continue
118+
for s in category:
119+
yield MetricSample(ts, mid, cid, name, s)
120+
121+
122+
@dataclass(frozen=True)
123+
class ComponentsDataBatch(GenericDataBatch):
124+
"""Batch of microgrid components data."""
125+
126+
def __init__(self, data_pb: PBReceiveMicrogridComponentsDataStreamResponse):
127+
"""Initialize the ComponentsDataBatch.
128+
129+
Args:
130+
data_pb: The underlying protobuf message.
131+
"""
132+
super().__init__(
133+
data_pb, id_attr="component_id", items_attr="components", has_bounds=True
134+
)
135+
136+
137+
@dataclass(frozen=True)
138+
class SensorsDataBatch(GenericDataBatch):
139+
"""Batch of microgrid sensors data."""
140+
141+
def __init__(self, data_pb: PBReceiveMicrogridSensorsDataStreamResponse):
142+
"""Initialize the SensorsDataBatch.
143+
144+
Args:
145+
data_pb: The underlying protobuf message.
146+
"""
147+
super().__init__(data_pb, id_attr="sensor_id", items_attr="sensors")
148+
149+
150+
@dataclass(frozen=True)
151+
class AggregatedMetric:
152+
"""An aggregated metric sample returned by the Reporting service."""
153+
154+
_data_pb: PBAggregatedStreamResponse
155+
"""The underlying protobuf message."""
156+
157+
def sample(self) -> MetricSample:
158+
"""Return the aggregated metric sample."""
159+
return MetricSample(
160+
timestamp=self._data_pb.sample.sampled_at.ToDatetime().replace(
161+
tzinfo=timezone.utc
162+
),
163+
microgrid_id=self._data_pb.aggregation_config.microgrid_id,
164+
component_id=self._data_pb.aggregation_config.aggregation_formula,
165+
metric=self._data_pb.aggregation_config.metric,
166+
value=self._data_pb.sample.sample.value,
167+
)

src/frequenz/client/reporting/cli/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from frequenz.client.common.metric import Metric
1313

1414
from frequenz.client.reporting import ReportingApiClient
15-
from frequenz.client.reporting._client import MetricSample
15+
from frequenz.client.reporting._types import MetricSample
1616

1717

1818
def main() -> None:

tests/test_client_reporting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from frequenz.client.base.client import BaseApiClient
1111

1212
from frequenz.client.reporting import ReportingApiClient
13-
from frequenz.client.reporting._client import ComponentsDataBatch
13+
from frequenz.client.reporting._types import ComponentsDataBatch
1414

1515

1616
@pytest.mark.asyncio

0 commit comments

Comments
 (0)