Skip to content

Commit c51b8cf

Browse files
committed
Hold references to fetchers outside of the task
This would allow the fetchers to be stopped when the MetricAggregator is stopped. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 98c885a commit c51b8cf

File tree

1 file changed

+11
-11
lines changed

1 file changed

+11
-11
lines changed

src/frequenz/sdk/timeseries/battery_pool/_methods.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ def __init__(
112112
set()
113113
)
114114

115+
self._fetchers: dict[int, ComponentMetricFetcher] = {}
116+
115117
@classmethod
116118
def name(cls) -> str:
117119
"""Get name of the method.
@@ -167,27 +169,25 @@ async def stop(self) -> None:
167169
*[cancel_and_await(self._send_task), cancel_and_await(self._update_task)]
168170
)
169171

170-
async def _create_data_fetchers(self) -> dict[int, ComponentMetricFetcher]:
172+
async def _create_data_fetchers(self) -> None:
171173
fetchers: dict[int, ComponentMetricFetcher] = {
172174
cid: await LatestBatteryMetricsFetcher.async_new(cid, metrics)
173175
for cid, metrics in self._metric_calculator.battery_metrics.items()
174176
}
177+
self._fetchers.update(fetchers)
175178
inverter_fetchers = {
176179
cid: await LatestInverterMetricsFetcher.async_new(cid, metrics)
177180
for cid, metrics in self._metric_calculator.inverter_metrics.items()
178181
}
179-
fetchers.update(inverter_fetchers)
180-
return fetchers
182+
self._fetchers.update(inverter_fetchers)
181183

182-
def _remove_metric_fetcher(
183-
self, fetchers: dict[int, ComponentMetricFetcher], component_id: int
184-
) -> None:
184+
def _remove_metric_fetcher(self, component_id: int) -> None:
185185
_logger.error(
186186
"Removing component %d from the %s formula.",
187187
component_id,
188188
self._result_channel._name, # pylint: disable=protected-access
189189
)
190-
fetchers.pop(component_id)
190+
self._fetchers.pop(component_id)
191191

192192
def _metric_updated(self, new_metrics: ComponentMetricsData) -> bool:
193193
cid = new_metrics.component_id
@@ -197,11 +197,11 @@ def _metric_updated(self, new_metrics: ComponentMetricsData) -> bool:
197197

198198
async def _update_and_notify(self) -> None:
199199
"""Receive component metrics and send notification when they change."""
200-
fetchers = await self._create_data_fetchers()
200+
await self._create_data_fetchers()
201201

202202
self._pending_data_fetchers = {
203203
asyncio.create_task(fetcher.fetch_next(), name=str(cid))
204-
for cid, fetcher in fetchers.items()
204+
for cid, fetcher in self._fetchers.items()
205205
}
206206
while len(self._pending_data_fetchers) > 0:
207207
done, self._pending_data_fetchers = await asyncio.wait(
@@ -210,7 +210,7 @@ async def _update_and_notify(self) -> None:
210210
for item in done:
211211
metrics = item.result()
212212
if metrics is None:
213-
self._remove_metric_fetcher(fetchers, int(item.get_name()))
213+
self._remove_metric_fetcher(int(item.get_name()))
214214
continue
215215
if self._metric_updated(metrics):
216216
self._update_event.set()
@@ -220,7 +220,7 @@ async def _update_and_notify(self) -> None:
220220
self._cached_metrics[cid] = metrics
221221
# Add fetcher back to the processing list.
222222
self._pending_data_fetchers.add(
223-
asyncio.create_task(fetchers[cid].fetch_next(), name=str(cid))
223+
asyncio.create_task(self._fetchers[cid].fetch_next(), name=str(cid))
224224
)
225225

226226
async def _send_on_update(self, min_update_interval: timedelta) -> None:

0 commit comments

Comments
 (0)