diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 15f2b90ec..ead79f513 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -4,8 +4,14 @@ - The `MovingWindow` now has an async `wait_for_samples` method that waits for a given number of samples to become available in the moving window and then returns. +- Add stop method to the FormulaEngine. Now it is possible to stop custom formulas. + +- Stop fallback formulas when primary formula starts working again. + ## Bug Fixes - Fixed a bug that was preventing power proposals to go through if there once existed some proposals with overlapping component IDs, even if the old proposals have expired. - Fixed a bug that was causing formulas to fallback to CHPs, when the CHP meters didn't have data. CHPs are not supported in the data sourcing actor and in the client, so we can't fallback to CHPs. + +- Fixed bug with formulas raising exception when stopped. \ No newline at end of file diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py index ffdb5a6ba..9ca956627 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py @@ -112,14 +112,15 @@ def __init__( self._channel: Broadcast[Sample[QuantityT]] = Broadcast(name=self._name) self._task: asyncio.Task[None] | None = None - async def _stop(self) -> None: + async def stop(self) -> None: """Stop a running formula engine.""" if self._task is None: return await cancel_and_await(self._task) + _, fetchers = self._builder.finalize() for fetcher in fetchers.values(): - fetcher.stream.close() + await fetcher.stop() @classmethod def from_receiver( @@ -313,7 +314,7 @@ async def _run(self) -> None: try: msg = await evaluator.apply() except asyncio.CancelledError: - _logger.exception("FormulaEngine task cancelled: %s", self._name) + _logger.debug("FormulaEngine task cancelled: %s", self._name) raise except Exception as err: # pylint: disable=broad-except _logger.warning( @@ -428,7 +429,7 @@ def __init__( FormulaEngine[QuantityT], ] = phase_streams - async def _stop(self) -> None: + async def stop(self) -> None: """Stop a running formula engine.""" if self._task is None: return @@ -582,7 +583,7 @@ async def _run(self) -> None: phase_3.value, ) except asyncio.CancelledError: - _logger.exception("FormulaEngine task cancelled: %s", self._name) + _logger.debug("FormulaEngine task cancelled: %s", self._name) break else: await sender.send(msg) diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py index dfd7b62c8..5f6c580ea 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py @@ -231,12 +231,12 @@ def from_3_phase_current_formula_generator( async def stop(self) -> None: """Stop all formula engines in the pool.""" for string_engine in self._string_engines.values(): - await string_engine._stop() # pylint: disable=protected-access + await string_engine.stop() for power_engine in self._power_engines.values(): - await power_engine._stop() # pylint: disable=protected-access + await power_engine.stop() for power_3_phase_engine in self._power_3_phase_engines.values(): - await power_3_phase_engine._stop() # pylint: disable=protected-access + await power_3_phase_engine.stop() for current_engine in self._current_engines.values(): - await current_engine._stop() # pylint: disable=protected-access + await current_engine.stop() for reactive_power_engine in self._reactive_power_engines.values(): - await reactive_power_engine._stop() # pylint: disable=protected-access + await reactive_power_engine.stop() diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py index 362cf8c18..f438ff7cc 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py @@ -77,3 +77,13 @@ def consume(self) -> Sample[QuantityT]: ), f"Fallback metric fetcher: {self.name} was not started" return self._receiver.consume() + + async def stop(self) -> None: + """Stop fallback metric fetcher, by closing the connection.""" + if self._formula_engine is not None: + await self._formula_engine.stop() + self._formula_engine = None + + if self._receiver is not None: + self._receiver.close() + self._receiver = None diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py index 2e18424c9..24ae94dbe 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py @@ -8,9 +8,9 @@ import logging import math from abc import ABC, abstractmethod -from typing import Any, Generic +from typing import Generic -from frequenz.channels import Receiver, ReceiverError +from frequenz.channels import Receiver, ReceiverError, ReceiverStoppedError from .._base_types import QuantityT, Sample @@ -368,6 +368,10 @@ def is_running(self) -> bool: def start(self) -> None: """Initialize the metric fetcher and start fetching samples.""" + @abstractmethod + async def stop(self) -> None: + """Stope the fetcher if is running.""" + class MetricFetcher(Generic[QuantityT], FormulaStep): """A formula step for fetching a value from a metric Receiver.""" @@ -396,6 +400,7 @@ def __init__( self._nones_are_zeros = nones_are_zeros self._fallback: FallbackMetricFetcher[QuantityT] | None = fallback self._latest_fallback_sample: Sample[QuantityT] | None = None + self._is_stopped = False @property def stream(self) -> Receiver[Sample[QuantityT]]: @@ -406,6 +411,17 @@ def stream(self) -> Receiver[Sample[QuantityT]]: """ return self._stream + async def stop(self) -> None: + """Stop metric fetcher. + + If metric fetcher is stopped, it can't be started again. + There is no use-case now to start it again. + """ + self._is_stopped = True + self.stream.close() + if self._fallback: + await self._fallback.stop() + def stream_name(self) -> str: """Return the name of the stream. @@ -417,15 +433,40 @@ def stream_name(self) -> str: def _is_value_valid(self, value: QuantityT | None) -> bool: return not (value is None or value.isnan() or value.isinf()) + async def _fetch_from_fallback( + self, fallback_fetcher: FallbackMetricFetcher[QuantityT] + ) -> Sample[QuantityT] | None: + try: + return await fallback_fetcher.receive() + except ReceiverStoppedError: + if self._is_stopped: + _logger.debug( + "Stream for fallback metric fetcher %s closed.", + fallback_fetcher.name, + ) + else: + _logger.error( + "Failed to fetch next value from %s. Fallback stream closed.", + self._name, + ) + return None + except ReceiverError as err: + _logger.error( + "Failed to fetch next value from fallback stream %s: %s", + self._name, + err, + ) + return None + async def _synchronize_and_fetch_fallback( self, - primary_fetcher_sample: Sample[QuantityT], + primary_fetcher_value: Sample[QuantityT] | None, fallback_fetcher: FallbackMetricFetcher[QuantityT], ) -> Sample[QuantityT] | None: """Synchronize the fallback fetcher and return the fallback value. Args: - primary_fetcher_sample: The sample fetched from the primary fetcher. + primary_fetcher_value: The sample fetched from the primary fetcher. fallback_fetcher: The fallback metric fetcher. Returns: @@ -433,70 +474,28 @@ async def _synchronize_and_fetch_fallback( fetcher sample is older than the latest sample from the fallback fetcher or if the fallback fetcher fails to fetch the next value. """ - # fallback_fetcher was not used, yet. We need to fetch first value. + # We need to save value, because + # primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp + # In that case we should wait for our time window. if self._latest_fallback_sample is None: - try: - self._latest_fallback_sample = await fallback_fetcher.receive() - except ReceiverError[Any] as err: - _logger.error( - "Fallback metric fetcher %s failed to fetch next value: %s." - "Using primary metric fetcher.", - fallback_fetcher.name, - err, - ) - return None + self._latest_fallback_sample = await self._fetch_from_fallback( + fallback_fetcher + ) + + if primary_fetcher_value is None or self._latest_fallback_sample is None: + return self._latest_fallback_sample - if primary_fetcher_sample.timestamp < self._latest_fallback_sample.timestamp: + if primary_fetcher_value.timestamp < self._latest_fallback_sample.timestamp: return None # Synchronize the fallback fetcher with primary one - while primary_fetcher_sample.timestamp > self._latest_fallback_sample.timestamp: - try: - self._latest_fallback_sample = await fallback_fetcher.receive() - except ReceiverError[Any] as err: - _logger.error( - "Fallback metric fetcher %s failed to fetch next value: %s." - "Using primary metric fetcher.", - fallback_fetcher.name, - err, - ) - return None - - return self._latest_fallback_sample - - async def fetch_next_with_fallback( - self, fallback_fetcher: FallbackMetricFetcher[QuantityT] - ) -> Sample[QuantityT]: - """Fetch the next value from the primary and fallback streams. - - Return the value from the stream that returns a valid value. - If any stream raises an exception, then return the value from - the other stream. - - Args: - fallback_fetcher: The fallback metric fetcher. - - Returns: - The value fetched from either the primary or fallback stream. - """ - try: - primary = await self._stream.receive() - except ReceiverError[Any] as err: - _logger.error( - "Primary metric fetcher %s failed to fetch next value: %s." - "Using fallback metric fetcher.", - self._name, - err, + while primary_fetcher_value.timestamp > self._latest_fallback_sample.timestamp: + self._latest_fallback_sample = await self._fetch_from_fallback( + fallback_fetcher ) - return await fallback_fetcher.receive() - - fallback = await self._synchronize_and_fetch_fallback(primary, fallback_fetcher) - if fallback is None: - return primary - - if self._is_value_valid(primary.value): - return primary - return fallback + if self._latest_fallback_sample is None: + break + return self._latest_fallback_sample async def fetch_next(self) -> Sample[QuantityT] | None: """Fetch the next value from the stream. @@ -506,34 +505,62 @@ async def fetch_next(self) -> Sample[QuantityT] | None: Returns: The fetched Sample. """ + if self._is_stopped: + _logger.error( + "Metric fetcher %s stopped. Can't fetch new value.", self._name + ) + return None + self._next_value = await self._fetch_next() return self._next_value async def _fetch_next(self) -> Sample[QuantityT] | None: - if self._fallback is None: - return await self._stream.receive() - - if self._fallback.is_running: - return await self.fetch_next_with_fallback(self._fallback) - - next_value = None + # First fetch from primary stream + primary_value: Sample[QuantityT] | None = None try: - next_value = await self._stream.receive() - except ReceiverError[Any] as err: + primary_value = await self._stream.receive() + except ReceiverStoppedError: + if self._is_stopped: + _logger.debug("Stream for metric fetcher %s closed.", self._name) + return None + _logger.error( + "Failed to fetch next value from %s. Primary stream closed.", + self._name, + ) + except ReceiverError as err: _logger.error("Failed to fetch next value from %s: %s", self._name, err) - else: - if self._is_value_valid(next_value.value): - return next_value - _logger.warning( - "Primary metric %s is invalid. Running fallback metric fetcher: %s", - self._name, - self._fallback.name, + # We have no fallback, so we just return primary value even if it is not correct. + if self._fallback is None: + return primary_value + + is_primary_value_valid = primary_value is not None and self._is_value_valid( + primary_value.value ) - # start fallback formula but don't wait for it because it has to - # synchronize. Just return invalid value. - self._fallback.start() - return next_value + + if is_primary_value_valid: + # Primary stream is good again, so we can stop fallback and return primary_value. + if self._fallback.is_running: + _logger.info( + "Primary metric %s is good again, stopping fallback metric fetcher %s", + self._name, + self._fallback.name, + ) + await self._fallback.stop() + return primary_value + + if not self._fallback.is_running: + _logger.warning( + "Primary metric %s is invalid. Running fallback metric fetcher: %s", + self._name, + self._fallback.name, + ) + # We started fallback, but it has to subscribe. + # We will receive fallback values since the next time window. + self._fallback.start() + return primary_value + + return await self._synchronize_and_fetch_fallback(primary_value, self._fallback) @property def value(self) -> Sample[QuantityT] | None: diff --git a/tests/microgrid/test_grid.py b/tests/microgrid/test_grid.py index 8d815b942..3621c89fd 100644 --- a/tests/microgrid/test_grid.py +++ b/tests/microgrid/test_grid.py @@ -480,6 +480,8 @@ async def test_grid_fallback_formula_without_grid_meter(mocker: MockerFixture) - ([2000, 1000], [-200, 1000, None], Power.from_watts(3000)), # battery start working ([2000, 10], [-200, 1000, 100], Power.from_watts(2110)), + # No primary value, start fallback formula + ([2000, None], [-200, 1000, 100], None), ([2000, None], [-200, 1000, 100], Power.from_watts(2900)), ] # fmt: on diff --git a/tests/timeseries/_battery_pool/test_battery_pool.py b/tests/timeseries/_battery_pool/test_battery_pool.py index cdb4d3aa6..b19a5a08b 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool.py +++ b/tests/timeseries/_battery_pool/test_battery_pool.py @@ -642,8 +642,9 @@ async def test_battery_power_fallback_formula( ([-1.0, None], [100.0, 100.0, None], Power.from_watts(200.0)), # Case 4: bat_inv_meter is available, ignore fallback inverters ([-1.0, 10], [100.0, 100.0, None], Power.from_watts(10.0)), - # Case 4: all components are unavailable (None). Return 0 according to the - # "nones-are-zero" rule. + # Case 4: all components are unavailable (None). Start fallback formula. + # Returned power = 0 according to the "nones-are-zero" rule. + ([-1.0, None], [None, None, None], None), ([-1.0, None], [None, None, None], Power.from_watts(0.0)), # Case 5: Components becomes available ([-1.0, None], [None, None, 100.0], Power.from_watts(100.0)), diff --git a/tests/timeseries/_formula_engine/test_formula_composition.py b/tests/timeseries/_formula_engine/test_formula_composition.py index d7b649cc2..83603bf6b 100644 --- a/tests/timeseries/_formula_engine/test_formula_composition.py +++ b/tests/timeseries/_formula_engine/test_formula_composition.py @@ -56,8 +56,7 @@ async def test_formula_composition( # pylint: disable=too-many-locals pv_power_recv = pv_pool.power.new_receiver() engine = (pv_pool.power + battery_pool.power).build("inv_power") - stack.push_async_callback(engine._stop) # pylint: disable=protected-access - + stack.push_async_callback(engine.stop) inv_calc_recv = engine.new_receiver() await mockgrid.mock_resampler.send_bat_inverter_power([10.0, 12.0, 14.0]) @@ -126,7 +125,7 @@ async def test_formula_composition_missing_pv(self, mocker: MockerFixture) -> No battery_power_recv = battery_pool.power.new_receiver() pv_power_recv = pv_pool.power.new_receiver() engine = (pv_pool.power + battery_pool.power).build("inv_power") - stack.push_async_callback(engine._stop) # pylint: disable=protected-access + stack.push_async_callback(engine.stop) inv_calc_recv = engine.new_receiver() @@ -168,7 +167,7 @@ async def test_formula_composition_missing_bat(self, mocker: MockerFixture) -> N battery_power_recv = battery_pool.power.new_receiver() pv_power_recv = pv_pool.power.new_receiver() engine = (pv_pool.power + battery_pool.power).build("inv_power") - stack.push_async_callback(engine._stop) # pylint: disable=protected-access + stack.push_async_callback(engine.stop) inv_calc_recv = engine.new_receiver() @@ -203,15 +202,11 @@ async def test_formula_composition_min_max(self, mocker: MockerFixture) -> None: stack.push_async_callback(grid.stop) engine_min = grid.power.min(logical_meter.chp_power).build("grid_power_min") - stack.push_async_callback( - engine_min._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_min.stop) engine_min_rx = engine_min.new_receiver() engine_max = grid.power.max(logical_meter.chp_power).build("grid_power_max") - stack.push_async_callback( - engine_max._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_max.stop) engine_max_rx = engine_max.new_receiver() await mockgrid.mock_resampler.send_meter_power([100.0, 200.0]) @@ -265,15 +260,11 @@ async def test_formula_composition_min_max_const( stack.push_async_callback(grid.stop) engine_min = grid.power.min(Power.zero()).build("grid_power_min") - stack.push_async_callback( - engine_min._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_min.stop) engine_min_rx = engine_min.new_receiver() engine_max = grid.power.max(Power.zero()).build("grid_power_max") - stack.push_async_callback( - engine_max._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_max.stop) engine_max_rx = engine_max.new_receiver() await mockgrid.mock_resampler.send_meter_power([100.0]) @@ -320,23 +311,15 @@ async def test_formula_composition_constant( # pylint: disable=too-many-locals engine_add = (grid.power + Power.from_watts(50)).build( "grid_power_addition" ) - stack.push_async_callback( - engine_add._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_add.stop) engine_sub = (grid.power - Power.from_watts(100)).build( "grid_power_subtraction" ) - stack.push_async_callback( - engine_sub._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_sub.stop) engine_mul = (grid.power * 2.0).build("grid_power_multiplication") - stack.push_async_callback( - engine_mul._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_mul.stop) engine_div = (grid.power / 2.0).build("grid_power_division") - stack.push_async_callback( - engine_div._stop # pylint: disable=protected-access - ) + stack.push_async_callback(engine_div.stop) engine_composite = ( ( @@ -435,7 +418,7 @@ async def test_3_phase_formulas(self, mocker: MockerFixture) -> None: engine = (grid.current_per_phase - ev_pool.current_per_phase).build( "net_current" ) - stack.push_async_callback(engine._stop) # pylint: disable=protected-access + stack.push_async_callback(engine.stop) net_current_recv = engine.new_receiver() for _ in range(10): diff --git a/tests/timeseries/test_consumer.py b/tests/timeseries/test_consumer.py index 063e6a8f1..6ce229665 100644 --- a/tests/timeseries/test_consumer.py +++ b/tests/timeseries/test_consumer.py @@ -113,7 +113,8 @@ async def test_consumer_power_fallback_formula_with_grid_meter( ([None, 100, -50], [100], [-200, -300], None), ([None, 200, -50], [100], [-200, -300], None), ([100, 100, -50], [100], [-200, -300], Power.from_watts(350)), - # Case 5: Only grid meter is working + # Case 5: Only grid meter is working, subscribe for fallback formula + ([100, None, None], [None], [None, None], None), ([100, None, None], [None], [None, None], Power.from_watts(100)), ([-500, None, None], [None], [None, None], Power.from_watts(-500)), # Case 6: Nothing is working diff --git a/tests/timeseries/test_formula_engine.py b/tests/timeseries/test_formula_engine.py index bf51c166e..0155a7bd9 100644 --- a/tests/timeseries/test_formula_engine.py +++ b/tests/timeseries/test_formula_engine.py @@ -95,7 +95,7 @@ async def run_test( # pylint: disable=too-many-locals and next_val.value.base_value == io_output ) tests_passed += 1 - await engine._stop() # pylint: disable=protected-access + await engine.stop() assert tests_passed == len(io_pairs) async def test_simple(self) -> None: @@ -384,7 +384,7 @@ async def run_test( # pylint: disable=too-many-locals and next_val.value.base_value == io_output ) tests_passed += 1 - await engine._stop() # pylint: disable=protected-access + await engine.stop() assert tests_passed == len(io_pairs) async def test_simple(self) -> None: diff --git a/tests/timeseries/test_producer.py b/tests/timeseries/test_producer.py index 83290636c..75028ec3e 100644 --- a/tests/timeseries/test_producer.py +++ b/tests/timeseries/test_producer.py @@ -148,9 +148,9 @@ async def test_producer_fallback_formula(self, mocker: MockerFixture) -> None: ([None, None], [None, None], [300.0], Power.from_watts(300.0)), ([-200.0, None], [None, -100.0], [50.0], Power.from_watts(-250.0)), ([-200.0, -200.0], [-10.0, -20.0], [50.0], Power.from_watts(-350.0)), - # Case 8: Meter is unavailable but we already subscribed for inverter - # So don't return None in this case. Just proper formula result. - ([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160.0)), + # Case 8: Meter is unavailable, start fallback formula. + ([None, -200.0], [-10.0, -100.0], [50.0], None), + ([None, -200.0], [-10.0, -100.0], [50.0], Power.from_watts(-160)), ] # fmt: on