Skip to content

Commit c71ed5f

Browse files
committed
Move resampler classes to frequenz.sdk.timeseries.resampler
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent b47602a commit c71ed5f

File tree

6 files changed

+156
-189
lines changed

6 files changed

+156
-189
lines changed

src/frequenz/sdk/actor/resampling.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515

1616
from frequenz.channels import MergeNamed, Receiver, Select, Sender, Timer
1717

18-
from ..data_ingestion.resampling.component_metric_group_resampler import (
19-
ComponentMetricGroupResampler,
20-
)
21-
from ..data_ingestion.resampling.component_metric_resampler import ResamplingFunction
2218
from ..data_pipeline import ComponentMetricRequest, Sample
19+
from ..timeseries.resampler import ComponentMetricGroupResampler, ResamplingFunction
2320
from . import ChannelRegistry, actor
2421

2522
logger = logging.Logger(__name__)

src/frequenz/sdk/data_ingestion/resampling/component_metric_resampler.py

Lines changed: 0 additions & 97 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""
2+
Handling of timeseries streams.
3+
4+
A timeseries is a stream (normally an async iterator) of
5+
[samples][frequenz.sdk.timeseries.Sample].
6+
7+
This module provides tools to operate on timeseries.
8+
9+
Copyright
10+
Copyright © 2022 Frequenz Energy-as-a-Service GmbH
11+
12+
License
13+
MIT
14+
"""
15+
16+
from .resampler import (
17+
ComponentMetricGroupResampler,
18+
ComponentMetricResampler,
19+
ResamplingFunction,
20+
)
21+
22+
__all__ = [
23+
"ComponentMetricGroupResampler",
24+
"ComponentMetricResampler",
25+
"ResamplingFunction",
26+
]

src/frequenz/sdk/data_ingestion/resampling/component_metric_group_resampler.py renamed to src/frequenz/sdk/timeseries/resampler.py

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,103 @@
11
"""
2-
ComponentMetricGroupResampler class that delegates resampling to individual resamplers.
2+
Timeseries resampler.
33
44
Copyright
55
Copyright © 2022 Frequenz Energy-as-a-Service GmbH
66
77
License
88
MIT
99
"""
10+
1011
import logging
11-
from datetime import datetime
12-
from typing import Dict, Generator, Tuple
12+
from collections import deque
13+
from datetime import datetime, timedelta
14+
from typing import Callable, Deque, Dict, Generator, Optional, Sequence, Tuple
1315

1416
import pytz
1517

16-
from ...data_pipeline import Sample
17-
from .component_metric_resampler import ComponentMetricResampler, ResamplingFunction
18+
from ..data_pipeline import Sample
1819

1920
logger = logging.Logger(__name__)
2021

2122

23+
ResamplingFunction = Callable[[Sequence[Sample], float], float]
24+
25+
26+
class ComponentMetricResampler:
27+
"""Resampler for a single metric of a specific component, e.g. 123_active_power."""
28+
29+
def __init__(
30+
self,
31+
resampling_period_s: float,
32+
max_data_age_in_periods: float,
33+
resampling_function: ResamplingFunction,
34+
) -> None:
35+
"""Initialize the ComponentMetricResampler.
36+
37+
Args:
38+
resampling_period_s: value describing how often resampling should be
39+
performed, in seconds
40+
max_data_age_in_periods: max age that samples shouldn't exceed in order
41+
to be used in the resampling function
42+
resampling_function: function to be applied to a sequence of samples within
43+
a resampling period to produce a single output sample
44+
"""
45+
self._resampling_period_s = resampling_period_s
46+
self._max_data_age_in_periods: float = max_data_age_in_periods
47+
self._buffer: Deque[Sample] = deque()
48+
self._resampling_function: ResamplingFunction = resampling_function
49+
50+
def add_sample(self, sample: Sample) -> None:
51+
"""Add a new sample.
52+
53+
Args:
54+
sample: sample to be added to the buffer
55+
"""
56+
self._buffer.append(sample)
57+
58+
def remove_outdated_samples(self, threshold: datetime) -> None:
59+
"""Remove samples that are older than the provided time threshold.
60+
61+
It is assumed that items in the buffer are in a sorted order (ascending order
62+
by timestamp).
63+
64+
The removal works by traversing the buffer starting from the oldest sample
65+
(smallest timestamp) and comparing sample's timestamp with the threshold.
66+
If the sample's threshold is smaller than `threshold`, it means that the
67+
sample is outdated and it is removed from the buffer. This continues until
68+
the first sample that is with timestamp greater or equal to `threshold` is
69+
encountered, then buffer is considered up to date.
70+
71+
Args:
72+
threshold: samples whose timestamp is older than the threshold are
73+
considered outdated and should be remove from the buffer
74+
"""
75+
while self._buffer:
76+
sample: Sample = self._buffer[0]
77+
if sample.timestamp >= threshold:
78+
return
79+
80+
self._buffer.popleft()
81+
82+
def resample(self) -> Optional[float]:
83+
"""Resample samples from the buffer and produce a single sample.
84+
85+
Returns:
86+
Samples resampled into a single sample or `None` if the
87+
`resampling_function` cannot produce a valid Sample.
88+
"""
89+
# It might be better to provide `now` from the outside so that all
90+
# individual resamplers use the same `now`
91+
now = datetime.now(tz=pytz.UTC)
92+
threshold = now - timedelta(
93+
seconds=self._max_data_age_in_periods * self._resampling_period_s
94+
)
95+
self.remove_outdated_samples(threshold=threshold)
96+
if len(self._buffer) == 0:
97+
return None
98+
return self._resampling_function(self._buffer, self._resampling_period_s)
99+
100+
22101
class ComponentMetricGroupResampler:
23102
"""Class that delegates resampling to individual component metric resamplers."""
24103

tests/data_ingestion/test_group_timeseries_resampler.py

Lines changed: 0 additions & 81 deletions
This file was deleted.

tests/data_ingestion/test_timeseries_resampler.py renamed to tests/timeseries/test_resampling.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@
77
License
88
MIT
99
"""
10+
1011
from datetime import datetime, timedelta
1112
from typing import Sequence
1213

1314
import pytz
1415
import time_machine
1516

16-
from frequenz.sdk.data_ingestion.resampling.component_metric_resampler import (
17+
from frequenz.sdk.data_pipeline import Sample
18+
from frequenz.sdk.timeseries.resampler import (
19+
ComponentMetricGroupResampler,
1720
ComponentMetricResampler,
1821
)
19-
from frequenz.sdk.data_pipeline import Sample
2022

2123

2224
# pylint: disable=unused-argument
@@ -113,3 +115,44 @@ def test_component_metric_resampler_resample_with_outdated_samples() -> None:
113115
value = resampler.resample()
114116
assert value is not None
115117
assert value == sum([value1, value2])
118+
119+
120+
@time_machine.travel(datetime.now())
121+
def test_component_metric_group_resampler() -> None:
122+
"""Test if resampling is properly delegated to component metric resamplers."""
123+
resampler = ComponentMetricGroupResampler(
124+
resampling_period_s=0.2,
125+
max_data_age_in_periods=5.0,
126+
initial_resampling_function=resampling_function_sum,
127+
)
128+
129+
time_series_id_1 = "123_active_power"
130+
time_series_id_2 = "99_active_power"
131+
132+
resampler.add_time_series(time_series_id=time_series_id_1)
133+
resampler.add_time_series(time_series_id=time_series_id_2)
134+
135+
timestamp = datetime.now(tz=pytz.UTC)
136+
137+
value1 = 5.0
138+
value2 = 15.0
139+
value3 = 100.0
140+
value4 = 999.0
141+
142+
sample1 = Sample(timestamp - timedelta(seconds=0.5), value=value1)
143+
sample2 = Sample(timestamp - timedelta(seconds=0.7), value=value2)
144+
sample3 = Sample(timestamp - timedelta(seconds=5.05), value=value3)
145+
sample4 = Sample(timestamp - timedelta(seconds=0.99), value=value4)
146+
147+
resampler.add_sample(time_series_id=time_series_id_1, sample=sample1)
148+
resampler.add_sample(time_series_id=time_series_id_1, sample=sample2)
149+
resampler.add_sample(time_series_id=time_series_id_2, sample=sample3)
150+
resampler.add_sample(time_series_id=time_series_id_2, sample=sample4)
151+
152+
resampled_samples = dict(resampler.resample())
153+
154+
assert resampled_samples[time_series_id_1].timestamp >= timestamp
155+
assert resampled_samples[time_series_id_1].value == sum([value1, value2])
156+
157+
assert resampled_samples[time_series_id_2].timestamp >= timestamp
158+
assert resampled_samples[time_series_id_2].value == value4

0 commit comments

Comments
 (0)