1313from typing import SupportsIndex , overload
1414
1515import numpy as np
16- from frequenz .channels import Receiver
16+ from frequenz .channels import Broadcast , Receiver , Sender
1717from numpy .typing import ArrayLike
1818
1919from .._internal .asyncio import cancel_and_await
2020from . import Sample
21+ from ._resampling import Resampler , ResamplerConfig
2122from ._ringbuffer import OrderedRingBuffer
2223
2324log = logging .getLogger (__name__ )
@@ -46,6 +47,13 @@ class MovingWindow:
4647 [`OrderedRingBuffer`][frequenz.sdk.timeseries._ringbuffer.OrderedRingBuffer]
4748 documentation.
4849
50+ Resampling might be required to reduce the number of samples to store, and
51+ it can be set by specifying the resampler config parameter so that the user
52+ can control the granularity of the samples to be stored in the underlying
53+ buffer.
54+
55+ If resampling is not required, the resampler config parameter can be
56+ set to None in which case the MovingWindow will not perform any resampling.
4957
5058 **Example1** (calculating the mean of a time interval):
5159
@@ -86,11 +94,12 @@ class MovingWindow:
8694 ```
8795 """
8896
89- def __init__ (
97+ def __init__ ( # pylint: disable=too-many-arguments
9098 self ,
9199 size : timedelta ,
92100 resampled_data_recv : Receiver [Sample ],
93- sampling_period : timedelta ,
101+ input_sampling_period : timedelta ,
102+ resampler_config : ResamplerConfig | None = None ,
94103 window_alignment : datetime = datetime (1 , 1 , 1 ),
95104 ) -> None :
96105 """
@@ -104,7 +113,8 @@ def __init__(
104113 size: The time span of the moving window over which samples will be stored.
105114 resampled_data_recv: A receiver that delivers samples with a
106115 given sampling period.
107- sampling_period: The sampling period.
116+ input_sampling_period: The time interval between consecutive input samples.
117+ resampler_config: The resampler configuration in case resampling is required.
108118 window_alignment: A datetime object that defines a point in time to which
109119 the window is aligned to modulo window size.
110120 (default is midnight 01.01.01)
@@ -114,26 +124,42 @@ def __init__(
114124 asyncio.CancelledError: when the task gets cancelled.
115125 """
116126 assert (
117- sampling_period .total_seconds () > 0
118- ), "The sampling period should be greater than zero."
127+ input_sampling_period .total_seconds () > 0
128+ ), "The input sampling period should be greater than zero."
119129 assert (
120- sampling_period <= size
121- ), "The sampling period should be equal to or lower than the window size."
130+ input_sampling_period <= size
131+ ), "The input sampling period should be equal to or lower than the window size."
132+
133+ sampling = input_sampling_period
134+ self ._resampler : Resampler | None = None
135+ self ._resampler_sender : Sender [Sample ] | None = None
136+ self ._resampler_task : asyncio .Task [None ] | None = None
137+
138+ if resampler_config :
139+ resampling_period = timedelta (seconds = resampler_config .resampling_period_s )
140+ assert (
141+ resampling_period <= size
142+ ), "The resampling period should be equal to or lower than the window size."
143+
144+ self ._resampler = Resampler (resampler_config )
145+ sampling = resampling_period
122146
123147 # Sampling period might not fit perfectly into the window size.
124- num_samples = math .ceil (size / sampling_period )
148+ num_samples = math .ceil (size . total_seconds () / sampling . total_seconds () )
125149
126150 self ._resampled_data_recv = resampled_data_recv
127151 self ._buffer = OrderedRingBuffer (
128152 np .empty (shape = num_samples , dtype = float ),
129- sampling_period = sampling_period ,
153+ sampling_period = sampling ,
130154 time_index_alignment = window_alignment ,
131155 )
132156
157+ if self ._resampler :
158+ self ._configure_resampler ()
159+
133160 self ._update_window_task : asyncio .Task [None ] = asyncio .create_task (
134161 self ._run_impl ()
135162 )
136- log .debug ("Cancelling MovingWindow task: %s" , __name__ )
137163
138164 async def _run_impl (self ) -> None :
139165 """Awaits samples from the receiver and updates the underlying ringbuffer.
@@ -144,16 +170,37 @@ async def _run_impl(self) -> None:
144170 try :
145171 async for sample in self ._resampled_data_recv :
146172 log .debug ("Received new sample: %s" , sample )
147- self ._buffer .update (sample )
173+ if self ._resampler and self ._resampler_sender :
174+ await self ._resampler_sender .send (sample )
175+ else :
176+ self ._buffer .update (sample )
177+
148178 except asyncio .CancelledError :
149179 log .info ("MovingWindow task has been cancelled." )
150180 raise
151181
152182 log .error ("Channel has been closed" )
153183
154184 async def stop (self ) -> None :
155- """Cancel the running task and stop the MovingWindow."""
185+ """Cancel the running tasks and stop the MovingWindow."""
156186 await cancel_and_await (self ._update_window_task )
187+ if self ._resampler_task :
188+ await cancel_and_await (self ._resampler_task )
189+
190+ def _configure_resampler (self ) -> None :
191+ """Configure the components needed to run the resampler."""
192+ assert self ._resampler is not None
193+
194+ async def sink_buffer (sample : Sample ) -> None :
195+ if sample .value is not None :
196+ self ._buffer .update (sample )
197+
198+ resampler_channel = Broadcast [Sample ]("average" )
199+ self ._resampler_sender = resampler_channel .new_sender ()
200+ self ._resampler .add_timeseries (
201+ "avg" , resampler_channel .new_receiver (), sink_buffer
202+ )
203+ self ._resampler_task = asyncio .create_task (self ._resampler .resample ())
157204
158205 def __len__ (self ) -> int :
159206 """
0 commit comments