Skip to content

Commit 8f8988e

Browse files
committed
Add LogicalMeter implementation
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 102f57a commit 8f8988e

File tree

3 files changed

+289
-1
lines changed

3 files changed

+289
-1
lines changed

src/frequenz/sdk/timeseries/logical_meter/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33

44
"""A logical meter for calculating high level metrics for a microgrid."""
55

6-
from .logical_meter import LogicalMeter
6+
from ._logical_meter import LogicalMeter
77

88
__all__ = ["LogicalMeter"]
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A logical meter for calculating high level metrics for a microgrid."""
5+
6+
import asyncio
7+
import uuid
8+
from typing import Dict, List
9+
10+
from frequenz.channels import Broadcast, Receiver, Sender
11+
12+
from ...actor import ChannelRegistry, ComponentMetricRequest
13+
from ...microgrid.component import ComponentMetricId
14+
from .._sample import Sample
15+
from ._formula_builder import FormulaBuilder
16+
from ._formula_engine import FormulaEngine
17+
18+
19+
class LogicalMeter:
20+
"""A logical meter for calculating high level metrics in a microgrid.
21+
22+
LogicalMeter can be used to run formulas on resampled component metric streams.
23+
24+
Formulas can have Component IDs that are preceeded by a pound symbol("#"), and these
25+
operators: +, -, *, /, (, ).
26+
27+
For example, the input string: "#20 + #5" is a formula for adding metrics from two
28+
components with ids 20 and 5.
29+
"""
30+
31+
def __init__(
32+
self,
33+
channel_registry: ChannelRegistry,
34+
resampler_subscription_sender: Sender[ComponentMetricRequest],
35+
) -> None:
36+
"""Create a `LogicalMeter instance`.
37+
38+
Args:
39+
channel_registry: A channel registry instance shared with the resampling
40+
actor.
41+
resampler_subscription_sender: A sender for sending metric requests to the
42+
resampling actor.
43+
"""
44+
self._channel_registry = channel_registry
45+
self._resampler_subscription_sender = resampler_subscription_sender
46+
47+
# Use a randomly generated uuid to create a unique namespace name for the local
48+
# meter to use when communicating with the resampling actor.
49+
self._namespace = f"logical-meter-{uuid.uuid4()}"
50+
51+
self._output_channels: Dict[str, Broadcast[Sample]] = {}
52+
self._tasks: List[asyncio.Task[None]] = []
53+
54+
async def _engine_from_formula_string(
55+
self, formula: str, metric_id: ComponentMetricId
56+
) -> FormulaEngine:
57+
builder = FormulaBuilder(
58+
self._namespace,
59+
self._channel_registry,
60+
self._resampler_subscription_sender,
61+
metric_id,
62+
)
63+
return await builder.from_string(formula)
64+
65+
async def _run_formula(
66+
self, formula: FormulaEngine, sender: Sender[Sample]
67+
) -> None:
68+
"""Run the formula repeatedly and send the results to a channel.
69+
70+
Args:
71+
formula: The formula to run.
72+
sender: A sender for sending the formula results to.
73+
"""
74+
while msg := await formula.apply():
75+
await sender.send(msg)
76+
77+
async def start_formula(
78+
self,
79+
formula: str,
80+
component_metric_id: ComponentMetricId,
81+
) -> Receiver[Sample]:
82+
"""Start execution of the given formula name.
83+
84+
Args:
85+
formula: formula to execute.
86+
component_metric_id: The metric ID to use when fetching receivers from the
87+
resampling actor.
88+
89+
Returns:
90+
A Receiver that streams values with the formulas applied.
91+
"""
92+
channel_key = formula + component_metric_id.value
93+
if channel_key in self._output_channels:
94+
return self._output_channels[channel_key].new_receiver()
95+
96+
formula_engine = await self._engine_from_formula_string(
97+
formula,
98+
component_metric_id,
99+
)
100+
out_chan = Broadcast[Sample](channel_key)
101+
self._output_channels[channel_key] = out_chan
102+
self._tasks.append(
103+
asyncio.create_task(
104+
self._run_formula(formula_engine, out_chan.new_sender())
105+
)
106+
)
107+
return out_chan.new_receiver()
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the logical meter."""
5+
6+
import time
7+
from typing import Iterator, Tuple
8+
9+
from frequenz.api.microgrid import microgrid_pb2
10+
from frequenz.api.microgrid.common_pb2 import AC, Metric
11+
from frequenz.api.microgrid.meter_pb2 import Data as MeterData
12+
from frequenz.api.microgrid.meter_pb2 import Meter
13+
from frequenz.api.microgrid.microgrid_pb2 import ComponentData, ComponentIdParam
14+
from frequenz.channels import Broadcast, Sender
15+
from google.protobuf.timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module
16+
from grpc.aio import grpc
17+
from pytest_mock import MockerFixture
18+
19+
from frequenz.sdk import microgrid
20+
from frequenz.sdk.actor import (
21+
ChannelRegistry,
22+
ComponentMetricRequest,
23+
ComponentMetricsResamplingActor,
24+
DataSourcingActor,
25+
)
26+
from frequenz.sdk.microgrid.component import ComponentMetricId
27+
from frequenz.sdk.timeseries.logical_meter import LogicalMeter
28+
from frequenz.sdk.timeseries.logical_meter._formula_builder import FormulaBuilder
29+
from tests.microgrid import mock_api
30+
31+
32+
class TestLogicalMeter:
33+
"""Tests for the logical meter."""
34+
35+
async def setup(
36+
self, mocker: MockerFixture
37+
) -> Tuple[Sender[ComponentMetricRequest], ChannelRegistry]:
38+
"""Initialize a mock microgrid api for a test.
39+
40+
Because we can't create a __init__ or multiple instances of the Test class, we
41+
use the `setup` method as a constructor, and call it once before each test.
42+
"""
43+
microgrid._microgrid._MICROGRID = None # pylint: disable=protected-access
44+
servicer = mock_api.MockMicrogridServicer()
45+
46+
# pylint: disable=unused-argument
47+
def get_component_data(
48+
request: ComponentIdParam, context: grpc.ServicerContext
49+
) -> Iterator[ComponentData]:
50+
"""Return an iterator for mock ComponentData."""
51+
# pylint: disable=stop-iteration-return
52+
53+
def next_msg(value: float) -> ComponentData:
54+
timestamp = Timestamp()
55+
timestamp.GetCurrentTime()
56+
return ComponentData(
57+
id=request.id,
58+
ts=timestamp,
59+
meter=Meter(
60+
data=MeterData(ac=AC(power_active=Metric(value=value)))
61+
),
62+
)
63+
64+
for value in range(1, 10):
65+
yield next_msg(value=value + request.id)
66+
time.sleep(0.1)
67+
68+
mocker.patch.object(servicer, "GetComponentData", get_component_data)
69+
70+
servicer.add_component(
71+
1, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_GRID
72+
)
73+
servicer.add_component(
74+
3, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_JUNCTION
75+
)
76+
servicer.add_component(
77+
4, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_METER
78+
)
79+
servicer.add_component(
80+
7, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_METER
81+
)
82+
servicer.add_component(
83+
8, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_INVERTER
84+
)
85+
servicer.add_component(
86+
9, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_BATTERY
87+
)
88+
89+
servicer.add_connection(1, 3)
90+
servicer.add_connection(3, 4)
91+
servicer.add_connection(3, 7)
92+
servicer.add_connection(7, 8)
93+
servicer.add_connection(8, 9)
94+
95+
# pylint: disable=attribute-defined-outside-init
96+
self.server = mock_api.MockGrpcServer(servicer, port=57891)
97+
# pylint: enable=attribute-defined-outside-init
98+
await self.server.start()
99+
100+
await microgrid.initialize("[::1]", 57891)
101+
102+
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
103+
104+
data_source_request_channel = Broadcast[ComponentMetricRequest](
105+
"Data Source Request Channel"
106+
)
107+
data_source_request_sender = data_source_request_channel.new_sender()
108+
data_source_request_receiver = data_source_request_channel.new_receiver()
109+
110+
resampling_actor_request_channel = Broadcast[ComponentMetricRequest](
111+
"Resampling Actor Request Channel"
112+
)
113+
resampling_actor_request_sender = resampling_actor_request_channel.new_sender()
114+
resampling_actor_request_receiver = (
115+
resampling_actor_request_channel.new_receiver()
116+
)
117+
118+
DataSourcingActor(
119+
request_receiver=data_source_request_receiver, registry=channel_registry
120+
)
121+
122+
ComponentMetricsResamplingActor(
123+
channel_registry=channel_registry,
124+
subscription_sender=data_source_request_sender,
125+
subscription_receiver=resampling_actor_request_receiver,
126+
resampling_period_s=0.1,
127+
)
128+
129+
return (resampling_actor_request_sender, channel_registry)
130+
131+
async def cleanup(self) -> None:
132+
"""Clean up after a test."""
133+
await self.server.stop(0.1)
134+
microgrid._microgrid._MICROGRID = None # pylint: disable=protected-access
135+
136+
async def test_1(self, mocker: MockerFixture) -> None:
137+
"""Test the LogicalMeter with the grid power formula: "#4 + #7"."""
138+
request_sender, channel_registry = await self.setup(mocker)
139+
logical_meter = LogicalMeter(
140+
channel_registry,
141+
request_sender,
142+
)
143+
144+
grid_power_recv = await logical_meter.start_formula(
145+
"#4 + #7", ComponentMetricId.ACTIVE_POWER
146+
)
147+
148+
# Create a `FormulaBuilder` instance, just in order to reuse its
149+
# `_get_resampled_receiver` function implementation.
150+
151+
# pylint: disable=protected-access
152+
builder = FormulaBuilder(
153+
logical_meter._namespace,
154+
channel_registry,
155+
request_sender,
156+
ComponentMetricId.ACTIVE_POWER,
157+
)
158+
comp_4 = await builder._get_resampled_receiver(4)
159+
comp_7 = await builder._get_resampled_receiver(7)
160+
# pylint: enable=protected-access
161+
162+
results = []
163+
comp_4_data = []
164+
comp_7_data = []
165+
for _ in range(10):
166+
val = await comp_4.receive()
167+
assert val is not None and val.value is not None and val.value > 0.0
168+
comp_4_data.append(val.value)
169+
170+
val = await comp_7.receive()
171+
assert val is not None and val.value is not None and val.value > 0.0
172+
comp_7_data.append(val.value)
173+
174+
val = await grid_power_recv.receive()
175+
assert val is not None
176+
results.append(val.value)
177+
await self.cleanup()
178+
179+
assert results == [
180+
val_4 + val_7 for val_4, val_7 in zip(comp_4_data, comp_7_data)
181+
]

0 commit comments

Comments
 (0)