Skip to content

Commit 5eb4655

Browse files
committed
Split the resampling module
Future changes require adding more code to this module, which is already quite big. This commit makes it a package and split the module into a few sub-modules to make it more manageable. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 1318ce4 commit 5eb4655

File tree

11 files changed

+392
-345
lines changed

11 files changed

+392
-345
lines changed

benchmarks/timeseries/resampling.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,9 @@
99

1010
from frequenz.quantities import Quantity
1111

12-
from frequenz.sdk.timeseries import Sample
13-
from frequenz.sdk.timeseries._resampling import (
14-
ResamplerConfig,
15-
SourceProperties,
16-
_ResamplingHelper,
17-
)
12+
from frequenz.sdk.timeseries import ResamplerConfig, Sample
13+
from frequenz.sdk.timeseries._resampling._base_types import SourceProperties
14+
from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper
1815

1916

2017
def nop( # pylint: disable=unused-argument

src/frequenz/sdk/actor/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ async def main() -> None: # (6)!
597597
[_run]: #the-_run-method
598598
"""
599599

600-
from ..timeseries._resampling import ResamplerConfig
600+
from ..timeseries._resampling._config import ResamplerConfig
601601
from ._actor import Actor
602602
from ._background_service import BackgroundService
603603
from ._run_utils import run

src/frequenz/sdk/microgrid/_resampling.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
from .._internal._asyncio import cancel_and_await
1515
from .._internal._channels import ChannelRegistry
16-
from ..actor import Actor
16+
from ..actor._actor import Actor
1717
from ..timeseries import Sample
18-
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
18+
from ..timeseries._resampling._config import ResamplerConfig
19+
from ..timeseries._resampling._exceptions import ResamplingError
20+
from ..timeseries._resampling._resampler import Resampler
1921
from ._data_sourcing import ComponentMetricRequest
2022

2123
_logger = logging.getLogger(__name__)

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from ._fuse import Fuse
4141
from ._moving_window import MovingWindow
4242
from ._periodic_feature_extractor import PeriodicFeatureExtractor
43-
from ._resampling import ResamplerConfig
43+
from ._resampling._config import ResamplerConfig
4444

4545
__all__ = [
4646
"Bounds",

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
from ..actor._background_service import BackgroundService
2020
from ._base_types import UNIX_EPOCH, Sample
21-
from ._resampling import Resampler, ResamplerConfig
21+
from ._resampling._config import ResamplerConfig
22+
from ._resampling._resampler import Resampler
2223
from ._ringbuffer import OrderedRingBuffer
2324

2425
_logger = logging.getLogger(__name__)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Timeseries resampling package."""
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Resampler base types."""
5+
6+
from collections.abc import AsyncIterator, Callable, Coroutine
7+
from dataclasses import dataclass
8+
from datetime import datetime, timedelta
9+
10+
from frequenz.quantities import Quantity
11+
12+
from .._base_types import Sample
13+
14+
Source = AsyncIterator[Sample[Quantity]]
15+
"""A source for a timeseries.
16+
17+
A timeseries can be received sample by sample in a streaming way
18+
using a source.
19+
"""
20+
21+
Sink = Callable[[Sample[Quantity]], Coroutine[None, None, None]]
22+
"""A sink for a timeseries.
23+
24+
A new timeseries can be generated by sending samples to a sink.
25+
26+
This should be an `async` callable, for example:
27+
28+
```python
29+
async some_sink(Sample) -> None:
30+
...
31+
```
32+
33+
Args:
34+
sample (Sample): A sample to be sent out.
35+
"""
36+
37+
38+
@dataclass
39+
class SourceProperties:
40+
"""Properties of a resampling source."""
41+
42+
sampling_start: datetime | None = None
43+
"""The time when resampling started for this source.
44+
45+
`None` means it didn't started yet.
46+
"""
47+
48+
received_samples: int = 0
49+
"""Total samples received by this source so far."""
50+
51+
sampling_period: timedelta | None = None
52+
"""The sampling period of this source.
53+
54+
This means we receive (on average) one sample for this source every
55+
`sampling_period` time.
56+
57+
`None` means it is unknown.
58+
"""
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Resampler configuration."""
5+
6+
from __future__ import annotations
7+
8+
import logging
9+
from collections.abc import Callable, Sequence
10+
from dataclasses import dataclass
11+
from datetime import datetime, timedelta
12+
13+
from frequenz.quantities import Quantity
14+
15+
from .._base_types import UNIX_EPOCH, QuantityT, Sample
16+
from ._base_types import SourceProperties
17+
18+
_logger = logging.getLogger(__name__)
19+
20+
21+
DEFAULT_BUFFER_LEN_INIT = 16
22+
"""Default initial buffer length.
23+
24+
Buffers will be created initially with this length, but they could grow or
25+
shrink depending on the source properties, like sampling rate, to make
26+
sure all the requested past sampling periods can be stored.
27+
"""
28+
29+
30+
DEFAULT_BUFFER_LEN_MAX = 1024
31+
"""Default maximum allowed buffer length.
32+
33+
If a buffer length would get bigger than this, it will be truncated to this
34+
length.
35+
"""
36+
37+
38+
DEFAULT_BUFFER_LEN_WARN = 128
39+
"""Default minimum buffer length that will produce a warning.
40+
41+
If a buffer length would get bigger than this, a warning will be logged.
42+
"""
43+
44+
45+
ResamplingFunction = Callable[
46+
[Sequence[Sample[Quantity]], "ResamplerConfig", "SourceProperties"], float
47+
]
48+
"""Resampling function type.
49+
50+
A resampling function produces a new sample based on a list of pre-existing
51+
samples. It can do "upsampling" when the data rate of the `input_samples`
52+
period is smaller than the `resampling_period`, or "downsampling" if it is
53+
bigger.
54+
55+
In general a resampling window is the same as the `resampling_period`, and
56+
this function might receive input samples from multiple windows in the past to
57+
enable extrapolation, but no samples from the future (so the timestamp of the
58+
new sample that is going to be produced will always be bigger than the biggest
59+
timestamp in the input data).
60+
61+
Args:
62+
input_samples (Sequence[Sample]): The sequence of pre-existing samples.
63+
resampler_config (ResamplerConfig): The configuration of the resampling
64+
calling this function.
65+
source_properties (SourceProperties): The properties of the source being
66+
resampled.
67+
68+
Returns:
69+
new_sample (float): The value of new sample produced after the resampling.
70+
"""
71+
72+
73+
# pylint: disable=unused-argument
74+
def average(
75+
samples: Sequence[Sample[QuantityT]],
76+
resampler_config: ResamplerConfig,
77+
source_properties: SourceProperties,
78+
) -> float:
79+
"""Calculate average of all the provided values.
80+
81+
Args:
82+
samples: The samples to apply the average to. It must be non-empty.
83+
resampler_config: The configuration of the resampler calling this
84+
function.
85+
source_properties: The properties of the source being resampled.
86+
87+
Returns:
88+
The average of all `samples` values.
89+
"""
90+
assert len(samples) > 0, "Average cannot be given an empty list of samples"
91+
values = list(
92+
sample.value.base_value for sample in samples if sample.value is not None
93+
)
94+
return sum(values) / len(values)
95+
96+
97+
@dataclass(frozen=True)
98+
class ResamplerConfig:
99+
"""Resampler configuration."""
100+
101+
resampling_period: timedelta
102+
"""The resampling period.
103+
104+
This is the time it passes between resampled data should be calculated.
105+
106+
It must be a positive time span.
107+
"""
108+
109+
max_data_age_in_periods: float = 3.0
110+
"""The maximum age a sample can have to be considered *relevant* for resampling.
111+
112+
Expressed in number of periods, where period is the `resampling_period`
113+
if we are downsampling (resampling period bigger than the input period) or
114+
the *input sampling period* if we are upsampling (input period bigger than
115+
the resampling period).
116+
117+
It must be bigger than 1.0.
118+
119+
Example:
120+
If `resampling_period` is 3 seconds, the input sampling period is
121+
1 and `max_data_age_in_periods` is 2, then data older than 3*2
122+
= 6 seconds will be discarded when creating a new sample and never
123+
passed to the resampling function.
124+
125+
If `resampling_period` is 3 seconds, the input sampling period is
126+
5 and `max_data_age_in_periods` is 2, then data older than 5*2
127+
= 10 seconds will be discarded when creating a new sample and never
128+
passed to the resampling function.
129+
"""
130+
131+
resampling_function: ResamplingFunction = average
132+
"""The resampling function.
133+
134+
This function will be applied to the sequence of relevant samples at
135+
a given time. The result of the function is what is sent as the resampled
136+
value.
137+
"""
138+
139+
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
140+
"""The initial length of the resampling buffer.
141+
142+
The buffer could grow or shrink depending on the source properties,
143+
like sampling rate, to make sure all the requested past sampling periods
144+
can be stored.
145+
146+
It must be at least 1 and at most `max_buffer_len`.
147+
"""
148+
149+
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
150+
"""The minimum length of the resampling buffer that will emit a warning.
151+
152+
If a buffer grows bigger than this value, it will emit a warning in the
153+
logs, so buffers don't grow too big inadvertently.
154+
155+
It must be at least 1 and at most `max_buffer_len`.
156+
"""
157+
158+
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
159+
"""The maximum length of the resampling buffer.
160+
161+
Buffers won't be allowed to grow beyond this point even if it would be
162+
needed to keep all the requested past sampling periods. An error will be
163+
emitted in the logs if the buffer length needs to be truncated to this
164+
value.
165+
166+
It must be at bigger than `warn_buffer_len`.
167+
"""
168+
169+
align_to: datetime | None = UNIX_EPOCH
170+
"""The time to align the resampling period to.
171+
172+
The resampling period will be aligned to this time, so the first resampled
173+
sample will be at the first multiple of `resampling_period` starting from
174+
`align_to`. It must be an aware datetime and can be in the future too.
175+
176+
If `align_to` is `None`, the resampling period will be aligned to the
177+
time the resampler is created.
178+
"""
179+
180+
def __post_init__(self) -> None:
181+
"""Check that config values are valid.
182+
183+
Raises:
184+
ValueError: If any value is out of range.
185+
"""
186+
if self.resampling_period.total_seconds() < 0.0:
187+
raise ValueError(
188+
f"resampling_period ({self.resampling_period}) must be positive"
189+
)
190+
if self.max_data_age_in_periods < 1.0:
191+
raise ValueError(
192+
f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0"
193+
)
194+
if self.warn_buffer_len < 1:
195+
raise ValueError(
196+
f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1"
197+
)
198+
if self.max_buffer_len <= self.warn_buffer_len:
199+
raise ValueError(
200+
f"max_buffer_len ({self.max_buffer_len}) should "
201+
f"be bigger than warn_buffer_len ({self.warn_buffer_len})"
202+
)
203+
204+
if self.initial_buffer_len < 1:
205+
raise ValueError(
206+
f"initial_buffer_len ({self.initial_buffer_len}) should at least 1"
207+
)
208+
if self.initial_buffer_len > self.max_buffer_len:
209+
raise ValueError(
210+
f"initial_buffer_len ({self.initial_buffer_len}) is bigger "
211+
f"than max_buffer_len ({self.max_buffer_len}), use a smaller "
212+
"initial_buffer_len or a bigger max_buffer_len"
213+
)
214+
if self.initial_buffer_len > self.warn_buffer_len:
215+
_logger.warning(
216+
"initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
217+
self.initial_buffer_len,
218+
self.warn_buffer_len,
219+
)
220+
if self.align_to is not None and self.align_to.tzinfo is None:
221+
raise ValueError(
222+
f"align_to ({self.align_to}) should be a timezone aware datetime"
223+
)

0 commit comments

Comments
 (0)