Skip to content

Commit 9f074e2

Browse files
authored
Improve resampler buffer and config (#131)
This add `typing_extensions` to the dependencies, even if it is not needed by this PR in particular. We also move the resampler configuration to its own class, as it will start growing in options and otherwise there is too much duplication of a huge list of arguments that need to be repeated in multiple classes. Finally we improve the performance of the resampling buffer (adding a benchmark to check it) and add a comment about the failed experiment to build our own custom resampling buffer.
2 parents 086d56b + 30a47b1 commit 9f074e2

File tree

10 files changed

+193
-143
lines changed

10 files changed

+193
-143
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Benchmark resampling."""
5+
6+
from datetime import datetime, timedelta, timezone
7+
from timeit import timeit
8+
from typing import Sequence
9+
10+
from frequenz.sdk.timeseries import Sample
11+
from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper
12+
13+
14+
def nop( # pylint: disable=unused-argument
15+
samples: Sequence[Sample], resampling_period_s: float
16+
) -> float:
17+
"""Return 0.0."""
18+
return 0.0
19+
20+
21+
def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
22+
"""Benchmark the resampling helper."""
23+
helper = _ResamplingHelper(
24+
ResamplerConfig(
25+
resampling_period_s=1.0,
26+
max_data_age_in_periods=3.0,
27+
resampling_function=nop,
28+
initial_buffer_len=samples * 3,
29+
)
30+
)
31+
now = datetime.now(timezone.utc)
32+
33+
def _do_work() -> None:
34+
nonlocal now
35+
for _n_resample in range(resamples):
36+
for _n_sample in range(samples):
37+
now = now + timedelta(seconds=1 / samples)
38+
helper.add_sample(Sample(now, 0.0))
39+
helper.resample(now)
40+
41+
print(timeit(_do_work, number=5))
42+
43+
44+
def _benchmark() -> None:
45+
for resamples in [10, 100, 1000]:
46+
for samples in [10, 100, 1000]:
47+
print(f"{resamples=} {samples=}")
48+
_benchmark_resampling_helper(resamples, samples)
49+
50+
51+
if __name__ == "__main__":
52+
_benchmark()

examples/resampling.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
)
1919
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
2020
from frequenz.sdk.timeseries import Sample
21-
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source
21+
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source
2222

2323
HOST = "microgrid.sandbox.api.frequenz.io"
2424
PORT = 61060
@@ -65,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals
6565
channel_registry=channel_registry,
6666
data_sourcing_request_sender=data_source_request_sender,
6767
resampling_request_receiver=resampling_request_receiver,
68-
resampling_period_s=1,
68+
config=ResamplerConfig(resampling_period_s=1),
6969
)
7070

7171
components = await microgrid.get().api_client.components()
@@ -104,7 +104,7 @@ async def run() -> None: # pylint: disable=too-many-locals
104104
# Create a channel to calculate an average for all the data
105105
average_chan = Broadcast[Sample]("average")
106106

107-
second_stage_resampler = Resampler(resampling_period_s=3.0)
107+
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
108108
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)
109109

110110
average_sender = average_chan.new_sender()

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies = [
3838
"sympy >= 1.10.1, < 2",
3939
"toml >= 0.10",
4040
"tqdm >= 4.38.0, < 5",
41+
"typing_extensions >= 4.4.0, < 5",
4142
"watchfiles >= 0.15.0",
4243
]
4344
dynamic = [ "version" ]

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
from ._config_managing import ConfigManagingActor
88
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
99
from ._decorator import actor
10-
from ._resampling import ComponentMetricsResamplingActor
10+
from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig
1111

1212
__all__ = [
1313
"ChannelRegistry",
1414
"ComponentMetricRequest",
1515
"ComponentMetricsResamplingActor",
1616
"ConfigManagingActor",
1717
"DataSourcingActor",
18+
"ResamplerConfig",
1819
"actor",
1920
]

src/frequenz/sdk/actor/_resampling.py

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
from frequenz.sdk.util.asyncio import cancel_and_await
1515

1616
from ..timeseries import Sample
17-
from ..timeseries._resampling import (
18-
Resampler,
19-
ResamplingError,
20-
ResamplingFunction,
21-
average,
22-
)
17+
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
2318
from ._channel_registry import ChannelRegistry
2419
from ._data_sourcing import ComponentMetricRequest
2520
from ._decorator import actor
@@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments
3732
channel_registry: ChannelRegistry,
3833
data_sourcing_request_sender: Sender[ComponentMetricRequest],
3934
resampling_request_receiver: Receiver[ComponentMetricRequest],
40-
resampling_period_s: float = 0.2,
41-
max_data_age_in_periods: float = 3.0,
42-
resampling_function: ResamplingFunction = average,
35+
config: ResamplerConfig,
4336
) -> None:
4437
"""Initialize an instance.
4538
@@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments
5144
to subscribe to component metrics.
5245
resampling_request_receiver: The receiver to use to receive new
5346
resampmling subscription requests.
54-
resampling_period_s: The time it passes between resampled data
55-
should be calculated (in seconds).
56-
max_data_age_in_periods: The maximum age a sample can have to be
57-
considered *relevant* for resampling purposes, expressed in the
58-
number of resampling periods. For exapmle is
59-
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
60-
then data older than `3*2 = 6` secods will be discarded when
61-
creating a new sample and never passed to the resampling
62-
function.
63-
resampling_function: The function to be applied to the sequence of
64-
*relevant* samples at a given time. The result of the function
65-
is what is sent as the resampled data.
47+
config: The configuration for the resampler.
6648
"""
6749
self._channel_registry: ChannelRegistry = channel_registry
68-
self._resampling_period_s: float = resampling_period_s
69-
self._max_data_age_in_periods: float = max_data_age_in_periods
70-
self._resampling_function: ResamplingFunction = resampling_function
7150
self._data_sourcing_request_sender: Sender[
7251
ComponentMetricRequest
7352
] = data_sourcing_request_sender
7453
self._resampling_request_receiver: Receiver[
7554
ComponentMetricRequest
7655
] = resampling_request_receiver
77-
self._resampler: Resampler = Resampler(
78-
resampling_period_s=resampling_period_s,
79-
max_data_age_in_periods=max_data_age_in_periods,
80-
resampling_function=resampling_function,
81-
)
56+
self._resampler: Resampler = Resampler(config)
8257
self._active_req_channels: set[str] = set()
8358

8459
async def _subscribe(self, request: ComponentMetricRequest) -> None:

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33

44
"""Timeseries basic types."""
55

6-
from dataclasses import dataclass
6+
from dataclasses import dataclass, field
77
from datetime import datetime
88
from typing import Optional
99

1010

11-
@dataclass(frozen=True)
11+
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
12+
# wanted. We are using this order now because usually we need to do binary
13+
# searches on sequences of samples, and the Python `bisect` module doesn't
14+
# support providing a key until Python 3.10.
15+
@dataclass(frozen=True, order=True)
1216
class Sample:
1317
"""A measurement taken at a particular point in time.
1418
@@ -17,5 +21,8 @@ class Sample:
1721
coherent view on a group of component metrics for a particular timestamp.
1822
"""
1923

20-
timestamp: datetime
21-
value: Optional[float] = None
24+
timestamp: datetime = field(compare=True)
25+
"""The time when this sample was generated."""
26+
27+
value: Optional[float] = field(compare=False, default=None)
28+
"""The value of this sample."""

0 commit comments

Comments
 (0)