Skip to content

Commit 9c0b590

Browse files
Add a formula generator for SoC in the LogicalMeter (#137)
This formula calculates and streams the average SoC of the active batteries. This is temporarily part of the `LogicalMeter` and will be moved to the `BatteryPool` within the next few releases.
2 parents b972541 + de2457b commit 9c0b590

File tree

9 files changed

+281
-14
lines changed

9 files changed

+281
-14
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Formula Engine Exceptions."""
5+
6+
7+
class FormulaEngineError(Exception):
8+
"""An error occured while fetching metrics or applying the formula on them."""

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77

88
import asyncio
99
from datetime import datetime
10-
from typing import Dict, List, Optional, Set
10+
from typing import Dict, List, Optional, Set, Tuple
1111

1212
from frequenz.channels import Receiver
1313

1414
from .. import Sample
1515
from ._formula_steps import (
1616
Adder,
17+
Averager,
1718
Divider,
1819
FormulaStep,
1920
MetricFetcher,
@@ -218,6 +219,20 @@ def push_metric(
218219
)
219220
self._steps.append(fetcher)
220221

222+
def push_average(self, metrics: List[Tuple[str, Receiver[Sample], bool]]) -> None:
223+
"""Push an average calculator into the engine.
224+
225+
Args:
226+
metrics: list of arguments to pass to each `MetricFetcher`.
227+
"""
228+
fetchers: List[MetricFetcher] = []
229+
for metric in metrics:
230+
fetcher = self._metric_fetchers.setdefault(
231+
metric[0], MetricFetcher(*metric)
232+
)
233+
fetchers.append(fetcher)
234+
self._steps.append(Averager(fetchers))
235+
221236
def build(self) -> FormulaEngine:
222237
"""Finalize and build the formula engine.
223238

src/frequenz/sdk/timeseries/logical_meter/_formula_generators/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""Generators for formulas from component graphs."""
55

66
from ._battery_power_formula import BatteryPowerFormula
7+
from ._battery_soc_formula import BatterySoCFormula
78
from ._formula_generator import (
89
ComponentNotFound,
910
FormulaGenerationError,
@@ -22,6 +23,7 @@
2223
#
2324
"GridPowerFormula",
2425
"BatteryPowerFormula",
26+
"BatterySoCFormula",
2527
"PVPowerFormula",
2628
#
2729
# Exceptions
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Formula generator from component graph for Grid Power."""
5+
6+
import asyncio
7+
8+
from frequenz.channels import Receiver
9+
10+
from .....sdk import microgrid
11+
from ....microgrid.component import ComponentCategory, ComponentMetricId, InverterType
12+
from ... import Sample
13+
from .._formula_engine import FormulaEngine
14+
from ._formula_generator import (
15+
ComponentNotFound,
16+
FormulaGenerationError,
17+
FormulaGenerator,
18+
)
19+
20+
21+
class _ActiveBatteryReceiver(Receiver[Sample]):
22+
"""Returns a Sample from a battery, only if the attached inverter is active."""
23+
24+
def __init__(self, inv_recv: Receiver[Sample], bat_recv: Receiver[Sample]):
25+
self._inv_recv = inv_recv
26+
self._bat_recv = bat_recv
27+
28+
async def ready(self) -> None:
29+
"""Wait until the next Sample is ready."""
30+
await asyncio.gather(self._inv_recv.ready(), self._bat_recv.ready())
31+
32+
def consume(self) -> Sample:
33+
"""Return the next Sample.
34+
35+
Returns:
36+
the next Sample.
37+
"""
38+
inv = self._inv_recv.consume()
39+
bat = self._bat_recv.consume()
40+
if inv.value is None:
41+
return inv
42+
return bat
43+
44+
45+
class BatterySoCFormula(FormulaGenerator):
46+
"""Creates a formula engine from the component graph for calculating battery soc."""
47+
48+
async def generate(
49+
self,
50+
) -> FormulaEngine:
51+
"""Make a formula for the average battery soc of a microgrid.
52+
53+
If there's no data coming from an inverter or a battery, the corresponding
54+
battery will be excluded from the calculation.
55+
56+
Returns:
57+
A formula engine that will calculate average battery soc values.
58+
59+
Raises:
60+
ComponentNotFound: if there are no batteries in the component graph, or if
61+
they don't have an inverter as a predecessor.
62+
FormulaGenerationError: If a battery has a non-inverter predecessor
63+
in the component graph.
64+
"""
65+
builder = self._get_builder(ComponentMetricId.ACTIVE_POWER)
66+
component_graph = microgrid.get().component_graph
67+
inv_bat_pairs = {
68+
comp: component_graph.successors(comp.component_id)
69+
for comp in component_graph.components()
70+
if comp.category == ComponentCategory.INVERTER
71+
and comp.type == InverterType.BATTERY
72+
}
73+
74+
if not inv_bat_pairs:
75+
raise ComponentNotFound(
76+
"Unable to find any battery inverters in the component graph."
77+
)
78+
79+
soc_streams = []
80+
for inv, bats in inv_bat_pairs.items():
81+
bat = list(bats)[0]
82+
if len(bats) != 1:
83+
raise FormulaGenerationError(
84+
f"Expected exactly one battery for inverter {inv}, got {bats}"
85+
)
86+
87+
# pylint: disable=protected-access
88+
soc_recv = _ActiveBatteryReceiver(
89+
await builder._get_resampled_receiver(
90+
inv.component_id, ComponentMetricId.ACTIVE_POWER
91+
),
92+
await builder._get_resampled_receiver(
93+
bat.component_id, ComponentMetricId.SOC
94+
),
95+
)
96+
# pylint: enable=protected-access
97+
98+
soc_streams.append((f"{bat.component_id}", soc_recv, False))
99+
100+
builder.push_average(soc_streams)
101+
102+
return builder.build()

src/frequenz/sdk/timeseries/logical_meter/_formula_steps.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33

44
"""Steps for building formula engines with."""
55

6+
from __future__ import annotations
7+
68
from abc import ABC, abstractmethod
79
from typing import List, Optional
810

911
from frequenz.channels import Receiver
1012

1113
from .. import Sample
14+
from ._exceptions import FormulaEngineError
1215

1316

1417
class FormulaStep(ABC):
@@ -157,6 +160,54 @@ def apply(self, _: List[Optional[float]]) -> None:
157160
"""No-op."""
158161

159162

163+
class Averager(FormulaStep):
164+
"""A formula step for calculating average."""
165+
166+
def __init__(self, fetchers: List[MetricFetcher]) -> None:
167+
"""Create an `Averager` instance.
168+
169+
Args:
170+
fetchers: MetricFetchers for the metrics that need to be averaged.
171+
"""
172+
self._fetchers = fetchers
173+
174+
def __repr__(self) -> str:
175+
"""Return a string representation of the step.
176+
177+
Returns:
178+
A string representation of the step.
179+
"""
180+
return f"avg({', '.join(repr(f) for f in self._fetchers)})"
181+
182+
def apply(self, eval_stack: List[Optional[float]]) -> None:
183+
"""Calculate average of given metrics, push the average to the eval_stack.
184+
185+
Args:
186+
eval_stack: An evaluation stack, to append the calculated average to.
187+
188+
Raises:
189+
FormulaEngineError: when metric fetchers are unable to fetch values.
190+
"""
191+
value_count = 0
192+
total = 0.0
193+
for fetcher in self._fetchers:
194+
next_val = fetcher.value
195+
if next_val is None:
196+
raise FormulaEngineError(
197+
"Unable to fetch a value from the resampling actor."
198+
)
199+
if next_val.value is None:
200+
continue
201+
value_count += 1
202+
total += next_val.value
203+
if value_count == 0:
204+
avg = 0.0
205+
else:
206+
avg = total / value_count
207+
208+
eval_stack.append(avg)
209+
210+
160211
class MetricFetcher(FormulaStep):
161212
"""A formula step for fetching a value from a metric Receiver."""
162213

@@ -186,6 +237,15 @@ async def fetch_next(self) -> Optional[Sample]:
186237
self._next_value = await self._stream.receive()
187238
return self._next_value
188239

240+
@property
241+
def value(self) -> Optional[Sample]:
242+
"""Get the next value in the stream.
243+
244+
Returns:
245+
Next value in the stream.
246+
"""
247+
return self._next_value
248+
189249
def __repr__(self) -> str:
190250
"""Return a string representation of the step.
191251

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from ._formula_engine import FormulaEngine
2020
from ._formula_generators import (
2121
BatteryPowerFormula,
22+
BatterySoCFormula,
2223
FormulaGenerator,
2324
GridPowerFormula,
2425
PVPowerFormula,
@@ -187,3 +188,14 @@ async def pv_power(self) -> Receiver[Sample]:
187188
A *new* receiver that will stream PV power production values.
188189
"""
189190
return await self._get_formula_stream("pv_power", PVPowerFormula)
191+
192+
async def _soc(self) -> Receiver[Sample]:
193+
"""Fetch the SoC of the active batteries in the microgrid.
194+
195+
NOTE: This method is part of the logical meter only temporarily, and will get
196+
moved to the `BatteryPool` within the next few releases.
197+
198+
Returns:
199+
A *new* receiver that will stream average SoC of active batteries.
200+
"""
201+
return await self._get_formula_stream("soc", BatterySoCFormula)

src/frequenz/sdk/timeseries/logical_meter/_resampled_formula_builder.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,19 @@ def __init__(
4040
self._metric_id = metric_id
4141
super().__init__()
4242

43-
async def _get_resampled_receiver(self, component_id: int) -> Receiver[Sample]:
43+
async def _get_resampled_receiver(
44+
self, component_id: int, metric_id: ComponentMetricId
45+
) -> Receiver[Sample]:
4446
"""Get a receiver with the resampled data for the given component id.
4547
46-
This receiver would contain data for the `metric_id` specified when creating the
47-
`ResampledFormulaBuilder` instance.
48-
4948
Args:
5049
component_id: The component id for which to get a resampled data receiver.
50+
metric_id: A metric ID to fetch for all components in this formula.
5151
5252
Returns:
5353
A receiver to stream resampled data for the given component id.
5454
"""
55-
request = ComponentMetricRequest(
56-
self._namespace, component_id, self._metric_id, None
57-
)
55+
request = ComponentMetricRequest(self._namespace, component_id, metric_id, None)
5856
await self._resampler_subscription_sender.send(request)
5957
return self._channel_registry.new_receiver(request.get_channel_name())
6058

@@ -68,7 +66,7 @@ async def push_component_metric(
6866
nones_are_zeros: Whether to treat None values from the stream as 0s. If
6967
False, the returned value will be a None.
7068
"""
71-
receiver = await self._get_resampled_receiver(component_id)
69+
receiver = await self._get_resampled_receiver(component_id, self._metric_id)
7270
self.push_metric(f"#{component_id}", receiver, nones_are_zeros)
7371

7472
async def from_string(

tests/timeseries/mock_microgrid.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
from typing import Iterator, Tuple
1010

1111
from frequenz.api.microgrid import microgrid_pb2
12-
from frequenz.api.microgrid.common_pb2 import AC, Metric
12+
from frequenz.api.microgrid.battery_pb2 import Battery
13+
from frequenz.api.microgrid.battery_pb2 import Data as BatteryData
14+
from frequenz.api.microgrid.common_pb2 import AC, Metric, MetricAggregation
1315
from frequenz.api.microgrid.inverter_pb2 import Data as InverterData
1416
from frequenz.api.microgrid.inverter_pb2 import Inverter
1517
from frequenz.api.microgrid.inverter_pb2 import Type as InverterType
@@ -120,19 +122,36 @@ def inverter_msg(value: float) -> ComponentData:
120122
),
121123
)
122124

125+
def battery_msg(value: float) -> ComponentData:
126+
timestamp = Timestamp()
127+
timestamp.GetCurrentTime()
128+
return ComponentData(
129+
id=request.id,
130+
ts=timestamp,
131+
battery=Battery(data=BatteryData(soc=MetricAggregation(avg=value))),
132+
)
133+
123134
if request.id % 10 == cls.inverter_id_suffix:
124135
next_msg = inverter_msg
125136
elif (
126137
request.id % 10 == cls.meter_id_suffix
127138
or request.id == cls.main_meter_id
128139
):
129140
next_msg = meter_msg
141+
elif request.id % 10 == cls.battery_id_suffix:
142+
next_msg = battery_msg
130143
else:
131144
raise RuntimeError(
132145
f"Component id {request.id} unsupported by MockMicrogrid"
133146
)
134147
for value in range(1, 10):
135-
yield next_msg(value=value + request.id)
148+
# for inverters with component_id > 100, send only half the messages.
149+
if request.id % 10 == cls.inverter_id_suffix:
150+
if request.id < 100 or value <= 5:
151+
yield next_msg(value=value + request.id)
152+
else:
153+
yield next_msg(value=value + request.id)
154+
136155
time.sleep(0.1)
137156

138157
mocker.patch.object(servicer, "GetComponentData", get_component_data)

0 commit comments

Comments
 (0)