Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

- The `MovingWindow` now has an async `wait_for_samples` method that waits for a given number of samples to become available in the moving window and then returns.

- Add stop method to the FormulaEngine. Now it is possible to stop custom formulas.

- Stop fallback formulas when primary formula starts working again.

## Bug Fixes

- Fixed a bug that was preventing power proposals to go through if there once existed some proposals with overlapping component IDs, even if the old proposals have expired.

- Fixed a bug that was causing formulas to fallback to CHPs, when the CHP meters didn't have data. CHPs are not supported in the data sourcing actor and in the client, so we can't fallback to CHPs.

- Fixed bug with formulas raising exception when stopped.
11 changes: 6 additions & 5 deletions src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,15 @@ def __init__(
self._channel: Broadcast[Sample[QuantityT]] = Broadcast(name=self._name)
self._task: asyncio.Task[None] | None = None

async def _stop(self) -> None:
async def stop(self) -> None:
"""Stop a running formula engine."""
if self._task is None:
return
await cancel_and_await(self._task)

_, fetchers = self._builder.finalize()
for fetcher in fetchers.values():
fetcher.stream.close()
await fetcher.stop()

@classmethod
def from_receiver(
Expand Down Expand Up @@ -313,7 +314,7 @@ async def _run(self) -> None:
try:
msg = await evaluator.apply()
except asyncio.CancelledError:
_logger.exception("FormulaEngine task cancelled: %s", self._name)
_logger.debug("FormulaEngine task cancelled: %s", self._name)
raise
except Exception as err: # pylint: disable=broad-except
_logger.warning(
Expand Down Expand Up @@ -428,7 +429,7 @@ def __init__(
FormulaEngine[QuantityT],
] = phase_streams

async def _stop(self) -> None:
async def stop(self) -> None:
"""Stop a running formula engine."""
if self._task is None:
return
Expand Down Expand Up @@ -582,7 +583,7 @@ async def _run(self) -> None:
phase_3.value,
)
except asyncio.CancelledError:
_logger.exception("FormulaEngine task cancelled: %s", self._name)
_logger.debug("FormulaEngine task cancelled: %s", self._name)
break
else:
await sender.send(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ def from_3_phase_current_formula_generator(
async def stop(self) -> None:
"""Stop all formula engines in the pool."""
for string_engine in self._string_engines.values():
await string_engine._stop() # pylint: disable=protected-access
await string_engine.stop()
for power_engine in self._power_engines.values():
await power_engine._stop() # pylint: disable=protected-access
await power_engine.stop()
for power_3_phase_engine in self._power_3_phase_engines.values():
await power_3_phase_engine._stop() # pylint: disable=protected-access
await power_3_phase_engine.stop()
for current_engine in self._current_engines.values():
await current_engine._stop() # pylint: disable=protected-access
await current_engine.stop()
for reactive_power_engine in self._reactive_power_engines.values():
await reactive_power_engine._stop() # pylint: disable=protected-access
await reactive_power_engine.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ def consume(self) -> Sample[QuantityT]:
), f"Fallback metric fetcher: {self.name} was not started"

return self._receiver.consume()

async def stop(self) -> None:
"""Stop fallback metric fetcher, by closing the connection."""
if self._formula_engine is not None:
await self._formula_engine.stop()
self._formula_engine = None

if self._receiver is not None:
self._receiver.close()
self._receiver = None
191 changes: 109 additions & 82 deletions src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import logging
import math
from abc import ABC, abstractmethod
from typing import Any, Generic
from typing import Generic

from frequenz.channels import Receiver, ReceiverError
from frequenz.channels import Receiver, ReceiverError, ReceiverStoppedError

from .._base_types import QuantityT, Sample

Expand Down Expand Up @@ -368,6 +368,10 @@ def is_running(self) -> bool:
def start(self) -> None:
"""Initialize the metric fetcher and start fetching samples."""

@abstractmethod
async def stop(self) -> None:
"""Stope the fetcher if is running."""


class MetricFetcher(Generic[QuantityT], FormulaStep):
"""A formula step for fetching a value from a metric Receiver."""
Expand Down Expand Up @@ -396,6 +400,7 @@ def __init__(
self._nones_are_zeros = nones_are_zeros
self._fallback: FallbackMetricFetcher[QuantityT] | None = fallback
self._latest_fallback_sample: Sample[QuantityT] | None = None
self._is_stopped = False

@property
def stream(self) -> Receiver[Sample[QuantityT]]:
Expand All @@ -406,6 +411,17 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
"""
return self._stream

async def stop(self) -> None:
"""Stop metric fetcher.

If metric fetcher is stopped, it can't be started again.
There is no use-case now to start it again.
"""
self._is_stopped = True
self.stream.close()
if self._fallback:
await self._fallback.stop()

def stream_name(self) -> str:
"""Return the name of the stream.

Expand All @@ -417,86 +433,69 @@ def stream_name(self) -> str:
def _is_value_valid(self, value: QuantityT | None) -> bool:
return not (value is None or value.isnan() or value.isinf())

async def _fetch_from_fallback(
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]
) -> Sample[QuantityT] | None:
try:
return await fallback_fetcher.receive()
except ReceiverStoppedError:
if self._is_stopped:
_logger.debug(
"Stream for fallback metric fetcher %s closed.",
fallback_fetcher.name,
)
else:
_logger.error(
"Failed to fetch next value from %s. Fallback stream closed.",
self._name,
)
return None
except ReceiverError as err:
_logger.error(
"Failed to fetch next value from fallback stream %s: %s",
self._name,
err,
)
return None

async def _synchronize_and_fetch_fallback(
self,
primary_fetcher_sample: Sample[QuantityT],
primary_fetcher_value: Sample[QuantityT] | None,
fallback_fetcher: FallbackMetricFetcher[QuantityT],
) -> Sample[QuantityT] | None:
"""Synchronize the fallback fetcher and return the fallback value.

Args:
primary_fetcher_sample: The sample fetched from the primary fetcher.
primary_fetcher_value: The sample fetched from the primary fetcher.
fallback_fetcher: The fallback metric fetcher.

Returns:
The value from the synchronized stream. Returns None if the primary
fetcher sample is older than the latest sample from the fallback
fetcher or if the fallback fetcher fails to fetch the next value.
"""
# fallback_fetcher was not used, yet. We need to fetch first value.
# We need to save value, because
# primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp
# In that case we should wait for our time window.
if self._latest_fallback_sample is None:
try:
self._latest_fallback_sample = await fallback_fetcher.receive()
except ReceiverError[Any] as err:
_logger.error(
"Fallback metric fetcher %s failed to fetch next value: %s."
"Using primary metric fetcher.",
fallback_fetcher.name,
err,
)
return None
self._latest_fallback_sample = await self._fetch_from_fallback(
fallback_fetcher
)

if primary_fetcher_value is None or self._latest_fallback_sample is None:
return self._latest_fallback_sample

if primary_fetcher_sample.timestamp < self._latest_fallback_sample.timestamp:
if primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp:
return None

# Synchronize the fallback fetcher with primary one
while primary_fetcher_sample.timestamp > self._latest_fallback_sample.timestamp:
try:
self._latest_fallback_sample = await fallback_fetcher.receive()
except ReceiverError[Any] as err:
_logger.error(
"Fallback metric fetcher %s failed to fetch next value: %s."
"Using primary metric fetcher.",
fallback_fetcher.name,
err,
)
return None

return self._latest_fallback_sample

async def fetch_next_with_fallback(
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]
) -> Sample[QuantityT]:
"""Fetch the next value from the primary and fallback streams.

Return the value from the stream that returns a valid value.
If any stream raises an exception, then return the value from
the other stream.

Args:
fallback_fetcher: The fallback metric fetcher.

Returns:
The value fetched from either the primary or fallback stream.
"""
try:
primary = await self._stream.receive()
except ReceiverError[Any] as err:
_logger.error(
"Primary metric fetcher %s failed to fetch next value: %s."
"Using fallback metric fetcher.",
self._name,
err,
while primary_fetcher_value.timestamp > self._latest_fallback_sample.timestamp:
self._latest_fallback_sample = await self._fetch_from_fallback(
fallback_fetcher
)
return await fallback_fetcher.receive()

fallback = await self._synchronize_and_fetch_fallback(primary, fallback_fetcher)
if fallback is None:
return primary

if self._is_value_valid(primary.value):
return primary
return fallback
if self._latest_fallback_sample is None:
break
return self._latest_fallback_sample

async def fetch_next(self) -> Sample[QuantityT] | None:
"""Fetch the next value from the stream.
Expand All @@ -506,34 +505,62 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
Returns:
The fetched Sample.
"""
if self._is_stopped:
_logger.error(
"Metric fetcher %s stopped. Can't fetch new value.", self._name
)
return None

self._next_value = await self._fetch_next()
return self._next_value

async def _fetch_next(self) -> Sample[QuantityT] | None:
if self._fallback is None:
return await self._stream.receive()

if self._fallback.is_running:
return await self.fetch_next_with_fallback(self._fallback)

next_value = None
# First fetch from primary stream
primary_value: Sample[QuantityT] | None = None
try:
next_value = await self._stream.receive()
except ReceiverError[Any] as err:
primary_value = await self._stream.receive()
except ReceiverStoppedError:
if self._is_stopped:
_logger.debug("Stream for metric fetcher %s closed.", self._name)
return None
_logger.error(
"Failed to fetch next value from %s. Primary stream closed.",
self._name,
)
except ReceiverError as err:
_logger.error("Failed to fetch next value from %s: %s", self._name, err)
else:
if self._is_value_valid(next_value.value):
return next_value

_logger.warning(
"Primary metric %s is invalid. Running fallback metric fetcher: %s",
self._name,
self._fallback.name,
# We have no fallback, so we just return primary value even if it is not correct.
if self._fallback is None:
return primary_value

is_primary_value_valid = primary_value is not None and self._is_value_valid(
primary_value.value
)
# start fallback formula but don't wait for it because it has to
# synchronize. Just return invalid value.
self._fallback.start()
return next_value

if is_primary_value_valid:
# Primary stream is good again, so we can stop fallback and return primary_value.
if self._fallback.is_running:
_logger.info(
"Primary metric %s is good again, stopping fallback metric fetcher %s",
self._name,
self._fallback.name,
)
await self._fallback.stop()
return primary_value

if not self._fallback.is_running:
_logger.warning(
"Primary metric %s is invalid. Running fallback metric fetcher: %s",
self._name,
self._fallback.name,
)
# We started fallback, but it has to subscribe.
# We will receive fallback values since the next time window.
self._fallback.start()
return primary_value

return await self._synchronize_and_fetch_fallback(primary_value, self._fallback)

@property
def value(self) -> Sample[QuantityT] | None:
Expand Down
2 changes: 2 additions & 0 deletions tests/microgrid/test_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ async def test_grid_fallback_formula_without_grid_meter(mocker: MockerFixture) -
([2000, 1000], [-200, 1000, None], Power.from_watts(3000)),
# battery start working
([2000, 10], [-200, 1000, 100], Power.from_watts(2110)),
# No primary value, start fallback formula
([2000, None], [-200, 1000, 100], None),
([2000, None], [-200, 1000, 100], Power.from_watts(2900)),
]
# fmt: on
Expand Down
5 changes: 3 additions & 2 deletions tests/timeseries/_battery_pool/test_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,9 @@ async def test_battery_power_fallback_formula(
([-1.0, None], [100.0, 100.0, None], Power.from_watts(200.0)),
# Case 4: bat_inv_meter is available, ignore fallback inverters
([-1.0, 10], [100.0, 100.0, None], Power.from_watts(10.0)),
# Case 4: all components are unavailable (None). Return 0 according to the
# "nones-are-zero" rule.
# Case 4: all components are unavailable (None). Start fallback formula.
# Returned power = 0 according to the "nones-are-zero" rule.
([-1.0, None], [None, None, None], None),
([-1.0, None], [None, None, None], Power.from_watts(0.0)),
# Case 5: Components becomes available
([-1.0, None], [None, None, 100.0], Power.from_watts(100.0)),
Expand Down
Loading
Loading