Skip to content

Commit a1ce9b3

Browse files
authored
Make resampler infer the input sampling period and buffer length (#136)
The input sampling period is calculated by counting the total received samples, and dividing the total resampling time until the internal buffer is full by the total received samples. To store all the source properties (sampling period, sampling start, total received samples) a new class is used (SourceProperties) and this information is passed to the resampling function, so it can have more information about the source to perform a proper resampling. This calculation is performed once and then remains constant, but at some point we could add a timer to re-calculate it. This also improves slightly the documentation and validation of the ResamplingConfig class. The add_timeseries() methods now also takes a name as a way to identify different sources in log messages (the actor uses the channel name as timeseries name). We also update the resampling buffer length based on the input sampling period, resampling period, and max data age in periods, so it can hold all the past data without wasting any space. Fixes #55.
2 parents 8e7a7c8 + 23931fd commit a1ce9b3

File tree

14 files changed

+563
-83
lines changed

14 files changed

+563
-83
lines changed

RELEASE_NOTES.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,35 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* The resampler now takes a `name` argument for `add_timeseries()`. This is only used for logging purposes.
10+
11+
* The resampler and resampling actor now takes a `ResamplerConfig` object in the constructor instead of the individual values.
12+
13+
* The resampler and resampling actor can emit errors or warnings if the buffer needed to resample is too big. If it is bigger than `ResamplingConfig.max_buffer_len`, the buffer will be truncated to that length, so the resampling can lose accuracy.
14+
15+
* The `ResamplingFunction` now takes different arguments:
16+
17+
* `resampling_period_s` was removed.
18+
* `resampler_config` is the configuration of the resampler calling the resampling function.
19+
* `source_properties` is the properties of the source being resampled.
1020

1121
## New Features
1222

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
23+
* The resampler and resampling actor can now take a few new options via the new `ResamplerConfig` object:
24+
25+
* `warn_buffer_len`: The minimum length of the resampling buffer that will emit a warning.
26+
* `max_buffer_len`: The maximum length of the resampling buffer.
27+
28+
* The resampler now infers the input sampling rate of sources and use a buffer size according to it.
29+
30+
This information can be consulted via `resampler.get_source_properties(source)`. The buffer size is now calculated so it can store all the needed samples requested via the combination of `resampling_period_s`, `max_data_age_in_periods` and the calculated `input_sampling_period_s`.
31+
32+
If we are upsampling, one sample could be enough for back-filling, but we store `max_data_age_in_periods` using `input_sampling_period_s` as period, so the resampling functions can do more complex inter/extrapolation if they need to.
33+
34+
If we are downsampling, we want a buffer that can hold `max_data_age_in_periods * resampling_period_s` seconds of data, and we have one sample every `input_sampling_period_s`, so we use a buffer length of: `max_data_age_in_periods * resampling_period_s / input_sampling_period_s`
1435

1536
## Bug Fixes
1637

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
38+
* Fixed logger creationg for some modules.
39+
40+
Some modules didn't create the logger properly so there was no way to configure them using the standard logger configuration system. Because of this, it might have happened that some log messages were never showed, or some message that the user didn't want to get were emitted anyway.

benchmarks/timeseries/resampling.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,17 @@
88
from typing import Sequence
99

1010
from frequenz.sdk.timeseries import Sample
11-
from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper
11+
from frequenz.sdk.timeseries._resampling import (
12+
ResamplerConfig,
13+
SourceProperties,
14+
_ResamplingHelper,
15+
)
1216

1317

1418
def nop( # pylint: disable=unused-argument
15-
samples: Sequence[Sample], resampling_period_s: float
19+
samples: Sequence[Sample],
20+
resampler_config: ResamplerConfig,
21+
source_properties: SourceProperties,
1622
) -> float:
1723
"""Return 0.0."""
1824
return 0.0
@@ -21,12 +27,15 @@ def nop( # pylint: disable=unused-argument
2127
def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
2228
"""Benchmark the resampling helper."""
2329
helper = _ResamplingHelper(
30+
"benchmark",
2431
ResamplerConfig(
2532
resampling_period_s=1.0,
2633
max_data_age_in_periods=3.0,
2734
resampling_function=nop,
2835
initial_buffer_len=samples * 3,
29-
)
36+
warn_buffer_len=samples * 3 + 2,
37+
max_buffer_len=samples * 3 + 3,
38+
),
3039
)
3140
now = datetime.now(timezone.utc)
3241

examples/resampling.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ async def run() -> None: # pylint: disable=too-many-locals
105105
average_chan = Broadcast[Sample]("average")
106106

107107
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
108-
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)
108+
second_stage_resampler.add_timeseries(
109+
"avg", average_chan.new_receiver(), _print_sample
110+
)
109111

110112
average_sender = average_chan.new_sender()
111113
# Needed until channels Senders raises exceptions on errors

src/frequenz/sdk/_data_handling/time_series.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
Key = TypeVar("Key")
1616
Value = TypeVar("Value")
1717

18-
logger = logging.Logger(__name__)
18+
logger = logging.getLogger(__name__)
1919

2020
SYMBOL_SEGMENT_SEPARATOR = "_"
2121

src/frequenz/sdk/_data_ingestion/component_info.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from ..microgrid import ComponentGraph
1111
from ..microgrid.component import ComponentCategory
1212

13-
logger = logging.Logger(__name__)
13+
logger = logging.getLogger(__name__)
1414

1515

1616
@dataclass

src/frequenz/sdk/_data_ingestion/formula_calculator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
METRIC_PV_PROD,
3737
)
3838

39-
logger = logging.Logger(__name__)
39+
logger = logging.getLogger(__name__)
4040

4141

4242
@dataclass(frozen=True)

src/frequenz/sdk/_data_ingestion/load_historic_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import pyarrow.parquet as pq
2323
from tqdm import tqdm
2424

25-
logger = logging.Logger(__name__)
25+
logger = logging.getLogger(__name__)
2626

2727
# directory path to all component data of a particular site
2828
HISTDATA_DIR = "/data"

src/frequenz/sdk/_data_ingestion/microgrid_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from .formula_calculator import FormulaCalculator
2828
from .gen_component_receivers import gen_component_receivers
2929

30-
logger = logging.Logger(__name__)
30+
logger = logging.getLogger(__name__)
3131

3232
CONFIG_FILE_FORMULA_PREFIX = "formula_"
3333

src/frequenz/sdk/actor/_decorator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import logging
1515
from typing import Any, Generic, Optional, Type, TypeVar
1616

17-
logger = logging.Logger(__name__)
17+
logger = logging.getLogger(__name__)
1818

1919
OT = TypeVar("OT")
2020

src/frequenz/sdk/actor/_resampling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from ._data_sourcing import ComponentMetricRequest
2020
from ._decorator import actor
2121

22-
logger = logging.Logger(__name__)
22+
logger = logging.getLogger(__name__)
2323

2424

2525
@actor
@@ -85,7 +85,7 @@ async def sink_adapter(sample: Sample) -> None:
8585
if not await sender.send(sample):
8686
raise Exception(f"Error while sending with sender {sender}", sender)
8787

88-
self._resampler.add_timeseries(receiver, sink_adapter)
88+
self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)
8989

9090
async def _process_resampling_requests(self) -> None:
9191
"""Process resampling data requests."""

0 commit comments

Comments
 (0)