Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0",
"networkx >= 2.8, < 4",
"numpy >= 2, < 3",
"numpy >= 2.1.0, < 3",
"typing_extensions >= 4.13.0, < 5",
"marshmallow >= 3.19.0, < 5",
"marshmallow_dataclass >= 8.7.1, < 9",
Expand Down
32 changes: 18 additions & 14 deletions src/frequenz/sdk/_internal/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncio
import logging
import sys
from abc import ABC
from datetime import timedelta
from typing import Any, Callable, Coroutine
Expand All @@ -22,14 +23,27 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None:

Args:
task: The task to be cancelled and waited for.

Raises:
asyncio.CancelledError: when our task was cancelled
"""
if task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if not task.cancelled():
raise


def is_loop_running() -> bool:
"""Check if the event loop is running."""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False


async def run_forever(
Expand All @@ -46,20 +60,10 @@ async def run_forever(
while True:
try:
await async_callable()
except RuntimeError as exc:
if "no running event loop" in str(exc):
_logger.exception(
"Something went wrong, no running event loop, skipping execution of %s",
async_callable.__name__,
)
return
except Exception: # pylint: disable=broad-except
if not asyncio.get_event_loop().is_running():
_logger.exception(
"Something went wrong, no running event loop, skipping execution of %s",
async_callable.__name__,
)
return
if not is_loop_running():
_logger.exception("There is no running event loop, aborting...")
sys.exit(-1)
_logger.exception("Restarting after exception")
await asyncio.sleep(interval_s)

Expand Down
9 changes: 9 additions & 0 deletions src/frequenz/sdk/actor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
from datetime import timedelta

from .._internal._asyncio import is_loop_running
from ._background_service import BackgroundService

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,6 +106,14 @@ async def _run_loop(self) -> None:
_logger.info("Actor %s: Cancelled.", self)
raise
except Exception: # pylint: disable=broad-except
if not is_loop_running():
_logger.exception(
"Something went wrong, no running event loop,"
" not trying to restart %s again.",
self,
)
raise

if self._is_cancelled:
# If actor was cancelled, but any tasks have failed with an exception
# other than asyncio.CancelledError, those exceptions are combined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
)
from typing_extensions import override

from frequenz.sdk._internal._asyncio import run_forever

from ....actor._background_service import BackgroundService
from ... import connection_manager
from ._blocking_status import BlockingStatus
Expand Down Expand Up @@ -262,7 +264,7 @@ async def _run(
inverter = inverter_receiver
set_power_result = set_power_result_receiver

while True:
async def _loop() -> None:
try:
async for selected in select(
battery,
Expand Down Expand Up @@ -317,6 +319,8 @@ async def _run(
except Exception as err: # pylint: disable=broad-except
_logger.exception("BatteryStatusTracker crashed with error: %s", err)

await run_forever(_loop)

def _get_current_status(self) -> ComponentStatusEnum:
"""Get current battery status.

Expand Down
5 changes: 2 additions & 3 deletions src/frequenz/sdk/timeseries/_voltage_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,5 @@ async def _send_request(self) -> None:
"Phase-to-neutral 3-phase voltage streaming task cancelled: %s",
self._source_component,
)
break
else:
await sender.send(msg)
raise
await sender.send(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,8 @@ async def _run(self) -> None:
)
except asyncio.CancelledError:
_logger.debug("FormulaEngine task cancelled: %s", self._name)
break
else:
await sender.send(msg)
raise
await sender.send(msg)

def new_receiver(
self, name: str | None = None, max_size: int = 50
Expand Down
Loading