Skip to content

Commit 29d1699

Browse files
Refactor fetching from fallback_fetcher
Also simplify failure log message Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 200a398 commit 29d1699

File tree

1 file changed

+27
-22
lines changed

1 file changed

+27
-22
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,19 @@ def stream_name(self) -> str:
417417
def _is_value_valid(self, value: QuantityT | None) -> bool:
418418
return not (value is None or value.isnan() or value.isinf())
419419

420+
async def _fetch_from_fallback(
421+
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]
422+
) -> Sample[QuantityT] | None:
423+
try:
424+
return await fallback_fetcher.receive()
425+
except ReceiverError[Any] as err:
426+
_logger.error(
427+
"Failed to fetch next value from fallback stream %s: %s",
428+
self._name,
429+
err,
430+
)
431+
return None
432+
420433
async def _synchronize_and_fetch_fallback(
421434
self,
422435
primary_fetcher_sample: Sample[QuantityT],
@@ -433,35 +446,27 @@ async def _synchronize_and_fetch_fallback(
433446
fetcher sample is older than the latest sample from the fallback
434447
fetcher or if the fallback fetcher fails to fetch the next value.
435448
"""
436-
# fallback_fetcher was not used, yet. We need to fetch first value.
449+
# We need to save value, because
450+
# primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp
451+
# In that case we should wait for our time window.
452+
if self._latest_fallback_sample is None:
453+
self._latest_fallback_sample = await self._fetch_from_fallback(
454+
fallback_fetcher
455+
)
456+
437457
if self._latest_fallback_sample is None:
438-
try:
439-
self._latest_fallback_sample = await fallback_fetcher.receive()
440-
except ReceiverError[Any] as err:
441-
_logger.error(
442-
"Fallback metric fetcher %s failed to fetch next value: %s."
443-
"Using primary metric fetcher.",
444-
fallback_fetcher.name,
445-
err,
446-
)
447-
return None
458+
return self._latest_fallback_sample
448459

449460
if primary_fetcher_sample.timestamp < self._latest_fallback_sample.timestamp:
450461
return None
451462

452463
# Synchronize the fallback fetcher with primary one
453464
while primary_fetcher_sample.timestamp > self._latest_fallback_sample.timestamp:
454-
try:
455-
self._latest_fallback_sample = await fallback_fetcher.receive()
456-
except ReceiverError[Any] as err:
457-
_logger.error(
458-
"Fallback metric fetcher %s failed to fetch next value: %s."
459-
"Using primary metric fetcher.",
460-
fallback_fetcher.name,
461-
err,
462-
)
463-
return None
464-
465+
self._latest_fallback_sample = await self._fetch_from_fallback(
466+
fallback_fetcher
467+
)
468+
if self._latest_fallback_sample is None:
469+
break
465470
return self._latest_fallback_sample
466471

467472
async def fetch_next_with_fallback(

0 commit comments

Comments
 (0)