Skip to content

Commit 7c49d6c

Browse files
Merge pull request #533 from scipp/history-buffer-service
Buffer history of results in `DataService`
2 parents e766396 + 015c706 commit 7c49d6c

36 files changed

+4639
-543
lines changed

src/ess/livedata/config/workflows.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ def is_empty(self) -> bool:
7676
def _get_value(self) -> sc.DataArray:
7777
if self._to_nxlog is None:
7878
raise ValueError("No data accumulated")
79-
return self._to_nxlog.get()
79+
# Return latest value. Will be aggregated into a timeseries in frontend (if a
80+
# plot requests it). This accumulator may be fully replaced once it is clear how
81+
# we want to handle obtaining the full history (e.g., after frontend restarts).
82+
return self._to_nxlog.get()[-1]
8083

8184
def _do_push(self, value: sc.DataArray) -> None:
8285
if self._to_nxlog is None:

src/ess/livedata/core/job.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,11 @@ def add(self, data: JobData) -> JobReply:
184184
remapped_aux_data[field_name] = value
185185

186186
# Pass data to workflow with field names (not stream names)
187-
self._processor.accumulate({**data.primary_data, **remapped_aux_data})
187+
self._processor.accumulate(
188+
{**data.primary_data, **remapped_aux_data},
189+
start_time=data.start_time,
190+
end_time=data.end_time,
191+
)
188192
if data.is_active():
189193
if self._start_time is None:
190194
self._start_time = data.start_time

src/ess/livedata/dashboard/correlation_histogram.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,25 @@ def add_correlation_processor(
392392
items: dict[ResultKey, sc.DataArray],
393393
) -> None:
394394
"""Add a correlation histogram processor with DataService subscription."""
395+
from .extractors import FullHistoryExtractor
396+
395397
self._processors.append(processor)
396398

397-
# Create subscriber that merges data and sends to processor
399+
# Create subscriber that merges data and sends to processor.
400+
# Use FullHistoryExtractor to get complete timeseries history needed for
401+
# correlation histogram computation.
402+
# TODO We should update the plotter to operate more efficiently by simply
403+
# subscribing to the changes. This will likely require a new extractor type as
404+
# well as changes in the plotter, so we defer this for now.
398405
assembler = MergingStreamAssembler(set(items))
399-
subscriber = DataSubscriber(assembler, processor)
406+
extractors = {key: FullHistoryExtractor() for key in items}
407+
408+
# Create factory that sends initial data to processor and returns it
409+
def processor_pipe_factory(data: dict[ResultKey, sc.DataArray]):
410+
processor.send(data)
411+
return processor
412+
413+
subscriber = DataSubscriber(assembler, processor_pipe_factory, extractors)
400414
self._data_service.register_subscriber(subscriber)
401415

402416
def get_timeseries(self) -> list[ResultKey]:
@@ -412,7 +426,23 @@ def create_2d_config(self) -> CorrelationHistogramConfigurationAdapter:
412426

413427

414428
def _is_timeseries(da: sc.DataArray) -> bool:
415-
return da.dims == ('time',) and 'time' in da.coords
429+
"""Check if data represents a timeseries.
430+
431+
When DataService uses LatestValueExtractor (default), it returns the latest value
432+
from a timeseries buffer as a 0D scalar with a time coordinate. This function
433+
identifies such values as originating from a timeseries.
434+
435+
Parameters
436+
----------
437+
da:
438+
DataArray to check.
439+
440+
Returns
441+
-------
442+
:
443+
True if the data is a 0D scalar with a time coordinate.
444+
"""
445+
return da.ndim == 0 and 'time' in da.coords
416446

417447

418448
class CorrelationHistogramProcessor:

src/ess/livedata/dashboard/data_service.py

Lines changed: 155 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,75 @@
22
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
33
from __future__ import annotations
44

5-
from collections import UserDict
6-
from collections.abc import Callable, Hashable
5+
from abc import ABC, abstractmethod
6+
from collections.abc import Callable, Hashable, Iterator, Mapping, MutableMapping
77
from contextlib import contextmanager
8-
from typing import TypeVar
8+
from typing import Any, Generic, TypeVar
99

10-
from .data_subscriber import DataSubscriber
10+
from .extractors import LatestValueExtractor, UpdateExtractor
11+
from .temporal_buffer_manager import TemporalBufferManager
1112

1213
K = TypeVar('K', bound=Hashable)
1314
V = TypeVar('V')
1415

1516

16-
class DataService(UserDict[K, V]):
17+
class DataServiceSubscriber(ABC, Generic[K]):
18+
"""Base class for data service subscribers with cached keys and extractors."""
19+
20+
def __init__(self) -> None:
21+
"""Initialize subscriber and cache keys from extractors."""
22+
# Cache keys from extractors to avoid repeated computation
23+
self._keys = set(self.extractors.keys())
24+
25+
@property
26+
def keys(self) -> set[K]:
27+
"""Return the set of data keys this subscriber depends on."""
28+
return self._keys
29+
30+
@property
31+
@abstractmethod
32+
def extractors(self) -> Mapping[K, UpdateExtractor]:
33+
"""
34+
Return extractors for obtaining data views.
35+
36+
Returns a mapping from key to the extractor to use for that key.
37+
"""
38+
39+
@abstractmethod
40+
def trigger(self, store: dict[K, Any]) -> None:
41+
"""Trigger the subscriber with updated data."""
42+
43+
44+
class DataService(MutableMapping[K, V]):
1745
"""
1846
A service for managing and retrieving data and derived data.
1947
2048
New data is set from upstream Kafka topics. Subscribers are typically plots that
2149
provide a live view of the data.
50+
51+
Uses buffers internally for storage, but presents a dict-like interface
52+
that returns the latest value for each key.
2253
"""
2354

24-
def __init__(self) -> None:
25-
super().__init__()
26-
self._subscribers: list[DataSubscriber[K]] = []
27-
self._key_change_subscribers: list[Callable[[set[K], set[K]], None]] = []
55+
def __init__(
56+
self,
57+
buffer_manager: TemporalBufferManager | None = None,
58+
) -> None:
59+
"""
60+
Initialize DataService.
61+
62+
Parameters
63+
----------
64+
buffer_manager:
65+
Manager for buffer sizing. If None, creates a new TemporalBufferManager.
66+
"""
67+
if buffer_manager is None:
68+
buffer_manager = TemporalBufferManager()
69+
self._buffer_manager = buffer_manager
70+
self._default_extractor = LatestValueExtractor()
71+
self._subscribers: list[DataServiceSubscriber[K]] = []
72+
self._update_callbacks: list[Callable[[set[K]], None]] = []
2873
self._pending_updates: set[K] = set()
29-
self._pending_key_additions: set[K] = set()
30-
self._pending_key_removals: set[K] = set()
3174
self._transaction_depth = 0
3275

3376
@contextmanager
@@ -48,30 +91,94 @@ def transaction(self):
4891
def _in_transaction(self) -> bool:
4992
return self._transaction_depth > 0
5093

51-
def register_subscriber(self, subscriber: DataSubscriber[K]) -> None:
94+
def _get_extractors(self, key: K) -> list[UpdateExtractor]:
5295
"""
53-
Register a subscriber for updates.
96+
Collect extractors for a key from all subscribers.
97+
98+
Examines all subscribers that need this key.
99+
100+
Parameters
101+
----------
102+
key:
103+
The key to collect extractors for.
104+
105+
Returns
106+
-------
107+
:
108+
List of extractors from all subscribers for this key.
109+
"""
110+
extractors = []
111+
112+
for subscriber in self._subscribers:
113+
subscriber_extractors = subscriber.extractors
114+
if key in subscriber_extractors:
115+
extractor = subscriber_extractors[key]
116+
extractors.append(extractor)
117+
118+
return extractors
119+
120+
def _build_subscriber_data(
121+
self, subscriber: DataServiceSubscriber[K]
122+
) -> dict[K, Any]:
123+
"""
124+
Extract data for a subscriber based on its extractors.
54125
55126
Parameters
56127
----------
57128
subscriber:
58-
The subscriber to register. Must implement the DataSubscriber interface.
129+
The subscriber to extract data for.
130+
131+
Returns
132+
-------
133+
:
134+
Dictionary mapping keys to extracted data (None values filtered out).
59135
"""
60-
self._subscribers.append(subscriber)
136+
subscriber_data = {}
61137

62-
def subscribe_to_changed_keys(
63-
self, subscriber: Callable[[set[K], set[K]], None]
64-
) -> None:
138+
for key, extractor in subscriber.extractors.items():
139+
buffered_data = self._buffer_manager.get_buffered_data(key)
140+
if buffered_data is not None:
141+
subscriber_data[key] = extractor.extract(buffered_data)
142+
143+
return subscriber_data
144+
145+
def register_subscriber(self, subscriber: DataServiceSubscriber[K]) -> None:
65146
"""
66-
Register a subscriber for key change updates (additions/removals).
147+
Register a subscriber for updates with extractor-based data access.
148+
149+
Triggers the subscriber immediately with existing data using its extractors.
67150
68151
Parameters
69152
----------
70153
subscriber:
71-
A callable that accepts two sets: added_keys and removed_keys.
154+
The subscriber to register.
72155
"""
73-
self._key_change_subscribers.append(subscriber)
74-
subscriber(set(self.data.keys()), set())
156+
self._subscribers.append(subscriber)
157+
158+
# Add extractors for keys this subscriber needs
159+
for key in subscriber.keys:
160+
if key in self._buffer_manager:
161+
extractor = subscriber.extractors[key]
162+
self._buffer_manager.add_extractor(key, extractor)
163+
164+
# Trigger immediately with existing data using subscriber's extractors
165+
existing_data = self._build_subscriber_data(subscriber)
166+
subscriber.trigger(existing_data)
167+
168+
def register_update_callback(self, callback: Callable[[set[K]], None]) -> None:
169+
"""
170+
Register a callback for key update notifications.
171+
172+
Callback receives only the set of updated key names, not the data.
173+
Use this for infrastructure that needs to know what changed but will
174+
query data itself.
175+
176+
Parameters
177+
----------
178+
callback:
179+
Callable that accepts a set of updated keys.
180+
"""
181+
self._update_callbacks.append(callback)
75182

76183
def _notify_subscribers(self, updated_keys: set[K]) -> None:
77184
"""
@@ -82,40 +189,46 @@ def _notify_subscribers(self, updated_keys: set[K]) -> None:
82189
updated_keys
83190
The set of data keys that were updated.
84191
"""
192+
# Notify extractor-based subscribers
85193
for subscriber in self._subscribers:
86-
if not isinstance(subscriber, DataSubscriber):
87-
subscriber(updated_keys)
88-
continue
89194
if updated_keys & subscriber.keys:
90-
# Pass only the data that the subscriber is interested in
91-
subscriber_data = {
92-
key: self.data[key] for key in subscriber.keys if key in self.data
93-
}
195+
subscriber_data = self._build_subscriber_data(subscriber)
94196
subscriber.trigger(subscriber_data)
95197

96-
def _notify_key_change_subscribers(self) -> None:
97-
"""Notify subscribers about key changes (additions/removals)."""
98-
if not self._pending_key_additions and not self._pending_key_removals:
99-
return
198+
# Notify update callbacks with just key names
199+
for callback in self._update_callbacks:
200+
callback(updated_keys)
100201

101-
for subscriber in self._key_change_subscribers:
102-
subscriber(
103-
self._pending_key_additions.copy(), self._pending_key_removals.copy()
104-
)
202+
def __getitem__(self, key: K) -> V:
203+
"""Get the latest value for a key."""
204+
buffered_data = self._buffer_manager.get_buffered_data(key)
205+
if buffered_data is None:
206+
raise KeyError(key)
207+
return self._default_extractor.extract(buffered_data)
105208

106209
def __setitem__(self, key: K, value: V) -> None:
107-
if key not in self.data:
108-
self._pending_key_additions.add(key)
109-
super().__setitem__(key, value)
210+
"""Set a value, storing it in a buffer."""
211+
if key not in self._buffer_manager:
212+
extractors = self._get_extractors(key)
213+
self._buffer_manager.create_buffer(key, extractors)
214+
self._buffer_manager.update_buffer(key, value)
110215
self._pending_updates.add(key)
111216
self._notify_if_not_in_transaction()
112217

113218
def __delitem__(self, key: K) -> None:
114-
self._pending_key_removals.add(key)
115-
super().__delitem__(key)
219+
"""Delete a key and its buffer."""
220+
self._buffer_manager.delete_buffer(key)
116221
self._pending_updates.add(key)
117222
self._notify_if_not_in_transaction()
118223

224+
def __iter__(self) -> Iterator[K]:
225+
"""Iterate over keys."""
226+
return iter(self._buffer_manager)
227+
228+
def __len__(self) -> int:
229+
"""Return the number of keys."""
230+
return len(self._buffer_manager)
231+
119232
def _notify_if_not_in_transaction(self) -> None:
120233
"""Notify subscribers if not in a transaction."""
121234
if not self._in_transaction:
@@ -127,6 +240,3 @@ def _notify(self) -> None:
127240
pending = set(self._pending_updates)
128241
self._pending_updates.clear()
129242
self._notify_subscribers(pending)
130-
self._notify_key_change_subscribers()
131-
self._pending_key_additions.clear()
132-
self._pending_key_removals.clear()

0 commit comments

Comments
 (0)