Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/frequenz/sdk/_internal/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,22 @@ async def run_forever(
while True:
try:
await async_callable()
except RuntimeError as exc:
if "no running event loop" in str(exc):
_logger.exception(
"Something went wrong, no running event loop, skipping execution of %s",
async_callable.__name__,
)
return
except Exception: # pylint: disable=broad-except
if not asyncio.get_event_loop().is_running():
_logger.exception(
"Something went wrong, no running event loop, skipping execution of %s",
async_callable.__name__,
)
return
_logger.exception("Restarting after exception")
await asyncio.sleep(interval_s)
await asyncio.sleep(interval_s)


class NotSyncConstructible(AssertionError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@

import asyncio
import contextlib
import logging
from collections import abc
from datetime import timedelta

from frequenz.channels import Broadcast, Merger, Receiver, Sender, merge
from frequenz.client.common.microgrid.components import ComponentId

from ..._internal._asyncio import cancel_and_await
from ..._internal._asyncio import cancel_and_await, run_forever
from ._component_status import (
ComponentPoolStatus,
ComponentStatus,
Expand All @@ -22,8 +21,6 @@
SetPowerResult,
)

_logger = logging.getLogger(__name__)


class ComponentPoolStatusTracker:
"""Track status of components of a given category.
Expand Down Expand Up @@ -112,14 +109,7 @@ async def _run(self) -> None:
async with contextlib.AsyncExitStack() as stack:
for tracker in self._component_status_trackers:
await stack.enter_async_context(tracker)
while True:
try:
await self._update_status()
except Exception as err: # pylint: disable=broad-except
_logger.error(
"ComponentPoolStatus failed with error: %s. Restarting.", err
)
await asyncio.sleep(1.0)
await run_forever(self._update_status)

async def _update_status(self) -> None:
async for status in self._merged_status_receiver:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""Tests for BatteryPoolStatus."""

import asyncio
from contextlib import AsyncExitStack
from datetime import timedelta

from frequenz.channels import Broadcast
Expand Down Expand Up @@ -37,7 +38,8 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None:
mock_microgrid = MockMicrogrid(grid_meter=True, mocker=mocker)
mock_microgrid.add_batteries(3)

async with mock_microgrid:
async with AsyncExitStack() as stack:
await stack.enter_async_context(mock_microgrid)
batteries = {
battery.component_id
for battery in mock_microgrid.mock_client.component_graph.components(
Expand All @@ -55,6 +57,7 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None:
max_blocking_duration=timedelta(seconds=30),
component_status_tracker_type=BatteryStatusTracker,
)
stack.push_async_callback(batteries_status.stop)
await asyncio.sleep(0.1)

expected_working: set[ComponentId] = set()
Expand Down Expand Up @@ -123,5 +126,3 @@ async def test_batteries_status(self, mocker: MockerFixture) -> None:
ComponentId(9),
ComponentId(19),
}

await batteries_status.stop()
9 changes: 8 additions & 1 deletion tests/timeseries/mock_microgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import asyncio
from collections.abc import Callable
from datetime import datetime, timedelta, timezone
from types import TracebackType
from typing import Coroutine

from frequenz.client.common.microgrid.components import ComponentId
Expand Down Expand Up @@ -622,6 +623,12 @@ async def __aenter__(self) -> MockMicrogrid:
await self.start()
return self

async def __aexit__(self, exc_type: None, exc_val: None, exc_tb: None) -> None:
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
/,
) -> None:
"""Exit context manager."""
await self.cleanup()
Loading