Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/timeseries/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

from frequenz.quantities import Quantity

from frequenz.sdk.timeseries import ResamplerConfig, Sample
from frequenz.sdk.timeseries import ResamplerConfig
from frequenz.sdk.timeseries._resampling._base_types import SourceProperties
from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper


def nop( # pylint: disable=unused-argument
samples: Sequence[Sample[Quantity]],
samples: Sequence[tuple[datetime, Quantity]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a NoNoneSample type or so might be a nice abstraction here. I can imagine there are actually many places that would enjoy not having to check for None after they initially received the data. I can certainly use it in FCR as well.

For now its fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in any case here we want to get rid of even the Quantity, we shouldn't really use Sample either, at least until we can use Sample[float].

If we go with the None/not None sample, maybe it can be OptionalSample (None values allowed) and Sample (can't have a None value) to follow Python's old Optional as an alias of | None. Another option, which I'm not sure if it is doable or not, is Sample[Power] vs Sample[Power | None]. I have the feeling we tried to make that work and failed, but maybe I'm just remembering wrongly.

resampler_config: ResamplerConfig,
source_properties: SourceProperties,
) -> float:
Expand Down Expand Up @@ -43,7 +43,7 @@ def _do_work() -> None:
for _n_resample in range(resamples):
for _n_sample in range(samples):
now = now + timedelta(seconds=1 / samples)
helper.add_sample(Sample(now, Quantity(0.0)))
helper.add_sample((now, Quantity(0.0)))
helper.resample(now)

print(timeit(_do_work, number=5))
Expand Down
15 changes: 7 additions & 8 deletions src/frequenz/sdk/timeseries/_resampling/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from frequenz.quantities import Quantity

from .._base_types import UNIX_EPOCH, QuantityT, Sample
from .._base_types import UNIX_EPOCH, QuantityT
from ._base_types import SourceProperties

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,16 +60,17 @@ class ResamplingFunction(Protocol):

def __call__(
self,
input_samples: Sequence[Sample[Quantity]],
input_samples: Sequence[tuple[datetime, Quantity]],
resampler_config: ResamplerConfig,
source_properties: SourceProperties,
/,
) -> float:
"""Call the resampling function.

Args:
input_samples: The sequence of pre-existing samples. It must be
non-empty.
input_samples: The sequence of pre-existing samples, where the first item is
the timestamp of the sample, and the second is the value of the sample.
The sequence must be non-empty.
resampler_config: The configuration of the resampler calling this
function.
source_properties: The properties of the source being resampled.
Expand All @@ -81,7 +82,7 @@ def __call__(


def average(
samples: Sequence[Sample[QuantityT]],
samples: Sequence[tuple[datetime, QuantityT]],
resampler_config: ResamplerConfig, # pylint: disable=unused-argument
source_properties: SourceProperties, # pylint: disable=unused-argument
) -> float:
Expand All @@ -97,9 +98,7 @@ def average(
The average of all `samples` values.
"""
assert len(samples) > 0, "Average cannot be given an empty list of samples"
values = list(
sample.value.base_value for sample in samples if sample.value is not None
)
values = list(sample[1].base_value for sample in samples)
return sum(values) / len(values)


Expand Down
18 changes: 10 additions & 8 deletions src/frequenz/sdk/timeseries/_resampling/_resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ def __init__(self, name: str, config: ResamplerConfig) -> None:
"""
self._name = name
self._config = config
self._buffer: deque[Sample[Quantity]] = deque(maxlen=config.initial_buffer_len)
self._buffer: deque[tuple[datetime, Quantity]] = deque(
maxlen=config.initial_buffer_len
)
self._source_properties: SourceProperties = SourceProperties()

@property
Expand All @@ -270,15 +272,15 @@ def source_properties(self) -> SourceProperties:
"""
return self._source_properties

def add_sample(self, sample: Sample[Quantity]) -> None:
def add_sample(self, sample: tuple[datetime, Quantity]) -> None:
"""Add a new sample to the internal buffer.

Args:
sample: The sample to be added to the buffer.
"""
self._buffer.append(sample)
if self._source_properties.sampling_start is None:
self._source_properties.sampling_start = sample.timestamp
self._source_properties.sampling_start = sample[0]
self._source_properties.received_samples += 1

def _update_source_sample_period(self, now: datetime) -> bool:
Expand Down Expand Up @@ -416,9 +418,9 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]:
min_index = bisect(
self._buffer,
minimum_relevant_timestamp,
key=lambda s: s.timestamp,
key=lambda s: s[0],
)
max_index = bisect(self._buffer, timestamp, key=lambda s: s.timestamp)
max_index = bisect(self._buffer, timestamp, key=lambda s: s[0])
# Using itertools for slicing doesn't look very efficient, but
# experiments with a custom (ring) buffer that can slice showed that
# it is not that bad. See:
Expand Down Expand Up @@ -450,8 +452,8 @@ def _log_no_relevant_samples(

if self._buffer:
buffer_info = (
f"{self._buffer[0].timestamp} - "
f"{self._buffer[-1].timestamp} ({len(self._buffer)} samples)"
f"{self._buffer[0][0]} - "
f"{self._buffer[-1][0]} ({len(self._buffer)} samples)"
)
else:
buffer_info = "Empty"
Expand Down Expand Up @@ -509,7 +511,7 @@ async def _receive_samples(self) -> None:
"""
async for sample in self._source:
if sample.value is not None and not sample.value.isnan():
self._helper.add_sample(sample)
self._helper.add_sample((sample.timestamp, sample.value))

# We need the noqa because pydoclint can't figure out that `recv_exception` is an
# `Exception` instance.
Expand Down
Loading