Skip to content

Commit e23af0a

Browse files
committed
Add a formula pool for storing and reusing formulas
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 0454451 commit e23af0a

File tree

1 file changed

+282
-0
lines changed

1 file changed

+282
-0
lines changed
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A formula pool for helping with tracking running formulas."""
5+
6+
7+
import logging
8+
import sys
9+
10+
from frequenz.channels import Sender
11+
from frequenz.client.microgrid.metrics import Metric
12+
from frequenz.quantities import Current, Power, Quantity, ReactivePower
13+
14+
from frequenz.sdk.timeseries.formulas._resampled_stream_fetcher import (
15+
ResampledStreamFetcher,
16+
)
17+
18+
from ..._internal._channels import ChannelRegistry
19+
from ...microgrid._data_sourcing import ComponentMetricRequest
20+
from ._formula import Formula
21+
from ._formula_3_phase import Formula3Phase
22+
from ._parser import parse
23+
24+
_logger = logging.getLogger(__name__)
25+
26+
27+
NON_EXISTING_COMPONENT_ID = sys.maxsize
28+
"""The component ID for non-existent components in the components graph.
29+
30+
The non-existing component ID is commonly used in scenarios where a formula
31+
engine requires a component ID but there are no available components in the
32+
graph to associate with it. Thus, the non-existing component ID is subscribed
33+
instead so that the formula engine can send `None` or `0` values at the same
34+
frequency as the other streams.
35+
"""
36+
37+
38+
class FormulaPool:
39+
"""Creates and owns formula engines from string formulas, or formula generators.
40+
41+
If an engine already exists with a given name, it is reused instead.
42+
"""
43+
44+
def __init__(
45+
self,
46+
namespace: str,
47+
channel_registry: ChannelRegistry,
48+
resampler_subscription_sender: Sender[ComponentMetricRequest],
49+
) -> None:
50+
"""Create a new instance.
51+
52+
Args:
53+
namespace: namespace to use with the data pipeline.
54+
channel_registry: A channel registry instance shared with the resampling
55+
actor.
56+
resampler_subscription_sender: A sender for sending metric requests to the
57+
resampling actor.
58+
"""
59+
self._namespace: str = namespace
60+
self._channel_registry: ChannelRegistry = channel_registry
61+
self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
62+
resampler_subscription_sender
63+
)
64+
65+
self._string_engines: dict[str, Formula[Quantity]] = {}
66+
self._power_engines: dict[str, Formula[Power]] = {}
67+
self._reactive_power_engines: dict[str, Formula[ReactivePower]] = {}
68+
self._current_engines: dict[str, Formula3Phase[Current]] = {}
69+
70+
self._power_3_phase_engines: dict[str, Formula3Phase[Power]] = {}
71+
self._current_3_phase_engines: dict[str, Formula3Phase[Current]] = {}
72+
73+
def from_string(
74+
self,
75+
formula_str: str,
76+
metric: Metric,
77+
) -> Formula[Quantity]:
78+
"""Get a receiver for a manual formula.
79+
80+
Args:
81+
formula_str: formula to execute.
82+
metric: The metric to use when fetching receivers from the resampling
83+
actor.
84+
85+
Returns:
86+
A Formula engine that streams values with the formulas applied.
87+
"""
88+
channel_key = formula_str + str(metric.value)
89+
if channel_key in self._string_engines:
90+
return self._string_engines[channel_key]
91+
formula = parse(
92+
name=channel_key,
93+
formula=formula_str,
94+
telemetry_fetcher=self._telemetry_fetcher(metric),
95+
create_method=Quantity,
96+
)
97+
self._string_engines[channel_key] = formula
98+
return formula
99+
100+
def from_power_formula(self, channel_key: str, formula_str: str) -> Formula[Power]:
101+
"""Get a receiver from the formula represented by the given strings.
102+
103+
Args:
104+
channel_key: A string to uniquely identify the formula. This
105+
usually includes the formula itself and the metric.
106+
formula_str: The formula string.
107+
108+
Returns:
109+
A formula engine that evaluates the given formula.
110+
"""
111+
if channel_key in self._power_engines:
112+
return self._power_engines[channel_key]
113+
114+
if formula_str == "0.0":
115+
formula_str = f"coalesce(#{NON_EXISTING_COMPONENT_ID}, 0.0)"
116+
117+
formula = parse(
118+
name=channel_key,
119+
formula=formula_str,
120+
telemetry_fetcher=self._telemetry_fetcher(Metric.AC_POWER_ACTIVE),
121+
create_method=Power.from_watts,
122+
)
123+
self._power_engines[channel_key] = formula
124+
125+
return formula
126+
127+
def from_reactive_power_formula(
128+
self, channel_key: str, formula_str: str
129+
) -> Formula[ReactivePower]:
130+
"""Get a receiver from the formula represented by the given strings.
131+
132+
Args:
133+
channel_key: A string to uniquely identify the formula. This
134+
usually includes the formula itself and the metric.
135+
formula_str: The formula string.
136+
137+
Returns:
138+
A formula engine that evaluates the given formula.
139+
"""
140+
if channel_key in self._power_engines:
141+
return self._reactive_power_engines[channel_key]
142+
143+
if formula_str == "0.0":
144+
formula_str = f"coalesce(#{NON_EXISTING_COMPONENT_ID}, 0.0)"
145+
146+
formula = parse(
147+
name=channel_key,
148+
formula=formula_str,
149+
telemetry_fetcher=self._telemetry_fetcher(Metric.AC_POWER_REACTIVE),
150+
create_method=ReactivePower.from_volt_amperes_reactive,
151+
)
152+
self._reactive_power_engines[channel_key] = formula
153+
154+
return formula
155+
156+
def from_power_3_phase_formula(
157+
self, channel_key: str, formula_str: str
158+
) -> Formula3Phase[Power]:
159+
"""Get a receiver from the 3-phase power formula represented by the given strings.
160+
161+
Args:
162+
channel_key: A string to uniquely identify the formula. This
163+
usually includes the formula itself.
164+
formula_str: The formula string.
165+
166+
Returns:
167+
A formula engine that evaluates the given formula.
168+
"""
169+
if channel_key in self._power_3_phase_engines:
170+
return self._power_3_phase_engines[channel_key]
171+
172+
if formula_str == "0.0":
173+
formula_str = f"coalesce(#{NON_EXISTING_COMPONENT_ID}, 0.0)"
174+
175+
formula = Formula3Phase(
176+
name=channel_key,
177+
phase_1=parse(
178+
name=channel_key + "_phase_1",
179+
formula=formula_str,
180+
telemetry_fetcher=self._telemetry_fetcher(
181+
Metric.AC_POWER_ACTIVE_PHASE_1
182+
),
183+
create_method=Power.from_watts,
184+
),
185+
phase_2=parse(
186+
name=channel_key + "_phase_2",
187+
formula=formula_str,
188+
telemetry_fetcher=self._telemetry_fetcher(
189+
Metric.AC_POWER_ACTIVE_PHASE_2
190+
),
191+
create_method=Power.from_watts,
192+
),
193+
phase_3=parse(
194+
name=channel_key + "_phase_3",
195+
formula=formula_str,
196+
telemetry_fetcher=self._telemetry_fetcher(
197+
Metric.AC_POWER_ACTIVE_PHASE_3
198+
),
199+
create_method=Power.from_watts,
200+
),
201+
)
202+
self._power_3_phase_engines[channel_key] = formula
203+
204+
return formula
205+
206+
def from_current_3_phase_formula(
207+
self, channel_key: str, formula_str: str
208+
) -> Formula3Phase[Current]:
209+
"""Get a receiver from the 3-phase current formula represented by the given strings.
210+
211+
Args:
212+
channel_key: A string to uniquely identify the formula. This
213+
usually includes the formula itself.
214+
formula_str: The formula string.
215+
216+
Returns:
217+
A formula engine that evaluates the given formula.
218+
"""
219+
if channel_key in self._current_3_phase_engines:
220+
return self._current_3_phase_engines[channel_key]
221+
222+
if formula_str == "0.0":
223+
formula_str = f"coalesce(#{NON_EXISTING_COMPONENT_ID}, 0.0)"
224+
225+
formula = Formula3Phase(
226+
name=channel_key,
227+
phase_1=parse(
228+
name=channel_key + "_phase_1",
229+
formula=formula_str,
230+
telemetry_fetcher=self._telemetry_fetcher(Metric.AC_CURRENT_PHASE_1),
231+
create_method=Current.from_amperes,
232+
),
233+
phase_2=parse(
234+
name=channel_key + "_phase_2",
235+
formula=formula_str,
236+
telemetry_fetcher=self._telemetry_fetcher(Metric.AC_CURRENT_PHASE_2),
237+
create_method=Current.from_amperes,
238+
),
239+
phase_3=parse(
240+
name=channel_key + "_phase_3",
241+
formula=formula_str,
242+
telemetry_fetcher=self._telemetry_fetcher(Metric.AC_CURRENT_PHASE_3),
243+
create_method=Current.from_amperes,
244+
),
245+
)
246+
self._current_3_phase_engines[channel_key] = formula
247+
248+
return formula
249+
250+
async def stop(self) -> None:
251+
"""Stop all formula engines."""
252+
for pf in self._power_engines.values():
253+
await pf.stop()
254+
self._power_engines.clear()
255+
256+
for rpf in self._reactive_power_engines.values():
257+
await rpf.stop()
258+
self._reactive_power_engines.clear()
259+
260+
for p3pf in self._power_3_phase_engines.values():
261+
await p3pf.stop()
262+
self._power_3_phase_engines.clear()
263+
264+
for c3pf in self._current_3_phase_engines.values():
265+
await c3pf.stop()
266+
self._current_3_phase_engines.clear()
267+
268+
def _telemetry_fetcher(self, metric: Metric) -> ResampledStreamFetcher:
269+
"""Create a ResampledStreamFetcher for the given metric.
270+
271+
Args:
272+
metric: The metric to fetch.
273+
274+
Returns:
275+
A ResampledStreamFetcher for the given metric.
276+
"""
277+
return ResampledStreamFetcher(
278+
self._namespace,
279+
self._channel_registry,
280+
self._resampler_subscription_sender,
281+
metric,
282+
)

0 commit comments

Comments
 (0)