Skip to content

Commit bc358fb

Browse files
committed
Add a generic _internal._asyncio.run_forever function
The function takes a callable that would return a corouting, and keep running it forever. This commit also replaces the custom `_run_forever` implementations with the generic `run_forever`. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent ed61541 commit bc358fb

File tree

6 files changed

+34
-64
lines changed

6 files changed

+34
-64
lines changed

src/frequenz/sdk/_internal/_asyncio.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55

66

77
import asyncio
8+
import logging
89
from abc import ABC
9-
from typing import Any
10+
from datetime import timedelta
11+
from typing import Any, Callable, Coroutine
12+
13+
_logger = logging.getLogger(__name__)
1014

1115

1216
async def cancel_and_await(task: asyncio.Task[Any]) -> None:
@@ -28,6 +32,25 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None:
2832
pass
2933

3034

35+
async def run_forever(
36+
async_callable: Callable[[], Coroutine[Any, Any, None]],
37+
interval: timedelta = timedelta(seconds=1),
38+
) -> None:
39+
"""Run a given function forever, restarting it after any exception.
40+
41+
Args:
42+
async_callable: The async callable to run.
43+
interval: The interval between restarts.
44+
"""
45+
interval_s = interval.total_seconds()
46+
while True:
47+
try:
48+
await async_callable()
49+
except Exception: # pylint: disable=broad-except
50+
_logger.exception("Restarting after exception")
51+
await asyncio.sleep(interval_s)
52+
53+
3154
class NotSyncConstructible(AssertionError):
3255
"""Raised when object with async constructor is created in sync way."""
3356

src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from typing_extensions import override
2626

27+
from ....._internal._asyncio import run_forever
2728
from ....._internal._math import is_close_to_zero
2829
from .....timeseries import Power, Sample3Phase, Voltage
2930
from .... import _data_pipeline, connection_manager
@@ -89,7 +90,7 @@ async def start(self) -> None:
8990
"""Start the ev charger data manager."""
9091
# Need to start a task only if there are EV chargers in the component graph.
9192
if self._ev_charger_ids:
92-
self._task = asyncio.create_task(self._run_forever())
93+
self._task = asyncio.create_task(run_forever(self._run))
9394

9495
@override
9596
async def distribute_power(self, request: Request) -> None:
@@ -217,15 +218,6 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> dict[int, Power]:
217218
)
218219
return {component_id: target_power}
219220

220-
async def _run_forever(self) -> None:
221-
"""Run the EV charger manager forever."""
222-
while True:
223-
try:
224-
await self._run()
225-
except Exception: # pylint: disable=broad-except
226-
_logger.exception("Recovering from an error in EV charger manager.")
227-
await asyncio.sleep(1.0)
228-
229221
async def _run(self) -> None: # pylint: disable=too-many-locals
230222
"""Run the main event loop of the EV charger manager."""
231223
api = connection_manager.get().api_client

src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from typing_extensions import override
1919

20+
from ...._internal._asyncio import run_forever
2021
from ....actor._background_service import BackgroundService
2122
from ... import connection_manager
2223
from ._blocking_status import BlockingStatus
@@ -80,7 +81,7 @@ def __init__( # pylint: disable=too-many-arguments
8081
@override
8182
def start(self) -> None:
8283
"""Start the status tracker."""
83-
self._tasks.add(asyncio.create_task(self._run_forever()))
84+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
8485

8586
def _is_working(self, ev_data: EVChargerData) -> bool:
8687
"""Return whether the given data indicates that the component is working."""
@@ -99,17 +100,6 @@ def _is_stale(self, ev_data: EVChargerData) -> bool:
99100
stale = now - ev_data.timestamp > self._max_data_age
100101
return stale
101102

102-
async def _run_forever(self) -> None:
103-
"""Run the status tracker forever."""
104-
while True:
105-
try:
106-
await self._run()
107-
except Exception: # pylint: disable=broad-except
108-
_logger.exception(
109-
"Restarting after exception in EVChargerStatusTracker.run()"
110-
)
111-
await asyncio.sleep(1.0)
112-
113103
def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum:
114104
"""Handle new EV charger data."""
115105
if self._is_stale(ev_data):

src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from frequenz.client.microgrid import InverterComponentState, InverterData
1313
from typing_extensions import override
1414

15+
from ...._internal._asyncio import run_forever
1516
from ....actor._background_service import BackgroundService
1617
from ... import connection_manager
1718
from ._blocking_status import BlockingStatus
@@ -76,7 +77,7 @@ def __init__( # pylint: disable=too-many-arguments
7677
@override
7778
def start(self) -> None:
7879
"""Start the status tracker."""
79-
self._tasks.add(asyncio.create_task(self._run_forever()))
80+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
8081

8182
def _is_working(self, pv_data: InverterData) -> bool:
8283
"""Return whether the given data indicates that the PV inverter is working."""
@@ -87,16 +88,6 @@ def _is_working(self, pv_data: InverterData) -> bool:
8788
InverterComponentState.STANDBY,
8889
)
8990

90-
async def _run_forever(self) -> None:
91-
while True:
92-
try:
93-
await self._run()
94-
except Exception: # pylint: disable=broad-except
95-
_logger.exception(
96-
"Restarting after exception in PVInverterStatusTracker.run()"
97-
)
98-
await asyncio.sleep(1.0)
99-
10091
def _is_stale(self, pv_data: InverterData) -> bool:
10192
"""Return whether the given data is stale."""
10293
now = datetime.now(tz=timezone.utc)

src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,18 @@
55

66

77
import asyncio
8-
import logging
98
from collections import abc
109

1110
from frequenz.channels import Receiver, Sender, merge, select, selected_from
1211
from frequenz.client.microgrid import EVChargerData
1312

13+
from ..._internal._asyncio import run_forever
1414
from ...actor import BackgroundService
1515
from ...microgrid import connection_manager
1616
from ...microgrid._power_distributing._component_status import ComponentPoolStatus
1717
from .. import Power
1818
from .._base_types import Bounds, SystemBounds
1919

20-
_logger = logging.getLogger(__name__)
21-
2220

2321
class EVCSystemBoundsTracker(BackgroundService):
2422
"""Track the system bounds for the EV chargers.
@@ -55,7 +53,7 @@ def __init__(
5553

5654
def start(self) -> None:
5755
"""Start the EV charger system bounds tracker."""
58-
self._tasks.add(asyncio.create_task(self._run_forever()))
56+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
5957

6058
async def _send_bounds(self) -> None:
6159
"""Calculate and send the aggregate system bounds if they have changed."""
@@ -104,17 +102,6 @@ async def _send_bounds(self) -> None:
104102
)
105103
await self._bounds_sender.send(self._last_sent_bounds)
106104

107-
async def _run_forever(self) -> None:
108-
"""Run the status tracker forever."""
109-
while True:
110-
try:
111-
await self._run()
112-
except Exception: # pylint: disable=broad-except
113-
_logger.exception(
114-
"Restarting after exception in EVChargerSystemBoundsTracker.run()"
115-
)
116-
await asyncio.sleep(1.0)
117-
118105
async def _run(self) -> None:
119106
"""Run the system bounds tracker."""
120107
api_client = connection_manager.get().api_client

src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,18 @@
44
"""System bounds tracker for PV inverters."""
55

66
import asyncio
7-
import logging
87
from collections import abc
98

109
from frequenz.channels import Receiver, Sender, merge, select, selected_from
1110
from frequenz.client.microgrid import InverterData
1211

12+
from ..._internal._asyncio import run_forever
1313
from ...actor import BackgroundService
1414
from ...microgrid import connection_manager
1515
from ...microgrid._power_distributing._component_status import ComponentPoolStatus
1616
from .._base_types import Bounds, SystemBounds
1717
from .._quantities import Power
1818

19-
_logger = logging.getLogger(__name__)
20-
2119

2220
class PVSystemBoundsTracker(BackgroundService):
2321
"""Track the system bounds for PV inverters.
@@ -54,7 +52,7 @@ def __init__(
5452

5553
def start(self) -> None:
5654
"""Start the PV inverter system bounds tracker."""
57-
self._tasks.add(asyncio.create_task(self._run_forever()))
55+
self._tasks.add(asyncio.create_task(run_forever(self._run)))
5856

5957
async def _send_bounds(self) -> None:
6058
"""Calculate and send the aggregate system bounds if they have changed."""
@@ -103,17 +101,6 @@ async def _send_bounds(self) -> None:
103101
)
104102
await self._bounds_sender.send(self._last_sent_bounds)
105103

106-
async def _run_forever(self) -> None:
107-
"""Run the system bounds tracker."""
108-
while True:
109-
try:
110-
await self._run()
111-
except Exception: # pylint: disable=broad-except
112-
_logger.exception(
113-
"Restarting after exception in PVSystemBoundsTracker.run()"
114-
)
115-
await asyncio.sleep(1.0)
116-
117104
async def _run(self) -> None:
118105
"""Run the system bounds tracker."""
119106
api_client = connection_manager.get().api_client

0 commit comments

Comments
 (0)