Skip to content

Commit a0f4657

Browse files
committed
Move resampler configuration to its own class
This makes user code a little bit more verbose but makes the code much more maintainable, as we avoid having to copy and passing around a lot of configuration variables that are only (or mostly) only relevant to the internal resampling class. It also removes a lot of documentation duplication that can get easily out of sync and cause confusion. And we'll add quite a few more configuration variables in subsequent commits, which will just exacerbate the above mentioned issues without a config class. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 5c110e5 commit a0f4657

File tree

7 files changed

+97
-109
lines changed

7 files changed

+97
-109
lines changed

examples/resampling.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
)
1919
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
2020
from frequenz.sdk.timeseries import Sample
21-
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source
21+
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source
2222

2323
HOST = "microgrid.sandbox.api.frequenz.io"
2424
PORT = 61060
@@ -65,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals
6565
channel_registry=channel_registry,
6666
data_sourcing_request_sender=data_source_request_sender,
6767
resampling_request_receiver=resampling_request_receiver,
68-
resampling_period_s=1,
68+
config=ResamplerConfig(resampling_period_s=1),
6969
)
7070

7171
components = await microgrid.get().api_client.components()
@@ -104,7 +104,7 @@ async def run() -> None: # pylint: disable=too-many-locals
104104
# Create a channel to calculate an average for all the data
105105
average_chan = Broadcast[Sample]("average")
106106

107-
second_stage_resampler = Resampler(resampling_period_s=3.0)
107+
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
108108
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)
109109

110110
average_sender = average_chan.new_sender()

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
from ._config_managing import ConfigManagingActor
88
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
99
from ._decorator import actor
10-
from ._resampling import ComponentMetricsResamplingActor
10+
from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig
1111

1212
__all__ = [
1313
"ChannelRegistry",
1414
"ComponentMetricRequest",
1515
"ComponentMetricsResamplingActor",
1616
"ConfigManagingActor",
1717
"DataSourcingActor",
18+
"ResamplerConfig",
1819
"actor",
1920
]

src/frequenz/sdk/actor/_resampling.py

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
from frequenz.sdk.util.asyncio import cancel_and_await
1515

1616
from ..timeseries import Sample
17-
from ..timeseries._resampling import (
18-
Resampler,
19-
ResamplingError,
20-
ResamplingFunction,
21-
average,
22-
)
17+
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
2318
from ._channel_registry import ChannelRegistry
2419
from ._data_sourcing import ComponentMetricRequest
2520
from ._decorator import actor
@@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments
3732
channel_registry: ChannelRegistry,
3833
data_sourcing_request_sender: Sender[ComponentMetricRequest],
3934
resampling_request_receiver: Receiver[ComponentMetricRequest],
40-
resampling_period_s: float = 0.2,
41-
max_data_age_in_periods: float = 3.0,
42-
resampling_function: ResamplingFunction = average,
35+
config: ResamplerConfig,
4336
) -> None:
4437
"""Initialize an instance.
4538
@@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments
5144
to subscribe to component metrics.
5245
resampling_request_receiver: The receiver to use to receive new
5346
resampmling subscription requests.
54-
resampling_period_s: The time it passes between resampled data
55-
should be calculated (in seconds).
56-
max_data_age_in_periods: The maximum age a sample can have to be
57-
considered *relevant* for resampling purposes, expressed in the
58-
number of resampling periods. For exapmle is
59-
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
60-
then data older than `3*2 = 6` secods will be discarded when
61-
creating a new sample and never passed to the resampling
62-
function.
63-
resampling_function: The function to be applied to the sequence of
64-
*relevant* samples at a given time. The result of the function
65-
is what is sent as the resampled data.
47+
config: The configuration for the resampler.
6648
"""
6749
self._channel_registry: ChannelRegistry = channel_registry
68-
self._resampling_period_s: float = resampling_period_s
69-
self._max_data_age_in_periods: float = max_data_age_in_periods
70-
self._resampling_function: ResamplingFunction = resampling_function
7150
self._data_sourcing_request_sender: Sender[
7251
ComponentMetricRequest
7352
] = data_sourcing_request_sender
7453
self._resampling_request_receiver: Receiver[
7554
ComponentMetricRequest
7655
] = resampling_request_receiver
77-
self._resampler: Resampler = Resampler(
78-
resampling_period_s=resampling_period_s,
79-
max_data_age_in_periods=max_data_age_in_periods,
80-
resampling_function=resampling_function,
81-
)
56+
self._resampler: Resampler = Resampler(config)
8257
self._active_req_channels: set[str] = set()
8358

8459
async def _subscribe(self, request: ComponentMetricRequest) -> None:

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 51 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import logging
1010
import math
1111
from collections import deque
12+
from dataclasses import dataclass
1213
from datetime import datetime, timedelta
1314
from typing import AsyncIterator, Callable, Coroutine, Sequence
1415

@@ -85,6 +86,34 @@ def average(samples: Sequence[Sample], resampling_period_s: float) -> float:
8586
return sum(values) / len(values)
8687

8788

89+
@dataclass(frozen=True)
90+
class ResamplerConfig:
91+
"""Resampler configuration."""
92+
93+
resampling_period_s: float
94+
"""The resapmling period in seconds.
95+
96+
This is the time it passes between resampled data should be calculated.
97+
"""
98+
99+
max_data_age_in_periods: float = 3.0
100+
"""The maximum age a sample can have to be considered *relevant* for resampling.
101+
102+
Expressed in number of resampling periods. For example if
103+
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data
104+
older than 3*2 = 6 secods will be discarded when creating a new sample and
105+
never passed to the resampling function.
106+
"""
107+
108+
resampling_function: ResamplingFunction = average
109+
"""The resampling function.
110+
111+
This function will be applied to the sequence of relevant samples at
112+
a given time. The result of the function is what is sent as the resampled
113+
value.
114+
"""
115+
116+
88117
class SourceStoppedError(RuntimeError):
89118
"""A timeseries stopped producing samples."""
90119

@@ -166,34 +195,24 @@ class Resampler:
166195
no way to produce meaningful samples with the available data.
167196
"""
168197

169-
def __init__(
170-
self,
171-
*,
172-
resampling_period_s: float,
173-
resampling_function: ResamplingFunction = average,
174-
max_data_age_in_periods: float = 3.0,
175-
) -> None:
198+
def __init__(self, config: ResamplerConfig) -> None:
176199
"""Initialize an instance.
177200
178201
Args:
179-
resampling_period_s: The time it passes between resampled data
180-
should be calculated (in seconds).
181-
max_data_age_in_periods: The maximum age a sample can have to be
182-
considered *relevant* for resampling purposes, expressed in the
183-
number of resampling periods. For exapmle is
184-
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
185-
then data older than `3*2 = 6` secods will be discarded when
186-
creating a new sample and never passed to the resampling
187-
function.
188-
resampling_function: The function to be applied to the sequence of
189-
*relevant* samples at a given time. The result of the function
190-
is what is sent as the resampled data.
202+
config: The configuration for the resampler.
191203
"""
192-
self._resampling_period_s = resampling_period_s
193-
self._max_data_age_in_periods: float = max_data_age_in_periods
194-
self._resampling_function: ResamplingFunction = resampling_function
204+
self._config = config
195205
self._resamplers: dict[Source, _StreamingHelper] = {}
196-
self._timer: Timer = Timer(self._resampling_period_s)
206+
self._timer: Timer = Timer(config.resampling_period_s)
207+
208+
@property
209+
def config(self) -> ResamplerConfig:
210+
"""Get the resampler configuration.
211+
212+
Returns:
213+
The resampler configuration.
214+
"""
215+
return self._config
197216

198217
async def stop(self) -> None:
199218
"""Cancel all receiving tasks."""
@@ -214,15 +233,7 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool:
214233
if source in self._resamplers:
215234
return False
216235

217-
resampler = _StreamingHelper(
218-
_ResamplingHelper(
219-
resampling_period_s=self._resampling_period_s,
220-
max_data_age_in_periods=self._max_data_age_in_periods,
221-
resampling_function=self._resampling_function,
222-
),
223-
source,
224-
sink,
225-
)
236+
resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink)
226237
self._resamplers[source] = resampler
227238
return True
228239

@@ -288,33 +299,14 @@ class _ResamplingHelper:
288299
when calling the `resample()` method. All older samples are discarded.
289300
"""
290301

291-
def __init__(
292-
self,
293-
*,
294-
resampling_period_s: float,
295-
max_data_age_in_periods: float,
296-
resampling_function: ResamplingFunction,
297-
) -> None:
302+
def __init__(self, config: ResamplerConfig) -> None:
298303
"""Initialize an instance.
299304
300305
Args:
301-
resampling_period_s: The time it passes between resampled data
302-
should be calculated (in seconds).
303-
max_data_age_in_periods: The maximum age a sample can have to be
304-
considered *relevant* for resampling purposes, expressed in the
305-
number of resampling periods. For exapmle is
306-
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
307-
then data older than 3*2 = 6 secods will be discarded when
308-
creating a new sample and never passed to the resampling
309-
function.
310-
resampling_function: The function to be applied to the sequence of
311-
relevant samples at a given time. The result of the function is
312-
what is sent as the resampled data.
306+
config: The configuration for the resampler.
313307
"""
314-
self._resampling_period_s = resampling_period_s
315-
self._max_data_age_in_periods: float = max_data_age_in_periods
308+
self._config = config
316309
self._buffer: deque[Sample] = deque()
317-
self._resampling_function: ResamplingFunction = resampling_function
318310

319311
def add_sample(self, sample: Sample) -> None:
320312
"""Add a new sample to the internal buffer.
@@ -361,14 +353,17 @@ def resample(self, timestamp: datetime) -> Sample:
361353
have `None` as `value`.
362354
"""
363355
threshold = timestamp - timedelta(
364-
seconds=self._max_data_age_in_periods * self._resampling_period_s
356+
seconds=self._config.max_data_age_in_periods
357+
* self._config.resampling_period_s
365358
)
366359
self._remove_outdated_samples(threshold=threshold)
367360

368361
value = (
369362
None
370363
if not self._buffer
371-
else self._resampling_function(self._buffer, self._resampling_period_s)
364+
else self._config.resampling_function(
365+
self._buffer, self._config.resampling_period_s
366+
)
372367
)
373368
return Sample(timestamp, value)
374369

tests/actor/test_resampling.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ChannelRegistry,
1717
ComponentMetricRequest,
1818
ComponentMetricsResamplingActor,
19+
ResamplerConfig,
1920
)
2021
from frequenz.sdk.microgrid.component import ComponentMetricId
2122
from frequenz.sdk.timeseries import Sample
@@ -123,8 +124,10 @@ async def test_single_request(
123124
channel_registry=channel_registry,
124125
data_sourcing_request_sender=data_source_req_chan.new_sender(),
125126
resampling_request_receiver=resampling_req_chan.new_receiver(),
126-
resampling_period_s=0.2,
127-
max_data_age_in_periods=2,
127+
config=ResamplerConfig(
128+
resampling_period_s=0.2,
129+
max_data_age_in_periods=2,
130+
),
128131
)
129132

130133
subs_req = ComponentMetricRequest(
@@ -167,8 +170,10 @@ async def test_duplicate_request(
167170
channel_registry=channel_registry,
168171
data_sourcing_request_sender=data_source_req_chan.new_sender(),
169172
resampling_request_receiver=resampling_req_chan.new_receiver(),
170-
resampling_period_s=0.2,
171-
max_data_age_in_periods=2,
173+
config=ResamplerConfig(
174+
resampling_period_s=0.2,
175+
max_data_age_in_periods=2,
176+
),
172177
)
173178

174179
subs_req = ComponentMetricRequest(

tests/timeseries/mock_microgrid.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ComponentMetricRequest,
2828
ComponentMetricsResamplingActor,
2929
DataSourcingActor,
30+
ResamplerConfig,
3031
)
3132
from tests.microgrid import mock_api
3233

@@ -236,7 +237,7 @@ async def _init_client_and_actors(
236237
channel_registry=channel_registry,
237238
data_sourcing_request_sender=data_source_request_sender,
238239
resampling_request_receiver=resampling_actor_request_receiver,
239-
resampling_period_s=0.1,
240+
config=ResamplerConfig(resampling_period_s=0.1),
240241
)
241242

242243
return (resampling_actor_request_sender, channel_registry)

0 commit comments

Comments
 (0)