-
Notifications
You must be signed in to change notification settings - Fork 1
Buffer history of results in DataService
#533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
66b3c81
469afe5
887eeee
8839323
df2f54d
0c81007
ccb2a44
c4c7fb9
2958752
12649c1
6bdb94d
d2d6f11
77bab84
04b836f
2de28d1
c9166cc
0f555b6
966a109
414960b
ae5f2cb
e9b8ee0
4b156dd
60c0553
3d3edb3
0309929
eef909e
22dbbd0
5d1ca6e
bf371f4
941d70d
a7602d5
a2c4f2b
706441f
f12a72b
c24cbb9
6de842c
27ebb0f
04ee116
06ebf51
e5ab3e6
5671b5f
97d78e2
fd2da5b
ceeeb31
53c80ca
d812bc0
5fe23d1
ae63507
349864a
1c5f84e
319ca92
159a351
433d165
0a22e9f
aead925
404ff83
a071bf9
cf931fd
bca67ca
78de8ce
e55dad8
0c8e0c0
a1c9751
3c68d09
554e0e9
5e0f1b2
c42b342
561f571
9985e66
43eb90e
3fc5026
ffb85a5
29ef0a0
6b345e8
d03112a
823ef18
8817dfd
9401d6e
e873dd8
6e48614
dffcbc5
711a6fc
eb51b52
f371697
a7f89d4
5a855cb
3d26023
1527ddf
b3ffcf4
2ebdb5d
9eedaaa
6380966
a04044c
e09796e
913d321
e8f0eab
40f351c
f4a23f7
3aa1329
12aa4e8
0557d3e
f4233a3
4ef481a
6e180a3
dc65e6f
7b148c2
7ab91f5
c8165ac
07e5ce9
8785ca1
cb2cd57
66589a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,32 +2,75 @@ | |
| # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) | ||
| from __future__ import annotations | ||
|
|
||
| from collections import UserDict | ||
| from collections.abc import Callable, Hashable | ||
| from abc import ABC, abstractmethod | ||
| from collections.abc import Callable, Hashable, Iterator, Mapping, MutableMapping | ||
| from contextlib import contextmanager | ||
| from typing import TypeVar | ||
| from typing import Any, Generic, TypeVar | ||
|
|
||
| from .data_subscriber import DataSubscriber | ||
| from .extractors import LatestValueExtractor, UpdateExtractor | ||
| from .temporal_buffer_manager import TemporalBufferManager | ||
|
|
||
| K = TypeVar('K', bound=Hashable) | ||
| V = TypeVar('V') | ||
|
|
||
|
|
||
| class DataService(UserDict[K, V]): | ||
| class DataServiceSubscriber(ABC, Generic[K]): | ||
| """Base class for data service subscribers with cached keys and extractors.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| """Initialize subscriber and cache keys from extractors.""" | ||
| # Cache keys from extractors to avoid repeated computation | ||
| self._keys = set(self.extractors.keys()) | ||
|
|
||
| @property | ||
| def keys(self) -> set[K]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slightly unusual that we call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought it makes sense since properties typically indicate that the return something "static". We are different from |
||
| """Return the set of data keys this subscriber depends on.""" | ||
| return self._keys | ||
|
|
||
| @property | ||
| @abstractmethod | ||
| def extractors(self) -> Mapping[K, UpdateExtractor]: | ||
| """ | ||
| Return extractors for obtaining data views. | ||
|
|
||
| Returns a mapping from key to the extractor to use for that key. | ||
| """ | ||
|
|
||
| @abstractmethod | ||
| def trigger(self, store: dict[K, Any]) -> None: | ||
| """Trigger the subscriber with updated data.""" | ||
|
|
||
|
|
||
| class DataService(MutableMapping[K, V]): | ||
| """ | ||
| A service for managing and retrieving data and derived data. | ||
|
|
||
| New data is set from upstream Kafka topics. Subscribers are typically plots that | ||
| provide a live view of the data. | ||
|
|
||
| Uses buffers internally for storage, but presents a dict-like interface | ||
| that returns the latest value for each key. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| super().__init__() | ||
| self._subscribers: list[DataSubscriber[K]] = [] | ||
| self._key_change_subscribers: list[Callable[[set[K], set[K]], None]] = [] | ||
| def __init__( | ||
| self, | ||
| buffer_manager: TemporalBufferManager | None = None, | ||
| ) -> None: | ||
| """ | ||
| Initialize DataService. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| buffer_manager: | ||
| Manager for buffer sizing. If None, creates a new TemporalBufferManager. | ||
| """ | ||
| if buffer_manager is None: | ||
| buffer_manager = TemporalBufferManager() | ||
| self._buffer_manager = buffer_manager | ||
| self._default_extractor = LatestValueExtractor() | ||
| self._subscribers: list[DataServiceSubscriber[K]] = [] | ||
| self._update_callbacks: list[Callable[[set[K]], None]] = [] | ||
| self._pending_updates: set[K] = set() | ||
| self._pending_key_additions: set[K] = set() | ||
| self._pending_key_removals: set[K] = set() | ||
| self._transaction_depth = 0 | ||
|
|
||
| @contextmanager | ||
|
|
@@ -48,30 +91,94 @@ def transaction(self): | |
| def _in_transaction(self) -> bool: | ||
| return self._transaction_depth > 0 | ||
|
|
||
| def register_subscriber(self, subscriber: DataSubscriber[K]) -> None: | ||
| def _get_extractors(self, key: K) -> list[UpdateExtractor]: | ||
| """ | ||
| Register a subscriber for updates. | ||
| Collect extractors for a key from all subscribers. | ||
|
|
||
| Examines all subscribers that need this key. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| key: | ||
| The key to collect extractors for. | ||
|
|
||
| Returns | ||
| ------- | ||
| : | ||
| List of extractors from all subscribers for this key. | ||
| """ | ||
| extractors = [] | ||
|
|
||
| for subscriber in self._subscribers: | ||
| subscriber_extractors = subscriber.extractors | ||
| if key in subscriber_extractors: | ||
| extractor = subscriber_extractors[key] | ||
| extractors.append(extractor) | ||
|
|
||
| return extractors | ||
|
|
||
| def _build_subscriber_data( | ||
| self, subscriber: DataServiceSubscriber[K] | ||
| ) -> dict[K, Any]: | ||
| """ | ||
| Extract data for a subscriber based on its extractors. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| subscriber: | ||
| The subscriber to register. Must implement the DataSubscriber interface. | ||
| The subscriber to extract data for. | ||
|
|
||
| Returns | ||
| ------- | ||
| : | ||
| Dictionary mapping keys to extracted data (None values filtered out). | ||
| """ | ||
| self._subscribers.append(subscriber) | ||
| subscriber_data = {} | ||
|
|
||
| def subscribe_to_changed_keys( | ||
| self, subscriber: Callable[[set[K], set[K]], None] | ||
| ) -> None: | ||
| for key, extractor in subscriber.extractors.items(): | ||
| buffered_data = self._buffer_manager.get_buffered_data(key) | ||
| if buffered_data is not None: | ||
| subscriber_data[key] = extractor.extract(buffered_data) | ||
|
|
||
| return subscriber_data | ||
|
|
||
| def register_subscriber(self, subscriber: DataServiceSubscriber[K]) -> None: | ||
| """ | ||
| Register a subscriber for key change updates (additions/removals). | ||
| Register a subscriber for updates with extractor-based data access. | ||
|
|
||
| Triggers the subscriber immediately with existing data using its extractors. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| subscriber: | ||
| A callable that accepts two sets: added_keys and removed_keys. | ||
| The subscriber to register. | ||
| """ | ||
| self._key_change_subscribers.append(subscriber) | ||
| subscriber(set(self.data.keys()), set()) | ||
| self._subscribers.append(subscriber) | ||
|
|
||
| # Add extractors for keys this subscriber needs | ||
| for key in subscriber.keys: | ||
| if key in self._buffer_manager: | ||
| extractor = subscriber.extractors[key] | ||
| self._buffer_manager.add_extractor(key, extractor) | ||
|
|
||
| # Trigger immediately with existing data using subscriber's extractors | ||
| existing_data = self._build_subscriber_data(subscriber) | ||
| subscriber.trigger(existing_data) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a potentially open problem:
But all that being said, we will eventually want to support saving plot setups (such as a certain grid of plots users want to reuse) or creating plots before their data arrives. That will have to be handled somehow, so parts of the mechanism may need to be revisited. Right now I don't think it will be the same mechanism, since saved plots would be for specific workflows, not workflow runs (jobs). The subscription mechanism is for data of a specific job. In other words, either
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Come to think of it, I think we should call further down even with empty data? Or else the plots might have no way of telling that data disappeared (even though we might need a better mechanism for that anyway). |
||
|
|
||
| def register_update_callback(self, callback: Callable[[set[K]], None]) -> None: | ||
| """ | ||
| Register a callback for key update notifications. | ||
|
|
||
| Callback receives only the set of updated key names, not the data. | ||
| Use this for infrastructure that needs to know what changed but will | ||
| query data itself. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| callback: | ||
| Callable that accepts a set of updated keys. | ||
| """ | ||
| self._update_callbacks.append(callback) | ||
|
|
||
| def _notify_subscribers(self, updated_keys: set[K]) -> None: | ||
| """ | ||
|
|
@@ -82,40 +189,46 @@ def _notify_subscribers(self, updated_keys: set[K]) -> None: | |
| updated_keys | ||
| The set of data keys that were updated. | ||
| """ | ||
| # Notify extractor-based subscribers | ||
| for subscriber in self._subscribers: | ||
| if not isinstance(subscriber, DataSubscriber): | ||
| subscriber(updated_keys) | ||
| continue | ||
| if updated_keys & subscriber.keys: | ||
| # Pass only the data that the subscriber is interested in | ||
| subscriber_data = { | ||
| key: self.data[key] for key in subscriber.keys if key in self.data | ||
| } | ||
| subscriber_data = self._build_subscriber_data(subscriber) | ||
| subscriber.trigger(subscriber_data) | ||
|
|
||
| def _notify_key_change_subscribers(self) -> None: | ||
| """Notify subscribers about key changes (additions/removals).""" | ||
| if not self._pending_key_additions and not self._pending_key_removals: | ||
| return | ||
| # Notify update callbacks with just key names | ||
| for callback in self._update_callbacks: | ||
| callback(updated_keys) | ||
|
|
||
| for subscriber in self._key_change_subscribers: | ||
| subscriber( | ||
| self._pending_key_additions.copy(), self._pending_key_removals.copy() | ||
| ) | ||
| def __getitem__(self, key: K) -> V: | ||
| """Get the latest value for a key.""" | ||
| buffered_data = self._buffer_manager.get_buffered_data(key) | ||
| if buffered_data is None: | ||
| raise KeyError(key) | ||
| return self._default_extractor.extract(buffered_data) | ||
|
|
||
| def __setitem__(self, key: K, value: V) -> None: | ||
| if key not in self.data: | ||
| self._pending_key_additions.add(key) | ||
| super().__setitem__(key, value) | ||
| """Set a value, storing it in a buffer.""" | ||
| if key not in self._buffer_manager: | ||
| extractors = self._get_extractors(key) | ||
| self._buffer_manager.create_buffer(key, extractors) | ||
| self._buffer_manager.update_buffer(key, value) | ||
| self._pending_updates.add(key) | ||
| self._notify_if_not_in_transaction() | ||
|
|
||
| def __delitem__(self, key: K) -> None: | ||
| self._pending_key_removals.add(key) | ||
| super().__delitem__(key) | ||
| """Delete a key and its buffer.""" | ||
| self._buffer_manager.delete_buffer(key) | ||
| self._pending_updates.add(key) | ||
| self._notify_if_not_in_transaction() | ||
|
|
||
| def __iter__(self) -> Iterator[K]: | ||
| """Iterate over keys.""" | ||
| return iter(self._buffer_manager) | ||
|
|
||
| def __len__(self) -> int: | ||
| """Return the number of keys.""" | ||
| return len(self._buffer_manager) | ||
|
|
||
| def _notify_if_not_in_transaction(self) -> None: | ||
| """Notify subscribers if not in a transaction.""" | ||
| if not self._in_transaction: | ||
|
|
@@ -127,6 +240,3 @@ def _notify(self) -> None: | |
| pending = set(self._pending_updates) | ||
| self._pending_updates.clear() | ||
| self._notify_subscribers(pending) | ||
| self._notify_key_change_subscribers() | ||
| self._pending_key_additions.clear() | ||
| self._pending_key_removals.clear() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the repeated computation that expensive? In other words, is the caching of keys really necessary?
I guess it depends on what 'computation' represents (how much work is actually done).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, say we expect O(100) results coming in per second it adds up, so why not cache?