-
Notifications
You must be signed in to change notification settings - Fork 1
Buffer history of results in DataService
#533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
f8af15a to
b09b6aa
Compare
Implement a service to maintain historical buffers of DataService data with configurable time/size limits. This enables features like sliding-window plots, timeseries plots, and correlation histograms that need access to historical data. Key components: - BufferStrategy protocol with three implementations: - TimeWindowBuffer: Maintains data within a time window - FixedSizeCircularBuffer: Fixed capacity with circular indexing - GrowingBuffer: Starts small, doubles capacity up to max limit - BufferConfig with type-based defaults and pluggable type detection - HistoryBufferService: Subscribes to DataService, maintains buffers per key - BufferSubscriber protocol: Supports configurable views (full/delta/window) Features: - Selective buffering with opt-in registration per key - Memory management with automatic eviction - Thread-safe buffer operations - Flexible subscription with configurable views - Transaction batching for efficient updates All tests passing (30 new tests, 336 total dashboard tests). Original prompt: I need to you think about implementing a HistoryBufferService int the frontend. This will subscribe to DataService and keep a backlog (limited in size and/or time). There will be multiple users of this: Sliding-window plot support, which need to always see the last N values or last T seconds; timeseries plots (kind of the same, but typically infinite length); correlation-histogram, which is mainly interested in updates, but may need to get the entire backlog on initial creation. Please think about this and investigate. Ask me one question at a time until you developed a complete picture of what we need. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Replace isinstance check with duck typing in DataService._notify_subscribers. Simplify _InternalDataSubscriber by removing unnecessary StreamAssembler inheritance and DataSubscriber wrapper with _NullPipe. Changes: - Add SubscriberProtocol to define the duck-typed interface - Update DataService to use hasattr checks instead of isinstance - Remove StreamAssembler inheritance from _InternalDataSubscriber - Remove DataSubscriber wrapper and _NullPipe fake - Simplify _InternalDataSubscriber to just implement keys + trigger This removes multiple layers of abstraction that were only used to satisfy an overly strict isinstance check, making the code more Pythonic and easier to understand. Original prompt: Consider _InternalDataSubscriber - is it overcomplicating things? See DataService._notify_subscribers, it seems it can just take any callable? 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Replace naive O(n²) concatenation pattern with pre-allocated buffers using in-place numpy-level writes, achieving O(n·m) amortized complexity for n appends of size m each. Key fixes: - Fix data-added-twice bug in _ensure_capacity - Fix coordinate corruption from padding slices - Fix GrowingStorage multiplication bug (DataArray * int) - Implement proper pre-allocation with doubling capacity - Handle coordinates correctly (constant vs dimension-dependent) - Properly preserve masks across appends Implementation now follows the pattern from to_nxlog.py: - Pre-allocate buffer with 2x capacity - Use numpy array indexing for O(1) in-place writes - Only concatenate when capacity exceeded (amortized cost) Added 34 comprehensive tests covering: - Single and multiple batch appends - Multi-dimensional data with various coordinate types - Mask preservation - Edge cases (large batches, trimming, overflow) - Behavior comparison between strategies 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
Replace SlidingWindowStorage and GrowingStorage with unified BufferStorage that works against a generic BufferInterface protocol. This separates the storage logic (growth, sliding window, shift-on-overflow) from the complexity of handling DataArray internals (coords, masks, allocation). Key changes: - Add BufferInterface protocol defining minimal operations - Implement VariableBuffer for simple Variable handling - Implement generic BufferStorage working against the interface - Remove old SlidingWindowStorage and GrowingStorage implementations - Add comprehensive tests using VariableBuffer BufferStorage is now completely agnostic about buffer types and can be tested independently with simple Variables. All DataArray complexity will be encapsulated in a future DataArrayBuffer implementation. Original prompt: "I am considering merging two storage implementations. Help me think through this: If we hit the maximum in GrowingStorage we have to have a fallback strategy. Falling back to a sliding window seems like a good choice." Follow-up discussion led to the realization that BufferStorage should work against a simple interface, with all DataArray complexity isolated in the buffer implementation rather than mixed into storage logic. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Simplifies the buffer architecture by removing unnecessary abstraction layers:
1. Remove StorageStrategy ABC
- Only one implementation existed (BufferStorage)
- YAGNI principle - no plans for alternative storage strategies
- BufferInterface[T] protocol already provides needed extensibility
2. Rename BufferStorage → Buffer and merge with old wrapper class
- Old buffer.py wrapper provided minimal value
- Merged get_window() and memory_mb property into Buffer
- get_window() now uses get_view() directly for efficiency
3. Implement DataArrayBuffer using TDD
- Handles DataArray complexity: coords, masks, multi-dimensional data
- Complements VariableBuffer for simpler Variable handling
- 17 comprehensive tests in test_dataarray_buffer.py
- Ports coord/mask handling logic from old GrowingStorage/SlidingWindowStorage
4. Fix history_buffer_service.py
- Remove broken imports (GrowingStorage, SlidingWindowStorage)
- Update _create_buffer_for_key() to use Buffer + DataArrayBuffer API
- Configure overallocation factors based on extractor type
New architecture:
Buffer[T] (generic storage with growth/sliding window)
↓
BufferInterface[T] (protocol)
↓
├─ VariableBuffer (for sc.Variable)
└─ DataArrayBuffer (for sc.DataArray with coords/masks)
All 332 dashboard tests pass.
Original prompt: "Consider @src/ess/livedata/dashboard/buffer_strategy.py and
wider context, do we still need StorageStrategy base? We are not planning
other implementations now."
Follow-up: "Let us merge them, we can still split later? Should HistoryBuffer
be a better name?" → Decided on Buffer as more accurate.
Follow-up: "Use a subagent to implement DataArrayBuffer using TDD."
Follow-up: "In Buffer.get_window, get_view should be used?" → Fixed to use
get_view directly instead of get_all() + slicing.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
Addresses code review feedback to enhance the history buffer implementation: 1. Fix critical bug: get_buffer() → get_all() in extractors - FullHistoryExtractor was calling non-existent method - Would have caused AttributeError at runtime 2. Remove incomplete DeltaExtractor - Was unimplemented (just returned full buffer) - Simplified _create_buffer_for_key logic - Can be added back when needed 3. Add WindowExtractor.window_size property - Exposes window size through public API - Updated history_buffer_service to use property instead of _size - Better encapsulation 4. Cache HistorySubscriber.keys property - Use @cached_property to avoid rebuilding set on every access - Addresses performance TODO - Documented cache behavior in docstring 5. Document magic numbers with class constants - Added DEFAULT_WINDOW_SIZE, DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_SIZE - Makes buffer configuration explicit and changeable - Improves code readability 6. Add concat_dim validation in buffer implementations - Both DataArrayBuffer and VariableBuffer now validate concat_dim exists - Provides clear error message with available dimensions - Catches configuration errors early All 332 dashboard tests pass. Original prompt: "Great, let's do this: Remove DeltaExtractor, leave DataArrayBuffer unchanged - it is probably wrong but we deal with this later, and finally address all your other comments." Previous context: Code review identified critical bug (get_buffer() method doesn't exist), incomplete DeltaExtractor, private attribute access, uncached property causing performance issues, magic numbers, and missing validation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…nterface Refactored HistoryBufferService to decouple from DataService: - Made data_service parameter optional (defaults to None) - Added public add_data(data: dict) method for generic data injection - Renamed internal process_data_service_update() to call public add_data() - DataService subscription is now conditional on initialization Benefits: - Improved testability: tests can inject data directly without mocking DataService - Increased flexibility: service can be used standalone or with DataService - Cleaner interface: single public add_data() method for all data ingestion - Better decoupling: service logic independent of DataService implementation Added comprehensive test suite (13 tests) covering: - Single/multiple key data addition - Window vs full history extraction behavior - Subscriber lifecycle (registration, unregistration) - Key tracking and selective notification - Memory usage monitoring - Lazy buffer initialization - Multi-subscriber independence All 345 dashboard tests pass, confirming backward compatibility. Original request: Do we have tests for HistoryBufferService? Follow-up: Why does it need to be so dependent on DataService? Make it optional.
Streamlined buffer creation logic: - Made WindowExtractor.window_size required (no None default) - Removes DEFAULT_WINDOW_SIZE constant - Cleaner API: users must specify size explicitly or use FullHistoryExtractor - Added concat_dim parameter to HistoryBufferService.__init__ (defaults to "time") - Removes need for data-based dimension detection in _create_buffer_for_key - Fixes issue where length-1 slices might lack concat dimension - Explicit and testable - Removed DEFAULT_INITIAL_CAPACITY constant - Buffer now handles initial allocation intelligently (uses defaults from Buffer class) - Simplifies buffer creation logic Result: - _create_buffer_for_key reduced from ~45 lines to ~20 lines - No data parameter needed anymore - No fallback logic - Straightforward: check extractor type, set max_size accordingly - Service initialization clearer: concat_dim parameter makes behavior explicit - All 345 tests pass without changes Original discussion: _create_buffer_for_key should be much simpler
Removes get_memory_usage() from HistoryBufferService, removes estimate_memory() and memory_mb property from Buffer classes, and removes the estimate_memory() method from BufferInterface protocol. Updates tests to verify functionality without relying on memory tracking. Consider @src/ess/livedata/dashboard/history_buffer_service.py and classes it uses. Remove all the methods and functions related to memory usage. Update tests. Commit. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Removes implicit FullHistoryExtractor defaults. Subscribers now provide exactly the extractors they need via the extractors property. Also removes SimpleSubscriber.keys property override since keys are now always set(self.extractors.keys()). This makes the design clearer: subscribers explicitly declare what they need rather than relying on implicit defaults. That sounds like a horrible design choice. Let us simplify: subclasses only provide extractors, no defaults! 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The test was checking trivial behavior (no notifications on registration without data) that doesn't represent a meaningful design decision. The behavior is either testing notification behavior that's already covered by test_add_data_single_key, or it's checking an implementation detail rather than user-facing behavior. Removing it pending clarity on whether subscribers should receive initial buffered data on registration. 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
Remove .copy() calls from DataArrayBuffer and VariableBuffer get_view() implementations. This establishes an "ephemeral view" contract where data passed to subscribers is a view that shares memory with internal buffers. Benefits: - Eliminates wasteful double-copy in _grow_buffer() - Zero-copy when subscribers immediately use data (e.g., plotting) - Subscribers control when copies are needed (e.g., long-term storage) Contract: Data passed to on_update() is valid only during the callback. Subscribers must use immediately or copy if retaining. Do not modify. Updated documentation in BufferInterface protocol and HistorySubscriber to clearly document the ephemeral view contract and usage requirements. Updated test subscriber to properly copy DataArrays when storing for later assertions, demonstrating correct usage pattern. Original prompt: I am unhappy about the `copy()` call in `get_view` in the buffers used by HistoryBufferService. That defeats the purpose of a cheap view! Please ultrathink about alternatives (I can think of at least 2 or 3). What are the tradeoffs? Is worrying about this now premature? Follow-up: I think we have control over subscriber implementations. Can you implement B so we see how it looks and get some tests going/passing? Commit when done, I'll have a look later. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The buffer implementations (DataArrayBuffer and VariableBuffer) now automatically handle data that doesn't have the concat dimension by treating each item as a single frame to be stacked. This enables: - 0D scalars → 1D timeseries - 1D arrays → 2D stacks - 2D images → 3D stacks (e.g., detector image sequences) Implementation: - Modified allocate() to add concat dimension to buffer if not in template - Modified write_slice() to handle single-frame writes without concat dim - Modified get_size() to return 1 for data without concat dimension - Added comprehensive tests at all levels (VariableBuffer, DataArrayBuffer, HistoryBufferService) This matches the actual usage pattern where individual snapshots arrive without a time dimension and need to be accumulated into a history buffer. Original prompt: "Please consider HistoryBufferService, all the way down to the buffer implementations: I think they currently assume that each new data has a 'time' dimension (at least by default). However, in practice the items do not have the dim at all - the are length 1 and get stacked together along a new dim. Think creating a stack of image frames, etc. Plesae use TDD to address this. This has to work with, e.g., 0D "Scalars" turned into 1D timeseries, or 2D images turned into a sequence of images, etc. Make sure to update also tests in lower level components. Commit when done." 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
Implement a new plotter that supports sliding time window summation: - Accepts 2D data (time + 1 spatial dim) and produces 1D line plots - Accepts 3D data (time + 2 spatial dims) and produces 2D image plots - Provides interactive slider for window length (1s to configurable max) - Sums data over the last N seconds along the time dimension - Validates time dimension presence and handles edge coordinates The implementation includes: - PlotParamsSlidingWindow: Pydantic model with max_window_length, time_dim, and separate scale options for 1D and 2D outputs - SlidingWindowPlotter: Plotter class with kdims for interactive slider - Registry integration supporting both 2D and 3D input data - Comprehensive test suite with 17 tests covering all functionality Original prompt: Please think through how plotters are created and registered - @src/ess/livedata/dashboard/plotting.py is a good starting point. Your task is to create a new 'SlidingWindow' plotter. It will be conceptually similar to the SlicerPlot since it, too, will need a slider to control the current window length. It needs to support 1D and 2D plots with 2D and 3D input data, respectively. The extra dim will be the 'time' dim. The plotter input params need to allow a maximum window length (seconds) - the slider should then have a range between 1s and the max. The plotting method of the plotter will sum over the last N (defined by slider input stream) seconds along the 'time' dim of the data. Make sure to write tests. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
UpdateExtractor and related classes now explicitly work with sc.DataArray instead of being generic over type T. This simplification is warranted because: - In practice, T is always sc.DataArray in production code - TemporalBuffer is hardcoded to sc.DataArray - All extractor operations (indexing, .dims, .coords, etc.) assume scipp - DataService in production always uses DataService[ResultKey, sc.DataArray] Changes: - Remove Generic[T] from UpdateExtractor, LatestValueExtractor, FullHistoryExtractor, and WindowAggregatingExtractor - Change extract() signature to accept sc.DataArray | None - Remove dead code: list handling in LatestValueExtractor (lines 73-75) - Simplify TemporalBufferManager from Generic[K, T] to Generic[K] with T fixed to sc.DataArray - Keep defensive hasattr checks for robustness (test compatibility) Original prompt: "Consider @src/ess/livedata/dashboard/extractors.py - I don't think in practice T can be anything but sc.DataArray. Can we change this to simplify and cleanup? Investigate and think!" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This change simplifies the UpdateExtractor interface and implementations by moving None checks from individual extractors to call sites: 1. Fix DataService.__getitem__ bug: - Now uses get_buffered_data() instead of direct buffer access - Raises KeyError for both missing keys and empty buffers - Fixes type safety (signature now correctly returns V, not V | None) 2. Simplify extractors: - Remove defensive None checks from all extract() methods - Change signature from extract(data: DataArray | None) to extract(data: DataArray) - Remove None handling from LatestValueExtractor, FullHistoryExtractor, and WindowAggregatingExtractor 3. Update _build_subscriber_data: - Check for None before calling extract() - Single check at call site replaces repetitive checks in each extractor Benefits: - DRY: One check instead of N checks across extractors - Clearer contract: Extractors only process valid data - Type safety: Proper dict-like semantics for __getitem__ - Fail fast: Invalid access raises KeyError as expected Also removed unused WindowAggregation enum values (last, max) that were not used in practice. Original prompt: currently @src/ess/livedata/dashboard/extractors.py handle None in their `extract`. Can this be avoided if we do a tiny rewrite if DataService._build_subscriber_data (and potentially others)? Would that be a good change? 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Cache aggregator function directly (sc.sum, sc.nansum, etc.) to avoid
repeated unit checks and dict lookups on every extract() call
- Use scipp free functions (sc.sum, sc.nansum, etc.) instead of lambdas
for cleaner code
- Simplify auto mode to check for 'counts' unit instead of dimensionless
- Update get_required_timespan() to always return float (0.0 for latest,
float('inf') for full history) instead of Optional[float]
Original prompt: Can we use a dict in WindowAggregatingExtractor to speedup
the aggregator lookup? Or maybe it could/should be cashed (unit cannot change
during stream).
Follow-up: I suggest you change to Callable[[sc.DataArray, str], sc.DataArray]
and remove the lambdas?
This fixes two related issues with buffer timespan management: 1. Simplify _update_buffer_requirements to work with the updated get_required_timespan() contract (now returns float, not float | None). This prevents stale timespan requirements from persisting when all extractors with timespan requirements are removed. 2. Fix TemporalBuffer overflow when timespan=0.0 by properly dropping all existing data to make room for new values. Previously, the buffer would fail with "exceeds buffer capacity even after trimming" because trimming did nothing when timespan <= 0. Added test_timespan_zero_trims_all_old_data_on_overflow to verify the fix using TDD approach. Original prompt: Consider potential bug in _update_buffer_requirements: If extractor was removed and only LatestValueExtractor remains then timespans will be empty so an old requirement for a timespan will not be cleared? Can you write a test trying to reproduce this? 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Implements 30 tests covering all extractor classes and the factory function: - LatestValueExtractor: 5 tests for latest value extraction with various data shapes - FullHistoryExtractor: 3 tests for complete buffer history extraction - WindowAggregatingExtractor: 11 tests for windowed aggregation with different aggregation methods (sum, nansum, mean, nanmean, auto), time units, and edge cases - create_extractors_from_params: 8 tests for factory function with various configurations and plotter specs - UpdateExtractor interface: 3 tests verifying abstract interface compliance All tests follow project conventions with NumPy-style docstrings, no private field access, and proper use of scipp data structures. Tests verify observable behavior through public interfaces only. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> --- Original prompt: Please implemented tests for all the extractors in @src/ess/livedata/dashboard/extractors.py
With timing noise (e.g., frames at [0.0001, 1, 2, 3, 4, 5]), a 5-second window would incorrectly include 6 frames instead of 5 due to the inclusive lower bound in label-based slicing. Solution: - Estimate frame period from median interval between frames - Shift cutoff by +0.5 × median_interval to place window boundary between frame slots, avoiding extra frames from timing jitter - Clamp cutoff to latest_time for narrow windows (duration < median_interval) - Continue using inclusive label-based slicing: data[time, cutoff:] This automatically adapts to different frame rates and handles both timing jitter and narrow windows correctly. Add comprehensive tests for timing jitter scenarios: - test_handles_timing_jitter_at_window_start - test_handles_timing_jitter_at_window_end - test_consistent_frame_count_with_perfect_timing Original prompt: Please think about a conceptualy problem in WindowAggregatingExtractor: - Data arrives in regular (but of source noisy) intervals, say once per second. - User requests 5 second sliding window. - Current extraction code will then often return to many frames. Example: Frames at [0.0001, 1,2,3,4,5] => we get 6 frames instead of 5. (I think even with 0.0 it is wrong, since label-based indexing used to extracted windowed_data is inclusive on the left). The problem is that we can't simply reduce to, say, 4 seconds, or 5 frames, since frame rates can vary a lot. Can you think of a more stable approach? 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Track the start time of each accumulation period and add it as a time coordinate to the current result, matching the pattern used in DetectorView. This enables time-aware plotting and analysis of monitor data windows. The time coordinate represents the start_time of the first data batch in each accumulation period (between finalize calls). Original prompt: Please add a `'time'` coord to `current` output of MonitorStreamProcessor. Look at DetectorView to see how to do it. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Modified _allocate_buffer() in VariableBuffer to pass with_variances parameter to sc.empty(), ensuring variances are preserved when the buffer expands. Added regression test to verify the fix works correctly. Original prompt: "I just fixed a bug in VariableBuffer, which would previously raise if data had variances. Can you write a test to demonstrate this fix was necessary and prevent regression?" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…e redundant wrapper - Move create_extractors_from_params() from extractors.py to plot_params.py where parameter types are defined. This eliminates awkward local import of WindowMode and improves cohesion. - Move TestCreateExtractorsFromParams tests to new plot_params_test.py file - Inline PlottingController._create_extractors() as it's a thin wrapper with single call site; the 2-line function call is self-explanatory - Remove unused UpdateExtractor import from plotting_controller.py - Move UpdateExtractor from TYPE_CHECKING to regular import in temporal_buffer_manager.py as it's used at runtime Original request: create_extractors_from_params might belong to a different file - see awkward import handling. Can you find a better place and move it as well as its tests? Follow-up: The PlottingController._create_extractors methods feels redundant, just inline? 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
The cleanup commit (88e7c72) simplified LatestValueExtractor to assume all input data are scipp DataArrays, removing the defensive hasattr check. However, several tests were passing non-DataArray types (plain ints/strings), causing AttributeErrors. Rather than revert the cleanup, fix the tests to respect the extractor's type contract by storing proper scipp DataArrays: - data_service_test.py: Updated 38 tests to use make_test_data() helper to create DataArrays instead of plain integers. Fixed assertions to extract scalar values (e.g., value.value instead of value). - helpers_test.py: Updated 4 integration tests to use make_test_result() helper for storing result data as DataArrays with time coordinates. - temporal_buffer_manager_test.py: Updated test to check for KeyError without regex pattern matching. All 1451 tests now pass. Original request: Please look into the failing tests. Did my latest cleanup commit cause this? 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
…ceSubscriber Replace complex DataSubscriber pattern with simpler DataServiceSubscriber: - Remove unnecessary DataSubscriber subclassing - Eliminate FakeDataAssembler and FakePipe machinery - Directly implement extractors property and trigger method - Reduce boilerplate from ~15-20 lines to ~8-10 lines per test The tests still validate all critical behavior (cascading updates, transactions, circular dependencies) but are now much clearer and match actual production usage patterns where DataServiceSubscriber is used with callbacks rather than DataSubscriber subclasses. All 38 tests pass. --- Original prompt: Please investigate if TestDataServiceUpdatingSubscribers has any value. It may have been written for a mechanism that is no longer in use. Please ultrathink about why it uses the DataSubscriber and assembler mechanism. What is really under test here? Do we have (or can we write) simpler but equivalent tests using a plain DataServiceSubscriber? 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Adds public API for replacing all extractors on a buffer, enabling reconfiguration when subscribers are removed. This allows buffers to optimize from TemporalBuffer to SingleValueBuffer when temporal extractors are no longer needed. Implementation: - Add set_extractors() method that replaces entire extractor list - Extract common buffer reconfiguration logic to _reconfigure_buffer_if_needed() - Refactor add_extractor() to use the helper method Test changes: - Rename test_add_extractor_switches_to_single_value_buffer to test_set_extractors_switches_to_single_value_buffer - Remove private field access (state.extractors.clear()) in test - Use new public set_extractors() API instead This prepares for DataService.unregister_subscriber() implementation, which will need to recalculate extractors when subscribers are removed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Change type hints from list to Sequence for extractor parameters - Add public get_required_timespan() method to BufferProtocol and implementations - Refactor tests to use public API instead of accessing private fields - Fix test data to use proper DataArray instead of scalar Variables - Use sc.identical() for DataArray comparisons instead of == Original task: Fix issues in TemporalBufferManager tests - private field access and type hint complaints about passing extractors (list vs Sequence). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…nd helpers - Add fixtures: single_slice_2element, thick_slice_2x2 for common test data patterns - Add helper functions: make_single_slice(), make_thick_slice(), assert_buffer_has_time_data() - Parametrize test_add_data_creates_time_dimension to cover single and thick slices - Remove private state testing from TestTemporalBuffer: - Delete test_set_required_timespan (was testing _required_timespan) - Delete test_set_max_memory (was testing _max_memory) - Delete test_max_memory_limits_capacity (was testing _data_buffer.max_capacity) - Rename remaining setter tests to reflect behavior-based testing intent - Simplify all buffer creation code using make_single_slice and make_thick_slice helpers - Reduce test file from 730 to 576 lines while maintaining full coverage Tests focus now on observable behavior (data being trimmed, concatenated, etc.) rather than inspecting implementation details. This makes tests more resilient to internal refactoring. 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
ae87648 to
7ab91f5
Compare
WindowAggregatingExtractor.extract() would raise DTypeError when time coordinates were datetime64 instead of float64. The issue occurred because datetime64 arithmetic in scipp requires int64 operands, not float64. The fix converts duration and half_median values to int64 when the time coordinate dtype is datetime64, while preserving float64 for regular numeric time coordinates. Added tests to verify extraction works correctly with datetime64 coordinates in both multi-frame and single-frame scenarios. Original prompt: "We have seen WindowAggregatingExtractor.extract raise because for some reason we got datetime64 latest_time but float64 self._duration. I think datetime64 'time' coord should be supported, but it only supports arithmetic with int64 (not float64). Can you write some test to reproduce the issue before we think about a fix?" Follow-up: "Ok, is it fixed if we convert _duration to int64 iff(!) time coord is datetime64?" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
nvaytet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit I ran out of steam and didn't really look at the tests.
But they look comprehensive enough. If you were happy with the tests, then so am I.
| self._keys = set(self.extractors.keys()) | ||
|
|
||
| @property | ||
| def keys(self) -> set[K]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly unusual that we call subscriber.keys when we are used to subscriber.keys() in Python.
Consider turning this property into a simple function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it makes sense since properties typically indicate that the return something "static". We are different from dict.keys() anyway since we are returning a set, and the subscriber is not dict-like.
|
|
||
| def __init__(self) -> None: | ||
| """Initialize subscriber and cache keys from extractors.""" | ||
| # Cache keys from extractors to avoid repeated computation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the repeated computation that expensive? In other words, is the caching of keys really necessary?
I guess it depends on what 'computation' represents (how much work is actually done).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, say we expect O(100) results coming in per second it adds up, so why not cache?
| extractors = subscriber.extractors | ||
|
|
||
| for key in subscriber.keys: | ||
| extractor = extractors[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to get the extractor if the buffered_data is None? -> move inside the if statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewriting to simple loop over extractors.items(), the None-data case is an exception, no need to optimize for that.
| """ | ||
| self._key_change_subscribers.append(subscriber) | ||
| subscriber(set(self.data.keys()), set()) | ||
| subscriber(set(self._buffer_manager.keys()), set()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using keyword args here would help with readbility (which one is for added and which one for removed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out the last user of this method was removed 2 weeks ago, removing.
|
|
||
| # Trigger immediately with existing data using subscriber's extractors | ||
| existing_data = self._build_subscriber_data(subscriber) | ||
| subscriber.trigger(existing_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does trigger handle empty existing_data?
Further down you have a if that only calls trigger if the data is not empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a potentially open problem:
- Currently we only subscribe if there is data (or else you would not get the widget that lets you select that data).
- The immediate callback is needed for the pipe creation.
- Downstream does not like empty data. The plot setup usual gets messed up a bit (sizing/scaling not working, etc.).
But all that being said, we will eventually want to support saving plot setups (such as a certain grid of plots users want to reuse) or creating plots before their data arrives. That will have to be handled somehow, so parts of the mechanism may need to be revisited. Right now I don't think it will be the same mechanism, since saved plots would be for specific workflows, not workflow runs (jobs). The subscription mechanism is for data of a specific job. In other words, either DataService or an additional mechanism has to watch for arriving job-data matching a given workflow before setting up the actual subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come to think of it, I think we should call further down even with empty data? Or else the plots might have no way of telling that data disappeared (even though we might need a better mechanism for that anyway).
|
|
||
| if self._pipe is None: | ||
| # First trigger - create pipe with correctly extracted data | ||
| self._pipe = self._pipe_factory(assembled_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also pipe.send(...) after the pipe was created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating the pipe (this is hv.streams.Pipe) sends the data passed in this init call. We don't want to send it twice. There is no way to init without data, that is way we had to move it into the subscriber and create on first use.
| if self._aggregator is None: | ||
| raise ValueError(f"Unknown aggregation method: {self._aggregation}") | ||
|
|
||
| return self._aggregator(windowed_data, self._concat_dim) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if the new aggregators are opening up this possibility, or if this would be (is already?) handled in a different way, but I remember talking to Celine and coming to the conclusion that it would be useful if you could 'freeze' (or persist) a curve (on a 1d plot) (like on the superplot).
You could look at a reduced spectrum, think that looks ok, persist the curve, and then fiddle a bit futher with the settings on the live view, so you can easily see if what you changed made things better or worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle one could, but I don't think this mechanism should be used for it as it is, because it would keep the entire backlog between now and then. There is a memory limit, so eventually it would get kicked out. But there are multiple other solutions:
- Put value to keep into
DataServiceunder a new name. - Do it directly in the plot (since it does not need updating any more).
- Add an additional storage type in addition to the
TemporalBufferthat can handle a sparse history for extractors that require specific slices. But this would be inefficient since (a) buffer gets more complex to handle and update and (b) it would re-send the static data with every update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do it directly in the plot (since it does not need updating any more).
Sounds like the best option?
| nansum = 'nansum' | ||
| nanmean = 'nanmean' | ||
| sum = 'sum' | ||
| mean = 'mean' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know if you also want to add median and nanmedian (not sure how useful that would be?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can always add more later. :)
| from .extractors import LatestValueExtractor | ||
|
|
||
| if isinstance(keys, dict): | ||
| # Dict provided: keys are dict keys, extractors are dict values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is misleading, the extractors are a dict, not the values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure what you mean, isn't it (conceptually) dict[Key, Extractor]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I read, I thought there was a bug; because of the comment I thought below we should have extractors = keys.values().
Then I read further that in the else case we make a dict for the extractors.
…ture Removes the unused subscribe_to_changed_keys method from DataService that was last called in CorrelationHistogramController before being removed in commit 0a7493a (Nov 3, 2025). The method and its supporting infrastructure (_key_change_subscribers, _pending_key_additions, _pending_key_removals, and _notify_key_change_subscribers) were never used after that cleanup. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> Original request: DataService.subscribe_to_changed_keys seems unused. Please find out what it was meant for? When did we remove the last user?
Fix two issues with CorrelationHistogramController after the change to use
TemporalBufferManager with extractors:
1. Update _is_timeseries to identify 0D scalars with time coords
- DataService now returns latest values via LatestValueExtractor (0D scalars)
- Old check for dims==('time',) failed for these 0D scalars
- New check: da.ndim == 0 and 'time' in da.coords
2. Use FullHistoryExtractor in add_correlation_processor
- Correlation processors need complete timeseries history, not just latest
- Pass extractors parameter to DataSubscriber with FullHistoryExtractor
- Create proper pipe factory function for DataSubscriber initialization
Add comprehensive tests in correlation_histogram_test.py:
- test_get_timeseries_with_individual_0d_points: Tests realistic scenario with
individual 0D timeseries points
- test_get_timeseries_identifies_buffered_timeseries: Tests with pre-concatenated
1D timeseries data
- test_processor_receives_full_history_from_0d_points: Tests that processors
receive concatenated history (1D) from individual 0D points via
FullHistoryExtractor
- test_processor_receives_full_history_not_latest: Tests processors receive full
history with pre-concatenated data
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
---
Original conversation:
User: On this branch we have changed DataService to not store timeseries directly,
but have a special FullHistoryExtractor use a DataServiceSubscriber if a timeseries
is needed. I believe this breaks CorrelationHistogramController, since its check
for _is_timeseries will (I think) only see the latest value, i.e., the check will
always be false.
Please ultrathink to understand the problem. Then think whether this suggestion
would solve it:
1. Modify _is_timeseries to only check for a time coord, and whether the data is
0D (or 1D with length-1 time coord).
2. Setup the subscriber correctly to use a FullHistoryExtractor.
[After analysis and discussion, we determined the check should be for 0D with time
coord only, and properly configured the subscriber with FullHistoryExtractor]
|
@nvaytet Found a bug (broke correlation histogram mechanism) because we didn't have existing tests... fixed (and added tests) now. See latest commit. |
| } | ||
| controller.add_correlation_processor(processor, items) | ||
|
|
||
| # Processor triggered immediately with existing data (1 point each) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if there was more than one 0D point in the data before accumulation started? Does it receive all of them? The current test would pass if it either gets all points or just the last point.
I can't figure out exactly if this is covered by the test just below (test_processor_receives_full_history_not_latest). If you had multiple 0D points before accumulation begins, would it look like the data created by make_timeseries_data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If some subscriber already requested a history you would get multiple points here, yes. I am not sure right now whether we should try to keep a history even before it was requested. I would be useful, but we'd somehow have to limit it to small data. I'd like to leave that decision for later.
Context
Please see issues linked below.
Key changes
Backend
timecoord. This is required so sliding-window plots can work.timeseriesservice, or workflows using theTimeseriesaccumulator) now only return the latest value. This fixes Timeseries: Only include latest value in message #497 but I kept the changes minimal as it is still unclear how we want to address some of the complications about restoring history (discussed there). This might be complex and I want to avoid making this PR bigger than it already is.Dashboard
DataServicenow usedTemporalBufferManagerto store data. By default it keeps only the latest message, replicating existing behavior.DataServiceSubscriber) lets subscribers specify if they need a time window or full history. The manager then uses a new buffer to store this.windowfield, allowing specifying an aggregation period. Currently this cannot be changed once the plot is created, but this is definitely possible to extend in the future. Fixes User configuration for moving window length in time (or number of pulses) #517 (probably, if we read between the lines).I think I'll also declare that this fixes #459. It is not true from the point of view of a latest-value subscriber. However, by subscribing differently it is possible to get the full log via the temporal buffer.