Skip to content

Commit 1ba3043

Browse files
Use a flag to identify if a ReceiverStoppedError is expected
If it happens because the fetcher is being stopped, then only log a DEBUG message. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 7f3dcb8 commit 1ba3043

File tree

1 file changed

+29
-1
lines changed

1 file changed

+29
-1
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from abc import ABC, abstractmethod
1111
from typing import Any, Generic
1212

13-
from frequenz.channels import Receiver, ReceiverError
13+
from frequenz.channels import Receiver, ReceiverError, ReceiverStoppedError
1414

1515
from .._base_types import QuantityT, Sample
1616

@@ -400,6 +400,7 @@ def __init__(
400400
self._nones_are_zeros = nones_are_zeros
401401
self._fallback: FallbackMetricFetcher[QuantityT] | None = fallback
402402
self._latest_fallback_sample: Sample[QuantityT] | None = None
403+
self._is_stopped = False
403404

404405
@property
405406
def stream(self) -> Receiver[Sample[QuantityT]]:
@@ -416,6 +417,7 @@ async def stop(self) -> None:
416417
If metric fetcher is stopped, it can't be started again.
417418
There is no use-case now to start it again.
418419
"""
420+
self._is_stopped = True
419421
self.stream.close()
420422
if self._fallback:
421423
await self._fallback.stop()
@@ -436,6 +438,18 @@ async def _fetch_from_fallback(
436438
) -> Sample[QuantityT] | None:
437439
try:
438440
return await fallback_fetcher.receive()
441+
except ReceiverStoppedError:
442+
if self._is_stopped:
443+
_logger.debug(
444+
"Stream for fallback metric fetcher %s closed.",
445+
fallback_fetcher.name,
446+
)
447+
else:
448+
_logger.error(
449+
"Failed to fetch next value from %s. Fallback stream closed.",
450+
self._name,
451+
)
452+
return None
439453
except ReceiverError[Any] as err:
440454
_logger.error(
441455
"Failed to fetch next value from fallback stream %s: %s",
@@ -491,6 +505,12 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
491505
Returns:
492506
The fetched Sample.
493507
"""
508+
if self._is_stopped:
509+
_logger.error(
510+
"Metric fetcher %s stopped. Can't fetch new value.", self._name
511+
)
512+
return None
513+
494514
self._next_value = await self._fetch_next()
495515
return self._next_value
496516

@@ -499,6 +519,14 @@ async def _fetch_next(self) -> Sample[QuantityT] | None:
499519
primary_value: Sample[QuantityT] | None = None
500520
try:
501521
primary_value = await self._stream.receive()
522+
except ReceiverStoppedError:
523+
if self._is_stopped:
524+
_logger.debug("Stream for metric fetcher %s closed.", self._name)
525+
return None
526+
_logger.error(
527+
"Failed to fetch next value from %s. Primary stream closed.",
528+
self._name,
529+
)
502530
except ReceiverError[Any] as err:
503531
_logger.error("Failed to fetch next value from %s: %s", self._name, err)
504532

0 commit comments

Comments
 (0)