Skip to content

Commit 0fcb11f

Browse files
committed
Add a stop method for LatestValueCache
And update users of `LatestValueCache` to call `stop` from their `stop` methods. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent cc4246c commit 0fcb11f

File tree

4 files changed

+17
-3
lines changed

4 files changed

+17
-3
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
from frequenz.channels import Receiver
1111

12+
from ._asyncio import cancel_and_await
13+
1214
T_co = typing.TypeVar("T_co", covariant=True)
1315

1416

@@ -72,3 +74,7 @@ def has_value(self) -> bool:
7274
async def _run(self) -> None:
7375
async for value in self._receiver:
7476
self._latest_value = value
77+
78+
async def stop(self) -> None:
79+
"""Stop the cache."""
80+
await cancel_and_await(self._task)

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ async def start(self) -> None:
188188
@override
189189
async def stop(self) -> None:
190190
"""Stop the battery data manager."""
191+
for bat_cache in self._battery_caches.values():
192+
await bat_cache.stop()
193+
for inv_cache in self._inverter_caches.values():
194+
await inv_cache.stop()
191195
await self._component_pool_status_tracker.stop()
192196

193197
@override

src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ async def distribute_power(self, request: Request) -> None:
9393
@override
9494
async def stop(self) -> None:
9595
"""Stop the ev charger manager."""
96+
await self._voltage_cache.stop()
9697
await self._component_pool_status_tracker.stop()
9798

9899
def _get_ev_charger_ids(self) -> collections.abc.Set[int]:

src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from frequenz.channels import Broadcast, Sender, select, selected_from
1212
from frequenz.channels.timer import SkipMissedAndDrift, Timer
13-
from frequenz.client.microgrid import ComponentCategory
13+
from frequenz.client.microgrid import ComponentCategory, MeterData
1414

1515
from ..._internal._asyncio import cancel_and_await
1616
from ..._internal._channels import LatestValueCache
@@ -55,6 +55,7 @@ def __init__(self, repeat_interval: timedelta) -> None:
5555
)
5656
self._bounds_rx = self._bounds_chan.new_receiver()
5757
self._bounds_tx = self._bounds_chan.new_sender()
58+
self._meter_data_cache: LatestValueCache[MeterData] | None = None
5859

5960
async def set(self, component_id: int, max_amps: float) -> None:
6061
"""Send the given current limit to the microgrid for the given component id.
@@ -75,6 +76,8 @@ def new_bounds_sender(self) -> Sender[ComponentCurrentLimit]:
7576

7677
async def stop(self) -> None:
7778
"""Stop the BoundsSetter."""
79+
if self._meter_data_cache is not None:
80+
await self._meter_data_cache.stop()
7881
await self._bounds_chan.close()
7982
await cancel_and_await(self._task)
8083

@@ -96,7 +99,7 @@ async def _run(self) -> None:
9699
_logger.error(err)
97100
raise RuntimeError(err)
98101

99-
meter_data = LatestValueCache(
102+
self._meter_data_cache = LatestValueCache(
100103
await api_client.meter_data(next(iter(meters)).component_id)
101104
)
102105
latest_bound: dict[int, ComponentCurrentLimit] = {}
@@ -107,7 +110,7 @@ async def _run(self) -> None:
107110
)
108111

109112
async for selected in select(bound_chan, timer):
110-
meter = meter_data.get()
113+
meter = self._meter_data_cache.get()
111114
if meter is None:
112115
raise ValueError("Meter channel closed.")
113116

0 commit comments

Comments
 (0)