Skip to content

Commit ffa451c

Browse files
committed
Refactor ComponentPoolStatusTracker's BatteryStatusTracker creation
This simplifies the `__init__` method and isolates the interaction with `BatteryStatusTracker` to a single function. It can be made generic so that it supports being substituted with `PVStatusTracker` or `EVChargerStatusTracker`, etc. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent f141e9c commit ffa451c

File tree

1 file changed

+39
-69
lines changed

1 file changed

+39
-69
lines changed

src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py

Lines changed: 39 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import asyncio
88
import logging
99
from collections import abc
10-
from dataclasses import dataclass
1110

1211
from frequenz.channels import Broadcast, Receiver, Sender
1312
from frequenz.channels.util import MergeNamed
@@ -19,26 +18,6 @@
1918
_logger = logging.getLogger(__name__)
2019

2120

22-
@dataclass
23-
class _ComponentStatusChannelHelper:
24-
"""Helper class to create component status channel.
25-
26-
Channel has only one receiver.
27-
Receiver has size 1, because we need only latest status.
28-
"""
29-
30-
component_id: int
31-
"""Id of the component for which we should create channel."""
32-
33-
def __post_init__(self) -> None:
34-
self.name: str = f"component-{self.component_id}-status"
35-
channel = Broadcast[ComponentStatusEnum](self.name)
36-
37-
receiver_name = f"{self.name}-receiver"
38-
self.receiver = channel.new_receiver(name=receiver_name, maxsize=1)
39-
self.sender = channel.new_sender()
40-
41-
4221
class ComponentPoolStatusTracker:
4322
"""Track status of components of a given category.
4423
@@ -70,39 +49,24 @@ def __init__( # noqa: DOC502 (RuntimeError raised from BatteryStatusTracker)
7049
RuntimeError: When managing batteries, if any battery has no adjacent
7150
inverter.
7251
"""
52+
self._component_ids = component_ids
53+
self._max_data_age_sec = max_data_age_sec
54+
self._max_blocking_duration_sec = max_blocking_duration_sec
55+
self._component_status_sender = component_status_sender
56+
7357
# At first no component is working, we will get notification when they start
7458
# working.
7559
self._current_status = ComponentPoolStatus(working=set(), uncertain=set())
7660

7761
# Channel for sending results of requests to the components.
78-
set_power_result_channel = Broadcast[SetPowerResult]("component_request_status")
79-
self._set_power_result_sender = set_power_result_channel.new_sender()
80-
81-
self._batteries: dict[str, BatteryStatusTracker] = {}
82-
83-
# Receivers for individual components statuses are needed to create a
84-
# `MergeNamed` object.
85-
receivers: dict[str, Receiver[ComponentStatusEnum]] = {}
86-
87-
for battery_id in component_ids:
88-
channel = _ComponentStatusChannelHelper(battery_id)
89-
receivers[channel.name] = channel.receiver
90-
91-
self._batteries[channel.name] = BatteryStatusTracker(
92-
battery_id=battery_id,
93-
max_data_age_sec=max_data_age_sec,
94-
max_blocking_duration_sec=max_blocking_duration_sec,
95-
status_sender=channel.sender,
96-
set_power_result_receiver=set_power_result_channel.new_receiver(
97-
f"battery_{battery_id}_request_status"
98-
),
99-
)
100-
101-
self._component_status_channel = MergeNamed[ComponentStatusEnum](
102-
**receivers,
62+
self._set_power_result_channel = Broadcast[SetPowerResult](
63+
"component_request_status"
10364
)
65+
self._set_power_result_sender = self._set_power_result_channel.new_sender()
66+
self._component_status_trackers: dict[str, BatteryStatusTracker] = {}
67+
self._merged_status_receiver = self._make_merged_status_receiver()
10468

105-
self._task = asyncio.create_task(self._run(component_status_sender))
69+
self._task = asyncio.create_task(self._run())
10670

10771
async def join(self) -> None:
10872
"""Wait and return when the instance's task completes.
@@ -114,40 +78,46 @@ async def join(self) -> None:
11478
async def stop(self) -> None:
11579
"""Stop tracking batteries status."""
11680
await cancel_and_await(self._task)
117-
11881
await asyncio.gather(
11982
*[
12083
tracker.stop() # pylint: disable=protected-access
121-
for tracker in self._batteries.values()
84+
for tracker in self._component_status_trackers.values()
12285
],
12386
)
124-
await self._component_status_channel.stop()
87+
await self._merged_status_receiver.stop()
12588

126-
async def _run(self, component_status_sender: Sender[ComponentPoolStatus]) -> None:
127-
"""Start tracking component status.
89+
def _make_merged_status_receiver(
90+
self,
91+
) -> MergeNamed[ComponentStatusEnum]:
92+
status_receivers: dict[str, Receiver[ComponentStatusEnum]] = {}
93+
94+
for component_id in self._component_ids:
95+
channel_name = f"component_{component_id}_status"
96+
channel: Broadcast[ComponentStatusEnum] = Broadcast(channel_name)
97+
tracker = BatteryStatusTracker(
98+
battery_id=component_id,
99+
max_data_age_sec=self._max_data_age_sec,
100+
max_blocking_duration_sec=self._max_blocking_duration_sec,
101+
status_sender=channel.new_sender(),
102+
set_power_result_receiver=self._set_power_result_channel.new_receiver(),
103+
)
104+
self._component_status_trackers[channel_name] = tracker
105+
status_receivers[channel_name] = channel.new_receiver()
106+
return MergeNamed(**status_receivers)
128107

129-
Args:
130-
component_status_sender: The sender used for sending the status of the
131-
components in the pool.
132-
"""
108+
async def _run(self) -> None:
109+
"""Start tracking component status."""
133110
while True:
134111
try:
135-
await self._update_status(component_status_sender)
112+
await self._update_status()
136113
except Exception as err: # pylint: disable=broad-except
137114
_logger.error(
138115
"ComponentPoolStatus failed with error: %s. Restarting.", err
139116
)
140117

141-
async def _update_status(
142-
self, component_status_sender: Sender[ComponentPoolStatus]
143-
) -> None:
144-
"""Wait for any component to change status and update status.
145-
146-
Args:
147-
component_status_sender: Sender to send the current status of components.
148-
"""
149-
async for channel_name, status in self._component_status_channel:
150-
component_id = self._batteries[channel_name].battery_id
118+
async def _update_status(self) -> None:
119+
async for channel_name, status in self._merged_status_receiver:
120+
component_id = self._component_status_trackers[channel_name].battery_id
151121
if status == ComponentStatusEnum.WORKING:
152122
self._current_status.working.add(component_id)
153123
self._current_status.uncertain.discard(component_id)
@@ -158,7 +128,7 @@ async def _update_status(
158128
self._current_status.working.discard(component_id)
159129
self._current_status.uncertain.discard(component_id)
160130

161-
await component_status_sender.send(self._current_status)
131+
await self._component_status_sender.send(self._current_status)
162132

163133
async def update_status(
164134
self, succeed_components: set[int], failed_components: set[int]
@@ -178,7 +148,7 @@ async def update_status(
178148
SetPowerResult(succeed_components, failed_components)
179149
)
180150

181-
def get_working_components(self, components: abc.Set[int]) -> set[int]:
151+
def get_working_components(self, components: abc.Set[int]) -> abc.Set[int]:
182152
"""From the given set of components, return only working ones.
183153
184154
Args:

0 commit comments

Comments
 (0)