Skip to content

Commit 3513f90

Browse files
committed
Calculate the input sampling period
The input sampling period is calculated by counting the total received samples, and dividing the total resampling time until the internal buffer is full by the total received samples. To store all the source properties (sampling period, sampling start, total received samples) a new class is used (SourceProperties) and this information is passed to the resampling function, so it can have more information about the source to perform a proper resampling. This calculation is performed once and then remains constant, but at some point we could add a timer to re-calculate it. This also improves slightly the documentation and validation of the ResamplingConfig class. The add_timeseries() methods now also takes a name as a way to identify different sources in log messages (the actor uses the channel name as timeseries name). Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 34a1dd5 commit 3513f90

File tree

5 files changed

+293
-82
lines changed

5 files changed

+293
-82
lines changed

benchmarks/timeseries/resampling.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,17 @@
88
from typing import Sequence
99

1010
from frequenz.sdk.timeseries import Sample
11-
from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper
11+
from frequenz.sdk.timeseries._resampling import (
12+
ResamplerConfig,
13+
SourceProperties,
14+
_ResamplingHelper,
15+
)
1216

1317

1418
def nop( # pylint: disable=unused-argument
15-
samples: Sequence[Sample], resampler_config: ResamplerConfig
19+
samples: Sequence[Sample],
20+
resampler_config: ResamplerConfig,
21+
source_properties: SourceProperties,
1622
) -> float:
1723
"""Return 0.0."""
1824
return 0.0
@@ -21,14 +27,15 @@ def nop( # pylint: disable=unused-argument
2127
def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
2228
"""Benchmark the resampling helper."""
2329
helper = _ResamplingHelper(
30+
"benchmark",
2431
ResamplerConfig(
2532
resampling_period_s=1.0,
2633
max_data_age_in_periods=3.0,
2734
resampling_function=nop,
2835
initial_buffer_len=samples * 3,
29-
max_buffer_len=samples * 3,
30-
warn_buffer_len=samples * 3,
31-
)
36+
warn_buffer_len=samples * 3 + 2,
37+
max_buffer_len=samples * 3 + 3,
38+
),
3239
)
3340
now = datetime.now(timezone.utc)
3441

examples/resampling.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ async def run() -> None: # pylint: disable=too-many-locals
105105
average_chan = Broadcast[Sample]("average")
106106

107107
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
108-
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)
108+
second_stage_resampler.add_timeseries(
109+
"avg", average_chan.new_receiver(), _print_sample
110+
)
109111

110112
average_sender = average_chan.new_sender()
111113
# Needed until channels Senders raises exceptions on errors

src/frequenz/sdk/actor/_resampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async def sink_adapter(sample: Sample) -> None:
8585
if not await sender.send(sample):
8686
raise Exception(f"Error while sending with sender {sender}", sender)
8787

88-
self._resampler.add_timeseries(receiver, sink_adapter)
88+
self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)
8989

9090
async def _process_resampling_requests(self) -> None:
9191
"""Process resampling data requests."""

0 commit comments

Comments
 (0)