Skip to content

Commit 75f0f48

Browse files
Add metric fetcher for the battery pool
Metric fetcher defines how to subscribe for the component metrics and receive the component metrics data. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 1ad4f71 commit 75f0f48

File tree

3 files changed

+524
-0
lines changed

3 files changed

+524
-0
lines changed

src/frequenz/sdk/_internal/_constants.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,14 @@
88

99
RECEIVER_MAX_SIZE = 50
1010
"""Default buffer size of the receiver."""
11+
12+
WAIT_FOR_COMPONENT_DATA_SEC: float = 2
13+
"""Delay the start of the application to wait for the data."""
14+
15+
MAX_BATTERY_DATA_AGE_SEC: float = 2
16+
"""Max time difference for the battery or inverter data to be considered as reliable.
17+
18+
If battery or inverter stopped sending data, then this is the maximum time when its
19+
last message should be considered as valid. After that time, component data
20+
should not be used.
21+
"""
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Module to define how to subscribe and fetch component data."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import logging
10+
import math
11+
from abc import ABC, abstractmethod
12+
from datetime import datetime, timezone
13+
from typing import Any, Generic, Iterable, Optional, Set, TypeVar
14+
15+
from frequenz.channels import ChannelClosedError, Receiver
16+
17+
from ..._internal._constants import MAX_BATTERY_DATA_AGE_SEC
18+
from ..._internal.asyncio import AsyncConstructible
19+
from ...actor._data_sourcing.microgrid_api_source import (
20+
_BatteryDataMethods,
21+
_InverterDataMethods,
22+
)
23+
from ...microgrid import get as get_microgrid
24+
from ...microgrid.component import (
25+
BatteryData,
26+
ComponentCategory,
27+
ComponentData,
28+
ComponentMetricId,
29+
InverterData,
30+
)
31+
from ._component_metrics import ComponentMetricsData
32+
33+
_logger = logging.getLogger(__name__)
34+
35+
T = TypeVar("T", bound=ComponentData)
36+
37+
38+
class ComponentMetricFetcher(AsyncConstructible, ABC):
39+
"""Define how to subscribe for and fetch the component metrics data."""
40+
41+
_component_id: int
42+
_metrics: Iterable[ComponentMetricId]
43+
44+
@classmethod
45+
async def async_new(
46+
cls, component_id: int, metrics: Iterable[ComponentMetricId]
47+
) -> Self: # type: ignore[name-defined] # pylint: disable=undefined-variable
48+
"""Create an instance of this class.
49+
50+
Subscribe for the given component metrics and return them if method
51+
`fetch_next` is called.
52+
53+
Args:
54+
component_id: component id
55+
metrics: metrics that should be fetched from this component.
56+
57+
Returns:
58+
This class instance.
59+
"""
60+
self: ComponentMetricFetcher = ComponentMetricFetcher.__new__(cls)
61+
self._component_id = component_id
62+
self._metrics = metrics
63+
return self
64+
65+
@abstractmethod
66+
async def fetch_next(self) -> Optional[ComponentMetricsData]:
67+
"""Fetch metrics for this component."""
68+
69+
70+
class LatestMetricsFetcher(ComponentMetricFetcher, Generic[T], ABC):
71+
"""Subscribe for the latest component data and extract the needed metrics."""
72+
73+
_receiver: Receiver[T]
74+
_max_waiting_time: float
75+
76+
@classmethod
77+
async def async_new(
78+
cls,
79+
component_id: int,
80+
metrics: Iterable[ComponentMetricId],
81+
) -> Self: # type: ignore[name-defined] # pylint: disable=undefined-variable:
82+
"""Create instance of this class.
83+
84+
Subscribe for the requested component data and fetch only the latest component
85+
metrics.
86+
87+
Args:
88+
component_id: component id
89+
metrics: metrics
90+
91+
Raises:
92+
ValueError: If any requested metric id is not supported.
93+
94+
Returns:
95+
This class instance
96+
"""
97+
self: LatestMetricsFetcher[T] = await super().async_new(component_id, metrics)
98+
99+
for metric in metrics:
100+
# pylint: disable=protected-access
101+
if metric not in self._supported_metrics():
102+
category = self._component_category()
103+
raise ValueError(f"Metric {metric} not supported for {category}")
104+
105+
# pylint: disable=protected-access
106+
self._receiver = await self._subscribe()
107+
self._max_waiting_time = MAX_BATTERY_DATA_AGE_SEC
108+
return self
109+
110+
async def fetch_next(self) -> Optional[ComponentMetricsData]:
111+
"""Fetch the latest component metrics.
112+
113+
Returns:
114+
Component metrics data.
115+
None if the channel was closed and fetching next element is impossible.
116+
"""
117+
try:
118+
data = await asyncio.wait_for(
119+
self._receiver.receive(), self._max_waiting_time
120+
)
121+
122+
except ChannelClosedError:
123+
_logger.exception(
124+
"Channel for component %d was closed.", self._component_id
125+
)
126+
return None
127+
except asyncio.TimeoutError:
128+
# Next time wait infinitely until we receive any message.
129+
_logger.debug("Component %d stopped sending data.", self._component_id)
130+
return ComponentMetricsData(
131+
self._component_id, datetime.now(tz=timezone.utc), {}
132+
)
133+
134+
self._max_waiting_time = MAX_BATTERY_DATA_AGE_SEC
135+
metrics = {}
136+
for mid in self._metrics:
137+
value = self._extract_metric(data, mid)
138+
# There is no guarantee that all fields in component message are populated
139+
if not math.isnan(value):
140+
metrics[mid] = value
141+
142+
return ComponentMetricsData(self._component_id, data.timestamp, metrics)
143+
144+
@abstractmethod
145+
def _extract_metric(self, data: T, mid: ComponentMetricId) -> float:
146+
...
147+
148+
@abstractmethod
149+
def _supported_metrics(self) -> Set[ComponentMetricId]:
150+
...
151+
152+
@abstractmethod
153+
def _component_category(self) -> ComponentCategory:
154+
...
155+
156+
@abstractmethod
157+
async def _subscribe(self) -> Receiver[Any]:
158+
"""Subscribe for this component data.
159+
160+
Size of the receiver buffer should should be 1 to make sure we receive only
161+
the latest component data.
162+
163+
Returns:
164+
Receiver for this component metrics.
165+
"""
166+
167+
168+
class LatestBatteryMetricsFetcher(LatestMetricsFetcher[BatteryData]):
169+
"""Subscribe for the latest battery data using MicrogridApiClient."""
170+
171+
@classmethod
172+
async def async_new(
173+
cls,
174+
component_id: int,
175+
metrics: Iterable[ComponentMetricId],
176+
) -> LatestBatteryMetricsFetcher:
177+
"""Create instance of this class.
178+
179+
Subscribe for the requested component data and fetch only the latest component
180+
metrics.
181+
182+
Args:
183+
component_id: component id
184+
metrics: metrics
185+
186+
Raises:
187+
ValueError: If any requested metric id is not supported.
188+
189+
Returns:
190+
This class instance
191+
"""
192+
self: LatestBatteryMetricsFetcher = await super().async_new(
193+
component_id, metrics
194+
)
195+
return self
196+
197+
def _supported_metrics(self) -> Set[ComponentMetricId]:
198+
return set(_BatteryDataMethods.keys())
199+
200+
def _extract_metric(self, data: BatteryData, mid: ComponentMetricId) -> float:
201+
return _BatteryDataMethods[mid](data)
202+
203+
async def _subscribe(self) -> Receiver[BatteryData]:
204+
"""Subscribe for this component data.
205+
206+
Size of the receiver buffer should should be 1 to make sure we receive only
207+
the latest component data.
208+
209+
Returns:
210+
Receiver for this component metrics.
211+
"""
212+
api = get_microgrid().api_client
213+
return await api.battery_data(self._component_id, maxsize=1)
214+
215+
def _component_category(self) -> ComponentCategory:
216+
return ComponentCategory.BATTERY
217+
218+
219+
class LatestInverterMetricsFetcher(LatestMetricsFetcher[InverterData]):
220+
"""Subscribe for the latest inverter data using MicrogridApiClient."""
221+
222+
@classmethod
223+
async def async_new(
224+
cls,
225+
component_id: int,
226+
metrics: Iterable[ComponentMetricId],
227+
) -> LatestInverterMetricsFetcher:
228+
"""Create instance of this class.
229+
230+
Subscribe for the requested component data and fetch only the latest component
231+
metrics.
232+
233+
Args:
234+
component_id: component id
235+
metrics: metrics
236+
237+
Raises:
238+
ValueError: If any requested metric id is not supported.
239+
240+
Returns:
241+
This class instance
242+
"""
243+
self: LatestInverterMetricsFetcher = await super().async_new(
244+
component_id, metrics
245+
)
246+
return self
247+
248+
def _supported_metrics(self) -> Set[ComponentMetricId]:
249+
return set(_InverterDataMethods.keys())
250+
251+
def _extract_metric(self, data: InverterData, mid: ComponentMetricId) -> float:
252+
return _InverterDataMethods[mid](data)
253+
254+
async def _subscribe(self) -> Receiver[InverterData]:
255+
"""Subscribe for this component data.
256+
257+
Size of the receiver buffer should should be 1 to make sure we receive only
258+
the latest component data.
259+
260+
Returns:
261+
Receiver for this component metrics.
262+
"""
263+
api = get_microgrid().api_client
264+
return await api.inverter_data(self._component_id, maxsize=1)
265+
266+
def _component_category(self) -> ComponentCategory:
267+
return ComponentCategory.INVERTER

0 commit comments

Comments
 (0)