Skip to content

Commit e58f3ba

Browse files
committed
Update FormulaEngine and LogicalMeter to use FormulaChannels
... instead of `Broadcast`s. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 9c607fc commit e58f3ba

File tree

3 files changed

+87
-13
lines changed

3 files changed

+87
-13
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __init__(
6464
self._steps = steps
6565
self._metric_fetchers = metric_fetchers
6666
self._first_run = True
67-
self._channel = Broadcast[Sample](self._name)
67+
self._channel = FormulaChannel(self._name, self)
6868
self._task = None
6969

7070
async def _synchronize_metric_timestamps(
@@ -170,7 +170,9 @@ async def _run(self) -> None:
170170
else:
171171
await sender.send(msg)
172172

173-
def new_receiver(self) -> Receiver[Sample]:
173+
def new_receiver(
174+
self, name: Optional[str] = None, max_size: int = 50
175+
) -> FormulaReceiver:
174176
"""Create a new receiver that streams the output of the formula engine.
175177
176178
Args:
@@ -183,7 +185,7 @@ def new_receiver(self) -> Receiver[Sample]:
183185
if self._task is None:
184186
self._task = asyncio.create_task(self._run())
185187

186-
return self._channel.new_receiver()
188+
return self._channel.new_receiver(name, max_size)
187189

188190

189191
class FormulaBuilder:
@@ -585,3 +587,24 @@ def build(self, name: str, nones_are_zeros: bool = False) -> FormulaEngine:
585587
self._engine = builder.build()
586588

587589
return self._engine
590+
591+
def new_receiver(
592+
self, name: Optional[str] = None, max_size: int = 50
593+
) -> FormulaReceiver:
594+
"""Get a new receiver from the corresponding engine.
595+
596+
Args:
597+
name: optional name for the receiver.
598+
max_size: size of the receiver's buffer.
599+
600+
Returns:
601+
A FormulaReceiver that streams formula output `Sample`s.
602+
603+
Raises:
604+
RuntimeError: If `build` hasn't been called yet.
605+
"""
606+
if self._engine is None:
607+
raise RuntimeError(
608+
"Please call `build()` first, before calls to `new_receiver()`"
609+
)
610+
return self._engine.new_receiver(name, max_size)

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@
1010
import uuid
1111
from typing import Dict, List, Type
1212

13-
from frequenz.channels import Receiver, Sender
13+
from frequenz.channels import Sender
1414

1515
from ...actor import ChannelRegistry, ComponentMetricRequest
1616
from ...microgrid import ComponentGraph
1717
from ...microgrid.component import ComponentMetricId
18-
from .. import Sample
19-
from ._formula_engine import FormulaEngine
18+
from ._formula_engine import FormulaEngine, FormulaReceiver
2019
from ._formula_generators import (
2120
BatteryPowerFormula,
2221
BatterySoCFormula,
@@ -83,7 +82,7 @@ async def start_formula(
8382
formula: str,
8483
component_metric_id: ComponentMetricId,
8584
nones_are_zeros: bool = False,
86-
) -> Receiver[Sample]:
85+
) -> FormulaReceiver:
8786
"""Start execution of the given formula name.
8887
8988
Args:
@@ -94,7 +93,7 @@ async def start_formula(
9493
False, the returned value will be a None.
9594
9695
Returns:
97-
A Receiver that streams values with the formulas applied.
96+
A FormulaReceiver that streams values with the formulas applied.
9897
"""
9998
channel_key = formula + component_metric_id.value
10099
if channel_key in self._engines:
@@ -111,7 +110,7 @@ async def _get_formula_stream(
111110
self,
112111
channel_key: str,
113112
generator: Type[FormulaGenerator],
114-
) -> Receiver[Sample]:
113+
) -> FormulaReceiver:
115114
if channel_key in self._engines:
116115
return self._engines[channel_key].new_receiver()
117116

@@ -121,7 +120,7 @@ async def _get_formula_stream(
121120
self._engines[channel_key] = engine
122121
return engine.new_receiver()
123122

124-
async def grid_power(self) -> Receiver[Sample]:
123+
async def grid_power(self) -> FormulaReceiver:
125124
"""Fetch the grid power for the microgrid.
126125
127126
If a formula engine to calculate grid power is not already running, it
@@ -134,7 +133,7 @@ async def grid_power(self) -> Receiver[Sample]:
134133
"""
135134
return await self._get_formula_stream("grid_power", GridPowerFormula)
136135

137-
async def battery_power(self) -> Receiver[Sample]:
136+
async def battery_power(self) -> FormulaReceiver:
138137
"""Fetch the cumulative battery power in the microgrid.
139138
140139
If a formula engine to calculate cumulative battery power is not
@@ -147,7 +146,7 @@ async def battery_power(self) -> Receiver[Sample]:
147146
"""
148147
return await self._get_formula_stream("battery_power", BatteryPowerFormula)
149148

150-
async def pv_power(self) -> Receiver[Sample]:
149+
async def pv_power(self) -> FormulaReceiver:
151150
"""Fetch the PV power production in the microgrid.
152151
153152
If a formula engine to calculate PV power production is not
@@ -159,7 +158,7 @@ async def pv_power(self) -> Receiver[Sample]:
159158
"""
160159
return await self._get_formula_stream("pv_power", PVPowerFormula)
161160

162-
async def _soc(self) -> Receiver[Sample]:
161+
async def _soc(self) -> FormulaReceiver:
163162
"""Fetch the SoC of the active batteries in the microgrid.
164163
165164
NOTE: This method is part of the logical meter only temporarily, and will get

tests/timeseries/test_logical_meter.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,55 @@ async def test_soc(self, mocker: MockerFixture) -> None:
248248
assert (await soc_recv.receive()).value == sum(bat_vals) / len(bat_vals)
249249

250250
await mockgrid.cleanup()
251+
252+
async def test_formula_composition( # pylint: disable=too-many-locals
253+
self,
254+
mocker: MockerFixture,
255+
) -> None:
256+
"""Test the battery power and pv power formulas."""
257+
mockgrid = await MockMicrogrid.new(mocker, grid_side_meter=False)
258+
mockgrid.add_batteries(3)
259+
mockgrid.add_solar_inverters(2)
260+
request_sender, channel_registry = await mockgrid.start()
261+
logical_meter = LogicalMeter(
262+
channel_registry,
263+
request_sender,
264+
microgrid.get().component_graph,
265+
)
266+
267+
grid_power_recv = await logical_meter.grid_power()
268+
battery_power_recv = await logical_meter.battery_power()
269+
pv_power_recv = await logical_meter.pv_power()
270+
main_meter_recv = await self._get_resampled_stream(
271+
logical_meter,
272+
channel_registry,
273+
request_sender,
274+
4,
275+
ComponentMetricId.ACTIVE_POWER,
276+
)
277+
278+
engine = (pv_power_recv.clone() + battery_power_recv.clone()).build("inv_power")
279+
inv_calc_recv = engine.new_receiver()
280+
281+
count = 0
282+
for _ in range(10):
283+
grid_pow = await grid_power_recv.receive()
284+
pv_pow = await pv_power_recv.receive()
285+
bat_pow = await battery_power_recv.receive()
286+
main_pow = await main_meter_recv.receive()
287+
inv_calc_pow = await inv_calc_recv.receive()
288+
289+
assert grid_pow is not None and grid_pow.value is not None
290+
assert inv_calc_pow is not None and inv_calc_pow.value is not None
291+
assert bat_pow is not None and bat_pow.value is not None
292+
assert pv_pow is not None and pv_pow.value is not None
293+
assert main_pow is not None and main_pow.value is not None
294+
295+
assert inv_calc_pow.value == pv_pow.value + bat_pow.value
296+
assert grid_pow.value == inv_calc_pow.value + main_pow.value
297+
count += 1
298+
299+
await mockgrid.cleanup()
300+
await engine._stop() # pylint: disable=protected-access
301+
302+
assert count == 10

0 commit comments

Comments
 (0)