Skip to content

Commit 7f3dcb8

Browse files
Implement a stop method on MetricFetcher
And use it to stop the fallback fetcher when it is no longer needed. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 03e90d8 commit 7f3dcb8

File tree

7 files changed

+39
-7
lines changed

7 files changed

+39
-7
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ async def stop(self) -> None:
117117
if self._task is None:
118118
return
119119
await cancel_and_await(self._task)
120+
120121
_, fetchers = self._builder.finalize()
121122
for fetcher in fetchers.values():
122-
fetcher.stream.close()
123+
await fetcher.stop()
123124

124125
@classmethod
125126
def from_receiver(

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,13 @@ def consume(self) -> Sample[QuantityT]:
7777
), f"Fallback metric fetcher: {self.name} was not started"
7878

7979
return self._receiver.consume()
80+
81+
async def stop(self) -> None:
82+
"""Stop fallback metric fetcher, by closing the connection."""
83+
if self._formula_engine is not None:
84+
await self._formula_engine.stop()
85+
self._formula_engine = None
86+
87+
if self._receiver is not None:
88+
self._receiver.close()
89+
self._receiver = None

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ def is_running(self) -> bool:
368368
def start(self) -> None:
369369
"""Initialize the metric fetcher and start fetching samples."""
370370

371+
@abstractmethod
372+
async def stop(self) -> None:
373+
"""Stope the fetcher if is running."""
374+
371375

372376
class MetricFetcher(Generic[QuantityT], FormulaStep):
373377
"""A formula step for fetching a value from a metric Receiver."""
@@ -406,6 +410,16 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
406410
"""
407411
return self._stream
408412

413+
async def stop(self) -> None:
414+
"""Stop metric fetcher.
415+
416+
If metric fetcher is stopped, it can't be started again.
417+
There is no use-case now to start it again.
418+
"""
419+
self.stream.close()
420+
if self._fallback:
421+
await self._fallback.stop()
422+
409423
def stream_name(self) -> str:
410424
"""Return the name of the stream.
411425
@@ -497,6 +511,9 @@ async def _fetch_next(self) -> Sample[QuantityT] | None:
497511
)
498512

499513
if is_primary_value_valid:
514+
# Primary stream is good again, so we can stop fallback and return primary_value.
515+
if self._fallback.is_running:
516+
await self._fallback.stop()
500517
return primary_value
501518

502519
if not self._fallback.is_running:

tests/microgrid/test_grid.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,8 @@ async def test_grid_fallback_formula_without_grid_meter(mocker: MockerFixture) -
480480
([2000, 1000], [-200, 1000, None], Power.from_watts(3000)),
481481
# battery start working
482482
([2000, 10], [-200, 1000, 100], Power.from_watts(2110)),
483+
# No primary value, start fallback formula
484+
([2000, None], [-200, 1000, 100], None),
483485
([2000, None], [-200, 1000, 100], Power.from_watts(2900)),
484486
]
485487
# fmt: on

tests/timeseries/_battery_pool/test_battery_pool.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -642,8 +642,9 @@ async def test_battery_power_fallback_formula(
642642
([-1.0, None], [100.0, 100.0, None], Power.from_watts(200.0)),
643643
# Case 4: bat_inv_meter is available, ignore fallback inverters
644644
([-1.0, 10], [100.0, 100.0, None], Power.from_watts(10.0)),
645-
# Case 4: all components are unavailable (None). Return 0 according to the
646-
# "nones-are-zero" rule.
645+
# Case 4: all components are unavailable (None). Start fallback formula.
646+
# Returned power = 0 according to the "nones-are-zero" rule.
647+
([-1.0, None], [None, None, None], None),
647648
([-1.0, None], [None, None, None], Power.from_watts(0.0)),
648649
# Case 5: Components becomes available
649650
([-1.0, None], [None, None, 100.0], Power.from_watts(100.0)),

tests/timeseries/test_consumer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ async def test_consumer_power_fallback_formula_with_grid_meter(
113113
([None, 100, -50], [100], [-200, -300], None),
114114
([None, 200, -50], [100], [-200, -300], None),
115115
([100, 100, -50], [100], [-200, -300], Power.from_watts(350)),
116-
# Case 5: Only grid meter is working
116+
# Case 5: Only grid meter is working, subscribe for fallback formula
117+
([100, None, None], [None], [None, None], None),
117118
([100, None, None], [None], [None, None], Power.from_watts(100)),
118119
([-500, None, None], [None], [None, None], Power.from_watts(-500)),
119120
# Case 6: Nothing is working

tests/timeseries/test_producer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,9 @@ async def test_producer_fallback_formula(self, mocker: MockerFixture) -> None:
148148
([None, None], [None, None], [300.0], Power.from_watts(300.0)),
149149
([-200.0, None], [None, -100.0], [50.0], Power.from_watts(-250.0)),
150150
([-200.0, -200.0], [-10.0, -20.0], [50.0], Power.from_watts(-350.0)),
151-
# Case 8: Meter is unavailable but we already subscribed for inverter
152-
# So don't return None in this case. Just proper formula result.
153-
([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160.0)),
151+
# Case 8: Meter is unavailable, start fallback formula.
152+
([None, -200.0], [-10.0, -100.0], [50.0], None),
153+
([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160)),
154154

155155
]
156156
# fmt: on

0 commit comments

Comments
 (0)