Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/ess/livedata/config/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ class Instrument:
active_namespace: str | None = None
_detector_group_names: dict[str, str] = field(default_factory=dict)
_monitor_workflow_handle: SpecHandle | None = field(default=None, init=False)
_monitor_ratemeter_handle: SpecHandle | None = field(default=None, init=False)
_timeseries_workflow_handle: SpecHandle | None = field(default=None, init=False)

def __post_init__(self) -> None:
"""Auto-register standard workflow specs based on instrument metadata."""
from ess.livedata.handlers.monitor_workflow_specs import (
register_monitor_ratemeter_spec,
register_monitor_workflow_specs,
)
from ess.livedata.handlers.timeseries_workflow_specs import (
Expand All @@ -74,6 +76,17 @@ def __post_init__(self) -> None:
instrument=self, source_names=self.monitors
)

# Only register monitor interval timeseries if nexus file is available
# (required by the workflow to create GenericTofWorkflow)
try:
_ = self.nexus_file # Cache in _nexus_file if successful
except ValueError:
pass # nexus file not available for this instrument
else:
self._monitor_ratemeter_handle = register_monitor_ratemeter_spec(
instrument=self, source_names=self.monitors
)

timeseries_names = list(self.f144_attribute_registry.keys())
self._timeseries_workflow_handle = register_timeseries_workflow_specs(
instrument=self, source_names=timeseries_names
Expand Down Expand Up @@ -241,6 +254,14 @@ def load_factories(self) -> None:
MonitorStreamProcessor.create_workflow
)

if self._monitor_ratemeter_handle is not None:
from ess.livedata.handlers.monitor_data_handler import (
create_monitor_ratemeter_factory,
)

factory = create_monitor_ratemeter_factory(self)
self._monitor_ratemeter_handle.attach_factory()(factory)

if self._timeseries_workflow_handle is not None:
from ess.livedata.handlers.timeseries_handler import (
TimeseriesStreamProcessor,
Expand Down
94 changes: 1 addition & 93 deletions src/ess/livedata/config/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,13 @@

from __future__ import annotations

from typing import Any, NewType
from typing import Any

import pydantic
import sciline
import scipp as sc

from ess.livedata import parameter_models
from ess.livedata.config import Instrument
from ess.livedata.config.workflow_spec import WorkflowOutputsBase
from ess.livedata.handlers.accumulators import LogData
from ess.livedata.handlers.stream_processor_workflow import StreamProcessorWorkflow
from ess.livedata.handlers.to_nxlog import ToNXlog
from ess.reduce import streaming
from ess.reduce.nexus.types import Filename, MonitorData, NeXusData, NeXusName
from ess.reduce.time_of_flight import GenericTofWorkflow


class MonitorTimeseriesParams(pydantic.BaseModel):
"""Parameters for the monitor timeseries workflow."""

toa_range: parameter_models.TOARange = pydantic.Field(
title="Time of Arrival Range",
description="Time of arrival range to include.",
default=parameter_models.TOARange(),
)


class MonitorTimeseriesOutputs(WorkflowOutputsBase):
"""Outputs for the monitor timeseries workflow."""

monitor_counts: sc.DataArray = pydantic.Field(
title="Monitor Counts",
description="Timeseries of monitor counts within the specified TOA range.",
)


CustomMonitor = NewType('CustomMonitor', int)
CurrentRun = NewType('CurrentRun', int)
MonitorCountsInInterval = NewType('MonitorCountsInInterval', sc.DataArray)


def _get_interval(
data: MonitorData[CurrentRun, CustomMonitor], range: parameter_models.TOARange
) -> MonitorCountsInInterval:
start, stop = range.range_ns
if data.bins is not None:
counts = data.bins['event_time_offset', start:stop].sum()
counts.coords['time'] = data.coords['event_time_zero'][0]
else:
# Include the full time(of arrival) bin at start and stop. Do we need more
# precision here?
# Note the current ECDC convention: time is the time offset w.r.t. the frame,
# i.e., the pulse, frame_time is the absolute time (since epoch).
counts = data['time', start:stop].sum()
counts.coords['time'] = data.coords['frame_time'][0]
return MonitorCountsInInterval(counts)


class TimeseriesAccumulator(streaming.Accumulator[sc.DataArray]):
Expand Down Expand Up @@ -95,46 +46,3 @@ def _do_push(self, value: sc.DataArray) -> None:
def clear(self) -> None:
if self._to_nxlog is not None:
self._to_nxlog.clear()


def _prepare_workflow(instrument: Instrument, monitor_name: str) -> sciline.Pipeline:
workflow = GenericTofWorkflow(run_types=[CurrentRun], monitor_types=[CustomMonitor])
workflow[Filename[CurrentRun]] = instrument.nexus_file
workflow[NeXusName[CustomMonitor]] = monitor_name
workflow.insert(_get_interval)
return workflow


def register_monitor_timeseries_workflows(
instrument: Instrument, source_names: list[str]
) -> None:
"""Register monitor timeseries workflows for the given instrument and source names.

Parameters
----------
instrument
The instrument for which to register the workflows.
source_names
The source names (monitor names) for which to register the workflows.
"""

@instrument.register_workflow(
name='monitor_interval_timeseries',
version=1,
title='Monitor Interval Timeseries',
description='Timeseries of counts in a monitor within a specified '
'time-of-arrival range.',
source_names=source_names,
outputs=MonitorTimeseriesOutputs,
)
def monitor_timeseries_workflow(
source_name: str, params: MonitorTimeseriesParams
) -> StreamProcessorWorkflow:
wf = _prepare_workflow(instrument, monitor_name=source_name)
wf[parameter_models.TOARange] = params.toa_range
return StreamProcessorWorkflow(
base_workflow=wf,
dynamic_keys={source_name: NeXusData[CustomMonitor, CurrentRun]},
target_keys={'monitor_counts': MonitorCountsInInterval},
accumulators={MonitorCountsInInterval: TimeseriesAccumulator},
)
80 changes: 79 additions & 1 deletion src/ess/livedata/handlers/monitor_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,28 @@
from __future__ import annotations

from collections.abc import Hashable
from typing import NewType

import numpy as np
import scipp as sc

from ess.reduce.nexus.types import Filename, NeXusData, NeXusName
from ess.reduce.time_of_flight import GenericTofWorkflow

from .. import parameter_models
from ..config.workflows import TimeseriesAccumulator
from ..core.handler import JobBasedPreprocessorFactoryBase
from ..core.message import StreamId, StreamKind
from .accumulators import Accumulator, CollectTOA, Cumulative, MonitorEvents
from .monitor_workflow_specs import MonitorDataParams
from .monitor_workflow_specs import MonitorDataParams, MonitorRatemeterParams
from .stream_processor_workflow import StreamProcessorWorkflow
from .workflow_factory import Workflow

# Type aliases for monitor ratemeter workflow
CustomMonitor = NewType('CustomMonitor', int)
CurrentRun = NewType('CurrentRun', int)
MonitorCountsInInterval = NewType('MonitorCountsInInterval', sc.DataArray)


class MonitorStreamProcessor(Workflow):
def __init__(self, edges: sc.Variable) -> None:
Expand Down Expand Up @@ -82,3 +94,69 @@ def make_preprocessor(self, key: StreamId) -> Accumulator | None:
return CollectTOA()
case _:
return None


def _get_interval(
data: NeXusData[CustomMonitor, CurrentRun],
range: parameter_models.TOARange,
) -> MonitorCountsInInterval:
"""Extract monitor counts within a time-of-arrival interval."""
start, stop = range.range_ns
if data.bins is not None:
counts = data.bins['event_time_offset', start:stop].sum()
counts.coords['time'] = data.coords['event_time_zero'][0]
else:
counts = data['time', start:stop].sum()
counts.coords['time'] = data.coords['frame_time'][0]
return MonitorCountsInInterval(counts)


def create_monitor_ratemeter_factory(instrument):
"""
Create factory function for monitor ratemeter workflow.

This is generic workflow logic that can be used by any instrument.
Auto-attached by Instrument.load_factories().

Parameters
----------
instrument
Instrument instance

Returns
-------
:
Factory function that can be attached to the spec handle
"""

def factory(
source_name: str,
params: MonitorRatemeterParams,
) -> StreamProcessorWorkflow:
"""Factory function for monitor interval timeseries workflow.

Parameters
----------
source_name:
Monitor source name
params:
MonitorTimeseriesParams with toa_range configuration

Returns
-------
:
StreamProcessorWorkflow instance
"""
wf = GenericTofWorkflow(run_types=[CurrentRun], monitor_types=[CustomMonitor])
wf[Filename[CurrentRun]] = instrument.nexus_file
wf[NeXusName[CustomMonitor]] = source_name
wf[parameter_models.TOARange] = params.toa_range
wf.insert(_get_interval)
return StreamProcessorWorkflow(
base_workflow=wf,
dynamic_keys={source_name: NeXusData[CustomMonitor, CurrentRun]},
target_keys={'monitor_counts': MonitorCountsInInterval},
accumulators={MonitorCountsInInterval: TimeseriesAccumulator},
)

return factory
70 changes: 54 additions & 16 deletions src/ess/livedata/handlers/monitor_workflow_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from __future__ import annotations

import pydantic
import scipp as sc

from .. import parameter_models
from ..config.instrument import Instrument
from ..config.workflow_spec import WorkflowOutputsBase
from ..handlers.workflow_factory import SpecHandle


Expand All @@ -26,17 +28,31 @@ class MonitorDataParams(pydantic.BaseModel):
)


class MonitorRatemeterParams(pydantic.BaseModel):
"""Parameters for monitor ratemeter workflow."""

toa_range: parameter_models.TOARange = pydantic.Field(
title="Time of Arrival Range",
description="Time of arrival range to include.",
default=parameter_models.TOARange(),
)


class MonitorRatemeterOutputs(WorkflowOutputsBase):
"""Outputs for monitor ratemeter workflow."""

monitor_counts: sc.DataArray = pydantic.Field(
title="Monitor Counts",
description="Monitor counts within the specified TOA range.",
)


def register_monitor_workflow_specs(
instrument: Instrument, source_names: list[str]
) -> SpecHandle | None:
"""
Register monitor workflow specs (lightweight, no heavy dependencies).

This is the first phase of two-phase registration.

If the workflow is already registered (e.g., auto-registered in
Instrument.__post_init__()), returns the existing handle.

Parameters
----------
instrument
Expand All @@ -52,17 +68,6 @@ def register_monitor_workflow_specs(
if not source_names:
return None

from ..config.workflow_spec import WorkflowId

workflow_id = WorkflowId(
instrument=instrument.name,
namespace='monitor_data',
name='monitor_histogram',
version=1,
)
if workflow_id in instrument.workflow_factory._workflow_specs:
return SpecHandle(workflow_id=workflow_id, _factory=instrument.workflow_factory)

return instrument.register_spec(
namespace='monitor_data',
name='monitor_histogram',
Expand All @@ -73,3 +78,36 @@ def register_monitor_workflow_specs(
source_names=source_names,
params=MonitorDataParams,
)


def register_monitor_ratemeter_spec(
instrument: Instrument, source_names: list[str]
) -> SpecHandle | None:
"""
Register monitor ratemeter workflow spec.

Parameters
----------
instrument
The instrument to register the workflow spec for.
source_names
List of monitor names (source names) for which to register the workflow.
If empty, returns None without registering.

Returns
-------
SpecHandle for later factory attachment, or None if no monitors.
"""
if not source_names:
return None

return instrument.register_spec(
namespace='monitor_data',
name='monitor_ratemeter',
version=1,
title='Monitor Ratemeter',
description='Monitor counts within a specified time-of-arrival range.',
source_names=source_names,
params=MonitorRatemeterParams,
outputs=MonitorRatemeterOutputs,
)
19 changes: 0 additions & 19 deletions src/ess/livedata/handlers/timeseries_workflow_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ def register_timeseries_workflow_specs(
"""
Register timeseries workflow specs (lightweight, no heavy dependencies).

This is the first phase of two-phase registration. Call this from
instrument specs.py modules.

If the workflow is already registered (e.g., auto-registered in
Instrument.__post_init__()), returns the existing handle.

Parameters
----------
instrument
Expand All @@ -35,19 +29,6 @@ def register_timeseries_workflow_specs(
if not source_names:
return None

from ..config.workflow_spec import WorkflowId

workflow_id = WorkflowId(
instrument=instrument.name,
namespace='timeseries',
name='timeseries_data',
version=1,
)
if workflow_id in instrument.workflow_factory._workflow_specs:
from ..handlers.workflow_factory import SpecHandle

return SpecHandle(workflow_id=workflow_id, _factory=instrument.workflow_factory)

return instrument.register_spec(
namespace='timeseries',
name='timeseries_data',
Expand Down
Loading