1010from abc import ABC , abstractmethod
1111from typing import Any , Generic
1212
13- from frequenz .channels import Receiver , ReceiverError
13+ from frequenz .channels import Receiver , ReceiverError , ReceiverStoppedError
1414
1515from .._base_types import QuantityT , Sample
1616
@@ -400,6 +400,7 @@ def __init__(
400400 self ._nones_are_zeros = nones_are_zeros
401401 self ._fallback : FallbackMetricFetcher [QuantityT ] | None = fallback
402402 self ._latest_fallback_sample : Sample [QuantityT ] | None = None
403+ self ._is_stopped = False
403404
404405 @property
405406 def stream (self ) -> Receiver [Sample [QuantityT ]]:
@@ -411,7 +412,12 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
411412 return self ._stream
412413
413414 async def stop (self ) -> None :
414- """Stop metric fetcher."""
415+ """Stop metric fetcher.
416+
417+ If metric fetcher is stopped, it can't be started again.
418+ There is no use-case now to start it again.
419+ """
420+ self ._is_stopped = True
415421 self .stream .close ()
416422 if self ._fallback :
417423 await self ._fallback .stop ()
@@ -427,86 +433,69 @@ def stream_name(self) -> str:
427433 def _is_value_valid (self , value : QuantityT | None ) -> bool :
428434 return not (value is None or value .isnan () or value .isinf ())
429435
436+ async def _fetch_from_fallback (
437+ self , fallback_fetcher : FallbackMetricFetcher [QuantityT ]
438+ ) -> Sample [QuantityT ] | None :
439+ try :
440+ return await fallback_fetcher .receive ()
441+ except ReceiverStoppedError :
442+ if self ._is_stopped :
443+ _logger .debug (
444+ "Stream for fallback metric fetcher %s closed." ,
445+ fallback_fetcher .name ,
446+ )
447+ else :
448+ _logger .error (
449+ "Failed to fetch next value from %s. Fallback stream closed." ,
450+ self ._name ,
451+ )
452+ return None
453+ except ReceiverError [Any ] as err :
454+ _logger .error (
455+ "Failed to fetch next value from fallback stream %s: %s" ,
456+ self ._name ,
457+ err ,
458+ )
459+ return None
460+
430461 async def _synchronize_and_fetch_fallback (
431462 self ,
432- primary_fetcher_sample : Sample [QuantityT ],
463+ primary_fetcher_value : Sample [QuantityT ] | None ,
433464 fallback_fetcher : FallbackMetricFetcher [QuantityT ],
434465 ) -> Sample [QuantityT ] | None :
435466 """Synchronize the fallback fetcher and return the fallback value.
436467
437468 Args:
438- primary_fetcher_sample : The sample fetched from the primary fetcher.
469+ primary_fetcher_value : The sample fetched from the primary fetcher.
439470 fallback_fetcher: The fallback metric fetcher.
440471
441472 Returns:
442473 The value from the synchronized stream. Returns None if the primary
443474 fetcher sample is older than the latest sample from the fallback
444475 fetcher or if the fallback fetcher fails to fetch the next value.
445476 """
446- # fallback_fetcher was not used, yet. We need to fetch first value.
477+ # We need to save value, because
478+ # primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp
479+ # In that case we should wait for our time window.
447480 if self ._latest_fallback_sample is None :
448- try :
449- self ._latest_fallback_sample = await fallback_fetcher .receive ()
450- except ReceiverError [Any ] as err :
451- _logger .error (
452- "Fallback metric fetcher %s failed to fetch next value: %s."
453- "Using primary metric fetcher." ,
454- fallback_fetcher .name ,
455- err ,
456- )
457- return None
481+ self ._latest_fallback_sample = await self ._fetch_from_fallback (
482+ fallback_fetcher
483+ )
458484
459- if primary_fetcher_sample .timestamp < self ._latest_fallback_sample .timestamp :
485+ if primary_fetcher_value is None or self ._latest_fallback_sample is None :
486+ return self ._latest_fallback_sample
487+
488+ if primary_fetcher_value .timestamp < self ._latest_fallback_sample .timestamp :
460489 return None
461490
462491 # Synchronize the fallback fetcher with primary one
463- while primary_fetcher_sample .timestamp > self ._latest_fallback_sample .timestamp :
464- try :
465- self ._latest_fallback_sample = await fallback_fetcher .receive ()
466- except ReceiverError [Any ] as err :
467- _logger .error (
468- "Fallback metric fetcher %s failed to fetch next value: %s."
469- "Using primary metric fetcher." ,
470- fallback_fetcher .name ,
471- err ,
472- )
473- return None
474-
475- return self ._latest_fallback_sample
476-
477- async def fetch_next_with_fallback (
478- self , fallback_fetcher : FallbackMetricFetcher [QuantityT ]
479- ) -> Sample [QuantityT ]:
480- """Fetch the next value from the primary and fallback streams.
481-
482- Return the value from the stream that returns a valid value.
483- If any stream raises an exception, then return the value from
484- the other stream.
485-
486- Args:
487- fallback_fetcher: The fallback metric fetcher.
488-
489- Returns:
490- The value fetched from either the primary or fallback stream.
491- """
492- try :
493- primary = await self ._stream .receive ()
494- except ReceiverError [Any ] as err :
495- _logger .error (
496- "Primary metric fetcher %s failed to fetch next value: %s."
497- "Using fallback metric fetcher." ,
498- self ._name ,
499- err ,
492+ while primary_fetcher_value .timestamp > self ._latest_fallback_sample .timestamp :
493+ self ._latest_fallback_sample = await self ._fetch_from_fallback (
494+ fallback_fetcher
500495 )
501- return await fallback_fetcher .receive ()
502-
503- fallback = await self ._synchronize_and_fetch_fallback (primary , fallback_fetcher )
504- if fallback is None :
505- return primary
506-
507- if self ._is_value_valid (primary .value ):
508- return primary
509- return fallback
496+ if self ._latest_fallback_sample is None :
497+ break
498+ return self ._latest_fallback_sample
510499
511500 async def fetch_next (self ) -> Sample [QuantityT ] | None :
512501 """Fetch the next value from the stream.
@@ -516,34 +505,57 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
516505 Returns:
517506 The fetched Sample.
518507 """
508+ if self ._is_stopped :
509+ _logger .error (
510+ "Metric fetcher %s stopped. Can't fetch new value." , self ._name
511+ )
512+ return None
513+
519514 self ._next_value = await self ._fetch_next ()
520515 return self ._next_value
521516
522517 async def _fetch_next (self ) -> Sample [QuantityT ] | None :
523- if self ._fallback is None :
524- return await self ._stream .receive ()
525-
526- if self ._fallback .is_running :
527- return await self .fetch_next_with_fallback (self ._fallback )
528-
529- next_value = None
518+ # First fetch from primary stream
519+ primary_value : Sample [QuantityT ] | None = None
530520 try :
531- next_value = await self ._stream .receive ()
521+ primary_value = await self ._stream .receive ()
522+ except ReceiverStoppedError :
523+ if self ._is_stopped :
524+ _logger .debug ("Stream for metric fetcher %s closed." , self ._name )
525+ return None
526+ _logger .error (
527+ "Failed to fetch next value from %s. Primary stream closed." ,
528+ self ._name ,
529+ )
532530 except ReceiverError [Any ] as err :
533531 _logger .error ("Failed to fetch next value from %s: %s" , self ._name , err )
534- else :
535- if self ._is_value_valid (next_value .value ):
536- return next_value
537532
538- _logger .warning (
539- "Primary metric %s is invalid. Running fallback metric fetcher: %s" ,
540- self ._name ,
541- self ._fallback .name ,
533+ # We have no fallback, so we just return primary value even if it is not correct.
534+ if self ._fallback is None :
535+ return primary_value
536+
537+ is_primary_value_valid = primary_value is not None and self ._is_value_valid (
538+ primary_value .value
542539 )
543- # start fallback formula but don't wait for it because it has to
544- # synchronize. Just return invalid value.
545- self ._fallback .start ()
546- return next_value
540+
541+ if is_primary_value_valid :
542+ # Primary stream is good again, so we can stop fallback and return primary_value.
543+ if self ._fallback .is_running :
544+ await self ._fallback .stop ()
545+ return primary_value
546+
547+ if not self ._fallback .is_running :
548+ _logger .warning (
549+ "Primary metric %s is invalid. Running fallback metric fetcher: %s" ,
550+ self ._name ,
551+ self ._fallback .name ,
552+ )
553+ # We started fallback, but it has to subscribe.
554+ # We will receive fallback values since the next time window.
555+ self ._fallback .start ()
556+ return primary_value
557+
558+ return await self ._synchronize_and_fetch_fallback (primary_value , self ._fallback )
547559
548560 @property
549561 def value (self ) -> Sample [QuantityT ] | None :
0 commit comments