Skip to content

Commit e3c7dc0

Browse files
Add stop method to FormulaEvaluator
1 parent 06f5256 commit e3c7dc0

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ async def _run(self) -> None:
314314
try:
315315
msg = await evaluator.apply()
316316
except asyncio.CancelledError:
317-
_logger.debug("FormulaEngine task cancelled: %s", self._name)
317+
_logger.debug("FormulaEngine has been cancelled, stopping evaluator")
318+
await evaluator.stop()
318319
raise
319320
except Exception as err: # pylint: disable=broad-except
320321
_logger.warning(

src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
"""A post-fix formula evaluator that operates on `Sample` receivers."""
55

66
import asyncio
7-
from collections.abc import Callable
7+
from collections.abc import Callable, Awaitable
88
from datetime import datetime
99
from math import isinf, isnan
10-
from typing import Generic
10+
from typing import Generic, Any
1111

1212
from .._base_types import QuantityT, Sample
1313
from ._formula_steps import FormulaStep, MetricFetcher
14+
from frequenz.sdk._internal._asyncio import cancel_and_await
1415

1516

1617
class FormulaEvaluator(Generic[QuantityT]):
@@ -38,6 +39,7 @@ def __init__(
3839
self._metric_fetchers: dict[str, MetricFetcher[QuantityT]] = metric_fetchers
3940
self._first_run = True
4041
self._create_method: Callable[[float], QuantityT] = create_method
42+
self._pending_fetch_tasks: list[asyncio.Task[Any]] = []
4143

4244
async def _synchronize_metric_timestamps(
4345
self, metrics: set[asyncio.Task[Sample[QuantityT] | None]]
@@ -87,6 +89,14 @@ async def _synchronize_metric_timestamps(
8789
self._first_run = False
8890
return latest_ts
8991

92+
async def stop(self) -> None:
93+
tasks_to_cancel = [
94+
asyncio.create_task(cancel_and_await(task))
95+
for task in self._pending_fetch_tasks
96+
]
97+
await asyncio.wait(tasks_to_cancel, return_when=asyncio.ALL_COMPLETED)
98+
self._pending_fetch_tasks = []
99+
90100
async def apply(self) -> Sample[QuantityT]:
91101
"""Fetch the latest metrics, apply the formula once and return the result.
92102
@@ -98,11 +108,13 @@ async def apply(self) -> Sample[QuantityT]:
98108
failed.
99109
"""
100110
eval_stack: list[float] = []
111+
self._pending_fetch_tasks = [
112+
asyncio.create_task(fetcher.fetch_next(), name=name)
113+
for name, fetcher in self._metric_fetchers.items()
114+
]
115+
101116
ready_metrics, pending = await asyncio.wait(
102-
[
103-
asyncio.create_task(fetcher.fetch_next(), name=name)
104-
for name, fetcher in self._metric_fetchers.items()
105-
],
117+
self._pending_fetch_tasks,
106118
return_when=asyncio.ALL_COMPLETED,
107119
)
108120

0 commit comments

Comments
 (0)