Skip to content

Commit 40f351c

Browse files
SimonHeybrockclaude
andcommitted
Add time coordinate to MonitorStreamProcessor current output
Track the start time of each accumulation period and add it as a time coordinate to the current result, matching the pattern used in DetectorView. This enables time-aware plotting and analysis of monitor data windows. The time coordinate represents the start_time of the first data batch in each accumulation period (between finalize calls). Original prompt: Please add a `'time'` coord to `current` output of MonitorStreamProcessor. Look at DetectorView to see how to do it. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent e8f0eab commit 40f351c

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

src/ess/livedata/handlers/monitor_data_handler.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, edges: sc.Variable) -> None:
2020
self._event_edges = edges.to(unit='ns').values
2121
self._cumulative: sc.DataArray | None = None
2222
self._current: sc.DataArray | None = None
23+
self._current_start_time: int | None = None
2324

2425
@staticmethod
2526
def create_workflow(params: MonitorDataParams) -> Workflow:
@@ -35,6 +36,11 @@ def accumulate(
3536
) -> None:
3637
if len(data) != 1:
3738
raise ValueError("MonitorStreamProcessor expects exactly one data item.")
39+
40+
# Track start time of first data since last finalize
41+
if self._current_start_time is None:
42+
self._current_start_time = start_time
43+
3844
raw = next(iter(data.values()))
3945
# Note: In theory we should consider rebinning/histogramming only in finalize(),
4046
# but the current plan is to accumulate before/during preprocessing, i.e.,
@@ -64,17 +70,29 @@ def accumulate(
6470
def finalize(self) -> dict[Hashable, sc.DataArray]:
6571
if self._current is None:
6672
raise ValueError("No data has been added")
73+
if self._current_start_time is None:
74+
raise RuntimeError(
75+
"finalize called without any data accumulated via accumulate"
76+
)
77+
6778
current = self._current
6879
if self._cumulative is None:
6980
self._cumulative = current
7081
else:
7182
self._cumulative += current
7283
self._current = sc.zeros_like(current)
84+
85+
# Add time coord to current result
86+
time_coord = sc.scalar(self._current_start_time, unit='ns')
87+
current = current.assign_coords(time=time_coord)
88+
self._current_start_time = None
89+
7390
return {'cumulative': self._cumulative, 'current': current}
7491

7592
def clear(self) -> None:
7693
self._cumulative = None
7794
self._current = None
95+
self._current_start_time = None
7896

7997

8098
class MonitorHandlerFactory(

tests/handlers/monitor_data_handler_test.py

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,13 @@ def test_finalize_first_time(self, processor):
143143

144144
assert "cumulative" in result
145145
assert "current" in result
146-
assert_identical(result["cumulative"], result["current"])
147-
148-
# After finalize, we can finalize again without new data, since empty batches
149-
# will be committed.
150-
empty_result = processor.finalize()
151-
assert empty_result["current"].sum().value == 0
152-
assert (
153-
empty_result["cumulative"].sum().value == result["cumulative"].sum().value
154-
)
146+
# Check cumulative data (excluding time coord which current has)
147+
assert_identical(result["cumulative"], result["current"].drop_coords("time"))
148+
149+
# Verify time coordinate is present
150+
assert "time" in result["current"].coords
151+
assert result["current"].coords["time"].value == 1000
152+
assert result["current"].coords["time"].unit == "ns"
155153

156154
def test_finalize_subsequent_calls(self, processor):
157155
"""Test finalize accumulates over multiple calls."""
@@ -178,6 +176,37 @@ def test_finalize_without_data(self, processor):
178176
with pytest.raises(ValueError, match="No data has been added"):
179177
processor.finalize()
180178

179+
def test_finalize_without_accumulate(self, processor):
180+
"""Test finalize raises error without accumulate since last finalize."""
181+
processor.accumulate(
182+
{"det1": np.array([10e6, 25e6])}, start_time=1000, end_time=2000
183+
)
184+
processor.finalize()
185+
186+
# After finalize, calling finalize again without accumulate should fail
187+
with pytest.raises(
188+
RuntimeError,
189+
match="finalize called without any data accumulated via accumulate",
190+
):
191+
processor.finalize()
192+
193+
def test_time_coordinate_tracks_first_accumulate(self, processor):
194+
"""Test time coordinate uses start_time of the first accumulate call."""
195+
# First accumulate with start_time=1000
196+
processor.accumulate({"det1": np.array([10e6])}, start_time=1000, end_time=2000)
197+
# Second accumulate with start_time=3000 (should be ignored)
198+
processor.accumulate({"det1": np.array([20e6])}, start_time=3000, end_time=4000)
199+
200+
result = processor.finalize()
201+
202+
# Time coordinate should use the first start_time
203+
assert result["current"].coords["time"].value == 1000
204+
205+
# After finalize, the next accumulate should set a new start_time
206+
processor.accumulate({"det1": np.array([30e6])}, start_time=5000, end_time=6000)
207+
result2 = processor.finalize()
208+
assert result2["current"].coords["time"].value == 5000
209+
181210
def test_clear(self, processor):
182211
"""Test clear method resets processor state."""
183212
processor.accumulate(

0 commit comments

Comments
 (0)