Skip to content

Commit c67a6aa

Browse files
committed
Make the resampling period a timedelta
The `ResamplerConfig` now takes the resampling period as a `timedelta`. The configuration was renamed from `resampling_period_s` to `resampling_period` accordingly. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 327db6a commit c67a6aa

File tree

15 files changed

+91
-74
lines changed

15 files changed

+91
-74
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Notice that the parameter `sampling_period` has been renamed to `input_sampling_period`
1919
to better distinguish it from the sampling period parameter in the resampler.
2020
* The serialization feature for the ringbuffer was made more flexible. The `dump` and `load` methods can now work directly with a ringbuffer instance.
21+
* The `ResamplerConfig` now takes the resampling period as a `timedelta`. The configuration was renamed from `resampling_period_s` to `resampling_period` accordingly.
2122

2223
## New Features
2324

benchmarks/power_distribution/power_distributor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import random
99
import timeit
1010
from dataclasses import dataclass
11+
from datetime import timedelta
1112
from typing import Any, Coroutine, Dict, List, Set # pylint: disable=unused-import
1213

1314
from frequenz.channels import Bidirectional, Broadcast
@@ -154,7 +155,9 @@ async def run() -> None:
154155
"""Create microgrid api and run tests."""
155156
# pylint: disable=protected-access
156157

157-
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=1.0))
158+
await microgrid.initialize(
159+
HOST, PORT, ResamplerConfig(resampling_period=timedelta(seconds=1.0))
160+
)
158161

159162
all_batteries: Set[Component] = connection_manager.get().component_graph.components(
160163
component_category={ComponentCategory.BATTERY}

benchmarks/timeseries/resampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
2929
helper = _ResamplingHelper(
3030
"benchmark",
3131
ResamplerConfig(
32-
resampling_period_s=1.0,
32+
resampling_period=timedelta(seconds=1.0),
3333
max_data_age_in_periods=3.0,
3434
resampling_function=nop,
3535
initial_buffer_len=samples * 3,

examples/battery_pool.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import asyncio
99
import logging
10+
from datetime import timedelta
1011
from typing import Any, Dict
1112

1213
from frequenz.channels import Receiver
@@ -25,7 +26,9 @@ async def main() -> None:
2526
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
2627
)
2728
await microgrid.initialize(
28-
host=HOST, port=PORT, resampler_config=ResamplerConfig(resampling_period_s=1.0)
29+
host=HOST,
30+
port=PORT,
31+
resampler_config=ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
2932
)
3033

3134
battery_pool = microgrid.battery_pool()

examples/battery_status.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import asyncio
1111
import logging
12+
from datetime import timedelta
1213

1314
from frequenz.channels import Broadcast
1415

@@ -31,7 +32,9 @@ async def main() -> None:
3132
logging.basicConfig(
3233
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
3334
)
34-
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=1.0))
35+
await microgrid.initialize(
36+
HOST, PORT, ResamplerConfig(resampling_period=timedelta(seconds=1.0))
37+
)
3538
batteries = {
3639
bat.component_id
3740
for bat in connection_manager.get().component_graph.components(

examples/power_distribution.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
import asyncio
1414
import logging
15-
from datetime import datetime, timezone
15+
from datetime import datetime, timedelta, timezone
1616
from queue import Queue
1717
from typing import List, Optional, Set
1818

@@ -159,7 +159,9 @@ async def run() -> None:
159159
logging.basicConfig(
160160
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
161161
)
162-
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=1.0))
162+
await microgrid.initialize(
163+
HOST, PORT, ResamplerConfig(resampling_period=timedelta(seconds=1.0))
164+
)
163165

164166
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
165167

@@ -180,7 +182,7 @@ async def run() -> None:
180182
channel_registry=channel_registry,
181183
data_sourcing_request_sender=data_source_request_channel.new_sender(),
182184
resampling_request_receiver=resampling_actor_request_channel.new_receiver(),
183-
config=ResamplerConfig(resampling_period_s=1.0),
185+
config=ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
184186
)
185187

186188
logical_meter = LogicalMeter(

examples/resampling.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""Frequenz Python SDK resampling example."""
55

66
import asyncio
7-
from datetime import datetime, timezone
7+
from datetime import datetime, timedelta, timezone
88

99
from frequenz.channels import Broadcast
1010
from frequenz.channels.util import Merge
@@ -43,7 +43,9 @@ async def _print_sample(sample: Sample) -> None:
4343

4444
async def run() -> None: # pylint: disable=too-many-locals
4545
"""Run main functions that initializes and creates everything."""
46-
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=0.2))
46+
await microgrid.initialize(
47+
HOST, PORT, ResamplerConfig(resampling_period=timedelta(seconds=0.2))
48+
)
4749

4850
channel_registry = ChannelRegistry(name="data-registry")
4951

@@ -66,7 +68,7 @@ async def run() -> None: # pylint: disable=too-many-locals
6668
channel_registry=channel_registry,
6769
data_sourcing_request_sender=data_source_request_sender,
6870
resampling_request_receiver=resampling_request_receiver,
69-
config=ResamplerConfig(resampling_period_s=1),
71+
config=ResamplerConfig(resampling_period=timedelta(seconds=1)),
7072
)
7173

7274
components = await connection_manager.get().api_client.components()
@@ -105,7 +107,9 @@ async def run() -> None: # pylint: disable=too-many-locals
105107
# Create a channel to calculate an average for all the data
106108
average_chan = Broadcast[Sample]("average")
107109

108-
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
110+
second_stage_resampler = Resampler(
111+
ResamplerConfig(resampling_period=timedelta(seconds=3.0))
112+
)
109113
second_stage_resampler.add_timeseries(
110114
"avg", average_chan.new_receiver(), _print_sample
111115
)

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import typing
1414
from collections import abc
1515
from dataclasses import dataclass
16-
from datetime import timedelta
1716

1817
from frequenz.channels import Bidirectional, Broadcast, Sender
1918

@@ -175,9 +174,7 @@ def battery_pool(
175174
batteries_status_receiver=self._battery_status_channel.new_receiver(
176175
maxsize=1
177176
),
178-
min_update_interval=timedelta(
179-
seconds=self._resampler_config.resampling_period_s
180-
),
177+
min_update_interval=self._resampler_config.resampling_period,
181178
batteries_id=battery_ids,
182179
)
183180

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,12 @@ def __init__( # pylint: disable=too-many-arguments
142142
self._resampler_task: asyncio.Task[None] | None = None
143143

144144
if resampler_config:
145-
resampling_period = timedelta(seconds=resampler_config.resampling_period_s)
146145
assert (
147-
resampling_period <= size
146+
resampler_config.resampling_period <= size
148147
), "The resampling period should be equal to or lower than the window size."
149148

150149
self._resampler = Resampler(resampler_config)
151-
sampling = resampling_period
150+
sampling = resampler_config.resampling_period
152151

153152
# Sampling period might not fit perfectly into the window size.
154153
num_samples = math.ceil(size.total_seconds() / sampling.total_seconds())

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@
7676
7777
A resampling function produces a new sample based on a list of pre-existing
7878
samples. It can do "upsampling" when there data rate of the `input_samples`
79-
period is smaller than the `resampling_period_s`, or "downsampling" if it is
79+
period is smaller than the `resampling_period`, or "downsampling" if it is
8080
bigger.
8181
82-
In general a resampling window is the same as the `resampling_period_s`, and
82+
In general a resampling window is the same as the `resampling_period`, and
8383
this function might receive input samples from multiple windows in the past to
8484
enable extrapolation, but no samples from the future (so the timestamp of the
8585
new sample that is going to be produced will always be bigger than the biggest
@@ -123,31 +123,31 @@ def average(
123123
class ResamplerConfig:
124124
"""Resampler configuration."""
125125

126-
resampling_period_s: float
127-
"""The resampling period in seconds.
126+
resampling_period: timedelta
127+
"""The resampling period.
128128
129129
This is the time it passes between resampled data should be calculated.
130130
131-
It must be a positive number.
131+
It must be a positive time span.
132132
"""
133133

134134
max_data_age_in_periods: float = 3.0
135135
"""The maximum age a sample can have to be considered *relevant* for resampling.
136136
137-
Expressed in number of periods, where period is the `resampling_period_s`
137+
Expressed in number of periods, where period is the `resampling_period`
138138
if we are downsampling (resampling period bigger than the input period) or
139139
the input sampling period if we are upsampling (input period bigger than
140140
the resampling period).
141141
142142
It must be bigger than 1.0.
143143
144144
Example:
145-
If `resampling_period_s` is 3, the input sampling period is
145+
If `resampling_period` is 3 seconds, the input sampling period is
146146
1 and `max_data_age_in_periods` is 2, then data older than 3*2
147147
= 6 seconds will be discarded when creating a new sample and never
148148
passed to the resampling function.
149149
150-
If `resampling_period_s` is 3, the input sampling period is
150+
If `resampling_period` is 3 seconds, the input sampling period is
151151
5 and `max_data_age_in_periods` is 2, then data older than 5*2
152152
= 10 seconds will be discarded when creating a new sample and never
153153
passed to the resampling function.
@@ -197,9 +197,9 @@ def __post_init__(self) -> None:
197197
Raises:
198198
ValueError: If any value is out of range.
199199
"""
200-
if self.resampling_period_s < 0.0:
200+
if self.resampling_period.total_seconds() < 0.0:
201201
raise ValueError(
202-
f"resampling_period_s ({self.resampling_period_s}) must be positive"
202+
f"resampling_period ({self.resampling_period}) must be positive"
203203
)
204204
if self.max_data_age_in_periods < 1.0:
205205
raise ValueError(
@@ -348,8 +348,8 @@ def __init__(self, config: ResamplerConfig) -> None:
348348
self._resamplers: dict[Source, _StreamingHelper] = {}
349349
"""A mapping between sources and the streaming helper handling that source."""
350350

351-
self._window_end: datetime = datetime.now(timezone.utc) + timedelta(
352-
seconds=self._config.resampling_period_s
351+
self._window_end: datetime = (
352+
datetime.now(timezone.utc) + self._config.resampling_period
353353
)
354354
"""The time in which the current window ends.
355355
@@ -449,9 +449,7 @@ async def resample(self, *, one_shot: bool = False) -> None:
449449
return_exceptions=True,
450450
)
451451

452-
self._window_end = self._window_end + timedelta(
453-
seconds=self._config.resampling_period_s
454-
)
452+
self._window_end = self._window_end + self._config.resampling_period
455453
exceptions = {
456454
source: results[i]
457455
for i, source in enumerate(self._resamplers)
@@ -476,25 +474,30 @@ async def _wait_for_next_resampling_period(self) -> None:
476474
sleep_for = self._window_end - now
477475
await asyncio.sleep(sleep_for.total_seconds())
478476

479-
timer_error_s = (now - self._window_end).total_seconds()
480-
if timer_error_s > (self._config.resampling_period_s / 10.0):
477+
timer_error = now - self._window_end
478+
# We use a tolerance of 10% of the resampling period
479+
tolerance = timedelta(
480+
seconds=self._config.resampling_period.total_seconds() / 10.0
481+
)
482+
if timer_error > tolerance:
481483
_logger.warning(
482-
"The resampling task woke up too late. Resampling should have started "
483-
"at %s, but it started at %s (%s seconds difference; resampling "
484-
"period is %s seconds)",
484+
"The resampling task woke up too late. Resampling should have "
485+
"started at %s, but it started at %s (tolerance: %s, "
486+
"difference: %s; resampling period: %s)",
485487
self._window_end,
486488
now,
487-
timer_error_s,
488-
self._config.resampling_period_s,
489+
tolerance,
490+
timer_error,
491+
self._config.resampling_period,
489492
)
490493

491494

492495
class _ResamplingHelper:
493496
"""Keeps track of *relevant* samples to pass them to the resampling function.
494497
495498
Samples are stored in an internal ring buffer. All collected samples that
496-
are newer than `max(resampling_period_s, input_period_s)
497-
* max_data_age_in_periods` seconds are considered *relevant* and are passed
499+
are newer than `max(resampling_period, input_period_s)
500+
* max_data_age_in_periods` are considered *relevant* and are passed
498501
to the provided `resampling_function` when calling the `resample()` method.
499502
All older samples are discarded.
500503
"""
@@ -552,7 +555,7 @@ def _update_source_sample_period(self, now: datetime) -> bool:
552555
props.sampling_period_s is not None
553556
or props.sampling_start is None
554557
or props.received_samples
555-
< config.resampling_period_s * config.max_data_age_in_periods
558+
< config.resampling_period.total_seconds() * config.max_data_age_in_periods
556559
or len(self._buffer) < self._buffer.maxlen
557560
# There might be a race between the first sample being received and
558561
# this function being called
@@ -589,14 +592,14 @@ def _update_buffer_len(self) -> bool:
589592
# If we are upsampling, one sample could be enough for back-filling, but
590593
# we store max_data_age_in_periods for input periods, so resampling
591594
# functions can do more complex inter/extrapolation if they need to.
592-
if input_sampling_period_s > config.resampling_period_s:
595+
if input_sampling_period_s > config.resampling_period.total_seconds():
593596
new_buffer_len = input_sampling_period_s * config.max_data_age_in_periods
594597
# If we are upsampling, we want a buffer that can hold
595-
# max_data_age_in_periods * resampling_period_s seconds of data, and we
598+
# max_data_age_in_periods * resampling_period of data, and we
596599
# one sample every input_sampling_period_s.
597600
else:
598601
new_buffer_len = (
599-
config.resampling_period_s
602+
config.resampling_period.total_seconds()
600603
/ input_sampling_period_s
601604
* config.max_data_age_in_periods
602605
)
@@ -652,9 +655,9 @@ def resample(self, timestamp: datetime) -> Sample:
652655
# To see which samples are relevant we need to consider if we are down
653656
# or upsampling.
654657
period = (
655-
max(conf.resampling_period_s, props.sampling_period_s)
658+
max(conf.resampling_period.total_seconds(), props.sampling_period_s)
656659
if props.sampling_period_s is not None
657-
else conf.resampling_period_s
660+
else conf.resampling_period.total_seconds()
658661
)
659662
minimum_relevant_timestamp = timestamp - timedelta(
660663
seconds=period * conf.max_data_age_in_periods

0 commit comments

Comments
 (0)