diff --git a/src/ess/livedata/config/instrument.py b/src/ess/livedata/config/instrument.py index 73f2b42aa..3026f95f7 100644 --- a/src/ess/livedata/config/instrument.py +++ b/src/ess/livedata/config/instrument.py @@ -59,11 +59,13 @@ class Instrument: active_namespace: str | None = None _detector_group_names: dict[str, str] = field(default_factory=dict) _monitor_workflow_handle: SpecHandle | None = field(default=None, init=False) + _monitor_ratemeter_handle: SpecHandle | None = field(default=None, init=False) _timeseries_workflow_handle: SpecHandle | None = field(default=None, init=False) def __post_init__(self) -> None: """Auto-register standard workflow specs based on instrument metadata.""" from ess.livedata.handlers.monitor_workflow_specs import ( + register_monitor_ratemeter_spec, register_monitor_workflow_specs, ) from ess.livedata.handlers.timeseries_workflow_specs import ( @@ -74,6 +76,17 @@ def __post_init__(self) -> None: instrument=self, source_names=self.monitors ) + # Only register monitor interval timeseries if nexus file is available + # (required by the workflow to create GenericTofWorkflow) + try: + _ = self.nexus_file # Cache in _nexus_file if successful + except ValueError: + pass # nexus file not available for this instrument + else: + self._monitor_ratemeter_handle = register_monitor_ratemeter_spec( + instrument=self, source_names=self.monitors + ) + timeseries_names = list(self.f144_attribute_registry.keys()) self._timeseries_workflow_handle = register_timeseries_workflow_specs( instrument=self, source_names=timeseries_names @@ -241,6 +254,14 @@ def load_factories(self) -> None: MonitorStreamProcessor.create_workflow ) + if self._monitor_ratemeter_handle is not None: + from ess.livedata.handlers.monitor_data_handler import ( + create_monitor_ratemeter_factory, + ) + + factory = create_monitor_ratemeter_factory(self) + self._monitor_ratemeter_handle.attach_factory()(factory) + if self._timeseries_workflow_handle is not None: from ess.livedata.handlers.timeseries_handler import ( TimeseriesStreamProcessor, diff --git a/src/ess/livedata/config/workflows.py b/src/ess/livedata/config/workflows.py index 4c24dd310..64b312f35 100644 --- a/src/ess/livedata/config/workflows.py +++ b/src/ess/livedata/config/workflows.py @@ -4,62 +4,13 @@ from __future__ import annotations -from typing import Any, NewType +from typing import Any -import pydantic -import sciline import scipp as sc -from ess.livedata import parameter_models -from ess.livedata.config import Instrument -from ess.livedata.config.workflow_spec import WorkflowOutputsBase from ess.livedata.handlers.accumulators import LogData -from ess.livedata.handlers.stream_processor_workflow import StreamProcessorWorkflow from ess.livedata.handlers.to_nxlog import ToNXlog from ess.reduce import streaming -from ess.reduce.nexus.types import Filename, MonitorData, NeXusData, NeXusName -from ess.reduce.time_of_flight import GenericTofWorkflow - - -class MonitorTimeseriesParams(pydantic.BaseModel): - """Parameters for the monitor timeseries workflow.""" - - toa_range: parameter_models.TOARange = pydantic.Field( - title="Time of Arrival Range", - description="Time of arrival range to include.", - default=parameter_models.TOARange(), - ) - - -class MonitorTimeseriesOutputs(WorkflowOutputsBase): - """Outputs for the monitor timeseries workflow.""" - - monitor_counts: sc.DataArray = pydantic.Field( - title="Monitor Counts", - description="Timeseries of monitor counts within the specified TOA range.", - ) - - -CustomMonitor = NewType('CustomMonitor', int) -CurrentRun = NewType('CurrentRun', int) -MonitorCountsInInterval = NewType('MonitorCountsInInterval', sc.DataArray) - - -def _get_interval( - data: MonitorData[CurrentRun, CustomMonitor], range: parameter_models.TOARange -) -> MonitorCountsInInterval: - start, stop = range.range_ns - if data.bins is not None: - counts = data.bins['event_time_offset', start:stop].sum() - counts.coords['time'] = data.coords['event_time_zero'][0] - else: - # Include the full time(of arrival) bin at start and stop. Do we need more - # precision here? - # Note the current ECDC convention: time is the time offset w.r.t. the frame, - # i.e., the pulse, frame_time is the absolute time (since epoch). - counts = data['time', start:stop].sum() - counts.coords['time'] = data.coords['frame_time'][0] - return MonitorCountsInInterval(counts) class TimeseriesAccumulator(streaming.Accumulator[sc.DataArray]): @@ -98,46 +49,3 @@ def _do_push(self, value: sc.DataArray) -> None: def clear(self) -> None: if self._to_nxlog is not None: self._to_nxlog.clear() - - -def _prepare_workflow(instrument: Instrument, monitor_name: str) -> sciline.Pipeline: - workflow = GenericTofWorkflow(run_types=[CurrentRun], monitor_types=[CustomMonitor]) - workflow[Filename[CurrentRun]] = instrument.nexus_file - workflow[NeXusName[CustomMonitor]] = monitor_name - workflow.insert(_get_interval) - return workflow - - -def register_monitor_timeseries_workflows( - instrument: Instrument, source_names: list[str] -) -> None: - """Register monitor timeseries workflows for the given instrument and source names. - - Parameters - ---------- - instrument - The instrument for which to register the workflows. - source_names - The source names (monitor names) for which to register the workflows. - """ - - @instrument.register_workflow( - name='monitor_interval_timeseries', - version=1, - title='Monitor Interval Timeseries', - description='Timeseries of counts in a monitor within a specified ' - 'time-of-arrival range.', - source_names=source_names, - outputs=MonitorTimeseriesOutputs, - ) - def monitor_timeseries_workflow( - source_name: str, params: MonitorTimeseriesParams - ) -> StreamProcessorWorkflow: - wf = _prepare_workflow(instrument, monitor_name=source_name) - wf[parameter_models.TOARange] = params.toa_range - return StreamProcessorWorkflow( - base_workflow=wf, - dynamic_keys={source_name: NeXusData[CustomMonitor, CurrentRun]}, - target_keys={'monitor_counts': MonitorCountsInInterval}, - accumulators={MonitorCountsInInterval: TimeseriesAccumulator}, - ) diff --git a/src/ess/livedata/handlers/monitor_data_handler.py b/src/ess/livedata/handlers/monitor_data_handler.py index 6e2980980..5a6597af3 100644 --- a/src/ess/livedata/handlers/monitor_data_handler.py +++ b/src/ess/livedata/handlers/monitor_data_handler.py @@ -3,16 +3,28 @@ from __future__ import annotations from collections.abc import Hashable +from typing import NewType import numpy as np import scipp as sc +from ess.reduce.nexus.types import Filename, NeXusData, NeXusName +from ess.reduce.time_of_flight import GenericTofWorkflow + +from .. import parameter_models +from ..config.workflows import TimeseriesAccumulator from ..core.handler import JobBasedPreprocessorFactoryBase from ..core.message import StreamId, StreamKind from .accumulators import Accumulator, CollectTOA, Cumulative, MonitorEvents -from .monitor_workflow_specs import MonitorDataParams +from .monitor_workflow_specs import MonitorDataParams, MonitorRatemeterParams +from .stream_processor_workflow import StreamProcessorWorkflow from .workflow_factory import Workflow +# Type aliases for monitor ratemeter workflow +CustomMonitor = NewType('CustomMonitor', int) +CurrentRun = NewType('CurrentRun', int) +MonitorCountsInInterval = NewType('MonitorCountsInInterval', sc.DataArray) + class MonitorStreamProcessor(Workflow): def __init__(self, edges: sc.Variable) -> None: @@ -106,3 +118,69 @@ def make_preprocessor(self, key: StreamId) -> Accumulator | None: return CollectTOA() case _: return None + + +def _get_interval( + data: NeXusData[CustomMonitor, CurrentRun], + range: parameter_models.TOARange, +) -> MonitorCountsInInterval: + """Extract monitor counts within a time-of-arrival interval.""" + start, stop = range.range_ns + if data.bins is not None: + counts = data.bins['event_time_offset', start:stop].sum() + counts.coords['time'] = data.coords['event_time_zero'][0] + else: + counts = data['time', start:stop].sum() + counts.coords['time'] = data.coords['frame_time'][0] + return MonitorCountsInInterval(counts) + + +def create_monitor_ratemeter_factory(instrument): + """ + Create factory function for monitor ratemeter workflow. + + This is generic workflow logic that can be used by any instrument. + Auto-attached by Instrument.load_factories(). + + Parameters + ---------- + instrument + Instrument instance + + Returns + ------- + : + Factory function that can be attached to the spec handle + """ + + def factory( + source_name: str, + params: MonitorRatemeterParams, + ) -> StreamProcessorWorkflow: + """Factory function for monitor interval timeseries workflow. + + Parameters + ---------- + source_name: + Monitor source name + params: + MonitorTimeseriesParams with toa_range configuration + + Returns + ------- + : + StreamProcessorWorkflow instance + """ + wf = GenericTofWorkflow(run_types=[CurrentRun], monitor_types=[CustomMonitor]) + wf[Filename[CurrentRun]] = instrument.nexus_file + wf[NeXusName[CustomMonitor]] = source_name + wf[parameter_models.TOARange] = params.toa_range + wf.insert(_get_interval) + return StreamProcessorWorkflow( + base_workflow=wf, + dynamic_keys={source_name: NeXusData[CustomMonitor, CurrentRun]}, + target_keys={'monitor_counts': MonitorCountsInInterval}, + accumulators={MonitorCountsInInterval: TimeseriesAccumulator}, + ) + + return factory diff --git a/src/ess/livedata/handlers/monitor_workflow_specs.py b/src/ess/livedata/handlers/monitor_workflow_specs.py index 5e8c9900c..625188cd9 100644 --- a/src/ess/livedata/handlers/monitor_workflow_specs.py +++ b/src/ess/livedata/handlers/monitor_workflow_specs.py @@ -5,9 +5,11 @@ from __future__ import annotations import pydantic +import scipp as sc from .. import parameter_models from ..config.instrument import Instrument +from ..config.workflow_spec import WorkflowOutputsBase from ..handlers.workflow_factory import SpecHandle @@ -26,17 +28,31 @@ class MonitorDataParams(pydantic.BaseModel): ) +class MonitorRatemeterParams(pydantic.BaseModel): + """Parameters for monitor ratemeter workflow.""" + + toa_range: parameter_models.TOARange = pydantic.Field( + title="Time of Arrival Range", + description="Time of arrival range to include.", + default=parameter_models.TOARange(), + ) + + +class MonitorRatemeterOutputs(WorkflowOutputsBase): + """Outputs for monitor ratemeter workflow.""" + + monitor_counts: sc.DataArray = pydantic.Field( + title="Monitor Counts", + description="Monitor counts within the specified TOA range.", + ) + + def register_monitor_workflow_specs( instrument: Instrument, source_names: list[str] ) -> SpecHandle | None: """ Register monitor workflow specs (lightweight, no heavy dependencies). - This is the first phase of two-phase registration. - - If the workflow is already registered (e.g., auto-registered in - Instrument.__post_init__()), returns the existing handle. - Parameters ---------- instrument @@ -52,17 +68,6 @@ def register_monitor_workflow_specs( if not source_names: return None - from ..config.workflow_spec import WorkflowId - - workflow_id = WorkflowId( - instrument=instrument.name, - namespace='monitor_data', - name='monitor_histogram', - version=1, - ) - if workflow_id in instrument.workflow_factory._workflow_specs: - return SpecHandle(workflow_id=workflow_id, _factory=instrument.workflow_factory) - return instrument.register_spec( namespace='monitor_data', name='monitor_histogram', @@ -73,3 +78,36 @@ def register_monitor_workflow_specs( source_names=source_names, params=MonitorDataParams, ) + + +def register_monitor_ratemeter_spec( + instrument: Instrument, source_names: list[str] +) -> SpecHandle | None: + """ + Register monitor ratemeter workflow spec. + + Parameters + ---------- + instrument + The instrument to register the workflow spec for. + source_names + List of monitor names (source names) for which to register the workflow. + If empty, returns None without registering. + + Returns + ------- + SpecHandle for later factory attachment, or None if no monitors. + """ + if not source_names: + return None + + return instrument.register_spec( + namespace='monitor_data', + name='monitor_ratemeter', + version=1, + title='Monitor Ratemeter', + description='Monitor counts within a specified time-of-arrival range.', + source_names=source_names, + params=MonitorRatemeterParams, + outputs=MonitorRatemeterOutputs, + ) diff --git a/src/ess/livedata/handlers/timeseries_workflow_specs.py b/src/ess/livedata/handlers/timeseries_workflow_specs.py index 521974508..61023d896 100644 --- a/src/ess/livedata/handlers/timeseries_workflow_specs.py +++ b/src/ess/livedata/handlers/timeseries_workflow_specs.py @@ -14,12 +14,6 @@ def register_timeseries_workflow_specs( """ Register timeseries workflow specs (lightweight, no heavy dependencies). - This is the first phase of two-phase registration. Call this from - instrument specs.py modules. - - If the workflow is already registered (e.g., auto-registered in - Instrument.__post_init__()), returns the existing handle. - Parameters ---------- instrument @@ -35,19 +29,6 @@ def register_timeseries_workflow_specs( if not source_names: return None - from ..config.workflow_spec import WorkflowId - - workflow_id = WorkflowId( - instrument=instrument.name, - namespace='timeseries', - name='timeseries_data', - version=1, - ) - if workflow_id in instrument.workflow_factory._workflow_specs: - from ..handlers.workflow_factory import SpecHandle - - return SpecHandle(workflow_id=workflow_id, _factory=instrument.workflow_factory) - return instrument.register_spec( namespace='timeseries', name='timeseries_data',