Skip to content

Commit 48ac318

Browse files
Add stop method to MetricFetcher
Stop fallback metric fetcher in the `stop` method.
1 parent 60ad95d commit 48ac318

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ async def stop(self) -> None:
117117
if self._task is None:
118118
return
119119
await cancel_and_await(self._task)
120+
120121
_, fetchers = self._builder.finalize()
121122
for fetcher in fetchers.values():
122-
fetcher.stream.close()
123+
await fetcher.stop()
123124

124125
@classmethod
125126
def from_receiver(

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,15 @@ def consume(self) -> Sample[QuantityT]:
7777
), f"Fallback metric fetcher: {self.name} was not started"
7878

7979
return self._receiver.consume()
80+
81+
async def stop(self) -> None:
82+
"""Stop fallback metric fetcher, by closing the connection."""
83+
if self._formula_engine is None:
84+
return
85+
await self._formula_engine.stop()
86+
self._formula_engine = None
87+
88+
if self._receiver is None:
89+
return
90+
self._receiver.close()
91+
self._receiver = None

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ def is_running(self) -> bool:
368368
def start(self) -> None:
369369
"""Initialize the metric fetcher and start fetching samples."""
370370

371+
@abstractmethod
372+
async def stop(self) -> None:
373+
"""Stope the fetcher if is running."""
374+
371375

372376
class MetricFetcher(Generic[QuantityT], FormulaStep):
373377
"""A formula step for fetching a value from a metric Receiver."""
@@ -406,6 +410,12 @@ def stream(self) -> Receiver[Sample[QuantityT]]:
406410
"""
407411
return self._stream
408412

413+
async def stop(self) -> None:
414+
"""Stop metric fetcher."""
415+
self.stream.close()
416+
if self._fallback:
417+
await self._fallback.stop()
418+
409419
def stream_name(self) -> str:
410420
"""Return the name of the stream.
411421

0 commit comments

Comments
 (0)