Skip to content

Commit dc65e6f

Browse files
SimonHeybrockclaude
andcommitted
Add set_extractors() to TemporalBufferManager for buffer reconfiguration
Adds public API for replacing all extractors on a buffer, enabling reconfiguration when subscribers are removed. This allows buffers to optimize from TemporalBuffer to SingleValueBuffer when temporal extractors are no longer needed. Implementation: - Add set_extractors() method that replaces entire extractor list - Extract common buffer reconfiguration logic to _reconfigure_buffer_if_needed() - Refactor add_extractor() to use the helper method Test changes: - Rename test_add_extractor_switches_to_single_value_buffer to test_set_extractors_switches_to_single_value_buffer - Remove private field access (state.extractors.clear()) in test - Use new public set_extractors() API instead This prepares for DataService.unregister_subscriber() implementation, which will need to recalculate extractors when subscribers are removed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 6e180a3 commit dc65e6f

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

src/ess/livedata/dashboard/temporal_buffer_manager.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,43 @@ def add_extractor(self, key: K, extractor: UpdateExtractor) -> None:
131131
"""
132132
state = self._states[key]
133133
state.extractors.append(extractor)
134+
self._reconfigure_buffer_if_needed(key, state)
134135

136+
def set_extractors(self, key: K, extractors: list[UpdateExtractor]) -> None:
137+
"""
138+
Replace all extractors for an existing buffer.
139+
140+
May trigger buffer type switch with data migration:
141+
- Single→Temporal: Existing data is copied to the new buffer
142+
- Temporal→Single: Last time slice is copied to the new buffer
143+
- Other transitions: Data is discarded
144+
145+
Useful for reconfiguring buffers when subscribers change, e.g., when
146+
a subscriber is removed and we need to downgrade from Temporal to
147+
SingleValue buffer.
148+
149+
Parameters
150+
----------
151+
key:
152+
Key identifying the buffer to update.
153+
extractors:
154+
New list of extractors that will use this buffer.
155+
"""
156+
state = self._states[key]
157+
state.extractors = list(extractors)
158+
self._reconfigure_buffer_if_needed(key, state)
159+
160+
def _reconfigure_buffer_if_needed(self, key: K, state: _BufferState) -> None:
161+
"""
162+
Check if buffer type needs to change and handle migration.
163+
164+
Parameters
165+
----------
166+
key:
167+
Key identifying the buffer.
168+
state:
169+
Buffer state to reconfigure.
170+
"""
135171
# Check if we need to switch buffer type
136172
new_buffer = self._create_buffer_for_extractors(state.extractors)
137173
if not isinstance(new_buffer, type(state.buffer)):

tests/dashboard/temporal_buffer_manager_test.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def test_add_extractor_switches_to_temporal_buffer(self):
128128
# Verify the data values are preserved
129129
assert sc.allclose(result['time', 0].data, data.data)
130130

131-
def test_add_extractor_switches_to_single_value_buffer(self):
131+
def test_set_extractors_switches_to_single_value_buffer(self):
132132
"""Test that switching buffer types preserves latest data."""
133133
manager = TemporalBufferManager()
134134
extractors = [WindowAggregatingExtractor(window_duration_seconds=1.0)]
@@ -152,12 +152,8 @@ def test_add_extractor_switches_to_single_value_buffer(self):
152152
assert 'time' in result.dims
153153
assert result.sizes['time'] == 3
154154

155-
# Manually clear extractors to simulate reconfiguration
156-
state = manager._states['test']
157-
state.extractors.clear()
158-
159-
# Add LatestValueExtractor - this should trigger buffer type switch
160-
manager.add_extractor('test', LatestValueExtractor())
155+
# Replace extractors - this should trigger buffer type switch
156+
manager.set_extractors('test', [LatestValueExtractor()])
161157

162158
# Verify the latest time slice is preserved after transition
163159
result = manager.get_buffered_data('test')

0 commit comments

Comments
 (0)