Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
66b3c81
Add HistoryBufferService for dashboard data buffering
SimonHeybrock Nov 5, 2025
469afe5
Simplify DataService subscriber protocol using duck typing
SimonHeybrock Nov 5, 2025
887eeee
Redo part 1
SimonHeybrock Nov 5, 2025
8839323
Continue redo
SimonHeybrock Nov 5, 2025
df2f54d
Storage
SimonHeybrock Nov 5, 2025
0c81007
More...
SimonHeybrock Nov 5, 2025
ccb2a44
Remove outdated tests
SimonHeybrock Nov 5, 2025
c4c7fb9
Rename
SimonHeybrock Nov 5, 2025
2958752
Fix buffer strategy implementations and add comprehensive tests.
SimonHeybrock Nov 5, 2025
12649c1
Refactor buffer storage with clean interface separation
SimonHeybrock Nov 6, 2025
6bdb94d
Remove StorageStrategy ABC and unify buffer classes
SimonHeybrock Nov 6, 2025
d2d6f11
Improve history buffer service design and robustness
SimonHeybrock Nov 6, 2025
77bab84
Make HistoryBufferService DataService-optional with public add_data i…
SimonHeybrock Nov 6, 2025
04b836f
Simplify HistoryBufferService: remove magic constants and auto-detection
SimonHeybrock Nov 6, 2025
2de28d1
Remove all memory usage tracking functionality
SimonHeybrock Nov 6, 2025
c9166cc
Simplify HistorySubscriber design: remove default extractor behavior
SimonHeybrock Nov 6, 2025
0f555b6
Remove test_no_notification_on_registration
SimonHeybrock Nov 6, 2025
966a109
Replace defensive copy with zero-copy views in buffer get_view()
SimonHeybrock Nov 6, 2025
414960b
Support stacking data without concat dimension in buffers
SimonHeybrock Nov 7, 2025
ae5f2cb
Add SlidingWindow plotter for time-windowed data visualization
SimonHeybrock Nov 7, 2025
e9b8ee0
Add plan
SimonHeybrock Nov 10, 2025
4b156dd
WIP integrate buffer+extractor in DataService
SimonHeybrock Nov 10, 2025
60c0553
Cleanup
SimonHeybrock Nov 10, 2025
3d3edb3
Append to existing buffers in DataService instead of replacing them
SimonHeybrock Nov 10, 2025
0309929
Integrate buffering with extractors into DataService
SimonHeybrock Nov 10, 2025
eef909e
Optimize Buffer for max_size==1 with simple value replacement
SimonHeybrock Nov 10, 2025
22dbbd0
Fix test file names
SimonHeybrock Nov 10, 2025
5d1ca6e
Replace SubscriberProtocol with concrete Subscriber ABC base class
SimonHeybrock Nov 10, 2025
bf371f4
Add extractor specification to plotter registration system
SimonHeybrock Nov 10, 2025
941d70d
Fix LinePlotter receiving 0D data for timeseries plotter
SimonHeybrock Nov 10, 2025
a7602d5
Refactor pipe creation to eliminate extraction hack
SimonHeybrock Nov 11, 2025
a2c4f2b
Update tests to use pipe_factory API in DataSubscriber
SimonHeybrock Nov 11, 2025
706441f
Extract duplicate data extraction logic in DataService into _build_su…
SimonHeybrock Nov 11, 2025
f12a72b
Fix plotters receiving raw data instead of extracted data from pipe
SimonHeybrock Nov 11, 2025
c24cbb9
Cleanup
SimonHeybrock Nov 11, 2025
6de842c
Cleanup
SimonHeybrock Nov 11, 2025
27ebb0f
Extract extractors from data_service into separate module
SimonHeybrock Nov 11, 2025
04ee116
Refactor Buffer into composable storage classes for clarity
SimonHeybrock Nov 11, 2025
06ebf51
Unify Buffer storage creation with _create_storage() helper
SimonHeybrock Nov 11, 2025
e5ab3e6
Move data incompatibility handling from DataService to Buffer storage
SimonHeybrock Nov 11, 2025
5671b5f
Remove unused _drop_concat_coord mechanism from buffer strategy
SimonHeybrock Nov 11, 2025
97d78e2
Add configurable window mode to plotters with aggregation support
SimonHeybrock Nov 11, 2025
fd2da5b
Remove unused SlidingWindowPlotter and PlotParamsSlidingWindow
SimonHeybrock Nov 11, 2025
ceeeb31
Remove implementation doc
SimonHeybrock Nov 11, 2025
53c80ca
Remove auto-generated concat dimension coordinates from DataArrayBuffer
SimonHeybrock Nov 11, 2025
d812bc0
Replace frame-based window sizing with time-based duration
SimonHeybrock Nov 11, 2025
5fe23d1
Use actual time coordinates for duration-based window extraction
SimonHeybrock Nov 11, 2025
ae63507
Simplify get_window_by_duration using scipp label-based indexing
SimonHeybrock Nov 11, 2025
349864a
Use proper scipp label-based indexing in get_window_by_duration
SimonHeybrock Nov 11, 2025
1c5f84e
Add start_time and end_time parameters to Workflow.accumulate()
SimonHeybrock Nov 11, 2025
319ca92
Add time coordinate to ROI current results in DetectorView
SimonHeybrock Nov 11, 2025
159a351
Fix ROI spectrum plot to respect window configuration
SimonHeybrock Nov 11, 2025
433d165
Remove unused WindowExtractor class
SimonHeybrock Nov 11, 2025
0a22e9f
Implement temporal requirement-based buffer management architecture
SimonHeybrock Nov 11, 2025
aead925
Fix BufferManager to resize before append to prevent data loss
SimonHeybrock Nov 11, 2025
404ff83
Refactor BufferManager to own buffers and implement Mapping interface
SimonHeybrock Nov 11, 2025
a071bf9
Simplify BufferManager by removing initial size calculation
SimonHeybrock Nov 11, 2025
cf931fd
Simplify BufferManager growth logic using needs_growth flag
SimonHeybrock Nov 11, 2025
bca67ca
Simplify CompleteHistory requirement semantics
SimonHeybrock Nov 11, 2025
78de8ce
Simplify buffer requirement validation logic
SimonHeybrock Nov 11, 2025
e55dad8
Remove redundant BufferManager methods (get_buffer, has_buffer)
SimonHeybrock Nov 12, 2025
0c8e0c0
Remove redundant comments
SimonHeybrock Nov 12, 2025
a1c9751
Add DataService benchmarks for subscriber notifications
SimonHeybrock Nov 12, 2025
3c68d09
Move Buffer and BufferFactory classes to new buffer.py module
SimonHeybrock Nov 12, 2025
554e0e9
Remove buffer management design doc
SimonHeybrock Nov 12, 2025
5e0f1b2
Refactor buffer extraction to remove leaky abstraction
SimonHeybrock Nov 12, 2025
c42b342
Simplify SingleValueStorage by removing frame extraction logic
SimonHeybrock Nov 12, 2025
561f571
Make UpdateExtractor generic to properly bind type parameter
SimonHeybrock Nov 12, 2025
9985e66
Replace frame-based buffer limits with memory-based limits
SimonHeybrock Nov 12, 2025
43eb90e
Fix stream_manager tests to properly compare scipp DataArrays
SimonHeybrock Nov 12, 2025
3fc5026
Implement temporal buffer system with extractor-based type selection
SimonHeybrock Nov 12, 2025
ffb85a5
Update DataService to use TemporalBufferManager
SimonHeybrock Nov 12, 2025
29ef0a0
Fix DataService tests to use DataArrays with time coordinates
SimonHeybrock Nov 12, 2025
6b345e8
Simplify scipp imports in temporal_buffers.py
SimonHeybrock Nov 12, 2025
d03112a
Refactor TemporalBuffer to use VariableBuffer for efficient appending
SimonHeybrock Nov 12, 2025
823ef18
Implement capacity management for TemporalBuffer
SimonHeybrock Nov 12, 2025
8817dfd
Remove old buffer system classes and tests
SimonHeybrock Nov 12, 2025
9401d6e
Preserve data when switching buffer types in TemporalBufferManager
SimonHeybrock Nov 12, 2025
e873dd8
Refactor TemporalBufferManager to return data directly via Mapping in…
SimonHeybrock Nov 12, 2025
6e48614
Remove unused temporal_requirements.py
SimonHeybrock Nov 12, 2025
dffcbc5
Fix KeyError when selecting plotters in job plotter selection modal
SimonHeybrock Nov 13, 2025
711a6fc
Change TemporalBufferManager to map keys to buffers instead of data
SimonHeybrock Nov 13, 2025
eb51b52
Improve WindowAggregatingExtractor with StrEnum and auto aggregation …
SimonHeybrock Nov 13, 2025
f371697
Refactor create_extractors_from_params to accept WindowParams directly
SimonHeybrock Nov 13, 2025
a7f89d4
Clarify description
SimonHeybrock Nov 13, 2025
5a855cb
Clarify title
SimonHeybrock Nov 13, 2025
3d26023
Clarify class name
SimonHeybrock Nov 13, 2025
1527ddf
Refactor UpdateExtractor interface: remove dead code and clarify cont…
SimonHeybrock Nov 13, 2025
b3ffcf4
Use WindowAggregation enum in WindowAggregatingExtractor for type safety
SimonHeybrock Nov 13, 2025
2ebdb5d
Simplify extractors by removing generic type parameter
SimonHeybrock Nov 13, 2025
9eedaaa
Simplify extractor contract by removing None handling
SimonHeybrock Nov 13, 2025
6380966
Optimize WindowAggregatingExtractor with caching and dict lookup
SimonHeybrock Nov 13, 2025
a04044c
Fix buffer timespan requirement updates and overflow handling
SimonHeybrock Nov 13, 2025
e09796e
Remove redundant default args
SimonHeybrock Nov 13, 2025
913d321
Add comprehensive tests for dashboard extractors
SimonHeybrock Nov 13, 2025
e8f0eab
Fix WindowAggregatingExtractor to handle timing jitter robustly
SimonHeybrock Nov 13, 2025
40f351c
Add time coordinate to MonitorStreamProcessor current output
SimonHeybrock Nov 13, 2025
f4a23f7
Update backend timeseries publish
SimonHeybrock Nov 13, 2025
3aa1329
Fix VariableBuffer to preserve variances on buffer expansion
SimonHeybrock Nov 13, 2025
12aa4e8
Refactor: Move create_extractors_from_params to plot_params and inlin…
SimonHeybrock Nov 13, 2025
0557d3e
Cleanup
SimonHeybrock Nov 13, 2025
f4233a3
Minor cleanup
SimonHeybrock Nov 13, 2025
4ef481a
Fix tests that violated extractor type constraints
SimonHeybrock Nov 13, 2025
6e180a3
Refactor: Simplify TestDataServiceUpdatingSubscribers using DataServi…
SimonHeybrock Nov 13, 2025
dc65e6f
Add set_extractors() to TemporalBufferManager for buffer reconfiguration
SimonHeybrock Nov 13, 2025
7b148c2
Fix TemporalBufferManager test and API issues
SimonHeybrock Nov 13, 2025
7ab91f5
Refactor temporal_buffers_test.py: reduce duplication with fixtures a…
SimonHeybrock Nov 13, 2025
c8165ac
Fix WindowAggregatingExtractor to support datetime64 time coordinates
SimonHeybrock Nov 13, 2025
07e5ce9
Remove unused subscribe_to_changed_keys method and related infrastruc…
SimonHeybrock Nov 14, 2025
8785ca1
Simplify
SimonHeybrock Nov 14, 2025
cb2cd57
Notify even if no data
SimonHeybrock Nov 14, 2025
66589a6
Fix CorrelationHistogramController for buffered DataService
SimonHeybrock Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions src/ess/livedata/dashboard/correlation_histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,25 @@ def add_correlation_processor(
items: dict[ResultKey, sc.DataArray],
) -> None:
"""Add a correlation histogram processor with DataService subscription."""
from .extractors import FullHistoryExtractor

self._processors.append(processor)

# Create subscriber that merges data and sends to processor
# Create subscriber that merges data and sends to processor.
# Use FullHistoryExtractor to get complete timeseries history needed for
# correlation histogram computation.
# TODO We should update the plotter to operate more efficiently by simply
# subscribing to the changes. This will likely require a new extractor type as
# well as changes in the plotter, so we defer this for now.
assembler = MergingStreamAssembler(set(items))
subscriber = DataSubscriber(assembler, processor)
extractors = {key: FullHistoryExtractor() for key in items}

# Create factory that sends initial data to processor and returns it
def processor_pipe_factory(data: dict[ResultKey, sc.DataArray]):
processor.send(data)
return processor

subscriber = DataSubscriber(assembler, processor_pipe_factory, extractors)
self._data_service.register_subscriber(subscriber)

def get_timeseries(self) -> list[ResultKey]:
Expand All @@ -412,7 +426,23 @@ def create_2d_config(self) -> CorrelationHistogramConfigurationAdapter:


def _is_timeseries(da: sc.DataArray) -> bool:
return da.dims == ('time',) and 'time' in da.coords
"""Check if data represents a timeseries.

When DataService uses LatestValueExtractor (default), it returns the latest value
from a timeseries buffer as a 0D scalar with a time coordinate. This function
identifies such values as originating from a timeseries.

Parameters
----------
da:
DataArray to check.

Returns
-------
:
True if the data is a 0D scalar with a time coordinate.
"""
return da.ndim == 0 and 'time' in da.coords


class CorrelationHistogramProcessor:
Expand Down
42 changes: 3 additions & 39 deletions src/ess/livedata/dashboard/data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ def __init__(
self._default_extractor = LatestValueExtractor()
self._subscribers: list[DataServiceSubscriber[K]] = []
self._update_callbacks: list[Callable[[set[K]], None]] = []
self._key_change_subscribers: list[Callable[[set[K], set[K]], None]] = []
self._pending_updates: set[K] = set()
self._pending_key_additions: set[K] = set()
self._pending_key_removals: set[K] = set()
self._transaction_depth = 0

@contextmanager
Expand Down Expand Up @@ -137,14 +134,11 @@ def _build_subscriber_data(
Dictionary mapping keys to extracted data (None values filtered out).
"""
subscriber_data = {}
extractors = subscriber.extractors

for key in subscriber.keys:
extractor = extractors[key]
for key, extractor in subscriber.extractors.items():
buffered_data = self._buffer_manager.get_buffered_data(key)
if buffered_data is not None:
data = extractor.extract(buffered_data)
subscriber_data[key] = data
subscriber_data[key] = extractor.extract(buffered_data)

return subscriber_data

Expand Down Expand Up @@ -186,20 +180,6 @@ def register_update_callback(self, callback: Callable[[set[K]], None]) -> None:
"""
self._update_callbacks.append(callback)

def subscribe_to_changed_keys(
self, subscriber: Callable[[set[K], set[K]], None]
) -> None:
"""
Register a subscriber for key change updates (additions/removals).

Parameters
----------
subscriber:
A callable that accepts two sets: added_keys and removed_keys.
"""
self._key_change_subscribers.append(subscriber)
subscriber(set(self._buffer_manager.keys()), set())

def _notify_subscribers(self, updated_keys: set[K]) -> None:
"""
Notify relevant subscribers about data updates.
Expand All @@ -213,23 +193,12 @@ def _notify_subscribers(self, updated_keys: set[K]) -> None:
for subscriber in self._subscribers:
if updated_keys & subscriber.keys:
subscriber_data = self._build_subscriber_data(subscriber)
if subscriber_data:
subscriber.trigger(subscriber_data)
subscriber.trigger(subscriber_data)

# Notify update callbacks with just key names
for callback in self._update_callbacks:
callback(updated_keys)

def _notify_key_change_subscribers(self) -> None:
"""Notify subscribers about key changes (additions/removals)."""
if not self._pending_key_additions and not self._pending_key_removals:
return

for subscriber in self._key_change_subscribers:
subscriber(
self._pending_key_additions.copy(), self._pending_key_removals.copy()
)

def __getitem__(self, key: K) -> V:
"""Get the latest value for a key."""
buffered_data = self._buffer_manager.get_buffered_data(key)
Expand All @@ -240,7 +209,6 @@ def __getitem__(self, key: K) -> V:
def __setitem__(self, key: K, value: V) -> None:
"""Set a value, storing it in a buffer."""
if key not in self._buffer_manager:
self._pending_key_additions.add(key)
extractors = self._get_extractors(key)
self._buffer_manager.create_buffer(key, extractors)
self._buffer_manager.update_buffer(key, value)
Expand All @@ -249,7 +217,6 @@ def __setitem__(self, key: K, value: V) -> None:

def __delitem__(self, key: K) -> None:
"""Delete a key and its buffer."""
self._pending_key_removals.add(key)
self._buffer_manager.delete_buffer(key)
self._pending_updates.add(key)
self._notify_if_not_in_transaction()
Expand All @@ -273,6 +240,3 @@ def _notify(self) -> None:
pending = set(self._pending_updates)
self._pending_updates.clear()
self._notify_subscribers(pending)
self._notify_key_change_subscribers()
self._pending_key_additions.clear()
self._pending_key_removals.clear()
Loading
Loading