Skip to content

Commit 00485f7

Browse files
committed
Use the timestamp provided by the timer receiver
The resampling class now can take an optional timestamp to be used to produce the new samples and the resampling actor now passes the timestamp produced by the timer instead of calling `now()` manually. This also adds a more explicit check to make sure the timer receiver is not closed. Fixes #54. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 639961d commit 00485f7

File tree

3 files changed

+33
-12
lines changed

3 files changed

+33
-12
lines changed

src/frequenz/sdk/actor/resampling.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,12 @@ async def run(self) -> None:
208208
component_data_receiver=MergeNamed(**self._input_receivers),
209209
)
210210
while await select.ready():
211-
if _ := select.resampling_timer:
211+
if msg := select.resampling_timer:
212+
assert msg.inner is not None, "The timer should never be 'closed'"
213+
timestamp = msg.inner
212214
awaitables = [
213215
self._output_senders[channel_name].send(sample)
214-
for channel_name, sample in self._resampler.resample()
216+
for channel_name, sample in self._resampler.resample(timestamp)
215217
]
216218
await asyncio.gather(*awaitables)
217219
if msg := select.component_data_receiver:

src/frequenz/sdk/timeseries/resampler.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,22 @@ def add_sample(self, time_series_id: str, sample: Sample) -> None:
179179
f"No resampler for time series {time_series_id} found!"
180180
) from err
181181

182-
def resample(self) -> Generator[Tuple[str, Sample], None, None]:
182+
def resample(
183+
self, timestamp: Optional[datetime] = None
184+
) -> Generator[Tuple[str, Sample], None, None]:
183185
"""Resample samples for all time series.
184186
187+
Args:
188+
timestamp: the timestamp to use to emit the new samples (and to
189+
consider stored samples relevant for resampling. If `None`,
190+
the current datetime (in UTC) will be used.
191+
185192
Yields:
186193
iterator of time series ids and their newly resampled samples
187194
"""
188-
now = datetime.now(timezone.utc)
195+
if timestamp is None:
196+
timestamp = datetime.now(timezone.utc)
189197
for time_series_id, resampler in self._resamplers.items():
190-
yield time_series_id, Sample(timestamp=now, value=resampler.resample(now))
198+
yield time_series_id, Sample(
199+
timestamp=timestamp, value=resampler.resample(timestamp)
200+
)

tests/timeseries/test_resampling.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,17 @@ def test_component_metric_group_resampler() -> None:
143143
resampler.add_time_series(time_series_id=time_series_id_1)
144144
resampler.add_time_series(time_series_id=time_series_id_2)
145145

146-
timestamp = datetime.now(timezone.utc)
146+
now = datetime.now(timezone.utc)
147147

148148
value11 = 5.0
149149
value12 = 15.0
150150
value21 = 100.0
151151
value22 = 999.0
152152

153-
sample11 = Sample(timestamp - timedelta(seconds=0.7), value=value11)
154-
sample12 = Sample(timestamp - timedelta(seconds=0.5), value=value12)
155-
sample21 = Sample(timestamp - timedelta(seconds=5.05), value=value21)
156-
sample22 = Sample(timestamp - timedelta(seconds=0.99), value=value22)
153+
sample11 = Sample(now - timedelta(seconds=0.7), value=value11)
154+
sample12 = Sample(now - timedelta(seconds=0.5), value=value12)
155+
sample21 = Sample(now - timedelta(seconds=5.05), value=value21)
156+
sample22 = Sample(now - timedelta(seconds=0.99), value=value22)
157157

158158
resampler.add_sample(time_series_id=time_series_id_1, sample=sample11)
159159
resampler.add_sample(time_series_id=time_series_id_1, sample=sample12)
@@ -162,8 +162,17 @@ def test_component_metric_group_resampler() -> None:
162162

163163
resampled_samples = dict(resampler.resample())
164164

165-
assert resampled_samples[time_series_id_1].timestamp >= timestamp
165+
assert resampled_samples[time_series_id_1].timestamp >= now
166166
assert resampled_samples[time_series_id_1].value == sum([value11, value12])
167167

168-
assert resampled_samples[time_series_id_2].timestamp >= timestamp
168+
assert resampled_samples[time_series_id_2].timestamp >= now
169169
assert resampled_samples[time_series_id_2].value == value22
170+
171+
timestamp = now + timedelta(seconds=0.5)
172+
resampled_samples = dict(resampler.resample(timestamp))
173+
174+
assert resampled_samples[time_series_id_1].timestamp == timestamp
175+
assert resampled_samples[time_series_id_1].value == value12
176+
177+
assert resampled_samples[time_series_id_2].timestamp == timestamp
178+
assert resampled_samples[time_series_id_2].value is None

0 commit comments

Comments
 (0)