diff --git a/src/frequenz/sdk/_internal/_asyncio.py b/src/frequenz/sdk/_internal/_asyncio.py index fc71e56f8..b93aab3f4 100644 --- a/src/frequenz/sdk/_internal/_asyncio.py +++ b/src/frequenz/sdk/_internal/_asyncio.py @@ -46,9 +46,22 @@ async def run_forever( while True: try: await async_callable() + except RuntimeError as exc: + if "no running event loop" in str(exc): + _logger.exception( + "Something went wrong, no running event loop, skipping execution of %s", + async_callable.__name__, + ) + return except Exception: # pylint: disable=broad-except + if not asyncio.get_event_loop().is_running(): + _logger.exception( + "Something went wrong, no running event loop, skipping execution of %s", + async_callable.__name__, + ) + return _logger.exception("Restarting after exception") - await asyncio.sleep(interval_s) + await asyncio.sleep(interval_s) class NotSyncConstructible(AssertionError): diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_pool_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_pool_status_tracker.py index 8a0541f8e..d3d493a9c 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_pool_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_pool_status_tracker.py @@ -6,14 +6,13 @@ import asyncio import contextlib -import logging from collections import abc from datetime import timedelta from frequenz.channels import Broadcast, Merger, Receiver, Sender, merge from frequenz.client.common.microgrid.components import ComponentId -from ..._internal._asyncio import cancel_and_await +from ..._internal._asyncio import cancel_and_await, run_forever from ._component_status import ( ComponentPoolStatus, ComponentStatus, @@ -22,8 +21,6 @@ SetPowerResult, ) -_logger = logging.getLogger(__name__) - class ComponentPoolStatusTracker: """Track status of components of a given category. @@ -112,14 +109,7 @@ async def _run(self) -> None: async with contextlib.AsyncExitStack() as stack: for tracker in self._component_status_trackers: await stack.enter_async_context(tracker) - while True: - try: - await self._update_status() - except Exception as err: # pylint: disable=broad-except - _logger.error( - "ComponentPoolStatus failed with error: %s. Restarting.", err - ) - await asyncio.sleep(1.0) + await run_forever(self._update_status) async def _update_status(self) -> None: async for status in self._merged_status_receiver: diff --git a/tests/microgrid/power_distributing/_component_status/test_battery_pool_status.py b/tests/microgrid/power_distributing/_component_status/test_battery_pool_status.py index 95db1494b..744510042 100644 --- a/tests/microgrid/power_distributing/_component_status/test_battery_pool_status.py +++ b/tests/microgrid/power_distributing/_component_status/test_battery_pool_status.py @@ -3,6 +3,7 @@ """Tests for BatteryPoolStatus.""" import asyncio +from contextlib import AsyncExitStack from datetime import timedelta from frequenz.channels import Broadcast @@ -37,7 +38,8 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None: mock_microgrid = MockMicrogrid(grid_meter=True, mocker=mocker) mock_microgrid.add_batteries(3) - async with mock_microgrid: + async with AsyncExitStack() as stack: + await stack.enter_async_context(mock_microgrid) batteries = { battery.component_id for battery in mock_microgrid.mock_client.component_graph.components( @@ -55,6 +57,7 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None: max_blocking_duration=timedelta(seconds=30), component_status_tracker_type=BatteryStatusTracker, ) + stack.push_async_callback(batteries_status.stop) await asyncio.sleep(0.1) expected_working: set[ComponentId] = set() @@ -123,5 +126,3 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None: ComponentId(9), ComponentId(19), } - - await batteries_status.stop() diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index 48ce6fd69..3f6674b1b 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -8,6 +8,7 @@ import asyncio from collections.abc import Callable from datetime import datetime, timedelta, timezone +from types import TracebackType from typing import Coroutine from frequenz.client.common.microgrid.components import ComponentId @@ -622,6 +623,12 @@ async def __aenter__(self) -> MockMicrogrid: await self.start() return self - async def __aexit__(self, exc_type: None, exc_val: None, exc_tb: None) -> None: + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + /, + ) -> None: """Exit context manager.""" await self.cleanup()