-
Couldn't load subscription status.
- Fork 20
Add a wait_for_samples method to the MovingWindow
#1159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
329738e
49b8738
a78106a
85dda90
32e27cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -190,6 +190,8 @@ def __init__( # pylint: disable=too-many-arguments | |
| align_to=align_to, | ||
| ) | ||
|
|
||
| self._condition_new_sample = asyncio.Condition() | ||
|
|
||
| def start(self) -> None: | ||
| """Start the MovingWindow. | ||
|
|
||
|
|
@@ -318,6 +320,44 @@ def window( | |
| start, end, force_copy=force_copy, fill_value=fill_value | ||
| ) | ||
|
|
||
| async def wait_for_samples(self, n: int) -> None: | ||
| """Wait until the next `n` samples are available in the MovingWindow. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this is valid samples? If so, I am actually not sure if we want this or something time-based i.e. allow that not all samples are valid. However, for our current use-case this also works since we would set n=1 anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's interesting. valid means that any data was received. If a component is missing data, resampler will send If a component is sending only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, in many scenarios I wouldn't distinguish between missing or None values. I think it shouldn't return after n Nones but after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated it to return after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see the confusion, I understood that it triggers when This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it would wait until there are It does consider the timestamps when the samples are received. It expects The current tests only cover cases where there is no resampling in the moving window. I'll rectify that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to clarify that it returns after |
||
|
|
||
| This function returns after `n` new samples are available in the MovingWindow, | ||
| without considering whether the new samples are valid. The validity of the | ||
| samples can be verified by calling the | ||
| [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method. | ||
|
|
||
| Args: | ||
| n: The number of samples to wait for. | ||
|
|
||
| Raises: | ||
| ValueError: If `n` is less than or equal to 0 or greater than the capacity | ||
| of the MovingWindow. | ||
| """ | ||
| if n == 0: | ||
| return | ||
| if n < 0: | ||
| raise ValueError("The number of samples to wait for must be 0 or greater.") | ||
| if n > self.capacity: | ||
shsms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| raise ValueError( | ||
| "The number of samples to wait for must be less than or equal to the " | ||
| + f"capacity of the MovingWindow ({self.capacity})." | ||
| ) | ||
|
Comment on lines
+343
to
+346
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could also just silently wait for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought of it that if n > capacity the n would still be respected, but calculated in terms of time steps since this was triggered. But I think the different understanding here shows already that this could be confusing and we can limit it until we have a case where we need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, true, best to start with a safe approach, for my behaviour you can still easily get is via |
||
| start_timestamp = ( | ||
| # Start from the next expected timestamp. | ||
| self.newest_timestamp + self.sampling_period | ||
| if self.newest_timestamp is not None | ||
| else None | ||
| ) | ||
| while True: | ||
shsms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| async with self._condition_new_sample: | ||
| # Every time a new sample is received, this condition gets notified and | ||
| # will wake up. | ||
| _ = await self._condition_new_sample.wait() | ||
|
Comment on lines
+354
to
+357
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just thinking out loud, and I don't even think we need to think about it for this PR, but maybe this could be done more efficiently by reversing the logic, and only set the condition when the counter is set (it could be even be a simple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The most common case is also with n=1 and resampling_interval=15 minutes. |
||
| if self.count_covered(since=start_timestamp) >= n: | ||
| return | ||
|
|
||
| async def _run_impl(self) -> None: | ||
| """Awaits samples from the receiver and updates the underlying ring buffer. | ||
|
|
||
|
|
@@ -331,6 +371,9 @@ async def _run_impl(self) -> None: | |
| await self._resampler_sender.send(sample) | ||
| else: | ||
| self._buffer.update(sample) | ||
| async with self._condition_new_sample: | ||
| # Wake up all coroutines waiting for new samples. | ||
| self._condition_new_sample.notify_all() | ||
|
|
||
| except asyncio.CancelledError: | ||
| _logger.info("MovingWindow task has been cancelled.") | ||
|
|
@@ -343,8 +386,10 @@ def _configure_resampler(self) -> None: | |
| assert self._resampler is not None | ||
|
|
||
| async def sink_buffer(sample: Sample[Quantity]) -> None: | ||
| if sample.value is not None: | ||
| self._buffer.update(sample) | ||
| self._buffer.update(sample) | ||
| async with self._condition_new_sample: | ||
| # Wake up all coroutines waiting for new samples. | ||
| self._condition_new_sample.notify_all() | ||
|
|
||
| resampler_channel = Broadcast[Sample[Quantity]](name="average") | ||
| self._resampler_sender = resampler_channel.new_sender() | ||
|
|
@@ -355,23 +400,44 @@ async def sink_buffer(sample: Sample[Quantity]) -> None: | |
| asyncio.create_task(self._resampler.resample(), name="resample") | ||
| ) | ||
|
|
||
| def count_valid(self) -> int: | ||
| """ | ||
| Count the number of valid samples in this `MovingWindow`. | ||
| def count_valid( | ||
| self, *, since: datetime | None = None, until: datetime | None = None | ||
| ) -> int: | ||
| """Count the number of valid samples in this `MovingWindow`. | ||
|
|
||
| If `since` and `until` are provided, the count is limited to the samples between | ||
| (and including) the given timestamps. | ||
|
|
||
| Args: | ||
| since: The timestamp from which to start counting. If `None`, the oldest | ||
| timestamp of the buffer is used. | ||
| until: The timestamp until (and including) which to count. If `None`, the | ||
| newest timestamp of the buffer is used. | ||
|
|
||
| Returns: | ||
| The number of valid samples in this `MovingWindow`. | ||
| """ | ||
| return self._buffer.count_valid() | ||
| return self._buffer.count_valid(since=since, until=until) | ||
|
|
||
| def count_covered(self) -> int: | ||
| def count_covered( | ||
| self, *, since: datetime | None = None, until: datetime | None = None | ||
| ) -> int: | ||
| """Count the number of samples that are covered by the oldest and newest valid samples. | ||
|
|
||
| If `since` and `until` are provided, the count is limited to the samples between | ||
| (and including) the given timestamps. | ||
|
|
||
| Args: | ||
| since: The timestamp from which to start counting. If `None`, the oldest | ||
| timestamp of the buffer is used. | ||
| until: The timestamp until (and including) which to count. If `None`, the | ||
| newest timestamp of the buffer is used. | ||
|
|
||
| Returns: | ||
| The count of samples between the oldest and newest (inclusive) valid samples | ||
| or 0 if there are is no time range covered. | ||
| """ | ||
| return self._buffer.count_covered() | ||
| return self._buffer.count_covered(since=since, until=until) | ||
|
|
||
| @overload | ||
| def __getitem__(self, key: SupportsIndex) -> float: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.