Skip to content

Commit 89af7da

Browse files
Mock Resampler in formula tests (frequenz-floss#373)
This allows most of the formula tests to be reliable. Only the test for the `EVChargerPool.component_data` method still depends on a lower level mock, and it needs it because it combines resampled data with component state values. If it is flaky, we'll have to add multi stage testing - with a fast test first, and a slower one if that fails, etc.
2 parents 34db055 + d198f49 commit 89af7da

File tree

12 files changed

+457
-396
lines changed

12 files changed

+457
-396
lines changed

src/frequenz/sdk/timeseries/_formula_engine/_formula_engine_pool.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,8 @@ def from_generator(
110110
).generate()
111111
self._engines[channel_key] = engine
112112
return engine
113+
114+
async def stop(self) -> None:
115+
"""Stop all formula engines in the pool."""
116+
for engine in self._engines.values():
117+
await engine._stop() # pylint: disable=protected-access

src/frequenz/sdk/timeseries/_formula_engine/_formula_generators/_pv_power_formula.py

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,20 @@
33

44
"""Formula generator for PV Power, from the component graph."""
55

6+
from __future__ import annotations
7+
68
import logging
9+
from collections import abc
710

811
from ....microgrid import connection_manager
912
from ....microgrid.component import ComponentCategory, ComponentMetricId, InverterType
1013
from .._formula_engine import FormulaEngine
11-
from ._formula_generator import NON_EXISTING_COMPONENT_ID, FormulaGenerator, FormulaType
14+
from ._formula_generator import (
15+
NON_EXISTING_COMPONENT_ID,
16+
FormulaGenerationError,
17+
FormulaGenerator,
18+
FormulaType,
19+
)
1220

1321
_logger = logging.getLogger(__name__)
1422

@@ -27,15 +35,8 @@ def generate(self) -> FormulaEngine:
2735
"""
2836
builder = self._get_builder("pv-power", ComponentMetricId.ACTIVE_POWER)
2937

30-
component_graph = connection_manager.get().component_graph
31-
pv_inverters = list(
32-
comp
33-
for comp in component_graph.components()
34-
if comp.category == ComponentCategory.INVERTER
35-
and comp.type == InverterType.SOLAR
36-
)
37-
38-
if not pv_inverters:
38+
pv_meters = self._get_pv_meters()
39+
if not pv_meters:
3940
_logger.warning(
4041
"Unable to find any PV inverters in the component graph. "
4142
"Subscribing to the resampling actor with a non-existing "
@@ -51,11 +52,11 @@ def generate(self) -> FormulaEngine:
5152

5253
builder.push_oper("(")
5354
builder.push_oper("(")
54-
for idx, comp in enumerate(pv_inverters):
55+
for idx, comp_id in enumerate(pv_meters):
5556
if idx > 0:
5657
builder.push_oper("+")
5758

58-
builder.push_component_metric(comp.component_id, nones_are_zeros=True)
59+
builder.push_component_metric(comp_id, nones_are_zeros=True)
5960
builder.push_oper(")")
6061
if self._config.formula_type == FormulaType.PRODUCTION:
6162
builder.push_oper("*")
@@ -66,3 +67,40 @@ def generate(self) -> FormulaEngine:
6667
builder.push_clipper(0.0, None)
6768

6869
return builder.build()
70+
71+
def _get_pv_meters(self) -> abc.Set[int]:
72+
component_graph = connection_manager.get().component_graph
73+
74+
pv_inverters = list(
75+
comp
76+
for comp in component_graph.components()
77+
if comp.category == ComponentCategory.INVERTER
78+
and comp.type == InverterType.SOLAR
79+
)
80+
pv_meters: set[int] = set()
81+
82+
if not pv_inverters:
83+
return pv_meters
84+
85+
for pv_inverter in pv_inverters:
86+
predecessors = component_graph.predecessors(pv_inverter.component_id)
87+
if len(predecessors) != 1:
88+
raise FormulaGenerationError(
89+
"Expected exactly one predecessor for PV inverter "
90+
f"{pv_inverter.component_id}, but found {len(predecessors)}."
91+
)
92+
meter = next(iter(predecessors))
93+
if meter.category != ComponentCategory.METER:
94+
raise FormulaGenerationError(
95+
f"Expected predecessor of PV inverter {pv_inverter.component_id} "
96+
f"to be a meter, but found {meter.category}."
97+
)
98+
meter_successors = component_graph.successors(meter.component_id)
99+
if len(meter_successors) != 1:
100+
raise FormulaGenerationError(
101+
f"Expected exactly one successor for meter {meter.component_id}"
102+
f", connected to PV inverter {pv_inverter.component_id}"
103+
f", but found {len(meter_successors)}."
104+
)
105+
pv_meters.add(meter.component_id)
106+
return pv_meters

src/frequenz/sdk/timeseries/battery_pool/battery_pool.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import uuid
1010
from collections.abc import Set
1111
from datetime import timedelta
12-
from typing import Any
12+
from typing import Any, Awaitable
1313

1414
from frequenz.channels import Receiver, Sender
1515

@@ -78,6 +78,7 @@ def __init__( # pylint: disable=too-many-arguments
7878

7979
self._working_batteries: set[int] = set()
8080

81+
self._update_battery_status_task: asyncio.Task[None] | None = None
8182
if self._batteries:
8283
self._update_battery_status_task = asyncio.create_task(
8384
self._update_battery_status(batteries_status_receiver)
@@ -266,10 +267,13 @@ async def power_bounds(
266267

267268
async def stop(self) -> None:
268269
"""Stop all pending async tasks."""
269-
await asyncio.gather(
270-
*[method.stop() for method in self._active_methods.values()],
271-
cancel_and_await(self._update_battery_status_task),
272-
)
270+
tasks_to_stop: list[Awaitable[Any]] = [
271+
method.stop() for method in self._active_methods.values()
272+
]
273+
tasks_to_stop.append(self._formula_pool.stop())
274+
if self._update_battery_status_task:
275+
tasks_to_stop.append(cancel_and_await(self._update_battery_status_task))
276+
await asyncio.gather(*tasks_to_stop)
273277

274278
def _get_all_batteries(self) -> Set[int]:
275279
"""Get all batteries from the microgrid.

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
1717

18+
from ..._internal._asyncio import cancel_and_await
1819
from ...actor import ChannelRegistry, ComponentMetricRequest
1920
from ...microgrid import connection_manager
2021
from ...microgrid.component import ComponentCategory, ComponentMetricId
@@ -271,6 +272,11 @@ async def stop(self) -> None:
271272
await self._bounds_setter.stop()
272273
if self._state_tracker:
273274
await self._state_tracker.stop()
275+
await self._formula_pool.stop()
276+
for stream in self._status_streams.values():
277+
task, chan = stream
278+
await chan.close()
279+
await cancel_and_await(task)
274280

275281
async def _get_current_streams(
276282
self, component_id: int

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,7 @@ def chp_consumption_power(self) -> FormulaEngine:
402402
)
403403
assert isinstance(engine, FormulaEngine)
404404
return engine
405+
406+
async def stop(self) -> None:
407+
"""Stop all formula engines."""
408+
await self._formula_pool.stop()

tests/timeseries/_battery_pool/test_battery_pool.py

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import async_solipsism
1515
import pytest
16-
from frequenz.channels import Broadcast, Receiver, Sender
16+
from frequenz.channels import Receiver, Sender
1717
from pytest_mock import MockerFixture
1818

1919
from frequenz.sdk import microgrid
@@ -23,8 +23,7 @@
2323
)
2424
from frequenz.sdk.actor import ResamplerConfig
2525
from frequenz.sdk.actor.power_distributing import BatteryStatus
26-
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
27-
from frequenz.sdk.timeseries import Sample
26+
from frequenz.sdk.microgrid.component import ComponentCategory
2827
from frequenz.sdk.timeseries.battery_pool import (
2928
BatteryPool,
3029
Bound,
@@ -445,53 +444,24 @@ async def test_battery_pool_power(mocker: MockerFixture) -> None:
445444
"""Test `BatteryPool.{,production,consumption}_power` methods."""
446445
mockgrid = MockMicrogrid(grid_side_meter=True)
447446
mockgrid.add_batteries(2)
448-
await mockgrid.start(mocker)
449-
450-
channels: dict[int, Broadcast[Sample]] = {
451-
meter_id: Broadcast(f"#{meter_id}")
452-
for meter_id in [*mockgrid.meter_ids, *mockgrid.battery_inverter_ids]
453-
}
454-
senders: list[Sender[Sample]] = [
455-
channels[component_id].new_sender()
456-
for component_id in mockgrid.battery_inverter_ids
457-
]
458-
459-
async def send_resampled_data(
460-
now: datetime,
461-
meter_data: list[float | None],
462-
) -> None:
463-
"""Send resampled data to the channels."""
464-
for sender, value in zip(senders, meter_data):
465-
await sender.send(Sample(now, value))
466-
467-
def mock_resampled_receiver(
468-
_1: Any, component_id: int, _2: ComponentMetricId
469-
) -> Receiver[Sample]:
470-
return channels[component_id].new_receiver()
471-
472-
mocker.patch(
473-
"frequenz.sdk.timeseries._formula_engine._resampled_formula_builder"
474-
".ResampledFormulaBuilder._get_resampled_receiver",
475-
mock_resampled_receiver,
476-
)
447+
await mockgrid.start_mock_datapipeline(mocker)
477448

478449
battery_pool = microgrid.battery_pool()
479450
power_receiver = battery_pool.power.new_receiver()
480451
consumption_receiver = battery_pool.consumption_power.new_receiver()
481452
production_receiver = battery_pool.production_power.new_receiver()
482453

483-
now = datetime.now(tz=timezone.utc)
484-
await send_resampled_data(now, [2.0, 3.0])
454+
await mockgrid.mock_data.send_bat_inverter_power([2.0, 3.0])
485455
assert (await power_receiver.receive()).value == 5.0
486456
assert (await consumption_receiver.receive()).value == 5.0
487457
assert (await production_receiver.receive()).value == 0.0
488458

489-
await send_resampled_data(now + timedelta(seconds=1), [-2.0, -5.0])
459+
await mockgrid.mock_data.send_bat_inverter_power([-2.0, -5.0])
490460
assert (await power_receiver.receive()).value == -7.0
491461
assert (await consumption_receiver.receive()).value == 0.0
492462
assert (await production_receiver.receive()).value == 7.0
493463

494-
await send_resampled_data(now + timedelta(seconds=3), [2.0, -5.0])
464+
await mockgrid.mock_data.send_bat_inverter_power([2.0, -5.0])
495465
assert (await power_receiver.receive()).value == -3.0
496466
assert (await consumption_receiver.receive()).value == 0.0
497467
assert (await production_receiver.receive()).value == 3.0

0 commit comments

Comments
 (0)