Skip to content
Open
Show file tree
Hide file tree
Changes from 108 commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
66b3c81
Add HistoryBufferService for dashboard data buffering
SimonHeybrock Nov 5, 2025
469afe5
Simplify DataService subscriber protocol using duck typing
SimonHeybrock Nov 5, 2025
887eeee
Redo part 1
SimonHeybrock Nov 5, 2025
8839323
Continue redo
SimonHeybrock Nov 5, 2025
df2f54d
Storage
SimonHeybrock Nov 5, 2025
0c81007
More...
SimonHeybrock Nov 5, 2025
ccb2a44
Remove outdated tests
SimonHeybrock Nov 5, 2025
c4c7fb9
Rename
SimonHeybrock Nov 5, 2025
2958752
Fix buffer strategy implementations and add comprehensive tests.
SimonHeybrock Nov 5, 2025
12649c1
Refactor buffer storage with clean interface separation
SimonHeybrock Nov 6, 2025
6bdb94d
Remove StorageStrategy ABC and unify buffer classes
SimonHeybrock Nov 6, 2025
d2d6f11
Improve history buffer service design and robustness
SimonHeybrock Nov 6, 2025
77bab84
Make HistoryBufferService DataService-optional with public add_data i…
SimonHeybrock Nov 6, 2025
04b836f
Simplify HistoryBufferService: remove magic constants and auto-detection
SimonHeybrock Nov 6, 2025
2de28d1
Remove all memory usage tracking functionality
SimonHeybrock Nov 6, 2025
c9166cc
Simplify HistorySubscriber design: remove default extractor behavior
SimonHeybrock Nov 6, 2025
0f555b6
Remove test_no_notification_on_registration
SimonHeybrock Nov 6, 2025
966a109
Replace defensive copy with zero-copy views in buffer get_view()
SimonHeybrock Nov 6, 2025
414960b
Support stacking data without concat dimension in buffers
SimonHeybrock Nov 7, 2025
ae5f2cb
Add SlidingWindow plotter for time-windowed data visualization
SimonHeybrock Nov 7, 2025
e9b8ee0
Add plan
SimonHeybrock Nov 10, 2025
4b156dd
WIP integrate buffer+extractor in DataService
SimonHeybrock Nov 10, 2025
60c0553
Cleanup
SimonHeybrock Nov 10, 2025
3d3edb3
Append to existing buffers in DataService instead of replacing them
SimonHeybrock Nov 10, 2025
0309929
Integrate buffering with extractors into DataService
SimonHeybrock Nov 10, 2025
eef909e
Optimize Buffer for max_size==1 with simple value replacement
SimonHeybrock Nov 10, 2025
22dbbd0
Fix test file names
SimonHeybrock Nov 10, 2025
5d1ca6e
Replace SubscriberProtocol with concrete Subscriber ABC base class
SimonHeybrock Nov 10, 2025
bf371f4
Add extractor specification to plotter registration system
SimonHeybrock Nov 10, 2025
941d70d
Fix LinePlotter receiving 0D data for timeseries plotter
SimonHeybrock Nov 10, 2025
a7602d5
Refactor pipe creation to eliminate extraction hack
SimonHeybrock Nov 11, 2025
a2c4f2b
Update tests to use pipe_factory API in DataSubscriber
SimonHeybrock Nov 11, 2025
706441f
Extract duplicate data extraction logic in DataService into _build_su…
SimonHeybrock Nov 11, 2025
f12a72b
Fix plotters receiving raw data instead of extracted data from pipe
SimonHeybrock Nov 11, 2025
c24cbb9
Cleanup
SimonHeybrock Nov 11, 2025
6de842c
Cleanup
SimonHeybrock Nov 11, 2025
27ebb0f
Extract extractors from data_service into separate module
SimonHeybrock Nov 11, 2025
04ee116
Refactor Buffer into composable storage classes for clarity
SimonHeybrock Nov 11, 2025
06ebf51
Unify Buffer storage creation with _create_storage() helper
SimonHeybrock Nov 11, 2025
e5ab3e6
Move data incompatibility handling from DataService to Buffer storage
SimonHeybrock Nov 11, 2025
5671b5f
Remove unused _drop_concat_coord mechanism from buffer strategy
SimonHeybrock Nov 11, 2025
97d78e2
Add configurable window mode to plotters with aggregation support
SimonHeybrock Nov 11, 2025
fd2da5b
Remove unused SlidingWindowPlotter and PlotParamsSlidingWindow
SimonHeybrock Nov 11, 2025
ceeeb31
Remove implementation doc
SimonHeybrock Nov 11, 2025
53c80ca
Remove auto-generated concat dimension coordinates from DataArrayBuffer
SimonHeybrock Nov 11, 2025
d812bc0
Replace frame-based window sizing with time-based duration
SimonHeybrock Nov 11, 2025
5fe23d1
Use actual time coordinates for duration-based window extraction
SimonHeybrock Nov 11, 2025
ae63507
Simplify get_window_by_duration using scipp label-based indexing
SimonHeybrock Nov 11, 2025
349864a
Use proper scipp label-based indexing in get_window_by_duration
SimonHeybrock Nov 11, 2025
1c5f84e
Add start_time and end_time parameters to Workflow.accumulate()
SimonHeybrock Nov 11, 2025
319ca92
Add time coordinate to ROI current results in DetectorView
SimonHeybrock Nov 11, 2025
159a351
Fix ROI spectrum plot to respect window configuration
SimonHeybrock Nov 11, 2025
433d165
Remove unused WindowExtractor class
SimonHeybrock Nov 11, 2025
0a22e9f
Implement temporal requirement-based buffer management architecture
SimonHeybrock Nov 11, 2025
aead925
Fix BufferManager to resize before append to prevent data loss
SimonHeybrock Nov 11, 2025
404ff83
Refactor BufferManager to own buffers and implement Mapping interface
SimonHeybrock Nov 11, 2025
a071bf9
Simplify BufferManager by removing initial size calculation
SimonHeybrock Nov 11, 2025
cf931fd
Simplify BufferManager growth logic using needs_growth flag
SimonHeybrock Nov 11, 2025
bca67ca
Simplify CompleteHistory requirement semantics
SimonHeybrock Nov 11, 2025
78de8ce
Simplify buffer requirement validation logic
SimonHeybrock Nov 11, 2025
e55dad8
Remove redundant BufferManager methods (get_buffer, has_buffer)
SimonHeybrock Nov 12, 2025
0c8e0c0
Remove redundant comments
SimonHeybrock Nov 12, 2025
a1c9751
Add DataService benchmarks for subscriber notifications
SimonHeybrock Nov 12, 2025
3c68d09
Move Buffer and BufferFactory classes to new buffer.py module
SimonHeybrock Nov 12, 2025
554e0e9
Remove buffer management design doc
SimonHeybrock Nov 12, 2025
5e0f1b2
Refactor buffer extraction to remove leaky abstraction
SimonHeybrock Nov 12, 2025
c42b342
Simplify SingleValueStorage by removing frame extraction logic
SimonHeybrock Nov 12, 2025
561f571
Make UpdateExtractor generic to properly bind type parameter
SimonHeybrock Nov 12, 2025
9985e66
Replace frame-based buffer limits with memory-based limits
SimonHeybrock Nov 12, 2025
43eb90e
Fix stream_manager tests to properly compare scipp DataArrays
SimonHeybrock Nov 12, 2025
3fc5026
Implement temporal buffer system with extractor-based type selection
SimonHeybrock Nov 12, 2025
ffb85a5
Update DataService to use TemporalBufferManager
SimonHeybrock Nov 12, 2025
29ef0a0
Fix DataService tests to use DataArrays with time coordinates
SimonHeybrock Nov 12, 2025
6b345e8
Simplify scipp imports in temporal_buffers.py
SimonHeybrock Nov 12, 2025
d03112a
Refactor TemporalBuffer to use VariableBuffer for efficient appending
SimonHeybrock Nov 12, 2025
823ef18
Implement capacity management for TemporalBuffer
SimonHeybrock Nov 12, 2025
8817dfd
Remove old buffer system classes and tests
SimonHeybrock Nov 12, 2025
9401d6e
Preserve data when switching buffer types in TemporalBufferManager
SimonHeybrock Nov 12, 2025
e873dd8
Refactor TemporalBufferManager to return data directly via Mapping in…
SimonHeybrock Nov 12, 2025
6e48614
Remove unused temporal_requirements.py
SimonHeybrock Nov 12, 2025
dffcbc5
Fix KeyError when selecting plotters in job plotter selection modal
SimonHeybrock Nov 13, 2025
711a6fc
Change TemporalBufferManager to map keys to buffers instead of data
SimonHeybrock Nov 13, 2025
eb51b52
Improve WindowAggregatingExtractor with StrEnum and auto aggregation …
SimonHeybrock Nov 13, 2025
f371697
Refactor create_extractors_from_params to accept WindowParams directly
SimonHeybrock Nov 13, 2025
a7f89d4
Clarify description
SimonHeybrock Nov 13, 2025
5a855cb
Clarify title
SimonHeybrock Nov 13, 2025
3d26023
Clarify class name
SimonHeybrock Nov 13, 2025
1527ddf
Refactor UpdateExtractor interface: remove dead code and clarify cont…
SimonHeybrock Nov 13, 2025
b3ffcf4
Use WindowAggregation enum in WindowAggregatingExtractor for type safety
SimonHeybrock Nov 13, 2025
2ebdb5d
Simplify extractors by removing generic type parameter
SimonHeybrock Nov 13, 2025
9eedaaa
Simplify extractor contract by removing None handling
SimonHeybrock Nov 13, 2025
6380966
Optimize WindowAggregatingExtractor with caching and dict lookup
SimonHeybrock Nov 13, 2025
a04044c
Fix buffer timespan requirement updates and overflow handling
SimonHeybrock Nov 13, 2025
e09796e
Remove redundant default args
SimonHeybrock Nov 13, 2025
913d321
Add comprehensive tests for dashboard extractors
SimonHeybrock Nov 13, 2025
e8f0eab
Fix WindowAggregatingExtractor to handle timing jitter robustly
SimonHeybrock Nov 13, 2025
40f351c
Add time coordinate to MonitorStreamProcessor current output
SimonHeybrock Nov 13, 2025
f4a23f7
Update backend timeseries publish
SimonHeybrock Nov 13, 2025
3aa1329
Fix VariableBuffer to preserve variances on buffer expansion
SimonHeybrock Nov 13, 2025
12aa4e8
Refactor: Move create_extractors_from_params to plot_params and inlin…
SimonHeybrock Nov 13, 2025
0557d3e
Cleanup
SimonHeybrock Nov 13, 2025
f4233a3
Minor cleanup
SimonHeybrock Nov 13, 2025
4ef481a
Fix tests that violated extractor type constraints
SimonHeybrock Nov 13, 2025
6e180a3
Refactor: Simplify TestDataServiceUpdatingSubscribers using DataServi…
SimonHeybrock Nov 13, 2025
dc65e6f
Add set_extractors() to TemporalBufferManager for buffer reconfiguration
SimonHeybrock Nov 13, 2025
7b148c2
Fix TemporalBufferManager test and API issues
SimonHeybrock Nov 13, 2025
7ab91f5
Refactor temporal_buffers_test.py: reduce duplication with fixtures a…
SimonHeybrock Nov 13, 2025
c8165ac
Fix WindowAggregatingExtractor to support datetime64 time coordinates
SimonHeybrock Nov 13, 2025
07e5ce9
Remove unused subscribe_to_changed_keys method and related infrastruc…
SimonHeybrock Nov 14, 2025
8785ca1
Simplify
SimonHeybrock Nov 14, 2025
cb2cd57
Notify even if no data
SimonHeybrock Nov 14, 2025
66589a6
Fix CorrelationHistogramController for buffered DataService
SimonHeybrock Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/ess/livedata/config/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ def is_empty(self) -> bool:
def _get_value(self) -> sc.DataArray:
if self._to_nxlog is None:
raise ValueError("No data accumulated")
return self._to_nxlog.get()
# Return latest value. Will be aggregated into a timeseries in frontend (if a
# plot requests it). This accumulator may be fully replaced once it is clear how
# we want to handle obtaining the full history (e.g., after frontend restarts).
return self._to_nxlog.get()[-1]

def _do_push(self, value: sc.DataArray) -> None:
if self._to_nxlog is None:
Expand Down
6 changes: 5 additions & 1 deletion src/ess/livedata/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ def add(self, data: JobData) -> JobReply:
remapped_aux_data[field_name] = value

# Pass data to workflow with field names (not stream names)
self._processor.accumulate({**data.primary_data, **remapped_aux_data})
self._processor.accumulate(
{**data.primary_data, **remapped_aux_data},
start_time=data.start_time,
end_time=data.end_time,
)
if data.is_active():
if self._start_time is None:
self._start_time = data.start_time
Expand Down
192 changes: 169 additions & 23 deletions src/ess/livedata/dashboard/data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,74 @@
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
from __future__ import annotations

from collections import UserDict
from collections.abc import Callable, Hashable
from abc import ABC, abstractmethod
from collections.abc import Callable, Hashable, Iterator, Mapping, MutableMapping
from contextlib import contextmanager
from typing import TypeVar
from typing import Any, Generic, TypeVar

from .data_subscriber import DataSubscriber
from .extractors import LatestValueExtractor, UpdateExtractor
from .temporal_buffer_manager import TemporalBufferManager

K = TypeVar('K', bound=Hashable)
V = TypeVar('V')


class DataService(UserDict[K, V]):
class DataServiceSubscriber(ABC, Generic[K]):
"""Base class for data service subscribers with cached keys and extractors."""

def __init__(self) -> None:
"""Initialize subscriber and cache keys from extractors."""
# Cache keys from extractors to avoid repeated computation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the repeated computation that expensive? In other words, is the caching of keys really necessary?
I guess it depends on what 'computation' represents (how much work is actually done).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, say we expect O(100) results coming in per second it adds up, so why not cache?

self._keys = set(self.extractors.keys())

@property
def keys(self) -> set[K]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly unusual that we call subscriber.keys when we are used to subscriber.keys() in Python.
Consider turning this property into a simple function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it makes sense since properties typically indicate that the return something "static". We are different from dict.keys() anyway since we are returning a set, and the subscriber is not dict-like.

"""Return the set of data keys this subscriber depends on."""
return self._keys

@property
@abstractmethod
def extractors(self) -> Mapping[K, UpdateExtractor]:
"""
Return extractors for obtaining data views.

Returns a mapping from key to the extractor to use for that key.
"""

@abstractmethod
def trigger(self, store: dict[K, Any]) -> None:
"""Trigger the subscriber with updated data."""


class DataService(MutableMapping[K, V]):
"""
A service for managing and retrieving data and derived data.

New data is set from upstream Kafka topics. Subscribers are typically plots that
provide a live view of the data.

Uses buffers internally for storage, but presents a dict-like interface
that returns the latest value for each key.
"""

def __init__(self) -> None:
super().__init__()
self._subscribers: list[DataSubscriber[K]] = []
def __init__(
self,
buffer_manager: TemporalBufferManager | None = None,
) -> None:
"""
Initialize DataService.

Parameters
----------
buffer_manager:
Manager for buffer sizing. If None, creates a new TemporalBufferManager.
"""
if buffer_manager is None:
buffer_manager = TemporalBufferManager()
self._buffer_manager = buffer_manager
self._default_extractor = LatestValueExtractor()
self._subscribers: list[DataServiceSubscriber[K]] = []
self._update_callbacks: list[Callable[[set[K]], None]] = []
self._key_change_subscribers: list[Callable[[set[K], set[K]], None]] = []
self._pending_updates: set[K] = set()
self._pending_key_additions: set[K] = set()
Expand All @@ -48,17 +94,98 @@ def transaction(self):
def _in_transaction(self) -> bool:
return self._transaction_depth > 0

def register_subscriber(self, subscriber: DataSubscriber[K]) -> None:
def _get_extractors(self, key: K) -> list[UpdateExtractor]:
"""
Register a subscriber for updates.
Collect extractors for a key from all subscribers.

Examines all subscribers that need this key.

Parameters
----------
key:
The key to collect extractors for.

Returns
-------
:
List of extractors from all subscribers for this key.
"""
extractors = []

for subscriber in self._subscribers:
subscriber_extractors = subscriber.extractors
if key in subscriber_extractors:
extractor = subscriber_extractors[key]
extractors.append(extractor)

return extractors

def _build_subscriber_data(
self, subscriber: DataServiceSubscriber[K]
) -> dict[K, Any]:
"""
Extract data for a subscriber based on its extractors.

Parameters
----------
subscriber:
The subscriber to extract data for.

Returns
-------
:
Dictionary mapping keys to extracted data (None values filtered out).
"""
subscriber_data = {}
extractors = subscriber.extractors

for key in subscriber.keys:
extractor = extractors[key]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to get the extractor if the buffered_data is None? -> move inside the if statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewriting to simple loop over extractors.items(), the None-data case is an exception, no need to optimize for that.

buffered_data = self._buffer_manager.get_buffered_data(key)
if buffered_data is not None:
data = extractor.extract(buffered_data)
subscriber_data[key] = data

return subscriber_data

def register_subscriber(self, subscriber: DataServiceSubscriber[K]) -> None:
"""
Register a subscriber for updates with extractor-based data access.

Triggers the subscriber immediately with existing data using its extractors.

Parameters
----------
subscriber:
The subscriber to register. Must implement the DataSubscriber interface.
The subscriber to register.
"""
self._subscribers.append(subscriber)

# Add extractors for keys this subscriber needs
for key in subscriber.keys:
if key in self._buffer_manager:
extractor = subscriber.extractors[key]
self._buffer_manager.add_extractor(key, extractor)

# Trigger immediately with existing data using subscriber's extractors
existing_data = self._build_subscriber_data(subscriber)
subscriber.trigger(existing_data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does trigger handle empty existing_data?
Further down you have a if that only calls trigger if the data is not empty.

Copy link
Member Author

@SimonHeybrock SimonHeybrock Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a potentially open problem:

  • Currently we only subscribe if there is data (or else you would not get the widget that lets you select that data).
  • The immediate callback is needed for the pipe creation.
  • Downstream does not like empty data. The plot setup usual gets messed up a bit (sizing/scaling not working, etc.).

But all that being said, we will eventually want to support saving plot setups (such as a certain grid of plots users want to reuse) or creating plots before their data arrives. That will have to be handled somehow, so parts of the mechanism may need to be revisited. Right now I don't think it will be the same mechanism, since saved plots would be for specific workflows, not workflow runs (jobs). The subscription mechanism is for data of a specific job. In other words, either DataService or an additional mechanism has to watch for arriving job-data matching a given workflow before setting up the actual subscription.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, I think we should call further down even with empty data? Or else the plots might have no way of telling that data disappeared (even though we might need a better mechanism for that anyway).


def register_update_callback(self, callback: Callable[[set[K]], None]) -> None:
"""
Register a callback for key update notifications.

Callback receives only the set of updated key names, not the data.
Use this for infrastructure that needs to know what changed but will
query data itself.

Parameters
----------
callback:
Callable that accepts a set of updated keys.
"""
self._update_callbacks.append(callback)

def subscribe_to_changed_keys(
self, subscriber: Callable[[set[K], set[K]], None]
) -> None:
Expand All @@ -71,7 +198,7 @@ def subscribe_to_changed_keys(
A callable that accepts two sets: added_keys and removed_keys.
"""
self._key_change_subscribers.append(subscriber)
subscriber(set(self.data.keys()), set())
subscriber(set(self._buffer_manager.keys()), set())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using keyword args here would help with readbility (which one is for added and which one for removed)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out the last user of this method was removed 2 weeks ago, removing.


def _notify_subscribers(self, updated_keys: set[K]) -> None:
"""
Expand All @@ -82,16 +209,16 @@ def _notify_subscribers(self, updated_keys: set[K]) -> None:
updated_keys
The set of data keys that were updated.
"""
# Notify extractor-based subscribers
for subscriber in self._subscribers:
if not isinstance(subscriber, DataSubscriber):
subscriber(updated_keys)
continue
if updated_keys & subscriber.keys:
# Pass only the data that the subscriber is interested in
subscriber_data = {
key: self.data[key] for key in subscriber.keys if key in self.data
}
subscriber.trigger(subscriber_data)
subscriber_data = self._build_subscriber_data(subscriber)
if subscriber_data:
subscriber.trigger(subscriber_data)

# Notify update callbacks with just key names
for callback in self._update_callbacks:
callback(updated_keys)

def _notify_key_change_subscribers(self) -> None:
"""Notify subscribers about key changes (additions/removals)."""
Expand All @@ -103,19 +230,38 @@ def _notify_key_change_subscribers(self) -> None:
self._pending_key_additions.copy(), self._pending_key_removals.copy()
)

def __getitem__(self, key: K) -> V:
"""Get the latest value for a key."""
buffered_data = self._buffer_manager.get_buffered_data(key)
if buffered_data is None:
raise KeyError(key)
return self._default_extractor.extract(buffered_data)

def __setitem__(self, key: K, value: V) -> None:
if key not in self.data:
"""Set a value, storing it in a buffer."""
if key not in self._buffer_manager:
self._pending_key_additions.add(key)
super().__setitem__(key, value)
extractors = self._get_extractors(key)
self._buffer_manager.create_buffer(key, extractors)
self._buffer_manager.update_buffer(key, value)
self._pending_updates.add(key)
self._notify_if_not_in_transaction()

def __delitem__(self, key: K) -> None:
"""Delete a key and its buffer."""
self._pending_key_removals.add(key)
super().__delitem__(key)
self._buffer_manager.delete_buffer(key)
self._pending_updates.add(key)
self._notify_if_not_in_transaction()

def __iter__(self) -> Iterator[K]:
"""Iterate over keys."""
return iter(self._buffer_manager)

def __len__(self) -> int:
"""Return the number of keys."""
return len(self._buffer_manager)

def _notify_if_not_in_transaction(self) -> None:
"""Notify subscribers if not in a transaction."""
if not self._in_transaction:
Expand Down
49 changes: 38 additions & 11 deletions src/ess/livedata/dashboard/data_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Hashable
from collections.abc import Callable, Hashable, Mapping
from typing import Any, Generic, Protocol, TypeVar

from ess.livedata.config.workflow_spec import ResultKey
from ess.livedata.dashboard.data_service import DataServiceSubscriber
from ess.livedata.dashboard.extractors import UpdateExtractor


class PipeBase(Protocol):
Expand Down Expand Up @@ -40,6 +42,7 @@ def __init__(self, data: Any) -> None:


Key = TypeVar('Key', bound=Hashable)
P = TypeVar('P', bound=PipeBase)


class StreamAssembler(ABC, Generic[Key]):
Expand Down Expand Up @@ -85,27 +88,45 @@ def assemble(self, data: dict[Key, Any]) -> Any:
"""


class DataSubscriber(Generic[Key]):
class DataSubscriber(DataServiceSubscriber[Key], Generic[Key, P]):
"""Unified subscriber that uses a StreamAssembler to process data."""

def __init__(self, assembler: StreamAssembler[Key], pipe: PipeBase) -> None:
def __init__(
self,
assembler: StreamAssembler[Key],
pipe_factory: Callable[[dict[Key, Any]], P],
extractors: Mapping[Key, UpdateExtractor],
) -> None:
"""
Initialize the subscriber with an assembler and pipe.
Initialize the subscriber with an assembler and pipe factory.

Parameters
----------
assembler:
The assembler responsible for processing the data.
pipe:
The pipe to send assembled data to.
pipe_factory:
Factory function to create the pipe on first trigger.
extractors:
Mapping from keys to their UpdateExtractor instances.
"""
self._assembler = assembler
self._pipe = pipe
self._pipe_factory = pipe_factory
self._pipe: P | None = None
self._extractors = extractors
# Initialize parent class to cache keys
super().__init__()

@property
def keys(self) -> set[Key]:
"""Return the set of data keys this subscriber depends on."""
return self._assembler.keys
def extractors(self) -> Mapping[Key, UpdateExtractor]:
"""Return extractors for obtaining data views."""
return self._extractors

@property
def pipe(self) -> P:
"""Return the pipe (must be created by first trigger)."""
if self._pipe is None:
raise RuntimeError("Pipe not yet initialized - subscriber not triggered")
return self._pipe

def trigger(self, store: dict[Key, Any]) -> None:
"""
Expand All @@ -118,7 +139,13 @@ def trigger(self, store: dict[Key, Any]) -> None:
"""
data = {key: store[key] for key in self.keys if key in store}
assembled_data = self._assembler.assemble(data)
self._pipe.send(assembled_data)

if self._pipe is None:
# First trigger - create pipe with correctly extracted data
self._pipe = self._pipe_factory(assembled_data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pipe.send(...) after the pipe was created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the pipe (this is hv.streams.Pipe) sends the data passed in this init call. We don't want to send it twice. There is no way to init without data, that is way we had to move it into the subscriber and create on first use.

else:
# Subsequent triggers - send to existing pipe
self._pipe.send(assembled_data)


class MergingStreamAssembler(StreamAssembler):
Expand Down
Loading
Loading