88import logging
99import math
1010from abc import ABC , abstractmethod
11- from typing import Any , Generic
11+ from typing import Generic
1212
13- from frequenz .channels import Receiver , ReceiverError
13+ from frequenz .channels import Receiver , ReceiverError , ReceiverStoppedError
1414
1515from .._base_types import QuantityT , Sample
1616
@@ -368,6 +368,10 @@ def is_running(self) -> bool:
368368 def start (self ) -> None :
369369 """Initialize the metric fetcher and start fetching samples."""
370370
371+ @abstractmethod
372+ async def stop (self ) -> None :
373+ """Stope the fetcher if is running."""
374+
371375
372376class MetricFetcher (Generic [QuantityT ], FormulaStep ):
373377 """A formula step for fetching a value from a metric Receiver."""
@@ -396,6 +400,7 @@ def __init__(
396400 self ._nones_are_zeros = nones_are_zeros
397401 self ._fallback : FallbackMetricFetcher [QuantityT ] | None = fallback
398402 self ._latest_fallback_sample : Sample [QuantityT ] | None = None
403+ self ._is_stopped = False
399404
400405 @property
401406 def stream (self ) -> Receiver [Sample [QuantityT ]]:
@@ -406,6 +411,17 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
406411 """
407412 return self ._stream
408413
414+ async def stop (self ) -> None :
415+ """Stop metric fetcher.
416+
417+ If metric fetcher is stopped, it can't be started again.
418+ There is no use-case now to start it again.
419+ """
420+ self ._is_stopped = True
421+ self .stream .close ()
422+ if self ._fallback :
423+ await self ._fallback .stop ()
424+
409425 def stream_name (self ) -> str :
410426 """Return the name of the stream.
411427
@@ -417,86 +433,69 @@ def stream_name(self) -> str:
417433 def _is_value_valid (self , value : QuantityT | None ) -> bool :
418434 return not (value is None or value .isnan () or value .isinf ())
419435
436+ async def _fetch_from_fallback (
437+ self , fallback_fetcher : FallbackMetricFetcher [QuantityT ]
438+ ) -> Sample [QuantityT ] | None :
439+ try :
440+ return await fallback_fetcher .receive ()
441+ except ReceiverStoppedError :
442+ if self ._is_stopped :
443+ _logger .debug (
444+ "Stream for fallback metric fetcher %s closed." ,
445+ fallback_fetcher .name ,
446+ )
447+ else :
448+ _logger .error (
449+ "Failed to fetch next value from %s. Fallback stream closed." ,
450+ self ._name ,
451+ )
452+ return None
453+ except ReceiverError as err :
454+ _logger .error (
455+ "Failed to fetch next value from fallback stream %s: %s" ,
456+ self ._name ,
457+ err ,
458+ )
459+ return None
460+
420461 async def _synchronize_and_fetch_fallback (
421462 self ,
422- primary_fetcher_sample : Sample [QuantityT ],
463+ primary_fetcher_value : Sample [QuantityT ] | None ,
423464 fallback_fetcher : FallbackMetricFetcher [QuantityT ],
424465 ) -> Sample [QuantityT ] | None :
425466 """Synchronize the fallback fetcher and return the fallback value.
426467
427468 Args:
428- primary_fetcher_sample : The sample fetched from the primary fetcher.
469+ primary_fetcher_value : The sample fetched from the primary fetcher.
429470 fallback_fetcher: The fallback metric fetcher.
430471
431472 Returns:
432473 The value from the synchronized stream. Returns None if the primary
433474 fetcher sample is older than the latest sample from the fallback
434475 fetcher or if the fallback fetcher fails to fetch the next value.
435476 """
436- # fallback_fetcher was not used, yet. We need to fetch first value.
477+ # We need to save value, because
478+ # primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp
479+ # In that case we should wait for our time window.
437480 if self ._latest_fallback_sample is None :
438- try :
439- self ._latest_fallback_sample = await fallback_fetcher .receive ()
440- except ReceiverError [Any ] as err :
441- _logger .error (
442- "Fallback metric fetcher %s failed to fetch next value: %s."
443- "Using primary metric fetcher." ,
444- fallback_fetcher .name ,
445- err ,
446- )
447- return None
481+ self ._latest_fallback_sample = await self ._fetch_from_fallback (
482+ fallback_fetcher
483+ )
484+
485+ if primary_fetcher_value is None or self ._latest_fallback_sample is None :
486+ return self ._latest_fallback_sample
448487
449- if primary_fetcher_sample .timestamp < self ._latest_fallback_sample .timestamp :
488+ if primary_fetcher_value .timestamp < self ._latest_fallback_sample .timestamp :
450489 return None
451490
452491 # Synchronize the fallback fetcher with primary one
453- while primary_fetcher_sample .timestamp > self ._latest_fallback_sample .timestamp :
454- try :
455- self ._latest_fallback_sample = await fallback_fetcher .receive ()
456- except ReceiverError [Any ] as err :
457- _logger .error (
458- "Fallback metric fetcher %s failed to fetch next value: %s."
459- "Using primary metric fetcher." ,
460- fallback_fetcher .name ,
461- err ,
462- )
463- return None
464-
465- return self ._latest_fallback_sample
466-
467- async def fetch_next_with_fallback (
468- self , fallback_fetcher : FallbackMetricFetcher [QuantityT ]
469- ) -> Sample [QuantityT ]:
470- """Fetch the next value from the primary and fallback streams.
471-
472- Return the value from the stream that returns a valid value.
473- If any stream raises an exception, then return the value from
474- the other stream.
475-
476- Args:
477- fallback_fetcher: The fallback metric fetcher.
478-
479- Returns:
480- The value fetched from either the primary or fallback stream.
481- """
482- try :
483- primary = await self ._stream .receive ()
484- except ReceiverError [Any ] as err :
485- _logger .error (
486- "Primary metric fetcher %s failed to fetch next value: %s."
487- "Using fallback metric fetcher." ,
488- self ._name ,
489- err ,
492+ while primary_fetcher_value .timestamp > self ._latest_fallback_sample .timestamp :
493+ self ._latest_fallback_sample = await self ._fetch_from_fallback (
494+ fallback_fetcher
490495 )
491- return await fallback_fetcher .receive ()
492-
493- fallback = await self ._synchronize_and_fetch_fallback (primary , fallback_fetcher )
494- if fallback is None :
495- return primary
496-
497- if self ._is_value_valid (primary .value ):
498- return primary
499- return fallback
496+ if self ._latest_fallback_sample is None :
497+ break
498+ return self ._latest_fallback_sample
500499
501500 async def fetch_next (self ) -> Sample [QuantityT ] | None :
502501 """Fetch the next value from the stream.
@@ -506,34 +505,62 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
506505 Returns:
507506 The fetched Sample.
508507 """
508+ if self ._is_stopped :
509+ _logger .error (
510+ "Metric fetcher %s stopped. Can't fetch new value." , self ._name
511+ )
512+ return None
513+
509514 self ._next_value = await self ._fetch_next ()
510515 return self ._next_value
511516
512517 async def _fetch_next (self ) -> Sample [QuantityT ] | None :
513- if self ._fallback is None :
514- return await self ._stream .receive ()
515-
516- if self ._fallback .is_running :
517- return await self .fetch_next_with_fallback (self ._fallback )
518-
519- next_value = None
518+ # First fetch from primary stream
519+ primary_value : Sample [QuantityT ] | None = None
520520 try :
521- next_value = await self ._stream .receive ()
522- except ReceiverError [Any ] as err :
521+ primary_value = await self ._stream .receive ()
522+ except ReceiverStoppedError :
523+ if self ._is_stopped :
524+ _logger .debug ("Stream for metric fetcher %s closed." , self ._name )
525+ return None
526+ _logger .error (
527+ "Failed to fetch next value from %s. Primary stream closed." ,
528+ self ._name ,
529+ )
530+ except ReceiverError as err :
523531 _logger .error ("Failed to fetch next value from %s: %s" , self ._name , err )
524- else :
525- if self ._is_value_valid (next_value .value ):
526- return next_value
527532
528- _logger .warning (
529- "Primary metric %s is invalid. Running fallback metric fetcher: %s" ,
530- self ._name ,
531- self ._fallback .name ,
533+ # We have no fallback, so we just return primary value even if it is not correct.
534+ if self ._fallback is None :
535+ return primary_value
536+
537+ is_primary_value_valid = primary_value is not None and self ._is_value_valid (
538+ primary_value .value
532539 )
533- # start fallback formula but don't wait for it because it has to
534- # synchronize. Just return invalid value.
535- self ._fallback .start ()
536- return next_value
540+
541+ if is_primary_value_valid :
542+ # Primary stream is good again, so we can stop fallback and return primary_value.
543+ if self ._fallback .is_running :
544+ _logger .info (
545+ "Primary metric %s is good again, stopping fallback metric fetcher %s" ,
546+ self ._name ,
547+ self ._fallback .name ,
548+ )
549+ await self ._fallback .stop ()
550+ return primary_value
551+
552+ if not self ._fallback .is_running :
553+ _logger .warning (
554+ "Primary metric %s is invalid. Running fallback metric fetcher: %s" ,
555+ self ._name ,
556+ self ._fallback .name ,
557+ )
558+ # We started fallback, but it has to subscribe.
559+ # We will receive fallback values since the next time window.
560+ self ._fallback .start ()
561+ return primary_value
562+
563+ return await self ._synchronize_and_fetch_fallback (primary_value , self ._fallback )
537564
538565 @property
539566 def value (self ) -> Sample [QuantityT ] | None :
0 commit comments