Skip to content

Commit 5c0eed3

Browse files
Refactor _fetch_next to prioritize the primary stream
When the primary stream has invalid data, then look at the fallback stream. This approach doesn't assume that once a fallback is started, it is always running. In a subsequent commit, we will implement stopping of fallback fetchers when the primary has recovered. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 254095c commit 5c0eed3

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -515,30 +515,36 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
515515
return self._next_value
516516

517517
async def _fetch_next(self) -> Sample[QuantityT] | None:
518-
if self._fallback is None:
519-
return await self._stream.receive()
520-
521-
if self._fallback.is_running:
522-
return await self.fetch_next_with_fallback(self._fallback)
523-
518+
# First fetch from primary stream
524519
next_value = None
525520
try:
526521
next_value = await self._stream.receive()
527522
except ReceiverError[Any] as err:
528523
_logger.error("Failed to fetch next value from %s: %s", self._name, err)
529-
else:
530-
if self._is_value_valid(next_value.value):
531-
return next_value
532524

533-
_logger.warning(
534-
"Primary metric %s is invalid. Running fallback metric fetcher: %s",
535-
self._name,
536-
self._fallback.name,
525+
# We have no fallback, so we just return primary value even if it is not correct.
526+
if self._fallback is None:
527+
return next_value
528+
529+
is_primary_value_valid = next_value is not None and self._is_value_valid(
530+
next_value.value
537531
)
538-
# start fallback formula but don't wait for it because it has to
539-
# synchronize. Just return invalid value.
540-
self._fallback.start()
541-
return next_value
532+
533+
if is_primary_value_valid:
534+
return next_value
535+
536+
if not self._fallback.is_running:
537+
_logger.warning(
538+
"Primary metric %s is invalid. Running fallback metric fetcher: %s",
539+
self._name,
540+
self._fallback.name,
541+
)
542+
# We started fallback, but it has to subscribe.
543+
# We will receive fallback values since the next time window.
544+
self._fallback.start()
545+
return next_value
546+
547+
return await self._synchronize_and_fetch_fallback(next_value, self._fallback)
542548

543549
@property
544550
def value(self) -> Sample[QuantityT] | None:

0 commit comments

Comments
 (0)