Skip to content

Commit 5503c83

Browse files
committed
Don't use Sample in the resampler buffer and function
`Sample` can have `None` as value, but the resampler buffer (and function) never get any `None` value stored/passed. The reason is saving a sample with no value provides no extra information and would make the resampling function implementation more complicated, as it would need to check values for `None`. Currently, the resampling function will never receive a `None` value, but it still needs to check for them for type-checking reasons. This commit uses a `tuple[datetime, Quantity]` to store samples in the buffer and pass it to the resampling function. This way it is trivially clear that values can't be `None` in this context. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent c8c0f63 commit 5503c83

File tree

4 files changed

+136
-41
lines changed

4 files changed

+136
-41
lines changed

benchmarks/timeseries/resampling.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99

1010
from frequenz.quantities import Quantity
1111

12-
from frequenz.sdk.timeseries import ResamplerConfig, Sample
12+
from frequenz.sdk.timeseries import ResamplerConfig
1313
from frequenz.sdk.timeseries._resampling._base_types import SourceProperties
1414
from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper
1515

1616

1717
def nop( # pylint: disable=unused-argument
18-
samples: Sequence[Sample[Quantity]],
18+
samples: Sequence[tuple[datetime, Quantity]],
1919
resampler_config: ResamplerConfig,
2020
source_properties: SourceProperties,
2121
) -> float:
@@ -43,7 +43,7 @@ def _do_work() -> None:
4343
for _n_resample in range(resamples):
4444
for _n_sample in range(samples):
4545
now = now + timedelta(seconds=1 / samples)
46-
helper.add_sample(Sample(now, Quantity(0.0)))
46+
helper.add_sample((now, Quantity(0.0)))
4747
helper.resample(now)
4848

4949
print(timeit(_do_work, number=5))

src/frequenz/sdk/timeseries/_resampling/_config.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from frequenz.quantities import Quantity
1515

16-
from .._base_types import UNIX_EPOCH, QuantityT, Sample
16+
from .._base_types import UNIX_EPOCH, QuantityT
1717
from ._base_types import SourceProperties
1818

1919
_logger = logging.getLogger(__name__)
@@ -60,16 +60,17 @@ class ResamplingFunction(Protocol):
6060

6161
def __call__(
6262
self,
63-
input_samples: Sequence[Sample[Quantity]],
63+
input_samples: Sequence[tuple[datetime, Quantity]],
6464
resampler_config: ResamplerConfig,
6565
source_properties: SourceProperties,
6666
/,
6767
) -> float:
6868
"""Call the resampling function.
6969
7070
Args:
71-
input_samples: The sequence of pre-existing samples. It must be
72-
non-empty.
71+
input_samples: The sequence of pre-existing samples, where the first item is
72+
the timestamp of the sample, and the second is the value of the sample.
73+
The sequence must be non-empty.
7374
resampler_config: The configuration of the resampler calling this
7475
function.
7576
source_properties: The properties of the source being resampled.
@@ -81,7 +82,7 @@ def __call__(
8182

8283

8384
def average(
84-
samples: Sequence[Sample[QuantityT]],
85+
samples: Sequence[tuple[datetime, QuantityT]],
8586
resampler_config: ResamplerConfig, # pylint: disable=unused-argument
8687
source_properties: SourceProperties, # pylint: disable=unused-argument
8788
) -> float:
@@ -97,9 +98,7 @@ def average(
9798
The average of all `samples` values.
9899
"""
99100
assert len(samples) > 0, "Average cannot be given an empty list of samples"
100-
values = list(
101-
sample.value.base_value for sample in samples if sample.value is not None
102-
)
101+
values = list(sample[1].base_value for sample in samples)
103102
return sum(values) / len(values)
104103

105104

src/frequenz/sdk/timeseries/_resampling/_resampler.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,9 @@ def __init__(self, name: str, config: ResamplerConfig) -> None:
258258
"""
259259
self._name = name
260260
self._config = config
261-
self._buffer: deque[Sample[Quantity]] = deque(maxlen=config.initial_buffer_len)
261+
self._buffer: deque[tuple[datetime, Quantity]] = deque(
262+
maxlen=config.initial_buffer_len
263+
)
262264
self._source_properties: SourceProperties = SourceProperties()
263265

264266
@property
@@ -270,15 +272,15 @@ def source_properties(self) -> SourceProperties:
270272
"""
271273
return self._source_properties
272274

273-
def add_sample(self, sample: Sample[Quantity]) -> None:
275+
def add_sample(self, sample: tuple[datetime, Quantity]) -> None:
274276
"""Add a new sample to the internal buffer.
275277
276278
Args:
277279
sample: The sample to be added to the buffer.
278280
"""
279281
self._buffer.append(sample)
280282
if self._source_properties.sampling_start is None:
281-
self._source_properties.sampling_start = sample.timestamp
283+
self._source_properties.sampling_start = sample[0]
282284
self._source_properties.received_samples += 1
283285

284286
def _update_source_sample_period(self, now: datetime) -> bool:
@@ -416,9 +418,9 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
416418
min_index = bisect(
417419
self._buffer,
418420
minimum_relevant_timestamp,
419-
key=lambda s: s.timestamp,
421+
key=lambda s: s[0],
420422
)
421-
max_index = bisect(self._buffer, timestamp, key=lambda s: s.timestamp)
423+
max_index = bisect(self._buffer, timestamp, key=lambda s: s[0])
422424
# Using itertools for slicing doesn't look very efficient, but
423425
# experiments with a custom (ring) buffer that can slice showed that
424426
# it is not that bad. See:
@@ -450,8 +452,8 @@ def _log_no_relevant_samples(
450452

451453
if self._buffer:
452454
buffer_info = (
453-
f"{self._buffer[0].timestamp} - "
454-
f"{self._buffer[-1].timestamp} ({len(self._buffer)} samples)"
455+
f"{self._buffer[0][0]} - "
456+
f"{self._buffer[-1][0]} ({len(self._buffer)} samples)"
455457
)
456458
else:
457459
buffer_info = "Empty"
@@ -509,7 +511,7 @@ async def _receive_samples(self) -> None:
509511
"""
510512
async for sample in self._source:
511513
if sample.value is not None and not sample.value.isnan():
512-
self._helper.add_sample(sample)
514+
self._helper.add_sample((sample.timestamp, sample.value))
513515

514516
# We need the noqa because pydoclint can't figure out that `recv_exception` is an
515517
# `Exception` instance.

0 commit comments

Comments
 (0)