Skip to content

Commit 15335c0

Browse files
committed
WIP fix test_producer.py
1 parent 4c2efc0 commit 15335c0

File tree

4 files changed

+57
-23
lines changed

4 files changed

+57
-23
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""A post-fix formula evaluator that operates on `Sample` receivers."""
55

66
import asyncio
7+
import logging
78
from collections.abc import Callable
89
from datetime import datetime
910
from math import isinf, isnan
@@ -12,6 +13,8 @@
1213
from .._base_types import QuantityT, Sample
1314
from ._formula_steps import FormulaStep, MetricFetcher
1415

16+
_logger = logging.getLogger(__name__)
17+
1518

1619
class FormulaEvaluator(Generic[QuantityT]):
1720
"""A post-fix formula evaluator that operates on `Sample` receivers."""
@@ -97,10 +100,18 @@ async def apply(self) -> Sample[QuantityT]:
97100
RuntimeError: if some samples didn't arrive, or if formula application
98101
failed.
99102
"""
103+
104+
async def fetch_next(
105+
name: str, fetcher: MetricFetcher[QuantityT]
106+
) -> Sample[QuantityT] | None:
107+
val = await fetcher.fetch_next()
108+
_logger.error(f">> FormulaEvaluator: {name=} {val=}")
109+
return val
110+
100111
eval_stack: list[float] = []
101112
ready_metrics, pending = await asyncio.wait(
102113
[
103-
asyncio.create_task(fetcher.fetch_next(), name=name)
114+
asyncio.create_task(fetch_next(name, fetcher), name=name)
104115
for name, fetcher in self._metric_fetchers.items()
105116
],
106117
return_when=asyncio.ALL_COMPLETED,

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_producer_power_formula.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,16 @@ def generate( # noqa: DOC502
7979

8080
if self._config.allow_fallback:
8181
fallbacks = self._get_fallback_formulas(producer_components)
82+
print(
83+
f"ProducerPowerFormula: {dict({str(c): f for c, f in fallbacks.items()})}"
84+
)
8285

8386
for idx, (primary_component, fallback_formula) in enumerate(
8487
fallbacks.items()
8588
):
89+
print(f"\t{idx}: {primary_component=}")
90+
print(f"\t{is_not_meter(primary_component)=}")
91+
8692
if idx > 0:
8793
builder.push_oper("+")
8894

@@ -121,6 +127,9 @@ def _get_fallback_formulas(
121127
A dictionary mapping primary components to their FallbackFormulaMetricFetcher.
122128
"""
123129
fallbacks = self._get_metric_fallback_components(components)
130+
print(
131+
f"fallback components for {ProducerPowerFormula}: {dict({str(c): [str(cc) for cc in f] for c, f in fallbacks.items()})}"
132+
)
124133

125134
fallback_formulas: dict[
126135
Component, FallbackFormulaMetricFetcher[Power] | None

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ async def _synchronize_and_fetch_fallback(
477477
# We need to save value, because
478478
# primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp
479479
# In that case we should wait for our time window.
480+
print(f"MetricFetcher<{self._name}>: {self._latest_fallback_sample=}")
480481
if self._latest_fallback_sample is None:
481482
self._latest_fallback_sample = await self._fetch_from_fallback(
482483
fallback_fetcher
@@ -516,9 +517,11 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
516517

517518
async def _fetch_next(self) -> Sample[QuantityT] | None:
518519
# First fetch from primary stream
520+
print(f"MetricFetcher<{self._name}>: Fetching from primary stream")
519521
primary_value: Sample[QuantityT] | None = None
520522
try:
521523
primary_value = await self._stream.receive()
524+
print(f"MetricFetcher<{self._name}>: RECEIVED {primary_value=}")
522525
except ReceiverStoppedError:
523526
if self._is_stopped:
524527
_logger.debug("Stream for metric fetcher %s closed.", self._name)
@@ -532,13 +535,15 @@ async def _fetch_next(self) -> Sample[QuantityT] | None:
532535

533536
# We have no fallback, so we just return primary value even if it is not correct.
534537
if self._fallback is None:
538+
print(f"MetricFetcher<{self._name}>: NO FALLBACK, returning primary")
535539
return primary_value
536540

537541
is_primary_value_valid = primary_value is not None and self._is_value_valid(
538542
primary_value.value
539543
)
540544

541545
if is_primary_value_valid:
546+
print(f"MetricFetcher<{self._name}>: Primary value is valid")
542547
# Primary stream is good again, so we can stop fallback and return primary_value.
543548
if self._fallback.is_running:
544549
_logger.info(
@@ -560,6 +565,7 @@ async def _fetch_next(self) -> Sample[QuantityT] | None:
560565
self._fallback.start()
561566
return primary_value
562567

568+
print(f"MetricFetcher<{self._name}>: Fetching from fallback stream")
563569
return await self._synchronize_and_fetch_fallback(primary_value, self._fallback)
564570

565571
@property

tests/timeseries/test_producer.py

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
"""Test the logical component for calculating high level producer metrics."""
55

6+
import asyncio
67
from contextlib import AsyncExitStack
78

89
from frequenz.quantities import Power
@@ -114,59 +115,66 @@ async def test_producer_fallback_formula(self, mocker: MockerFixture) -> None:
114115

115116
# fmt: off
116117
expected_input_output: list[
117-
tuple[list[float | None], list[float | None], list[float | None], Power | None]
118+
tuple[float, list[float | None], list[float | None], list[float | None], Power | None]
118119
] = [
119-
# ([pv_meter_power], [pv_inverter_power], [chp_power], expected_power)
120+
# (test number, [pv_meter_power], [pv_inverter_power], [chp_power], expected_power)
121+
# Case 1: All components are available
120122
# Add power from meters and chp
121-
([-1.0, -2.0], [None, -200.0], [300], Power.from_watts(297.0)),
122-
([-1.0, -10], [-100.0, -200.0], [400], Power.from_watts(389.0)),
123+
(1.1, [-1.0, -2.0], [None, -200.0], [300], Power.from_watts(297.0)),
124+
(1.2, [-1.0, -10], [-100.0, -200.0], [400], Power.from_watts(389.0)),
123125
# Case 2: The first meter is unavailable (None).
124126
# Subscribe to the fallback inverter, but return None as the result,
125127
# according to the "nones-are-zero" rule
126-
([None, -2.0], [-100, -200.0], [400], None),
128+
(2.1, [None, -2.0], [-100, -200.0], [400], None),
127129
# Case 3: First meter is unavailable (None). Fallback inverter provides
128130
# a value.
129131
# Add second meter, first inverter and chp power
130-
([None, -2.0], [-100, -200.0], [400], Power.from_watts(298.0)),
131-
([None, -2.0], [-50, -200.0], [300], Power.from_watts(248.0)),
132+
(3.1, [None, -2.0], [-100, -200.0], [400], Power.from_watts(298.0)),
133+
(3.2, [None, -2.0], [-50, -200.0], [300], Power.from_watts(248.0)),
132134
# Case 4: Both first meter and its fallback inverter are unavailable
133135
# (None). Return 0 from failing component according to the
134136
# "nones-are-zero" rule.
135-
([None, -2.0], [None, -200.0], [300], Power.from_watts(298.0)),
136-
([None, -10.0], [-20.0, -200.0], [300], Power.from_watts(270.0)),
137+
(4.1, [None, -2.0], [None, -200.0], [300], Power.from_watts(298.0)),
138+
(4.2, [None, -10.0], [-20.0, -200.0], [300], Power.from_watts(270.0)),
137139
# Case 5: CHP is unavailable. Return 0 from failing component
138140
# according to the "nones-are-zero" rule.
139-
([None, -10.0], [-20.0, -200.0], [None], Power.from_watts(-30.0)),
141+
(5.1, [None, -10.0], [-20.0, -200.0], [None], Power.from_watts(-30.0)),
140142
# Case 6: Both meters are unavailable (None). Subscribe for fallback inverter
141-
([None, None], [-20.0, -200.0], [None], None),
142-
([None, None], [-20.0, -200.0], [None], Power.from_watts(-220.0)),
143-
([None, None], [None, -200.0], [None], Power.from_watts(-200.0)),
143+
(6.1, [None, None], [-20.0, -200.0], [None], None),
144+
(6.2, [None, None], [-20.0, -200.0], [None], Power.from_watts(-220.0)),
145+
(6.3, [None, None], [None, -200.0], [None], Power.from_watts(-200.0)),
144146
# Case 7: All components are unavailable (None). Return 0 according to the
145147
# "nones-are-zero" rule.
146-
([None, None], [None, None], [None], Power.from_watts(0)),
147-
([None, None], [None, None], [None], Power.from_watts(0)),
148-
([None, None], [None, None], [300.0], Power.from_watts(300.0)),
149-
([-200.0, None], [None, -100.0], [50.0], Power.from_watts(-250.0)),
150-
([-200.0, -200.0], [-10.0, -20.0], [50.0], Power.from_watts(-350.0)),
148+
(7.1, [None, None], [None, None], [None], Power.from_watts(0)),
149+
(7.2, [None, None], [None, None], [None], Power.from_watts(0)),
150+
(7.3, [None, None], [None, None], [300.0], Power.from_watts(300.0)),
151+
(7.4, [-200.0, None], [None, -100.0], [50.0], Power.from_watts(-250.0)),
152+
(7.5, [-200.0, -200.0], [-10.0, -20.0], [50.0], Power.from_watts(-350.0)),
151153
# Case 8: Meter is unavailable, start fallback formula.
152-
([None, -200.0], [-10.0, -100.0], [50.0], None),
153-
([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160)),
154+
(8.1, [None, -200.0], [-10.0, -100.0], [50.0], None),
155+
(8.2, [None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160)),
154156

155157
]
156158
# fmt: on
157159

158-
for idx, (
160+
for (
161+
idx,
159162
meter_power,
160163
pv_inverter_power,
161164
chp_power,
162165
expected_power,
163-
) in enumerate(expected_input_output):
166+
) in expected_input_output:
167+
print(
168+
f"\n----------------------------------------------------\nTEST CASE {idx}"
169+
)
164170
await mockgrid.mock_resampler.send_chp_power(chp_power)
165171
await mockgrid.mock_resampler.send_meter_power(meter_power)
166172
await mockgrid.mock_resampler.send_pv_inverter_power(pv_inverter_power)
167173
mockgrid.mock_resampler.next_ts()
174+
await asyncio.sleep(0.1)
168175

169176
result = await producer_power_receiver.receive()
177+
print(f">>>>> result: {result}")
170178
assert result.value == expected_power, (
171179
f"Test case {idx} failed:"
172180
+ f" meter_power: {meter_power}"

0 commit comments

Comments
 (0)