Skip to content

Remove callback and expose the Resampler #107

@leandro-lucarella-frequenz

Description

What's needed?

The Resampler was redesigned using a Sink (a callback) as a basic concept for timeseries, but we probably don't want to allow users to use callbacks because it was abused in the past.

Proposed solution

Rewrite the Resampler in a way that doesn't use a callback, but returns the resample window with all samples on each iteration.

From #101 (comment):

Resampler being used by the resampling actor
    async def _resample(self) -> None:
        """Do the resampling."""
        while True:
            try:
                async for samples in self._resampler.resample():
                    await asyncio.gather(
                        *[
                            self._senders[source].send(sample)
                            for source, sample in samples.items()
                        ]
                    )
            except ResamplingError as err:
                # Send whatever we got
                if err.samples:
                    await asyncio.gather(
                        *[
                            self._senders[source].send(sample)
                            for source, sample in err.samples.items()
                        ]
                    )
                # Remove failing timeseries
                for source, source_error in err.exceptions.items():
                    logger.exception(
                        "Error resampling source %s, removing source...",
                        source,
                        exc_info=source_error,
                    )
                    removed = self._resampler.remove_timeseries(source)
                    popped = self._senders.pop(source, None)
                    if not removed or not popped:
                        logger.warning(
                            "Got an exception from an unknown source: "
                            "source=%r, exception=%r resampler=%s, senders=%s",
                            source,
                            source_error,
                            removed,
                            popped,
                            exc_info=source_error,
                        )

    async def run(self) -> None:
        """Resample known component metrics and process resampling requests.

        If there is a resampling error while resampling some component metric,
        then that metric will be discarded and not resampled any more. Any
        other error will be propagated (most likely ending in the actor being
        restarted).

        Raises:
            RuntimeError: If there is some unexpected error while resampling or
                handling requests.

        # noqa: DAR401 error
        """
        tasks_to_cancel: set[asyncio.Task] = set()
        try:
            subscriptions_task = asyncio.create_task(
                self._process_resampling_requests()
            )
            tasks_to_cancel.add(subscriptions_task)

            while True:
                resampling_task = asyncio.create_task(self._resample())
                tasks_to_cancel.add(resampling_task)
                done, _ = await asyncio.wait(
                    [resampling_task, subscriptions_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )

                if subscriptions_task in done:
                    tasks_to_cancel.remove(subscriptions_task)
                    raise RuntimeError(
                        "There was a problem with the subscriptions channel."
                    )

                if resampling_task in done:
                    tasks_to_cancel.remove(resampling_task)
                    # The resampler shouldn't end without an exception
                    error = resampling_task.exception()
                    assert (
                        error is not None
                    ), "The resample() function shouldn't exit without an exception."

                    # We don't know what to do with whatever error comes that
                    # wasn't handled in self._resample(), so propagate the
                    # exception if that is the case.
                    raise error
                    # The resampling_task will be re-created if we reached this point
        finally:
            await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
Diff of the changes (not updating all the tests which also need a lot of rewriting)
diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py
index 10cd77b..c18e4aa 100644
--- a/src/frequenz/sdk/actor/_resampling.py
+++ b/src/frequenz/sdk/actor/_resampling.py
@@ -3,10 +3,12 @@
 
 """ComponentMetricsResamplingActor used to subscribe for resampled component metrics."""
 
+from __future__ import annotations
+
 import asyncio
 import dataclasses
 import logging
-from typing import Sequence, Set
+from collections.abc import AsyncIterator, Sequence
 
 from frequenz.channels import Receiver, Sender
 
@@ -142,11 +144,12 @@ class ComponentMetricsResamplingActor:
         self._resampling_period_s = resampling_period_s
         self._max_data_age_in_periods: float = max_data_age_in_periods
         self._resampling_function: ResamplingFunction = resampling_function
-        self._resampler = Resampler(
+        self._resampler: Resampler = Resampler(
             resampling_period_s=resampling_period_s,
             max_data_age_in_periods=max_data_age_in_periods,
             resampling_function=resampling_function,
         )
+        self._senders: dict[AsyncIterator[Sample], Sender[Sample]] = {}
 
     async def _subscribe(self, request: ComponentMetricRequest) -> None:
         """Subscribe for data for a specific time series.
@@ -160,22 +163,56 @@ class ComponentMetricsResamplingActor:
         data_source_channel_name = data_source_request.get_channel_name()
         await self._subscription_sender.send(data_source_request)
         receiver = self._channel_registry.new_receiver(data_source_channel_name)
-
-        # This is a temporary hack until the Sender implementation uses
-        # exceptions to report errors.
         sender = self._channel_registry.new_sender(request.get_channel_name())
 
-        async def sink_adapter(sample: Sample) -> None:
-            if not await sender.send(sample):
-                raise Exception(f"Error while sending with sender {sender}", sender)
-
-        self._resampler.add_timeseries(receiver, sink_adapter)
+        self._senders[receiver] = sender
+        self._resampler.add_timeseries(receiver)
 
     async def _process_resampling_requests(self) -> None:
         """Process resampling data requests."""
         async for request in self._subscription_receiver:
             await self._subscribe(request)
 
+    async def _resample(self) -> None:
+        """Do the resampling."""
+        while True:
+            try:
+                async for samples in self._resampler.resample():
+                    await asyncio.gather(
+                        *[
+                            self._senders[source].send(sample)
+                            for source, sample in samples.items()
+                        ]
+                    )
+            except ResamplingError as err:
+                # Send whatever we got
+                if err.samples:
+                    await asyncio.gather(
+                        *[
+                            self._senders[source].send(sample)
+                            for source, sample in err.samples.items()
+                        ]
+                    )
+                # Remove failing timeseries
+                for source, source_error in err.exceptions.items():
+                    logger.exception(
+                        "Error resampling source %s, removing source...",
+                        source,
+                        exc_info=source_error,
+                    )
+                    removed = self._resampler.remove_timeseries(source)
+                    popped = self._senders.pop(source, None)
+                    if not removed or not popped:
+                        logger.warning(
+                            "Got an exception from an unknown source: "
+                            "source=%r, exception=%r resampler=%s, senders=%s",
+                            source,
+                            source_error,
+                            removed,
+                            popped,
+                            exc_info=source_error,
+                        )
+
     async def run(self) -> None:
         """Resample known component metrics and process resampling requests.
 
@@ -190,7 +227,7 @@ class ComponentMetricsResamplingActor:
 
         # noqa: DAR401 error
         """
-        tasks_to_cancel: Set[asyncio.Task] = set()
+        tasks_to_cancel: set[asyncio.Task] = set()
         try:
             subscriptions_task = asyncio.create_task(
                 self._process_resampling_requests()
@@ -198,7 +235,7 @@ class ComponentMetricsResamplingActor:
             tasks_to_cancel.add(subscriptions_task)
 
             while True:
-                resampling_task = asyncio.create_task(self._resampler.resample())
+                resampling_task = asyncio.create_task(self._resample())
                 tasks_to_cancel.add(resampling_task)
                 done, _ = await asyncio.wait(
                     [resampling_task, subscriptions_task],
@@ -217,28 +254,12 @@ class ComponentMetricsResamplingActor:
                     error = resampling_task.exception()
                     assert (
                         error is not None
-                    ), "The resample() function shouldn't exit normally."
-
-                    # We don't know what to do with something other than
-                    # ResamplingError, so propagate the exception if that is the
-                    # case.
-                    if not isinstance(error, ResamplingError):
-                        raise error
-                    for source, source_error in error.exceptions.items():
-                        logger.exception(
-                            "Error resampling source %s, removing source...",
-                            source,
-                            exc_info=source_error,
-                        )
-                        removed = self._resampler.remove_timeseries(source)
-                        if not removed:
-                            logger.warning(
-                                "Got an exception from an unknown source: "
-                                "source=%r, exception=%r",
-                                source,
-                                source_error,
-                                exc_info=source_error,
-                            )
+                    ), "The resample() function shouldn't exit without an exception."
+
+                    # We don't know what to do with whatever error comes that
+                    # wasn't handled in self._resample(), so propagate the
+                    # exception if that is the case.
+                    raise error
                     # The resampling_task will be re-created if we reached this point
         finally:
             await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
diff --git a/src/frequenz/sdk/timeseries/resampling.py b/src/frequenz/sdk/timeseries/resampling.py
index 8c830da..88c6c4b 100644
--- a/src/frequenz/sdk/timeseries/resampling.py
+++ b/src/frequenz/sdk/timeseries/resampling.py
@@ -3,17 +3,20 @@
 
 """Timeseries resampler."""
 
+from __future__ import annotations
+
 import asyncio
 import logging
 import math
 from collections import deque
+from collections.abc import AsyncIterator
 from datetime import datetime, timedelta
-from typing import Callable, Deque, Dict, Sequence, Union, cast
+from typing import Callable, Deque, Dict, Sequence, Union
 
 from frequenz.channels.util import Timer
 
 from ..util.asyncio import cancel_and_await
-from . import Sample, Sink, Source
+from . import Sample
 
 _logger = logging.Logger(__name__)
 
@@ -45,7 +48,7 @@ Returns:
 class SourceStoppedError(RuntimeError):
     """A timeseries stopped producing samples."""
 
-    def __init__(self, source: Source) -> None:
+    def __init__(self, source: AsyncIterator[Sample]) -> None:
         """Create an instance.
 
         Args:
@@ -68,36 +71,36 @@ class SourceStoppedError(RuntimeError):
 class ResamplingError(RuntimeError):
     """An Error ocurred while resampling.
 
-    This error is a container for errors raised by the underlying sources and
-    or sinks.
+    This error provides information on which samples were succesfully produced
+    and which failed. Successful samples are stored in `samples` and failing
+    timeseries have their exceptions resported in `exceptions`.
     """
 
     def __init__(
-        self, exceptions: Dict[Source, Union[Exception, asyncio.CancelledError]]
+        self,
+        samples: Dict[AsyncIterator[Sample], Sample],
+        exceptions: Dict[
+            AsyncIterator[Sample], Union[Exception, asyncio.CancelledError]
+        ],
     ) -> None:
         """Create an instance.
 
         Args:
-            exceptions: A mapping of timeseries source and the exception
-                encountered while resampling that timeseries. Note that the
-                error could be raised by the sink, while trying to send
-                a resampled data for this timeseries, the source key is only
-                used to identify the timeseries with the issue, it doesn't
-                necessarily mean that the error was raised by the source. The
-                underlying exception should provide information about what was
+            samples: The samples from the timeseries that could be successfully
+                calculated.
+            exceptions: The exceptions raised by failing timeseries.
+                encountered while resampling that timeseries.
                 the actual source of the exception.
         """
         super().__init__(f"Some error were found while resampling: {exceptions}")
-        self.exceptions = exceptions
-        """A mapping of timeseries source and the exception encountered.
-
-        Note that the error could be raised by the sink, while trying to send
-        a resampled data for this timeseries, the source key is only used to
-        identify the timeseries with the issue, it doesn't necessarily mean
-        that the error was raised by the source. The underlying exception
-        should provide information about what was the actual source of the
-        exception.
-        """
+
+        self.samples: Dict[AsyncIterator[Sample], Sample] = samples
+        """The samples from the timeseries that could be successfully calculated."""
+
+        self.exceptions: Dict[
+            AsyncIterator[Sample], Union[Exception, asyncio.CancelledError]
+        ] = exceptions
+        """The exceptions raised by failing timeseries."""
 
     def __repr__(self) -> str:
         """Return the representation of the instance.
@@ -105,15 +108,15 @@ class ResamplingError(RuntimeError):
         Returns:
             The representation of the instance.
         """
-        return f"{self.__class__.__name__}({self.exceptions=})"
+        return f"{self.__class__.__name__}({self.samples=}, {self.exceptions=})"
 
 
 class Resampler:
     """A timeseries resampler.
 
-    In general timeseries [`Source`][frequenz.sdk.timeseries.Source]s don't
-    necessarily come at periodic intervals. You can use this class to normalize
-    timeseries to produce `Sample`s at regular periodic intervals.
+    In general timeseries don't necessarily come at periodic intervals. You can
+    use this class to normalize timeseries to produce `Sample`s at regular
+    periodic intervals.
 
     This class uses
     a [`ResamplingFunction`][frequenz.sdk.timeseries.ResamplingFunction] to
@@ -149,14 +152,14 @@ class Resampler:
         self._resampling_period_s = resampling_period_s
         self._max_data_age_in_periods: float = max_data_age_in_periods
         self._resampling_function: ResamplingFunction = resampling_function
-        self._resamplers: Dict[Source, _StreamingHelper] = {}
+        self._resamplers: Dict[AsyncIterator[Sample], _StreamingHelper] = {}
         self._timer: Timer = Timer(self._resampling_period_s)
 
     async def stop(self) -> None:
         """Cancel all receiving tasks."""
         await asyncio.gather(*[helper.stop() for helper in self._resamplers.values()])
 
-    def add_timeseries(self, source: Source, sink: Sink) -> bool:
+    def add_timeseries(self, source: AsyncIterator[Sample]) -> bool:
         """Start resampling a new timeseries.
 
         Args:
@@ -178,12 +181,11 @@ class Resampler:
                 resampling_function=self._resampling_function,
             ),
             source,
-            sink,
         )
         self._resamplers[source] = resampler
         return True
 
-    def remove_timeseries(self, source: Source) -> bool:
+    def remove_timeseries(self, source: AsyncIterator[Sample]) -> bool:
         """Stop resampling the timeseries produced by `source`.
 
         Args:
@@ -200,16 +202,12 @@ class Resampler:
             return False
         return True
 
-    async def resample(self, *, one_shot: bool = False) -> None:
+    async def resample(self) -> AsyncIterator[Dict[AsyncIterator[Sample], Sample]]:
         """Start resampling all known timeseries.
 
         This method will run forever unless there is an error while receiving
         from a source or sending to a sink (or `one_shot` is used).
 
-        Args:
-            one_shot: Wether the resampling should run only for one resampling
-                period.
-
         Raises:
             ResamplingError: If some timseries source or sink encounters any
                 errors while receiving or sending samples. In this case the
@@ -223,23 +221,28 @@ class Resampler:
                 *[r.resample(timer_timestamp) for r in self._resamplers.values()],
                 return_exceptions=True,
             )
-            # We need to cast this because Python is not smart enough to figure
-            # out the key value can't be None (not even adding another
-            # condition checking for None in the dict expression).
-            exceptions = cast(
-                Dict[Source, Union[Exception, asyncio.CancelledError]],
-                {
-                    source: results[i]
-                    for i, source in enumerate(self._resamplers)
-                    # CancelledError inherits from BaseException, but we don't want
-                    # to catch *all* BaseExceptions here.
-                    if isinstance(results[i], (Exception, asyncio.CancelledError))
-                },
-            )
+
+            samples: Dict[AsyncIterator[Sample], Sample] = {}
+            exceptions: Dict[
+                AsyncIterator[Sample], Union[Exception, asyncio.CancelledError]
+            ] = {}
+            for i, source in enumerate(self._resamplers):
+                if isinstance(results[i], (Exception, asyncio.CancelledError)):
+                    exceptions[source] = results[i]
+                elif isinstance(results[i], Sample):
+                    samples[source] = results[i]
+                elif isinstance(results[i], BaseException):
+                    raise results[i]
+                else:
+                    assert False, (
+                        f"Resampling result for source {source!r} "
+                        "is not an exception or Sample: {results[i]!r}"
+                    )
+
             if exceptions:
-                raise ResamplingError(exceptions)
-            if one_shot:
-                break
+                raise ResamplingError(samples, exceptions)
+
+            yield samples
 
 
 class _ResamplingHelper:
@@ -342,8 +345,7 @@ class _StreamingHelper:
     def __init__(
         self,
         helper: _ResamplingHelper,
-        source: Source,
-        sink: Sink,
+        source: AsyncIterator[Sample],
     ) -> None:
         """Initialize an instance.
 
@@ -353,8 +355,7 @@ class _StreamingHelper:
             sink: The sink to use to send the resampled data.
         """
         self._helper: _ResamplingHelper = helper
-        self._source: Source = source
-        self._sink: Sink = sink
+        self._source: AsyncIterator[Sample] = source
         self._receiving_task: asyncio.Task = asyncio.create_task(
             self._receive_samples()
         )
@@ -373,7 +374,7 @@ class _StreamingHelper:
             if sample.value is not None and not math.isnan(sample.value):
                 self._helper.add_sample(sample)
 
-    async def resample(self, timestamp: datetime) -> None:
+    def resample(self, timestamp: datetime) -> Sample:
         """Calculate a new sample for the passed `timestamp` and send it.
 
         The helper is used to calculate the new sample and the sender is used
@@ -402,4 +403,4 @@ class _StreamingHelper:
                 raise recv_exception
             raise SourceStoppedError(self._source)
 
-        await self._sink(self._helper.resample(timestamp))
+        return self._helper.resample(timestamp)

Use cases

No response

Alternatives and workarounds

From #101 (comment):

t1 = resampler.add_timeseries(...)
t2 = resampler.add_timeseries(...)

while True:
    if err := await resampler.resample():
        print(err)
        continue
    t1.consume()
    t2.consume()

Additional context

Also from @sahas-subramanian-frequenz in #101 (comment):


I spent quite some time with this and I'm not sure if going for the individual tasks design makes any sense at all if we really want to avoid callbacks at all cost, as it introduces a lot of extra unnecessary complexity. For example, one of the niceties about this approach is the resampling actor becomes really simple, as it doesn't need to keep track of senders or receivers itself. Without the callbacks, it needs to start keep tracking of the senders again, and once we do that, I'm not sure it brings any value compared to the previous approach of keeping track of both, on the contrary, it introduces some asynchrony, because now half is handled by the actor and half is handled deep in the Resampler class.

The whole point of having independent tasks handling each time series is to avoid the interaction that is being proposed if we want an "active driver" that needs to keep track of everything.

Removing the Sink sadly is turning out into another whole rewrite, as it was at the heart of the new design. To be honest, I'm also still not convinced it is justified to have such an strong no-callback policy just because it was misused in the past, but I guess that is a broader topic.

Since I also started doing some other work based on this PR, I suggest to merge it as is (if there are no other comments) and maybe do another rewrite as a follow up after I'm done with the stuff that I based on top of it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    part:data-pipelineAffects the data pipelinetype:enhancementNew feature or enhancement visitble to users

    Type

    No type

    Projects

    Status

    To do

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions