Skip to content

Commit 8d0ed27

Browse files
committed
Make FormulaEngine the owner of the run task and output channel
... instead of the `LogicalMeter`. Also add a `name` parameter to the `FormulaEngine` and the formula builders. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 0262294 commit 8d0ed27

File tree

10 files changed

+84
-63
lines changed

10 files changed

+84
-63
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
from __future__ import annotations
77

88
import asyncio
9+
import logging
910
from datetime import datetime
1011
from typing import Dict, List, Optional, Set, Tuple
1112

12-
from frequenz.channels import Receiver
13+
from frequenz.channels import Broadcast, Receiver
1314

1415
from .. import Sample
1516
from ._formula_steps import (
@@ -23,6 +24,8 @@
2324
Subtractor,
2425
)
2526

27+
logger = logging.Logger(__name__)
28+
2629
_operator_precedence = {
2730
"(": 0,
2831
"/": 1,
@@ -40,17 +43,24 @@ class FormulaEngine:
4043
"""
4144

4245
def __init__(
43-
self, steps: List[FormulaStep], metric_fetchers: Dict[str, MetricFetcher]
46+
self,
47+
name: str,
48+
steps: List[FormulaStep],
49+
metric_fetchers: Dict[str, MetricFetcher],
4450
) -> None:
4551
"""Create a `FormulaEngine` instance.
4652
4753
Args:
54+
name: A name for the formula.
4855
steps: Steps for the engine to execute, in post-fix order.
4956
metric_fetchers: Fetchers for each metric stream the formula depends on.
5057
"""
58+
self._name = name
5159
self._steps = steps
5260
self._metric_fetchers = metric_fetchers
5361
self._first_run = True
62+
self._channel = Broadcast[Sample](self._name)
63+
self._task = None
5464

5565
async def _synchronize_metric_timestamps(
5666
self, metrics: Set[asyncio.Task[Optional[Sample]]]
@@ -93,12 +103,13 @@ async def _synchronize_metric_timestamps(
93103
metric_ts = next_val.timestamp
94104
if metric_ts > latest_ts:
95105
raise RuntimeError(
96-
"Unable to synchronize timestamps of resampled metrics"
106+
"Unable to synchronize resampled metric timestamps, "
107+
f"for formula: {self._name}"
97108
)
98109
self._first_run = False
99110
return latest_ts
100111

101-
async def apply(self) -> Sample:
112+
async def _apply(self) -> Sample:
102113
"""Fetch the latest metrics, apply the formula once and return the result.
103114
104115
Returns:
@@ -118,7 +129,9 @@ async def apply(self) -> Sample:
118129
)
119130

120131
if pending or any(res.result() is None for res in iter(ready_metrics)):
121-
raise RuntimeError("Some resampled metrics didn't arrive")
132+
raise RuntimeError(
133+
f"Some resampled metrics didn't arrive, for formula: {self._name}"
134+
)
122135

123136
if self._first_run:
124137
metric_ts = await self._synchronize_metric_timestamps(ready_metrics)
@@ -133,10 +146,40 @@ async def apply(self) -> Sample:
133146
# if all steps were applied and the formula was correct, there should only be a
134147
# single value in the evaluation stack, and that would be the formula result.
135148
if len(eval_stack) != 1:
136-
raise RuntimeError("Formula application failed.")
149+
raise RuntimeError(f"Formula application failed: {self._name}")
137150

138151
return Sample(metric_ts, eval_stack[0])
139152

153+
async def _run(self) -> None:
154+
sender = self._channel.new_sender()
155+
while True:
156+
try:
157+
msg = await self._apply()
158+
except asyncio.CancelledError:
159+
logger.exception("FormulaEngine task cancelled: %s", self._name)
160+
break
161+
except Exception as err: # pylint: disable=broad-except
162+
logger.warning(
163+
"Formula application failed: %s. Error: %s", self._name, err
164+
)
165+
else:
166+
await sender.send(msg)
167+
168+
def new_receiver(self) -> Receiver[Sample]:
169+
"""Create a new receiver that streams the output of the formula engine.
170+
171+
Args:
172+
name: An optional name for the receiver.
173+
max_size: The size of the receiver's buffer.
174+
175+
Returns:
176+
A receiver that streams output `Sample`s from the formula engine.
177+
"""
178+
if self._task is None:
179+
self._task = asyncio.create_task(self._run())
180+
181+
return self._channel.new_receiver()
182+
140183

141184
class FormulaBuilder:
142185
"""Builds a post-fix formula engine that operates on `Sample` receivers.
@@ -161,10 +204,13 @@ class FormulaBuilder:
161204
add the values and return the result.
162205
"""
163206

164-
def __init__(
165-
self,
166-
) -> None:
167-
"""Create a `FormulaBuilder` instance."""
207+
def __init__(self, name: str) -> None:
208+
"""Create a `FormulaBuilder` instance.
209+
210+
Args:
211+
name: A name for the formula being built.
212+
"""
213+
self._name = name
168214
self._build_stack: List[FormulaStep] = []
169215
self._steps: List[FormulaStep] = []
170216
self._metric_fetchers: Dict[str, MetricFetcher] = {}
@@ -242,4 +288,4 @@ def build(self) -> FormulaEngine:
242288
while self._build_stack:
243289
self._steps.append(self._build_stack.pop())
244290

245-
return FormulaEngine(self._steps, self._metric_fetchers)
291+
return FormulaEngine(self._name, self._steps, self._metric_fetchers)

src/frequenz/sdk/timeseries/logical_meter/_formula_generators/_battery_power_formula.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async def generate(
3232
FormulaGenerationError: If a battery has a non-inverter predecessor
3333
in the component graph.
3434
"""
35-
builder = self._get_builder(ComponentMetricId.ACTIVE_POWER)
35+
builder = self._get_builder("battery-power", ComponentMetricId.ACTIVE_POWER)
3636
component_graph = microgrid.get().component_graph
3737
battery_inverters = list(
3838
comp

src/frequenz/sdk/timeseries/logical_meter/_formula_generators/_battery_soc_formula.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def generate(
6262
FormulaGenerationError: If a battery has a non-inverter predecessor
6363
in the component graph.
6464
"""
65-
builder = self._get_builder(ComponentMetricId.ACTIVE_POWER)
65+
builder = self._get_builder("soc", ComponentMetricId.ACTIVE_POWER)
6666
component_graph = microgrid.get().component_graph
6767
inv_bat_pairs = {
6868
comp: component_graph.successors(comp.component_id)

src/frequenz/sdk/timeseries/logical_meter/_formula_generators/_formula_generator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ def __init__(
4444
self._namespace = namespace
4545

4646
def _get_builder(
47-
self, component_metric_id: ComponentMetricId
47+
self, name: str, component_metric_id: ComponentMetricId
4848
) -> ResampledFormulaBuilder:
4949
builder = ResampledFormulaBuilder(
5050
self._namespace,
51+
name,
5152
self._channel_registry,
5253
self._resampler_subscription_sender,
5354
component_metric_id,

src/frequenz/sdk/timeseries/logical_meter/_formula_generators/_grid_power_formula.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async def generate(
2323
Raises:
2424
ComponentNotFound: when the component graph doesn't have a `GRID` component.
2525
"""
26-
builder = self._get_builder(ComponentMetricId.ACTIVE_POWER)
26+
builder = self._get_builder("grid-power", ComponentMetricId.ACTIVE_POWER)
2727
component_graph = microgrid.get().component_graph
2828
grid_component = next(
2929
(

src/frequenz/sdk/timeseries/logical_meter/_formula_generators/_pv_power_formula.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async def generate(self) -> FormulaEngine:
2121
Raises:
2222
ComponentNotFound: if there are no PV inverters in the component graph.
2323
"""
24-
builder = self._get_builder(ComponentMetricId.ACTIVE_POWER)
24+
builder = self._get_builder("pv-power", ComponentMetricId.ACTIVE_POWER)
2525

2626
component_graph = microgrid.get().component_graph
2727
pv_inverters = list(

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import uuid
1111
from typing import Dict, List, Type
1212

13-
from frequenz.channels import Broadcast, Receiver, Sender
13+
from frequenz.channels import Receiver, Sender
1414

1515
from ...actor import ChannelRegistry, ComponentMetricRequest
1616
from ...microgrid import ComponentGraph
@@ -63,40 +63,21 @@ def __init__(
6363
# meter to use when communicating with the resampling actor.
6464
self._namespace = f"logical-meter-{uuid.uuid4()}"
6565
self._component_graph = component_graph
66-
self._output_channels: Dict[str, Broadcast[Sample]] = {}
66+
self._engines: Dict[str, FormulaEngine] = {}
6767
self._tasks: List[asyncio.Task[None]] = []
6868

6969
async def _engine_from_formula_string(
7070
self, formula: str, metric_id: ComponentMetricId, nones_are_zeros: bool
7171
) -> FormulaEngine:
7272
builder = ResampledFormulaBuilder(
7373
self._namespace,
74+
formula,
7475
self._channel_registry,
7576
self._resampler_subscription_sender,
7677
metric_id,
7778
)
7879
return await builder.from_string(formula, nones_are_zeros)
7980

80-
async def _run_formula(
81-
self, formula: FormulaEngine, sender: Sender[Sample]
82-
) -> None:
83-
"""Run the formula repeatedly and send the results to a channel.
84-
85-
Args:
86-
formula: The formula to run.
87-
sender: A sender for sending the formula results to.
88-
"""
89-
while True:
90-
try:
91-
msg = await formula.apply()
92-
except asyncio.CancelledError:
93-
logger.exception("LogicalMeter task cancelled")
94-
break
95-
except Exception as err: # pylint: disable=broad-except
96-
logger.warning("Formula application failed: %s", err)
97-
else:
98-
await sender.send(msg)
99-
10081
async def start_formula(
10182
self,
10283
formula: str,
@@ -116,40 +97,29 @@ async def start_formula(
11697
A Receiver that streams values with the formulas applied.
11798
"""
11899
channel_key = formula + component_metric_id.value
119-
if channel_key in self._output_channels:
120-
return self._output_channels[channel_key].new_receiver()
100+
if channel_key in self._engines:
101+
return self._engines[channel_key].new_receiver()
121102

122103
formula_engine = await self._engine_from_formula_string(
123104
formula, component_metric_id, nones_are_zeros
124105
)
125-
out_chan = Broadcast[Sample](channel_key)
126-
self._output_channels[channel_key] = out_chan
127-
self._tasks.append(
128-
asyncio.create_task(
129-
self._run_formula(formula_engine, out_chan.new_sender())
130-
)
131-
)
132-
return out_chan.new_receiver()
106+
self._engines[channel_key] = formula_engine
107+
108+
return formula_engine.new_receiver()
133109

134110
async def _get_formula_stream(
135111
self,
136112
channel_key: str,
137113
generator: Type[FormulaGenerator],
138114
) -> Receiver[Sample]:
139-
if channel_key in self._output_channels:
140-
return self._output_channels[channel_key].new_receiver()
115+
if channel_key in self._engines:
116+
return self._engines[channel_key].new_receiver()
141117

142-
formula_engine = await generator(
118+
engine = await generator(
143119
self._namespace, self._channel_registry, self._resampler_subscription_sender
144120
).generate()
145-
out_chan = Broadcast[Sample](channel_key)
146-
self._output_channels[channel_key] = out_chan
147-
self._tasks.append(
148-
asyncio.create_task(
149-
self._run_formula(formula_engine, out_chan.new_sender())
150-
)
151-
)
152-
return out_chan.new_receiver()
121+
self._engines[channel_key] = engine
122+
return engine.new_receiver()
153123

154124
async def grid_power(self) -> Receiver[Sample]:
155125
"""Fetch the grid power for the microgrid.

src/frequenz/sdk/timeseries/logical_meter/_resampled_formula_builder.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
class ResampledFormulaBuilder(FormulaBuilder):
1717
"""Provides a way to build a FormulaEngine from resampled data streams."""
1818

19-
def __init__(
19+
def __init__( # pylint: disable=too-many-arguments
2020
self,
2121
namespace: str,
22+
formula_name: str,
2223
channel_registry: ChannelRegistry,
2324
resampler_subscription_sender: Sender[ComponentMetricRequest],
2425
metric_id: ComponentMetricId,
@@ -28,6 +29,7 @@ def __init__(
2829
Args:
2930
namespace: The unique namespace to allow reuse of streams in the data
3031
pipeline.
32+
formula_name: A name for the formula.
3133
channel_registry: The channel registry instance shared with the resampling
3234
and the data sourcing actors.
3335
resampler_subscription_sender: A sender to send metric requests to the
@@ -38,7 +40,7 @@ def __init__(
3840
self._resampler_subscription_sender = resampler_subscription_sender
3941
self._namespace = namespace
4042
self._metric_id = metric_id
41-
super().__init__()
43+
super().__init__(formula_name)
4244

4345
async def _get_resampled_receiver(
4446
self, component_id: int, metric_id: ComponentMetricId

tests/timeseries/test_formula_engine.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ async def run_test(
4848
) -> None:
4949
"""Run a formula test."""
5050
channels: Dict[str, Broadcast[Sample]] = {}
51-
builder = FormulaBuilder()
51+
builder = FormulaBuilder("test_formula")
5252
for token in Tokenizer(formula):
5353
if token.type == TokenType.COMPONENT_METRIC:
5454
if token.value not in channels:
@@ -76,7 +76,8 @@ async def run_test(
7676
]
7777
)
7878
)
79-
assert (await engine.apply()).value == io_output
79+
next_val = await engine._apply() # pylint: disable=protected-access
80+
assert (next_val).value == io_output
8081
tests_passed += 1
8182
assert tests_passed == len(io_pairs)
8283

tests/timeseries/test_logical_meter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def _get_resampled_stream( # pylint: disable=too-many-arguments
3838
# pylint: disable=protected-access
3939
builder = ResampledFormulaBuilder(
4040
logical_meter._namespace,
41+
"",
4142
channel_registry,
4243
request_sender,
4344
metric_id,

0 commit comments

Comments
 (0)