diff --git a/pyproject.toml b/pyproject.toml index 191826864..3168f4086 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "frequenz-channels >= 1.6.1, < 2.0.0", "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "networkx >= 2.8, < 4", - "numpy >= 2, < 3", + "numpy >= 2.1.0, < 3", "typing_extensions >= 4.13.0, < 5", "marshmallow >= 3.19.0, < 5", "marshmallow_dataclass >= 8.7.1, < 9", diff --git a/src/frequenz/sdk/_internal/_asyncio.py b/src/frequenz/sdk/_internal/_asyncio.py index b93aab3f4..385ccf89b 100644 --- a/src/frequenz/sdk/_internal/_asyncio.py +++ b/src/frequenz/sdk/_internal/_asyncio.py @@ -6,6 +6,7 @@ import asyncio import logging +import sys from abc import ABC from datetime import timedelta from typing import Any, Callable, Coroutine @@ -22,6 +23,9 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None: Args: task: The task to be cancelled and waited for. + + Raises: + asyncio.CancelledError: when our task was cancelled """ if task.done(): return @@ -29,7 +33,17 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None: try: await task except asyncio.CancelledError: - pass + if not task.cancelled(): + raise + + +def is_loop_running() -> bool: + """Check if the event loop is running.""" + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False async def run_forever( @@ -46,20 +60,10 @@ async def run_forever( while True: try: await async_callable() - except RuntimeError as exc: - if "no running event loop" in str(exc): - _logger.exception( - "Something went wrong, no running event loop, skipping execution of %s", - async_callable.__name__, - ) - return except Exception: # pylint: disable=broad-except - if not asyncio.get_event_loop().is_running(): - _logger.exception( - "Something went wrong, no running event loop, skipping execution of %s", - async_callable.__name__, - ) - return + if not is_loop_running(): + _logger.exception("There is no running event loop, aborting...") + sys.exit(-1) _logger.exception("Restarting after exception") await asyncio.sleep(interval_s) diff --git a/src/frequenz/sdk/actor/_actor.py b/src/frequenz/sdk/actor/_actor.py index 337b00613..cb62b0464 100644 --- a/src/frequenz/sdk/actor/_actor.py +++ b/src/frequenz/sdk/actor/_actor.py @@ -8,6 +8,7 @@ import logging from datetime import timedelta +from .._internal._asyncio import is_loop_running from ._background_service import BackgroundService _logger = logging.getLogger(__name__) @@ -105,6 +106,14 @@ async def _run_loop(self) -> None: _logger.info("Actor %s: Cancelled.", self) raise except Exception: # pylint: disable=broad-except + if not is_loop_running(): + _logger.exception( + "Something went wrong, no running event loop," + " not trying to restart %s again.", + self, + ) + raise + if self._is_cancelled: # If actor was cancelled, but any tasks have failed with an exception # other than asyncio.CancelledError, those exceptions are combined diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py index 027299492..ec3d4b992 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py @@ -36,6 +36,8 @@ ) from typing_extensions import override +from frequenz.sdk._internal._asyncio import run_forever + from ....actor._background_service import BackgroundService from ... import connection_manager from ._blocking_status import BlockingStatus @@ -262,7 +264,7 @@ async def _run( inverter = inverter_receiver set_power_result = set_power_result_receiver - while True: + async def _loop() -> None: try: async for selected in select( battery, @@ -317,6 +319,8 @@ async def _run( except Exception as err: # pylint: disable=broad-except _logger.exception("BatteryStatusTracker crashed with error: %s", err) + await run_forever(_loop) + def _get_current_status(self) -> ComponentStatusEnum: """Get current battery status. diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index bf8b489f2..457ea03f9 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -190,6 +190,5 @@ async def _send_request(self) -> None: "Phase-to-neutral 3-phase voltage streaming task cancelled: %s", self._source_component, ) - break - else: - await sender.send(msg) + raise + await sender.send(msg) diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py index 3a0dc47e7..ab84055a7 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py @@ -584,9 +584,8 @@ async def _run(self) -> None: ) except asyncio.CancelledError: _logger.debug("FormulaEngine task cancelled: %s", self._name) - break - else: - await sender.send(msg) + raise + await sender.send(msg) def new_receiver( self, name: str | None = None, max_size: int = 50