Skip to content

Commit 2237e4e

Browse files
Change MetricFetcher fetch_next algorithm
This introduces 3 changes: 1. Add stop method to MetricFetcher. This closes receivers and stop fallback metric fetcher. 2. Catch ReceiverStoppedError during fetch_next operation. When MetricFetchers stop, we close the receiver. It raises ReceiverStoppedError. Previously it was uncatched and caused lots of exceptions when formula stopped. 3. Stop FallbackMetricFetcher and primary stream is good again. Now when we are able to stop metricFetcher without error, we can stop FallbackMetricFetcher. Additionally fetch_next method was simplified. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 200a398 commit 2237e4e

File tree

7 files changed

+124
-87
lines changed

7 files changed

+124
-87
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ async def stop(self) -> None:
117117
if self._task is None:
118118
return
119119
await cancel_and_await(self._task)
120+
120121
_, fetchers = self._builder.finalize()
121122
for fetcher in fetchers.values():
122-
fetcher.stream.close()
123+
await fetcher.stop()
123124

124125
@classmethod
125126
def from_receiver(

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,13 @@ def consume(self) -> Sample[QuantityT]:
7777
), f"Fallback metric fetcher: {self.name} was not started"
7878

7979
return self._receiver.consume()
80+
81+
async def stop(self) -> None:
82+
"""Stop fallback metric fetcher, by closing the connection."""
83+
if self._formula_engine is not None:
84+
await self._formula_engine.stop()
85+
self._formula_engine = None
86+
87+
if self._receiver is not None:
88+
self._receiver.close()
89+
self._receiver = None

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 102 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from abc import ABC, abstractmethod
1111
from typing import Any, Generic
1212

13-
from frequenz.channels import Receiver, ReceiverError
13+
from frequenz.channels import Receiver, ReceiverError, ReceiverStoppedError
1414

1515
from .._base_types import QuantityT, Sample
1616

@@ -368,6 +368,10 @@ def is_running(self) -> bool:
368368
def start(self) -> None:
369369
"""Initialize the metric fetcher and start fetching samples."""
370370

371+
@abstractmethod
372+
async def stop(self) -> None:
373+
"""Stope the fetcher if is running."""
374+
371375

372376
class MetricFetcher(Generic[QuantityT], FormulaStep):
373377
"""A formula step for fetching a value from a metric Receiver."""
@@ -396,6 +400,7 @@ def __init__(
396400
self._nones_are_zeros = nones_are_zeros
397401
self._fallback: FallbackMetricFetcher[QuantityT] | None = fallback
398402
self._latest_fallback_sample: Sample[QuantityT] | None = None
403+
self._is_stopped = False
399404

400405
@property
401406
def stream(self) -> Receiver[Sample[QuantityT]]:
@@ -406,6 +411,17 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
406411
"""
407412
return self._stream
408413

414+
async def stop(self) -> None:
415+
"""Stop metric fetcher.
416+
417+
If metric fetcher is stopped, it can't be started again.
418+
There is no use-case now to start it again.
419+
"""
420+
self._is_stopped = True
421+
self.stream.close()
422+
if self._fallback:
423+
await self._fallback.stop()
424+
409425
def stream_name(self) -> str:
410426
"""Return the name of the stream.
411427
@@ -417,86 +433,69 @@ def stream_name(self) -> str:
417433
def _is_value_valid(self, value: QuantityT | None) -> bool:
418434
return not (value is None or value.isnan() or value.isinf())
419435

436+
async def _fetch_from_fallback(
437+
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]
438+
) -> Sample[QuantityT] | None:
439+
try:
440+
return await fallback_fetcher.receive()
441+
except ReceiverStoppedError:
442+
if self._is_stopped:
443+
_logger.debug(
444+
"Stream for fallback metric fetcher %s closed.",
445+
fallback_fetcher.name,
446+
)
447+
else:
448+
_logger.error(
449+
"Failed to fetch next value from %s. Fallback stream closed.",
450+
self._name,
451+
)
452+
return None
453+
except ReceiverError[Any] as err:
454+
_logger.error(
455+
"Failed to fetch next value from fallback stream %s: %s",
456+
self._name,
457+
err,
458+
)
459+
return None
460+
420461
async def _synchronize_and_fetch_fallback(
421462
self,
422-
primary_fetcher_sample: Sample[QuantityT],
463+
primary_fetcher_value: Sample[QuantityT] | None,
423464
fallback_fetcher: FallbackMetricFetcher[QuantityT],
424465
) -> Sample[QuantityT] | None:
425466
"""Synchronize the fallback fetcher and return the fallback value.
426467
427468
Args:
428-
primary_fetcher_sample: The sample fetched from the primary fetcher.
469+
primary_fetcher_value: The sample fetched from the primary fetcher.
429470
fallback_fetcher: The fallback metric fetcher.
430471
431472
Returns:
432473
The value from the synchronized stream. Returns None if the primary
433474
fetcher sample is older than the latest sample from the fallback
434475
fetcher or if the fallback fetcher fails to fetch the next value.
435476
"""
436-
# fallback_fetcher was not used, yet. We need to fetch first value.
477+
# We need to save value, because
478+
# primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp
479+
# In that case we should wait for our time window.
437480
if self._latest_fallback_sample is None:
438-
try:
439-
self._latest_fallback_sample = await fallback_fetcher.receive()
440-
except ReceiverError[Any] as err:
441-
_logger.error(
442-
"Fallback metric fetcher %s failed to fetch next value: %s."
443-
"Using primary metric fetcher.",
444-
fallback_fetcher.name,
445-
err,
446-
)
447-
return None
481+
self._latest_fallback_sample = await self._fetch_from_fallback(
482+
fallback_fetcher
483+
)
448484

449-
if primary_fetcher_sample.timestamp < self._latest_fallback_sample.timestamp:
485+
if primary_fetcher_value is None or self._latest_fallback_sample is None:
486+
return self._latest_fallback_sample
487+
488+
if primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp:
450489
return None
451490

452491
# Synchronize the fallback fetcher with primary one
453-
while primary_fetcher_sample.timestamp > self._latest_fallback_sample.timestamp:
454-
try:
455-
self._latest_fallback_sample = await fallback_fetcher.receive()
456-
except ReceiverError[Any] as err:
457-
_logger.error(
458-
"Fallback metric fetcher %s failed to fetch next value: %s."
459-
"Using primary metric fetcher.",
460-
fallback_fetcher.name,
461-
err,
462-
)
463-
return None
464-
465-
return self._latest_fallback_sample
466-
467-
async def fetch_next_with_fallback(
468-
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]
469-
) -> Sample[QuantityT]:
470-
"""Fetch the next value from the primary and fallback streams.
471-
472-
Return the value from the stream that returns a valid value.
473-
If any stream raises an exception, then return the value from
474-
the other stream.
475-
476-
Args:
477-
fallback_fetcher: The fallback metric fetcher.
478-
479-
Returns:
480-
The value fetched from either the primary or fallback stream.
481-
"""
482-
try:
483-
primary = await self._stream.receive()
484-
except ReceiverError[Any] as err:
485-
_logger.error(
486-
"Primary metric fetcher %s failed to fetch next value: %s."
487-
"Using fallback metric fetcher.",
488-
self._name,
489-
err,
492+
while primary_fetcher_value.timestamp > self._latest_fallback_sample.timestamp:
493+
self._latest_fallback_sample = await self._fetch_from_fallback(
494+
fallback_fetcher
490495
)
491-
return await fallback_fetcher.receive()
492-
493-
fallback = await self._synchronize_and_fetch_fallback(primary, fallback_fetcher)
494-
if fallback is None:
495-
return primary
496-
497-
if self._is_value_valid(primary.value):
498-
return primary
499-
return fallback
496+
if self._latest_fallback_sample is None:
497+
break
498+
return self._latest_fallback_sample
500499

501500
async def fetch_next(self) -> Sample[QuantityT] | None:
502501
"""Fetch the next value from the stream.
@@ -506,34 +505,57 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
506505
Returns:
507506
The fetched Sample.
508507
"""
508+
if self._is_stopped:
509+
_logger.error(
510+
"Metric fetcher %s stopped. Can't fetch new value.", self._name
511+
)
512+
return None
513+
509514
self._next_value = await self._fetch_next()
510515
return self._next_value
511516

512517
async def _fetch_next(self) -> Sample[QuantityT] | None:
513-
if self._fallback is None:
514-
return await self._stream.receive()
515-
516-
if self._fallback.is_running:
517-
return await self.fetch_next_with_fallback(self._fallback)
518-
519-
next_value = None
518+
# First fetch from primary stream
519+
primary_value: Sample[QuantityT] | None = None
520520
try:
521-
next_value = await self._stream.receive()
521+
primary_value = await self._stream.receive()
522+
except ReceiverStoppedError:
523+
if self._is_stopped:
524+
_logger.debug("Stream for metric fetcher %s closed.", self._name)
525+
return None
526+
_logger.error(
527+
"Failed to fetch next value from %s. Primary stream closed.",
528+
self._name,
529+
)
522530
except ReceiverError[Any] as err:
523531
_logger.error("Failed to fetch next value from %s: %s", self._name, err)
524-
else:
525-
if self._is_value_valid(next_value.value):
526-
return next_value
527532

528-
_logger.warning(
529-
"Primary metric %s is invalid. Running fallback metric fetcher: %s",
530-
self._name,
531-
self._fallback.name,
533+
# We have no fallback, so we just return primary value even if it is not correct.
534+
if self._fallback is None:
535+
return primary_value
536+
537+
is_primary_value_valid = primary_value is not None and self._is_value_valid(
538+
primary_value.value
532539
)
533-
# start fallback formula but don't wait for it because it has to
534-
# synchronize. Just return invalid value.
535-
self._fallback.start()
536-
return next_value
540+
541+
if is_primary_value_valid:
542+
# Primary stream is good again, so we can stop fallback and return primary_value.
543+
if self._fallback.is_running:
544+
await self._fallback.stop()
545+
return primary_value
546+
547+
if not self._fallback.is_running:
548+
_logger.warning(
549+
"Primary metric %s is invalid. Running fallback metric fetcher: %s",
550+
self._name,
551+
self._fallback.name,
552+
)
553+
# We started fallback, but it has to subscribe.
554+
# We will receive fallback values since the next time window.
555+
self._fallback.start()
556+
return primary_value
557+
558+
return await self._synchronize_and_fetch_fallback(primary_value, self._fallback)
537559

538560
@property
539561
def value(self) -> Sample[QuantityT] | None:

tests/microgrid/test_grid.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,8 @@ async def test_grid_fallback_formula_without_grid_meter(mocker: MockerFixture) -
480480
([2000, 1000], [-200, 1000, None], Power.from_watts(3000)),
481481
# battery start working
482482
([2000, 10], [-200, 1000, 100], Power.from_watts(2110)),
483+
# No primary value, start fallback formula
484+
([2000, None], [-200, 1000, 100], None),
483485
([2000, None], [-200, 1000, 100], Power.from_watts(2900)),
484486
]
485487
# fmt: on

tests/timeseries/_battery_pool/test_battery_pool.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -642,8 +642,9 @@ async def test_battery_power_fallback_formula(
642642
([-1.0, None], [100.0, 100.0, None], Power.from_watts(200.0)),
643643
# Case 4: bat_inv_meter is available, ignore fallback inverters
644644
([-1.0, 10], [100.0, 100.0, None], Power.from_watts(10.0)),
645-
# Case 4: all components are unavailable (None). Return 0 according to the
646-
# "nones-are-zero" rule.
645+
# Case 4: all components are unavailable (None). Start fallback formula.
646+
# Returned power = 0 according to the "nones-are-zero" rule.
647+
([-1.0, None], [None, None, None], None),
647648
([-1.0, None], [None, None, None], Power.from_watts(0.0)),
648649
# Case 5: Components becomes available
649650
([-1.0, None], [None, None, 100.0], Power.from_watts(100.0)),

tests/timeseries/test_consumer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ async def test_consumer_power_fallback_formula_with_grid_meter(
113113
([None, 100, -50], [100], [-200, -300], None),
114114
([None, 200, -50], [100], [-200, -300], None),
115115
([100, 100, -50], [100], [-200, -300], Power.from_watts(350)),
116-
# Case 5: Only grid meter is working
116+
# Case 5: Only grid meter is working, subscribe for fallback formula
117+
([100, None, None], [None], [None, None], None),
117118
([100, None, None], [None], [None, None], Power.from_watts(100)),
118119
([-500, None, None], [None], [None, None], Power.from_watts(-500)),
119120
# Case 6: Nothing is working

tests/timeseries/test_producer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,9 @@ async def test_producer_fallback_formula(self, mocker: MockerFixture) -> None:
148148
([None, None], [None, None], [300.0], Power.from_watts(300.0)),
149149
([-200.0, None], [None, -100.0], [50.0], Power.from_watts(-250.0)),
150150
([-200.0, -200.0], [-10.0, -20.0], [50.0], Power.from_watts(-350.0)),
151-
# Case 8: Meter is unavailable but we already subscribed for inverter
152-
# So don't return None in this case. Just proper formula result.
153-
([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160.0)),
151+
# Case 8: Meter is unavailable, start fallback formula.
152+
([None, -200.0], [-10.0, -100.0], [50.0], None),
153+
([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160)),
154154

155155
]
156156
# fmt: on

0 commit comments

Comments
 (0)