Skip to content

Commit a29e600

Browse files
authored
Some component status cleanup (#839)
- Refactor component status tracking into its own package. - Use `timedelta`s instead of `float`s for duration in component status trackers. - Convert `ComponentStatusTracker`s into `BackgroundService`s
2 parents 5f3cd29 + 97e59b7 commit a29e600

File tree

7 files changed

+145
-143
lines changed

7 files changed

+145
-143
lines changed

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
from ....microgrid import connection_manager
2020
from ....microgrid.component import BatteryData, ComponentCategory, InverterData
2121
from ....timeseries._quantities import Power
22-
from .._battery_status_tracker import BatteryStatusTracker
2322
from .._component_pool_status_tracker import ComponentPoolStatusTracker
24-
from .._component_status import ComponentPoolStatus
23+
from .._component_status import BatteryStatusTracker, ComponentPoolStatus
2524
from .._distribution_algorithm import (
2625
AggregatedBatteryData,
2726
BatteryDistributionAlgorithm,
@@ -146,8 +145,8 @@ def __init__(
146145
self._component_pool_status_tracker = ComponentPoolStatusTracker(
147146
component_ids=set(self._battery_ids),
148147
component_status_sender=component_pool_status_sender,
149-
max_blocking_duration_sec=30.0,
150-
max_data_age_sec=10.0,
148+
max_blocking_duration=timedelta(seconds=30.0),
149+
max_data_age=timedelta(seconds=10.0),
151150
component_status_tracker_type=BatteryStatusTracker,
152151
)
153152

src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66

77
import asyncio
8+
import contextlib
89
import logging
910
from collections import abc
11+
from datetime import timedelta
1012

1113
from frequenz.channels import Broadcast, Receiver, Sender
1214
from frequenz.channels.util import Merge
@@ -32,10 +34,10 @@ class ComponentPoolStatusTracker:
3234

3335
def __init__( # pylint: disable=too-many-arguments
3436
self,
35-
component_ids: set[int],
37+
component_ids: abc.Set[int],
3638
component_status_sender: Sender[ComponentPoolStatus],
37-
max_data_age_sec: float,
38-
max_blocking_duration_sec: float,
39+
max_data_age: timedelta,
40+
max_blocking_duration: timedelta,
3941
component_status_tracker_type: type[ComponentStatusTracker],
4042
) -> None:
4143
"""Create ComponentPoolStatusTracker instance.
@@ -44,18 +46,17 @@ def __init__( # pylint: disable=too-many-arguments
4446
component_ids: set of component ids whose status is to be tracked.
4547
component_status_sender: The sender used for sending the status of the
4648
tracked components.
47-
max_data_age_sec: If a component stops sending data, then this is the
48-
maximum time for which its last message should be considered as
49-
valid. After that time, the component won't be used until it starts
50-
sending data.
51-
max_blocking_duration_sec: This value tell what should be the maximum
52-
timeout used for blocking failing component.
53-
component_status_tracker_type: component status tracker to use
54-
for tracking the status of the components.
49+
max_data_age: If a component stops sending data, then this is the maximum
50+
time for which its last message should be considered as valid. After
51+
that time, the component won't be used until it starts sending data.
52+
max_blocking_duration: This value tell what should be the maximum timeout
53+
used for blocking failing component.
54+
component_status_tracker_type: component status tracker to use for tracking
55+
the status of the components.
5556
"""
5657
self._component_ids = component_ids
57-
self._max_data_age_sec = max_data_age_sec
58-
self._max_blocking_duration_sec = max_blocking_duration_sec
58+
self._max_data_age = max_data_age
59+
self._max_blocking_duration = max_blocking_duration
5960
self._component_status_sender = component_status_sender
6061
self._component_status_tracker_type = component_status_tracker_type
6162

@@ -83,9 +84,6 @@ async def join(self) -> None:
8384
async def stop(self) -> None:
8485
"""Stop the ComponentPoolStatusTracker instance."""
8586
await cancel_and_await(self._task)
86-
await asyncio.gather(
87-
*[tracker.stop() for tracker in self._component_status_trackers],
88-
)
8987
await self._merged_status_receiver.stop()
9088

9189
def _make_merged_status_receiver(
@@ -99,8 +97,8 @@ def _make_merged_status_receiver(
9997
)
10098
tracker = self._component_status_tracker_type(
10199
component_id=component_id,
102-
max_data_age_sec=self._max_data_age_sec,
103-
max_blocking_duration_sec=self._max_blocking_duration_sec,
100+
max_data_age=self._max_data_age,
101+
max_blocking_duration=self._max_blocking_duration,
104102
status_sender=channel.new_sender(),
105103
set_power_result_receiver=self._set_power_result_channel.new_receiver(),
106104
)
@@ -110,14 +108,17 @@ def _make_merged_status_receiver(
110108

111109
async def _run(self) -> None:
112110
"""Start tracking component status."""
113-
while True:
114-
try:
115-
await self._update_status()
116-
except Exception as err: # pylint: disable=broad-except
117-
_logger.error(
118-
"ComponentPoolStatus failed with error: %s. Restarting.", err
119-
)
120-
await asyncio.sleep(1.0)
111+
async with contextlib.AsyncExitStack() as stack:
112+
for tracker in self._component_status_trackers:
113+
await stack.enter_async_context(tracker)
114+
while True:
115+
try:
116+
await self._update_status()
117+
except Exception as err: # pylint: disable=broad-except
118+
_logger.error(
119+
"ComponentPoolStatus failed with error: %s. Restarting.", err
120+
)
121+
await asyncio.sleep(1.0)
121122

122123
async def _update_status(self) -> None:
123124
async for status in self._merged_status_receiver:
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Status tracking for components."""
5+
6+
from ._battery_status_tracker import BatteryStatusTracker
7+
from ._component_status import (
8+
ComponentPoolStatus,
9+
ComponentStatus,
10+
ComponentStatusEnum,
11+
ComponentStatusTracker,
12+
SetPowerResult,
13+
)
14+
15+
__all__ = [
16+
"BatteryStatusTracker",
17+
"ComponentPoolStatus",
18+
"ComponentStatus",
19+
"ComponentStatusEnum",
20+
"ComponentStatusTracker",
21+
"SetPowerResult",
22+
]

src/frequenz/sdk/actor/power_distributing/_battery_status_tracker.py renamed to src/frequenz/sdk/actor/power_distributing/_component_status/_battery_status_tracker.py

Lines changed: 52 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
from frequenz.channels.util import Timer, select, selected_from
2121
from typing_extensions import override
2222

23-
from ..._internal._asyncio import cancel_and_await
24-
from ...microgrid import connection_manager
25-
from ...microgrid.component import (
23+
from ....microgrid import connection_manager
24+
from ....microgrid.component import (
2625
BatteryData,
2726
ComponentCategory,
2827
ComponentData,
2928
InverterData,
3029
)
30+
from ..._background_service import BackgroundService
3131
from ._component_status import (
3232
ComponentStatus,
3333
ComponentStatusEnum,
@@ -55,55 +55,54 @@ class _ComponentStreamStatus:
5555

5656
@dataclass
5757
class _BlockingStatus:
58-
min_duration_sec: float
59-
"""The minimum blocking duration (in seconds)."""
58+
min_duration: timedelta
59+
"""The minimum blocking duration."""
6060

61-
max_duration_sec: float
62-
"""The maximum blocking duration (in seconds)."""
61+
max_duration: timedelta
62+
"""The maximum blocking duration."""
6363

64-
last_blocking_duration_sec: float = 0.0
65-
"""Last blocking duration (in seconds)."""
64+
last_blocking_duration: timedelta = timedelta(seconds=0.0)
65+
"""Last blocking duration."""
6666

6767
blocked_until: datetime | None = None
6868
"""Until when the battery is blocked."""
6969

7070
def __post_init__(self) -> None:
71-
assert self.min_duration_sec <= self.max_duration_sec, (
72-
f"Minimum blocking duration ({self.min_duration_sec}) cannot be greater "
73-
f"than maximum blocking duration ({self.max_duration_sec})"
71+
assert self.min_duration <= self.max_duration, (
72+
f"Minimum blocking duration ({self.min_duration}) cannot be greater "
73+
f"than maximum blocking duration ({self.max_duration})"
7474
)
75-
self.last_blocking_duration_sec = self.min_duration_sec
75+
self.last_blocking_duration = self.min_duration
76+
self._timedelta_zero = timedelta(seconds=0.0)
7677

77-
def block(self) -> float:
78+
def block(self) -> timedelta:
7879
"""Block battery.
7980
8081
Battery can be unblocked using `self.unblock()` method.
8182
8283
Returns:
83-
For how long (in seconds) the battery is blocked.
84+
The duration for which the battery is blocked.
8485
"""
8586
now = datetime.now(tz=timezone.utc)
8687

8788
# If is not blocked
8889
if self.blocked_until is None:
89-
self.last_blocking_duration_sec = self.min_duration_sec
90-
self.blocked_until = now + timedelta(
91-
seconds=self.last_blocking_duration_sec
92-
)
93-
return self.last_blocking_duration_sec
90+
self.last_blocking_duration = self.min_duration
91+
self.blocked_until = now + self.last_blocking_duration
92+
return self.last_blocking_duration
9493

9594
# If still blocked, then do nothing
9695
if self.blocked_until > now:
97-
return 0.0
96+
return self._timedelta_zero
9897

9998
# If previous blocking time expired, then blocked it once again.
10099
# Increase last blocking time, unless it reach the maximum.
101-
self.last_blocking_duration_sec = min(
102-
2 * self.last_blocking_duration_sec, self.max_duration_sec
100+
self.last_blocking_duration = min(
101+
2 * self.last_blocking_duration, self.max_duration
103102
)
104-
self.blocked_until = now + timedelta(seconds=self.last_blocking_duration_sec)
103+
self.blocked_until = now + self.last_blocking_duration
105104

106-
return self.last_blocking_duration_sec
105+
return self.last_blocking_duration
107106

108107
def unblock(self) -> None:
109108
"""Unblock battery.
@@ -127,7 +126,7 @@ def is_blocked(self) -> bool:
127126
return self.blocked_until > datetime.now(tz=timezone.utc)
128127

129128

130-
class BatteryStatusTracker(ComponentStatusTracker):
129+
class BatteryStatusTracker(ComponentStatusTracker, BackgroundService):
131130
"""Class for tracking if battery is working.
132131
133132
Status updates are sent out only when there is a status change.
@@ -166,20 +165,19 @@ class BatteryStatusTracker(ComponentStatusTracker):
166165
def __init__( # pylint: disable=too-many-arguments
167166
self,
168167
component_id: int,
169-
max_data_age_sec: float,
170-
max_blocking_duration_sec: float,
168+
max_data_age: timedelta,
169+
max_blocking_duration: timedelta,
171170
status_sender: Sender[ComponentStatus],
172171
set_power_result_receiver: Receiver[SetPowerResult],
173172
) -> None:
174173
"""Create class instance.
175174
176175
Args:
177176
component_id: Id of this battery
178-
max_data_age_sec: If component stopped sending data, then
179-
this is the maximum time when its last message should be considered as
180-
valid. After that time, component won't be used until it starts sending
181-
data.
182-
max_blocking_duration_sec: This value tell what should be the maximum
177+
max_data_age: If component stopped sending data, then this is the maximum
178+
time when its last message should be considered as valid. After that
179+
time, component won't be used until it starts sending data.
180+
max_blocking_duration: This value tell what should be the maximum
183181
timeout used for blocking failing component.
184182
status_sender: Channel to send status updates.
185183
set_power_result_receiver: Channel to receive results of the requests to the
@@ -188,13 +186,18 @@ def __init__( # pylint: disable=too-many-arguments
188186
Raises:
189187
RuntimeError: If battery has no adjacent inverter.
190188
"""
191-
self._max_data_age = max_data_age_sec
189+
BackgroundService.__init__(self, name=f"BatteryStatusTracker({component_id})")
190+
self._max_data_age = max_data_age
191+
self._status_sender = status_sender
192+
self._set_power_result_receiver = set_power_result_receiver
193+
192194
# First battery is considered as not working.
193195
# Change status after first messages are received.
194196
self._last_status: ComponentStatusEnum = ComponentStatusEnum.NOT_WORKING
195197
self._blocking_status: _BlockingStatus = _BlockingStatus(
196-
1.0, max_blocking_duration_sec
198+
timedelta(seconds=1.0), max_blocking_duration
197199
)
200+
self._timedelta_zero = timedelta(seconds=0.0)
198201

199202
inverter_id = self._find_adjacent_inverter_id(component_id)
200203
if inverter_id is None:
@@ -204,17 +207,22 @@ def __init__( # pylint: disable=too-many-arguments
204207

205208
self._battery: _ComponentStreamStatus = _ComponentStreamStatus(
206209
component_id,
207-
data_recv_timer=Timer.timeout(timedelta(seconds=max_data_age_sec)),
210+
data_recv_timer=Timer.timeout(max_data_age),
208211
)
209212
self._inverter: _ComponentStreamStatus = _ComponentStreamStatus(
210213
inverter_id,
211-
data_recv_timer=Timer.timeout(timedelta(seconds=max_data_age_sec)),
214+
data_recv_timer=Timer.timeout(max_data_age),
212215
)
213216

214217
# Select needs receivers that can be get in async way only.
215218

216-
self._task: asyncio.Task[None] = asyncio.create_task(
217-
self._run(status_sender, set_power_result_receiver)
219+
@override
220+
def start(self) -> None:
221+
"""Start the BatteryStatusTracker instance."""
222+
self._tasks.add(
223+
asyncio.create_task(
224+
self._run(self._status_sender, self._set_power_result_receiver)
225+
)
218226
)
219227

220228
@property
@@ -226,10 +234,6 @@ def battery_id(self) -> int:
226234
"""
227235
return self._battery.component_id
228236

229-
async def stop(self) -> None:
230-
"""Stop tracking battery status."""
231-
await cancel_and_await(self._task)
232-
233237
def _handle_status_battery(self, bat_data: BatteryData) -> None:
234238
self._battery.last_msg_correct = (
235239
self._is_message_reliable(bat_data)
@@ -259,9 +263,9 @@ def _handle_status_set_power_result(self, result: SetPowerResult) -> None:
259263
):
260264
duration = self._blocking_status.block()
261265

262-
if duration > 0:
266+
if duration > self._timedelta_zero:
263267
_logger.warning(
264-
"battery %d failed last response. block it for %f sec",
268+
"battery %d failed last response. block it for %s",
265269
self.battery_id,
266270
duration,
267271
)
@@ -345,7 +349,7 @@ async def _run(
345349
if (
346350
datetime.now(tz=timezone.utc)
347351
- self._battery.last_msg_timestamp
348-
) < timedelta(seconds=self._max_data_age):
352+
) < self._max_data_age:
349353
# This means that we have received data from the battery
350354
# since the timer triggered, but the timer event arrived
351355
# late, so we can ignore it.
@@ -356,7 +360,7 @@ async def _run(
356360
if (
357361
datetime.now(tz=timezone.utc)
358362
- self._inverter.last_msg_timestamp
359-
) < timedelta(seconds=self._max_data_age):
363+
) < self._max_data_age:
360364
# This means that we have received data from the inverter
361365
# since the timer triggered, but the timer event arrived
362366
# late, so we can ignore it.
@@ -505,7 +509,7 @@ def _is_timestamp_outdated(self, timestamp: datetime) -> bool:
505509
_True if timestamp is to old, False otherwise
506510
"""
507511
now = datetime.now(tz=timezone.utc)
508-
diff = (now - timestamp).total_seconds()
512+
diff = now - timestamp
509513
return diff > self._max_data_age
510514

511515
def _is_message_reliable(self, message: ComponentData) -> bool:

0 commit comments

Comments
 (0)