Skip to content

Commit 06f5256

Browse files
Add stop method to MetricFetcher
Stop fallback metric fetcher in the `stop` method.
1 parent 60ad95d commit 06f5256

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ async def stop(self) -> None:
117117
if self._task is None:
118118
return
119119
await cancel_and_await(self._task)
120+
120121
_, fetchers = self._builder.finalize()
121122
for fetcher in fetchers.values():
122-
fetcher.stream.close()
123+
await fetcher.stop()
123124

124125
@classmethod
125126
def from_receiver(

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,14 @@ def consume(self) -> Sample[QuantityT]:
7777
), f"Fallback metric fetcher: {self.name} was not started"
7878

7979
return self._receiver.consume()
80+
81+
async def stop(self) -> None:
82+
if self._formula_engine is None:
83+
return
84+
await self._formula_engine.stop()
85+
self._formula_engine = None
86+
87+
if self._receiver is None:
88+
return
89+
self._receiver.close()
90+
self._receiver = None

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ def is_running(self) -> bool:
368368
def start(self) -> None:
369369
"""Initialize the metric fetcher and start fetching samples."""
370370

371+
@abstractmethod
372+
async def stop(self) -> None:
373+
"""Stope the fetcher if is running."""
374+
371375

372376
class MetricFetcher(Generic[QuantityT], FormulaStep):
373377
"""A formula step for fetching a value from a metric Receiver."""
@@ -406,6 +410,11 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
406410
"""
407411
return self._stream
408412

413+
async def stop(self) -> None:
414+
self.stream.close()
415+
if self._fallback:
416+
await self._fallback.stop()
417+
409418
def stream_name(self) -> str:
410419
"""Return the name of the stream.
411420

0 commit comments

Comments
 (0)