Skip to content

Commit 97e59b7

Browse files
committed
Update ComponentStatusTrackers to be BackgroundServices
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 66b4cf0 commit 97e59b7

File tree

4 files changed

+37
-54
lines changed

4 files changed

+37
-54
lines changed

src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66

77
import asyncio
8+
import contextlib
89
import logging
910
from collections import abc
1011
from datetime import timedelta
@@ -83,9 +84,6 @@ async def join(self) -> None:
8384
async def stop(self) -> None:
8485
"""Stop the ComponentPoolStatusTracker instance."""
8586
await cancel_and_await(self._task)
86-
await asyncio.gather(
87-
*[tracker.stop() for tracker in self._component_status_trackers],
88-
)
8987
await self._merged_status_receiver.stop()
9088

9189
def _make_merged_status_receiver(
@@ -110,14 +108,17 @@ def _make_merged_status_receiver(
110108

111109
async def _run(self) -> None:
112110
"""Start tracking component status."""
113-
while True:
114-
try:
115-
await self._update_status()
116-
except Exception as err: # pylint: disable=broad-except
117-
_logger.error(
118-
"ComponentPoolStatus failed with error: %s. Restarting.", err
119-
)
120-
await asyncio.sleep(1.0)
111+
async with contextlib.AsyncExitStack() as stack:
112+
for tracker in self._component_status_trackers:
113+
await stack.enter_async_context(tracker)
114+
while True:
115+
try:
116+
await self._update_status()
117+
except Exception as err: # pylint: disable=broad-except
118+
_logger.error(
119+
"ComponentPoolStatus failed with error: %s. Restarting.", err
120+
)
121+
await asyncio.sleep(1.0)
121122

122123
async def _update_status(self) -> None:
123124
async for status in self._merged_status_receiver:

src/frequenz/sdk/actor/power_distributing/_component_status/_battery_status_tracker.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
from frequenz.channels.util import Timer, select, selected_from
2121
from typing_extensions import override
2222

23-
from ...._internal._asyncio import cancel_and_await
2423
from ....microgrid import connection_manager
2524
from ....microgrid.component import (
2625
BatteryData,
2726
ComponentCategory,
2827
ComponentData,
2928
InverterData,
3029
)
30+
from ..._background_service import BackgroundService
3131
from ._component_status import (
3232
ComponentStatus,
3333
ComponentStatusEnum,
@@ -126,7 +126,7 @@ def is_blocked(self) -> bool:
126126
return self.blocked_until > datetime.now(tz=timezone.utc)
127127

128128

129-
class BatteryStatusTracker(ComponentStatusTracker):
129+
class BatteryStatusTracker(ComponentStatusTracker, BackgroundService):
130130
"""Class for tracking if battery is working.
131131
132132
Status updates are sent out only when there is a status change.
@@ -186,7 +186,11 @@ def __init__( # pylint: disable=too-many-arguments
186186
Raises:
187187
RuntimeError: If battery has no adjacent inverter.
188188
"""
189+
BackgroundService.__init__(self, name=f"BatteryStatusTracker({component_id})")
189190
self._max_data_age = max_data_age
191+
self._status_sender = status_sender
192+
self._set_power_result_receiver = set_power_result_receiver
193+
190194
# First battery is considered as not working.
191195
# Change status after first messages are received.
192196
self._last_status: ComponentStatusEnum = ComponentStatusEnum.NOT_WORKING
@@ -212,8 +216,13 @@ def __init__( # pylint: disable=too-many-arguments
212216

213217
# Select needs receivers that can be get in async way only.
214218

215-
self._task: asyncio.Task[None] = asyncio.create_task(
216-
self._run(status_sender, set_power_result_receiver)
219+
@override
220+
def start(self) -> None:
221+
"""Start the BatteryStatusTracker instance."""
222+
self._tasks.add(
223+
asyncio.create_task(
224+
self._run(self._status_sender, self._set_power_result_receiver)
225+
)
217226
)
218227

219228
@property
@@ -225,10 +234,6 @@ def battery_id(self) -> int:
225234
"""
226235
return self._battery.component_id
227236

228-
async def stop(self) -> None:
229-
"""Stop tracking battery status."""
230-
await cancel_and_await(self._task)
231-
232237
def _handle_status_battery(self, bat_data: BatteryData) -> None:
233238
self._battery.last_msg_correct = (
234239
self._is_message_reliable(bat_data)

src/frequenz/sdk/actor/power_distributing/_component_status/_component_status.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
from frequenz.channels import Receiver, Sender
1414

15+
from ..._background_service import BackgroundService
16+
1517

1618
@dataclass
1719
class ComponentPoolStatus:
@@ -77,11 +79,11 @@ class SetPowerResult:
7779
"""Component IDs for which the last set power command failed."""
7880

7981

80-
class ComponentStatusTracker(ABC):
82+
class ComponentStatusTracker(BackgroundService, ABC):
8183
"""Interface for specialized component status trackers to implement."""
8284

8385
@abstractmethod
84-
def __init__( # pylint: disable=too-many-arguments
86+
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
8587
self,
8688
component_id: int,
8789
max_data_age: timedelta,
@@ -102,7 +104,3 @@ def __init__( # pylint: disable=too-many-arguments
102104
set_power_result_receiver: Channel to receive results of the requests to the
103105
components.
104106
"""
105-
106-
@abstractmethod
107-
async def stop(self) -> None:
108-
"""Stop the ComponentStatusTracker instance."""

tests/actor/test_battery_status.py

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@
77
import asyncio
88
import math
99
from collections.abc import AsyncIterator, Iterable
10-
from contextlib import asynccontextmanager
1110
from dataclasses import dataclass
1211
from datetime import datetime, timedelta, timezone
13-
from typing import Any, Generic, TypeVar
12+
from typing import Generic, TypeVar
1413

1514
import pytest
1615

@@ -144,26 +143,6 @@ async def recv_timeout(recv: Receiver[T], timeout: float = 0.1) -> T | type[_Tim
144143
return _Timeout
145144

146145

147-
@asynccontextmanager
148-
async def battery_status_tracker(
149-
*args: Any, **kwargs: Any
150-
) -> AsyncIterator[BatteryStatusTracker]:
151-
"""Create BatteryStatusTracker with given arguments.
152-
153-
Args:
154-
*args: Arguments for BatteryStatusTracker.
155-
**kwargs: Arguments for BatteryStatusTracker.
156-
157-
Yields:
158-
BatteryStatusTracker with given arguments.
159-
"""
160-
tracker = BatteryStatusTracker(*args, **kwargs)
161-
try:
162-
yield tracker
163-
finally:
164-
await tracker.stop()
165-
166-
167146
# pylint: disable=protected-access, unused-argument
168147
class TestBatteryStatus:
169148
"""Tests BatteryStatusTracker."""
@@ -186,7 +165,7 @@ async def test_sync_update_status_with_messages(
186165
status_channel = Broadcast[ComponentStatus]("battery_status")
187166
set_power_result_channel = Broadcast[SetPowerResult]("set_power_result")
188167

189-
async with mock_microgrid, battery_status_tracker(
168+
async with mock_microgrid, BatteryStatusTracker(
190169
BATTERY_ID,
191170
max_data_age=timedelta(seconds=5),
192171
max_blocking_duration=timedelta(seconds=30),
@@ -354,7 +333,7 @@ async def test_sync_blocking_feature(self, mocker: MockerFixture) -> None:
354333
status_channel = Broadcast[ComponentStatus]("battery_status")
355334
set_power_result_channel = Broadcast[SetPowerResult]("set_power_result")
356335

357-
async with mock_microgrid, battery_status_tracker(
336+
async with mock_microgrid, BatteryStatusTracker(
358337
# increase max_data_age_sec for blocking tests.
359338
# Otherwise it will block blocking.
360339
BATTERY_ID,
@@ -492,7 +471,7 @@ async def test_sync_blocking_interrupted_with_with_max_data(
492471
status_channel = Broadcast[ComponentStatus]("battery_status")
493472
set_power_result_channel = Broadcast[SetPowerResult]("set_power_result")
494473

495-
async with mock_microgrid, battery_status_tracker(
474+
async with mock_microgrid, BatteryStatusTracker(
496475
BATTERY_ID,
497476
max_data_age=timedelta(seconds=5),
498477
max_blocking_duration=timedelta(seconds=30),
@@ -540,7 +519,7 @@ async def test_sync_blocking_interrupted_with_invalid_message(
540519
status_channel = Broadcast[ComponentStatus]("battery_status")
541520
set_power_result_channel = Broadcast[SetPowerResult]("set_power_result")
542521

543-
async with mock_microgrid, battery_status_tracker(
522+
async with mock_microgrid, BatteryStatusTracker(
544523
BATTERY_ID,
545524
max_data_age=timedelta(seconds=5),
546525
max_blocking_duration=timedelta(seconds=30),
@@ -601,7 +580,7 @@ async def test_timers(
601580
status_channel = Broadcast[ComponentStatus]("battery_status")
602581
set_power_result_channel = Broadcast[SetPowerResult]("set_power_result")
603582

604-
async with mock_microgrid, battery_status_tracker(
583+
async with mock_microgrid, BatteryStatusTracker(
605584
BATTERY_ID,
606585
max_data_age=timedelta(seconds=5),
607586
max_blocking_duration=timedelta(seconds=30),
@@ -667,7 +646,7 @@ async def test_async_battery_status(self, mocker: MockerFixture) -> None:
667646
status_receiver = status_channel.new_receiver()
668647
set_power_result_sender = set_power_result_channel.new_sender()
669648

670-
async with mock_microgrid, battery_status_tracker(
649+
async with mock_microgrid, BatteryStatusTracker(
671650
BATTERY_ID,
672651
max_data_age=timedelta(seconds=5),
673652
max_blocking_duration=timedelta(seconds=30),
@@ -751,7 +730,7 @@ async def setup_tracker(
751730

752731
status_receiver = status_channel.new_receiver()
753732

754-
async with mock_microgrid, battery_status_tracker(
733+
async with mock_microgrid, BatteryStatusTracker(
755734
BATTERY_ID,
756735
max_data_age=timedelta(seconds=0.1),
757736
max_blocking_duration=timedelta(seconds=1),

0 commit comments

Comments
 (0)