diff --git a/pyproject.toml b/pyproject.toml index 63600d7e..a0075ebb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ filterwarnings = [ line-length = 88 extend-include = ["*.ipynb"] extend-exclude = [ - ".*", "__pycache__", "build", "dist", "install", + ".*", "__pycache__", "build", "dist", "install", "test-stream-processor.py", ] [tool.ruff.lint] diff --git a/src/ess/estia/conversions.py b/src/ess/estia/conversions.py index aff84e59..9c698a6c 100644 --- a/src/ess/estia/conversions.py +++ b/src/ess/estia/conversions.py @@ -64,6 +64,8 @@ def wavelength( ): "Converts event_time_offset to wavelength" # Use frame unwrapping from scippneutron + w = sc.bins_like(event_time_offset, sc.scalar(7.0, unit='angstrom')) + return w raise NotImplementedError() @@ -76,7 +78,19 @@ def coordinate_transformation_graph() -> CoordTransformationGraph: "L1": lambda source_position, sample_position: sc.norm( sample_position - source_position ), # + extra correction for guides? - "L2": lambda position, sample_position: sc.norm(position - sample_position), + "L2": lambda position, sample_position: sc.norm( + position - sample_position.to(unit=position.unit) + ), + "blade": lambda detector_number: sc.array( + dims=detector_number.dims, values=(detector_number.values - 1) // (32 * 64) + ), + "wire": lambda detector_number: sc.array( + dims=detector_number.dims, values=((detector_number.values - 1) // 64) % 32 + ), + "strip": lambda detector_number: sc.array( + dims=detector_number.dims, values=(detector_number.values - 1) % 64 + ), + "z_index": lambda wire, blade: 32 * blade + wire, } diff --git a/src/ess/estia/corrections.py b/src/ess/estia/corrections.py index db5f60c5..829a647d 100644 --- a/src/ess/estia/corrections.py +++ b/src/ess/estia/corrections.py @@ -12,9 +12,11 @@ BeamDivergenceLimits, CoordTransformationGraph, DetectorData, + DetectorRotation, ProtonCurrent, ReducibleData, RunType, + SampleRotation, WavelengthBins, YIndexLimits, ZIndexLimits, @@ -29,13 +31,19 @@ def add_coords_masks_and_apply_corrections( bdlim: BeamDivergenceLimits, wbins: WavelengthBins, proton_current: ProtonCurrent[RunType], + sample_rotation: SampleRotation[RunType], + detector_rotation: DetectorRotation[RunType], graph: CoordTransformationGraph, ) -> ReducibleData[RunType]: """ Computes coordinates, masks and corrections that are the same for the sample measurement and the reference measurement. """ + da = da.assign_coords( + sample_rotation=sample_rotation, detector_rotation=detector_rotation + ) da = add_coords(da, graph) + da = da.transform_coords(('blade', 'wire', 'strip', 'z_index'), graph) da = add_masks(da, ylim, zlims, bdlim, wbins) da = correct_by_footprint(da) diff --git a/src/ess/reflectometry/conversions.py b/src/ess/reflectometry/conversions.py index 7c05b278..984e84f2 100644 --- a/src/ess/reflectometry/conversions.py +++ b/src/ess/reflectometry/conversions.py @@ -37,8 +37,9 @@ def add_proton_current_coord( ) -> sc.DataArray: """Find the proton current value for each event and adds it as a coord to the data array.""" + time = pc.coords['time'].to(unit='ns', dtype='int64') pc_lookup = sc.lookup( - pc.assign_coords(time=pc.coords['time'].to(unit='ns')), + pc.assign_coords(time=time + sc.datetime(0, unit=time.unit)), dim='time', mode='previous', fill_value=sc.scalar(float('nan'), unit=pc.unit), diff --git a/test-stream-processor.py b/test-stream-processor.py new file mode 100644 index 00000000..79798276 --- /dev/null +++ b/test-stream-processor.py @@ -0,0 +1,167 @@ +# ruff: noqa +from copy import deepcopy +from typing import TypeVar + +import numpy as np +import scipp as sc +from scippnexus import NXdetector + +import ess.estia as estia +import ess.reflectometry as reflectometry +from ess.amor.types import * +from ess.reduce.streaming import Accumulator, EternalAccumulator, StreamProcessor +from ess.reflectometry.types import * + + +def add_dummmy_value_to_all_transformations(): + import h5py + + with h5py.File('estia.hdf', 'r+') as f: + + def visitor(name, g): + if ( + isinstance(g, h5py.Group) + and g.attrs['NX_class'] == 'NXlog' + and 'transformation_type' in g.attrs + ): + print(g) + + if len(g['time']) == 0: + print('Fixed') + + attrs = dict(f[name + '/time'].attrs) + del f[name + '/time'] + f.create_dataset(name + '/time', data=np.zeros(1)) + for k, v in attrs.items(): + f[name + '/time'].attrs[k] = v + + attrs = dict(f[name + '/value'].attrs) + del f[name + '/value'] + f.create_dataset(name + '/value', data=np.zeros(1)) + for k, v in attrs.items(): + f[name + '/value'].attrs[k] = v + + f.visititems(visitor) + + with h5py.File('estia.hdf', 'r') as f: + + def visitor(name, g): + if ( + isinstance(g, h5py.Group) + and g.attrs['NX_class'] == 'NXlog' + and 'transformation_type' in g.attrs + ): + print(name, len(g['time'])) + + f.visititems(visitor) + + +# add_dummmy_value_to_all_transformations() + + +def estia_stream_workflow() -> StreamProcessor: + workflow = estia.EstiaWorkflow() + workflow[SampleSize[SampleRun]] = sc.scalar(10.0, unit='mm') + workflow[SampleSize[ReferenceRun]] = sc.scalar(10.0, unit='mm') + + workflow[ChopperPhase[ReferenceRun]] = sc.scalar(-7.5, unit='deg') + workflow[ChopperPhase[SampleRun]] = sc.scalar(-7.5, unit='deg') + + workflow[QBins] = sc.geomspace( + dim='Q', start=0.005, stop=0.3, num=391, unit='1/angstrom' + ) + workflow[WavelengthBins] = sc.geomspace( + 'wavelength', 2.8, 12.5, 2001, unit='angstrom' + ) + + workflow[YIndexLimits] = sc.scalar(11), sc.scalar(41) + workflow[ZIndexLimits] = sc.scalar(80), sc.scalar(370) + workflow[BeamDivergenceLimits] = ( + sc.scalar(-0.75, unit='deg'), + sc.scalar(0.75, unit='deg'), + ) + + workflow[SampleRotationOffset[SampleRun]] = sc.scalar(0.05, unit='deg') + + workflow[SampleRotationOffset[ReferenceRun]] = sc.scalar(0.05, unit='deg') + + workflow[Filename[SampleRun]] = './estia.hdf' + workflow[Filename[ReferenceRun]] = './estia.hdf' + + workflow[ProtonCurrent[SampleRun]] = sc.DataArray( + sc.ones(dims=['time'], shape=(1,)), + coords={'time': sc.zeros(dims=['time'], shape=(1,), unit='s')}, + ) + workflow[ProtonCurrent[ReferenceRun]] = sc.DataArray( + sc.ones(dims=['time'], shape=(1,)), + coords={'time': sc.zeros(dims=['time'], shape=(1,), unit='s')}, + ) + workflow[DetectorSpatialResolution[SampleRun]] = 0.0025 * sc.units.m + workflow[DetectorSpatialResolution[ReferenceRun]] = 0.0025 * sc.units.m + import ess.reflectometry.supermirror as sm + + workflow[sm.CriticalEdge] = 0.022 * sc.Unit("1/angstrom") + workflow[sm.Alpha] = sc.scalar(0.25 / 0.088, unit=sc.units.angstrom) + workflow[sm.MValue] = 5 + + workflow[SampleRotation[SampleRun]] = sc.scalar(1.0, unit='deg') + workflow[DetectorRotation[SampleRun]] = sc.scalar(2.0, unit='deg') + workflow[SampleRotation[ReferenceRun]] = sc.scalar(1.0, unit='deg') + workflow[DetectorRotation[ReferenceRun]] = sc.scalar(2.0, unit='deg') + + workflow[DetectorData[ReferenceRun]] = workflow.compute(DetectorData[SampleRun]) + + wf = workflow.copy() + return StreamProcessor( + wf, + dynamic_keys=(NeXusComponent[NXdetector, SampleRun],), + target_keys=(ReflectivityOverQ,), + accumulators={ + Sample: EternalAccumulator, + Reference: LatestAccumulator, + }, + ) + + +T = TypeVar('T') + + +class LatestAccumulator(Accumulator[T]): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._value: T | None = None + + @property + def is_empty(self) -> bool: + return self._value is None + + def _get_value(self) -> T: + return deepcopy(self._value) + + def _do_push(self, value: T) -> None: + self._value = deepcopy(value) + + def clear(self) -> None: + """Clear the accumulated value.""" + self._value = None + + +fake = sc.DataGroup( + { + 'data': sc.DataArray( + sc.ones(dims=['events'], shape=[10000]), + coords={ + 'event_id': sc.arange('events', 1, 10001), + 'event_time_offset': sc.arange('events', 1, 10001, unit='us'), + }, + ) + } +) + +s = estia_stream_workflow() +s.accumulate( + { + NeXusComponent[NXdetector, SampleRun]: fake, + } +) +s.finalize()