Skip to content

Commit 295b668

Browse files
committed
Synchronize streams before running logical_meter tests
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent d1806db commit 295b668

File tree

1 file changed

+48
-1
lines changed

1 file changed

+48
-1
lines changed

tests/timeseries/test_logical_meter.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55

66
from __future__ import annotations
77

8+
from datetime import datetime
89
from frequenz.channels import Receiver, Sender
910
from pytest_mock import MockerFixture
1011

1112
from frequenz.sdk import microgrid
1213
from frequenz.sdk.actor import ChannelRegistry, ComponentMetricRequest
1314
from frequenz.sdk.microgrid.component import ComponentMetricId
14-
from frequenz.sdk.timeseries import Sample
15+
from frequenz.sdk.timeseries import Sample, Sample3Phase
1516
from frequenz.sdk.timeseries.logical_meter import LogicalMeter
17+
from frequenz.sdk.timeseries.logical_meter._formula_engine import (
18+
FormulaReceiver,
19+
FormulaReceiver3Phase,
20+
)
1621
from frequenz.sdk.timeseries.logical_meter._resampled_formula_builder import (
1722
ResampledFormulaBuilder,
1823
)
@@ -49,6 +54,34 @@ async def _get_resampled_stream( # pylint: disable=too-many-arguments
4954
)
5055
# pylint: enable=protected-access
5156

57+
async def _synchronize_receivers(
58+
self,
59+
receivers: list[FormulaReceiver | FormulaReceiver3Phase | Receiver[Sample]],
60+
) -> None:
61+
by_ts: dict[
62+
datetime, List[FormulaReceiver | FormulaReceiver3Phase | Receiver[Sample]]
63+
] = {}
64+
for recv in receivers:
65+
while True:
66+
sample = await recv.receive()
67+
assert sample is not None
68+
if isinstance(sample, Sample) and sample.value is None:
69+
continue
70+
if isinstance(sample, Sample3Phase) and sample.value_p1 is None:
71+
continue
72+
by_ts.setdefault(sample.timestamp, []).append(recv)
73+
break
74+
latest_ts = max(by_ts)
75+
76+
for sample_ts, recvs in by_ts.items():
77+
if sample_ts == latest_ts:
78+
continue
79+
while sample_ts < latest_ts:
80+
for recv in recvs:
81+
val = await recv.receive()
82+
assert val is not None
83+
sample_ts = val.timestamp
84+
5285
async def test_grid_power_1(self, mocker: MockerFixture) -> None:
5386
"""Test the grid power formula with a grid side meter."""
5487
mockgrid = await MockMicrogrid.new(mocker, grid_side_meter=True)
@@ -71,6 +104,7 @@ async def test_grid_power_1(self, mocker: MockerFixture) -> None:
71104
ComponentMetricId.ACTIVE_POWER,
72105
)
73106

107+
await self._synchronize_receivers([grid_power_recv, main_meter_recv])
74108
results = []
75109
main_meter_data = []
76110
for _ in range(10):
@@ -113,6 +147,8 @@ async def test_grid_power_2(
113147
for meter_id in mockgrid.meter_ids
114148
]
115149

150+
await self._synchronize_receivers([grid_power_recv, *meter_receivers])
151+
116152
results = []
117153
meter_sums = []
118154
for _ in range(10):
@@ -172,6 +208,10 @@ async def test_battery_and_pv_power( # pylint: disable=too-many-locals
172208
for meter_id in mockgrid.pv_inverter_ids
173209
]
174210

211+
await self._synchronize_receivers(
212+
[battery_power_recv, pv_power_recv, *bat_inv_receivers, *pv_inv_receivers]
213+
)
214+
175215
battery_results = []
176216
pv_results = []
177217
battery_inv_sums = []
@@ -232,6 +272,8 @@ async def test_soc(self, mocker: MockerFixture) -> None:
232272
for bat_id in mockgrid.battery_ids
233273
]
234274

275+
await self._synchronize_receivers([soc_recv, *bat_receivers])
276+
235277
for ctr in range(10):
236278
bat_vals = []
237279
for recv in bat_receivers:
@@ -381,6 +423,11 @@ async def test_3_phase_formulas(self, mocker: MockerFixture) -> None:
381423
"net_current"
382424
)
383425
net_current_recv = engine.new_receiver()
426+
427+
await self._synchronize_receivers(
428+
[grid_current_recv, ev_current_recv, net_current_recv]
429+
)
430+
384431
for _ in range(10):
385432
grid_amps = await grid_current_recv.receive()
386433
ev_amps = await ev_current_recv.receive()

0 commit comments

Comments
 (0)