Skip to content

Commit e8f0eab

Browse files
SimonHeybrockclaude
andcommitted
Fix WindowAggregatingExtractor to handle timing jitter robustly
With timing noise (e.g., frames at [0.0001, 1, 2, 3, 4, 5]), a 5-second window would incorrectly include 6 frames instead of 5 due to the inclusive lower bound in label-based slicing. Solution: - Estimate frame period from median interval between frames - Shift cutoff by +0.5 × median_interval to place window boundary between frame slots, avoiding extra frames from timing jitter - Clamp cutoff to latest_time for narrow windows (duration < median_interval) - Continue using inclusive label-based slicing: data[time, cutoff:] This automatically adapts to different frame rates and handles both timing jitter and narrow windows correctly. Add comprehensive tests for timing jitter scenarios: - test_handles_timing_jitter_at_window_start - test_handles_timing_jitter_at_window_end - test_consistent_frame_count_with_perfect_timing Original prompt: Please think about a conceptualy problem in WindowAggregatingExtractor: - Data arrives in regular (but of source noisy) intervals, say once per second. - User requests 5 second sliding window. - Current extraction code will then often return to many frames. Example: Frames at [0.0001, 1,2,3,4,5] => we get 6 frames instead of 5. (I think even with 0.0 it is wrong, since label-based indexing used to extracted windowed_data is inclusive on the left). The problem is that we can't simply reduce to, say, 4 seconds, or 5 frames, since frame rates can vary a lot. Can you think of a more stable approach? 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 913d321 commit e8f0eab

File tree

2 files changed

+109
-1
lines changed

2 files changed

+109
-1
lines changed

src/ess/livedata/dashboard/extractors.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,24 @@ def extract(self, data: sc.DataArray) -> Any:
129129
duration = sc.scalar(self._window_duration_seconds, unit='s').to(
130130
unit=time_coord.unit
131131
)
132-
windowed_data = data[self._concat_dim, latest_time - duration :]
132+
133+
# Estimate frame period from median interval to handle timing noise.
134+
# Shift cutoff by half period to place boundary between frame slots,
135+
# avoiding inclusion of extra frames due to timing jitter.
136+
if len(time_coord) > 1:
137+
intervals = time_coord[1:] - time_coord[:-1]
138+
median_interval = sc.median(intervals)
139+
cutoff_time = latest_time - duration + 0.5 * median_interval
140+
# Clamp to ensure at least latest frame included
141+
# (handles narrow windows where duration < median_interval)
142+
if cutoff_time > latest_time:
143+
cutoff_time = latest_time
144+
else:
145+
# Single frame: use duration-based cutoff
146+
cutoff_time = latest_time - duration
147+
148+
# Use label-based slicing with inclusive lower bound
149+
windowed_data = data[self._concat_dim, cutoff_time:]
133150

134151
# Resolve and cache aggregator function on first call
135152
if self._aggregator is None:

tests/dashboard/extractors_test.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,97 @@ def test_extract_narrow_window(self):
413413
result.data, sc.array(dims=['x'], values=[5, 6], unit='counts')
414414
)
415415

416+
def test_handles_timing_jitter_at_window_start(self):
417+
"""Test that timing noise near window boundary doesn't include extra frames."""
418+
extractor = WindowAggregatingExtractor(
419+
window_duration_seconds=5.0,
420+
aggregation=WindowAggregation.nansum,
421+
concat_dim='time',
422+
)
423+
424+
# Regular 1 Hz data with timing jitter on first frame
425+
# Conceptually frames at t=[0, 1, 2, 3, 4, 5] but first has noise
426+
data = sc.DataArray(
427+
sc.array(
428+
dims=['time', 'x'],
429+
values=[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]],
430+
unit='counts',
431+
),
432+
coords={
433+
'time': sc.array(
434+
dims=['time'], values=[0.0001, 1.0, 2.0, 3.0, 4.0, 5.0], unit='s'
435+
),
436+
'x': sc.arange('x', 2, unit='m'),
437+
},
438+
)
439+
440+
result = extractor.extract(data)
441+
442+
# Window (5-5, 5] = (0, 5] excludes frame at 0.0001 (using exclusive bound)
443+
# Should include 5 frames [1, 2, 3, 4, 5], not all 6
444+
expected_sum = sc.array(dims=['x'], values=[35, 40], unit='counts')
445+
assert sc.allclose(result.data, expected_sum)
446+
447+
def test_handles_timing_jitter_at_window_end(self):
448+
"""Test that timing noise on latest frame doesn't affect frame count."""
449+
extractor = WindowAggregatingExtractor(
450+
window_duration_seconds=5.0,
451+
aggregation=WindowAggregation.nansum,
452+
concat_dim='time',
453+
)
454+
455+
# Regular 1 Hz data with timing jitter on last frame
456+
data = sc.DataArray(
457+
sc.array(
458+
dims=['time', 'x'],
459+
values=[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]],
460+
unit='counts',
461+
),
462+
coords={
463+
'time': sc.array(
464+
dims=['time'], values=[0.0, 1.0, 2.0, 3.0, 4.0, 5.0001], unit='s'
465+
),
466+
'x': sc.arange('x', 2, unit='m'),
467+
},
468+
)
469+
470+
result = extractor.extract(data)
471+
472+
# Window (5.0001-5, 5.0001] = (0.0001, 5.0001]
473+
# Should include 5 frames [1, 2, 3, 4, 5.0001]
474+
expected_sum = sc.array(dims=['x'], values=[35, 40], unit='counts')
475+
assert sc.allclose(result.data, expected_sum)
476+
477+
def test_consistent_frame_count_with_perfect_timing(self):
478+
"""Test baseline: perfect timing gives expected frame count."""
479+
extractor = WindowAggregatingExtractor(
480+
window_duration_seconds=5.0,
481+
aggregation=WindowAggregation.nansum,
482+
concat_dim='time',
483+
)
484+
485+
# Perfect 1 Hz data at exactly [0, 1, 2, 3, 4, 5]
486+
data = sc.DataArray(
487+
sc.array(
488+
dims=['time', 'x'],
489+
values=[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]],
490+
unit='counts',
491+
),
492+
coords={
493+
'time': sc.array(
494+
dims=['time'], values=[0.0, 1.0, 2.0, 3.0, 4.0, 5.0], unit='s'
495+
),
496+
'x': sc.arange('x', 2, unit='m'),
497+
},
498+
)
499+
500+
result = extractor.extract(data)
501+
502+
# Window (0, 5] excludes frame at exactly 0 (exclusive bound)
503+
# Should include 5 frames [1, 2, 3, 4, 5]
504+
expected_sum = sc.array(dims=['x'], values=[35, 40], unit='counts')
505+
assert sc.allclose(result.data, expected_sum)
506+
416507

417508
class TestCreateExtractorsFromParams:
418509
"""Tests for create_extractors_from_params factory function."""

0 commit comments

Comments
 (0)