diff --git a/docs/api-reference/index.md b/docs/api-reference/index.md index 50d92e43..59650582 100644 --- a/docs/api-reference/index.md +++ b/docs/api-reference/index.md @@ -30,6 +30,7 @@ live logging nexus + normalization streaming time_of_flight ui diff --git a/src/ess/reduce/__init__.py b/src/ess/reduce/__init__.py index a9175c8b..d65bb3aa 100644 --- a/src/ess/reduce/__init__.py +++ b/src/ess/reduce/__init__.py @@ -4,7 +4,7 @@ import importlib.metadata -from . import nexus, time_of_flight, uncertainty +from . import nexus, normalization, time_of_flight, uncertainty try: __version__ = importlib.metadata.version("essreduce") @@ -13,4 +13,4 @@ del importlib -__all__ = ["nexus", "time_of_flight", "uncertainty"] +__all__ = ["nexus", "normalization", "time_of_flight", "uncertainty"] diff --git a/src/ess/reduce/normalization.py b/src/ess/reduce/normalization.py new file mode 100644 index 00000000..46e5e502 --- /dev/null +++ b/src/ess/reduce/normalization.py @@ -0,0 +1,200 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Normalization routines for neutron data reduction.""" + +import functools + +import scipp as sc + +from .uncertainty import UncertaintyBroadcastMode, broadcast_uncertainties + + +def normalize_by_monitor_histogram( + detector: sc.DataArray, + *, + monitor: sc.DataArray, + uncertainty_broadcast_mode: UncertaintyBroadcastMode, +) -> sc.DataArray: + """Normalize detector data by a normalized histogrammed monitor. + + This normalization accounts for both the (wavelength) profile of the incident beam + and the integrated neutron flux, meaning measurement duration and source strength. + + - For *event* detectors, the monitor values are mapped to the detector + using :func:`scipp.lookup`. That is, for detector event :math:`d_i`, + :math:`m_i` is the monitor bin value at the same coordinate. + - For *histogram* detectors, the monitor is rebinned using to the detector + binning using :func:`scipp.rebin`. Thus, detector value :math:`d_i` and + monitor value :math:`m_i` correspond to the same bin. + + In both cases, let :math:`x_i` be the lower bound of monitor bin :math:`i` + and let :math:`\\Delta x_i = x_{i+1} - x_i` be the width of that bin. + + The detector is normalized according to + + .. math:: + + d_i^\\text{Norm} = \\frac{d_i}{m_i} \\Delta x_i + + Parameters + ---------- + detector: + Input detector data. + Must have a coordinate named ``monitor.dim``, that is, the single + dimension name of the **monitor**. + monitor: + A histogrammed monitor. + Must be one-dimensional and have a dimension coordinate, typically "wavelength". + uncertainty_broadcast_mode: + Choose how uncertainties of the monitor are broadcast to the sample data. + + Returns + ------- + : + ``detector`` normalized by ``monitor``. + If the monitor has masks or contains non-finite values, the output has a mask + called '_monitor_mask' constructed from the monitor masks and non-finite values. + + See also + -------- + normalize_by_monitor_integrated: + Normalize by an integrated monitor. + """ + _check_monitor_range_contains_detector(monitor=monitor, detector=detector) + + dim = monitor.dim + + if detector.bins is None: + monitor = monitor.rebin({dim: detector.coords[dim]}) + detector = _mask_detector_for_norm(detector=detector, monitor=monitor) + coord = monitor.coords[dim] + delta_w = sc.DataArray(coord[1:] - coord[:-1], masks=monitor.masks) + norm = broadcast_uncertainties( + monitor / delta_w, prototype=detector, mode=uncertainty_broadcast_mode + ) + + if detector.bins is None: + return detector / norm.rebin({dim: detector.coords[dim]}) + return detector.bins / sc.lookup(norm, dim=dim) + + +def normalize_by_monitor_integrated( + detector: sc.DataArray, + *, + monitor: sc.DataArray, + uncertainty_broadcast_mode: UncertaintyBroadcastMode, +) -> sc.DataArray: + """Normalize detector data by an integrated monitor. + + This normalization accounts only for the integrated neutron flux, + meaning measurement duration and source strength. + It does *not* account for the (wavelength) profile of the incident beam. + For that, see :func:`normalize_by_monitor_histogram`. + + Let :math:`d_i` be a detector event or the counts in a detector bin. + The normalized detector is + + .. math:: + + d_i^\\text{Norm} = \\frac{d_i}{\\sum_j\\, m_j} + + where :math:`m_j` is the monitor counts in bin :math:`j`. + Note that this is not a true integral but only a sum over monitor events. + + The result depends on the range of the monitor but not its + binning within that range. + + Parameters + ---------- + detector: + Input detector data. + monitor: + A histogrammed monitor. + Must be one-dimensional and have a dimension coordinate, typically "wavelength". + uncertainty_broadcast_mode: + Choose how uncertainties of the monitor are broadcast to the sample data. + + Returns + ------- + : + `detector` normalized by a monitor. + If the monitor has masks or contains non-finite values, the output has a mask + called '_monitor_mask' constructed from the monitor masks and non-finite values. + + See also + -------- + normalize_by_monitor_histogram: + Normalize by a monitor histogram. + """ + _check_monitor_range_contains_detector(monitor=monitor, detector=detector) + detector = _mask_detector_for_norm(detector=detector, monitor=monitor) + norm = monitor.data.sum() + norm = broadcast_uncertainties( + norm, prototype=detector, mode=uncertainty_broadcast_mode + ) + return detector / norm + + +def _check_monitor_range_contains_detector( + *, monitor: sc.DataArray, detector: sc.DataArray +) -> None: + dim = monitor.dim + if not monitor.coords.is_edges(dim): + raise sc.CoordError( + f"Monitor coordinate '{dim}' must be bin-edges to integrate the monitor." + ) + + # Prefer a bin coord over an event coord because this makes the behavior for binned + # and histogrammed data consistent. If we used an event coord, we might allow a + # monitor range that is less than the detector bins which is fine for the events, + # but would be wrong if the detector was subsequently histogrammed. + if (det_coord := detector.coords.get(dim)) is not None: + lo = det_coord[dim, :-1].nanmin() + hi = det_coord[dim, 1:].nanmax() + elif (det_coord := detector.bins.coords.get(dim)) is not None: + lo = det_coord.nanmin() + hi = det_coord.nanmax() + else: + raise sc.CoordError( + f"Missing '{dim}' coordinate in detector for monitor normalization." + ) + + if monitor.coords[dim].min() > lo or monitor.coords[dim].max() < hi: + raise ValueError( + f"Cannot normalize by monitor: The {dim} range of the monitor " + f"({monitor.coords[dim].min():c} to {monitor.coords[dim].max():c}) " + f"is smaller than the range of the detector ({lo:c} to {hi:c})." + ) + + +def _mask_detector_for_norm( + *, detector: sc.DataArray, monitor: sc.DataArray +) -> sc.DataArray: + """Mask the detector where the monitor is masked. + + For performance, this applies the monitor mask to the detector bins. + This can lead to masking more events than strictly necessary if we + used an event mask. + """ + if (monitor_mask := _monitor_mask(monitor)) is None: + return detector + + # Use rebin to reshape the mask to the detector: + dim = monitor.dim + mask = sc.DataArray(monitor_mask, coords={dim: monitor.coords[dim]}).rebin( + {dim: detector.coords[dim]} + ).data != sc.scalar(0, unit=None) + return detector.assign_masks({"_monitor_mask": mask}) + + +def _monitor_mask(monitor: sc.DataArray) -> sc.Variable | None: + """Mask nonfinite monitor values and combine all masks.""" + masks = list(monitor.masks.values()) + + finite = sc.isfinite(monitor.data) + if not finite.all(): + masks.append(~finite) + + if not masks: + return None + return functools.reduce(sc.logical_or, masks) diff --git a/tests/normalization_test.py b/tests/normalization_test.py new file mode 100644 index 00000000..a4d3d2e4 --- /dev/null +++ b/tests/normalization_test.py @@ -0,0 +1,639 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) + +import numpy as np +import pytest +import scipp as sc +import scipp.testing + +from ess.reduce.normalization import ( + normalize_by_monitor_histogram, + normalize_by_monitor_integrated, +) +from ess.reduce.uncertainty import UncertaintyBroadcastMode + + +class TestNormalizeByMonitorHistogram: + def test_aligned_bins_w_event_coord(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 5], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + + sc.testing.assert_identical(normalized, expected) + + def test_aligned_bins_wo_event_coord(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 5], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + + sc.testing.assert_identical(normalized, expected) + + def test_aligned_bins_hist(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[10, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[4.0, 5.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + + sc.testing.assert_identical(normalized, expected) + + def test_monitor_envelops_detector_bin(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 2.5], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[-1, 1.5, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 5, 7.5], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 2.5], unit='Å')) + + sc.testing.assert_identical(normalized, expected) + + def test_monitor_envelops_detector_bin_hist( + self, + ) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[10, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 2.5], unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[-1, 1.5, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + # These values are different from the case with binned data in + # test_normalize_by_monitor_histogram_monitor_envelops_detector_bin + # because the monitor gets rebinned to match the detector bins. + expected = sc.DataArray( + sc.array(dims=['w'], values=[4, 15 / 2], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 2.5], unit='Å')}, + ) + + sc.testing.assert_identical(normalized, expected) + + def test_detector_envelops_monitor_bin(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 1.5, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0, 2, 2.5], unit='Å')}, + ) + with pytest.raises(ValueError, match="smaller than the range of the detector"): + normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + def test_detector_envelops_monitor_bin_hist( + self, + ) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[10, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 1.5, 3], unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0, 2, 2.5], unit='Å')}, + ) + with pytest.raises(ValueError, match="smaller than the range of the detector"): + normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + def test_extra_bins_in_monitor(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[4.0, 5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[-1.0, 0, 2, 3, 4], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 5], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + + sc.testing.assert_identical(normalized, expected) + + def test_extra_bins_in_monitor_hist(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[10, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[4.0, 5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[-1.0, 0, 2, 3, 4], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[4.0, 5], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + + sc.testing.assert_identical(normalized, expected) + + def test_extra_bins_in_detector(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[-10, 0, 10, 30, 40], unit='counts'), + coords={'w': sc.arange('w', -1.0, 4.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[-1.0, 0, 2, 3, 4], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0, 2, 3], unit='Å')}, + ) + with pytest.raises(ValueError, match="smaller than the range of the detector"): + normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + def test_finer_bins_in_detector(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 1, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 5], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 1, 2, 3], unit='Å')) + + sc.testing.assert_identical(normalized, expected) + + def test_finer_bins_in_detector_hist(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 1, 2, 3], unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 5], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 1, 2, 3], unit='Å')}, + ) + + sc.testing.assert_identical(normalized, expected) + + def test_finer_bins_in_monitor(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 8.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 1, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[0.0, 5 / 4, 5], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + + sc.testing.assert_allclose(normalized, expected) + + def test_finer_bins_in_monitor_hist(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[10, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 8.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 1, 2, 3], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = sc.DataArray( + sc.array(dims=['w'], values=[20 / 13, 5], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + ) + + sc.testing.assert_allclose(normalized, expected) + + def test_monitor_mask_aligned_bins(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')}, + masks={'m': sc.array(dims=['w'], values=[False, True, False])}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = ( + sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 0, 90 / 7], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + .assign_masks( + _monitor_mask=sc.array(dims=['w'], values=[False, True, False]) + ) + ) + + sc.testing.assert_allclose(normalized, expected) + + def test_monitor_mask_multiple(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')}, + masks={ + 'm1': sc.array(dims=['w'], values=[False, True, False]), + 'm2': sc.array(dims=['w'], values=[False, True, True]), + }, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = ( + sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 0, 0], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + .assign_masks( + _monitor_mask=sc.array(dims=['w'], values=[False, True, True]) + ) + ) + + sc.testing.assert_identical(normalized, expected) + + def test_monitor_and_detector_mask_aligned_bins(self) -> None: + detector = ( + sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + .assign_masks(d=sc.array(dims=['w'], values=[True, False, False])) + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')}, + masks={'m': sc.array(dims=['w'], values=[False, True, False])}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = ( + sc.DataArray( + sc.array(dims=['w'], values=[0.0, 4, 0, 90 / 7], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + .assign_masks( + d=sc.array(dims=['w'], values=[True, False, False]), + _monitor_mask=sc.array(dims=['w'], values=[False, True, False]), + ) + ) + + sc.testing.assert_allclose(normalized, expected) + + def test_monitor_mask_unaligned_bins(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[-10, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0, 8.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3.5, 4, 7], unit='Å')}, + masks={'m': sc.array(dims=['w'], values=[False, True, False, False])}, + ) + + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = ( + sc.DataArray( + sc.array(dims=['w'], values=[-4, 0, 0, 45 / 4], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + .assign_masks( + _monitor_mask=sc.array(dims=['w'], values=[True, True, False]) + ) + ) + + sc.testing.assert_identical(normalized, expected) + + def test_monitor_mask_at_edge(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 30], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')}, + masks={'m': sc.array(dims=['w'], values=[False, True])}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = ( + sc.DataArray( + sc.array(dims=['w'], values=[0, 4, 0], unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + .assign_masks(_monitor_mask=sc.array(dims=['w'], values=[False, True])) + ) + + sc.testing.assert_identical(normalized, expected) + + @pytest.mark.parametrize("nonfinite_value", [np.nan, np.inf]) + def test_nonfinite_in_monitor_gets_masked( + self, + nonfinite_value: float, + ) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[0, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[nonfinite_value, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')}, + ) + normalized = normalize_by_monitor_histogram( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + expected = ( + sc.DataArray( + sc.array( + dims=['w'], + values=[1 / nonfinite_value, 1 / nonfinite_value, 10 / 3, 90 / 7], + unit='counts', + ), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ) + .bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + .assign_masks( + _monitor_mask=sc.array(dims=['w'], values=[True, False, False]) + ) + ) + + sc.testing.assert_allclose(normalized, expected) + + def test_independent_of_monitor_binning_bin(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[3, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + + monitor1 = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 2, 4, 8], unit='Å')}, + ) + monitor2 = monitor1.rebin( + w=sc.array(dims=['w'], values=[1.0, 2, 3, 4, 7], unit='Å') + ) + + normalized1 = normalize_by_monitor_histogram( + detector, + monitor=monitor1, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + normalized2 = normalize_by_monitor_histogram( + detector, + monitor=monitor2, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + sc.testing.assert_identical(normalized1, normalized2) + + def test_independent_of_monitor_binning_hist(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[10, 20, 30], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')}, + ) + + monitor1 = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 2, 4, 8], unit='Å')}, + ) + monitor2 = monitor1.rebin( + w=sc.array(dims=['w'], values=[1.0, 2, 3, 4, 7], unit='Å') + ) + + normalized1 = normalize_by_monitor_histogram( + detector, + monitor=monitor1, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + normalized2 = normalize_by_monitor_histogram( + detector, + monitor=monitor2, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + sc.testing.assert_identical(normalized1, normalized2) + + +class TestNormalizeByMonitorIntegrated: + def test_expected_results_bin(self) -> None: + detector = sc.DataArray( + sc.arange('w', 1, 4, unit='counts'), + coords={'w': sc.arange('w', 3.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[4.0, 5.0, 6.0, 10], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 0.5, 2, 3, 4], unit='Å')}, + ) + normalized = normalize_by_monitor_integrated( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + expected = detector / monitor.sum() + sc.testing.assert_identical(normalized, expected) + + def test_expected_results_hist(self) -> None: + detector = sc.DataArray( + sc.arange('w', 1, 4, unit='counts'), + coords={'w': sc.arange('w', 4.0, unit='Å')}, + ) + monitor = sc.DataArray( + sc.array(dims=['w'], values=[4.0, 5.0, 6.0, 10], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[0.0, 0.5, 2, 3, 4], unit='Å')}, + ) + normalized = normalize_by_monitor_integrated( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + expected = detector / monitor.sum() + sc.testing.assert_identical(normalized, expected) + + def test_raises_if_monitor_range_too_narrow( + self, + ) -> None: + detector = sc.DataArray( + sc.arange('wavelength', 3, unit='counts'), + coords={'wavelength': sc.arange('wavelength', 3.0, unit='Å')}, + ).bin(wavelength=sc.array(dims=['wavelength'], values=[0.0, 2, 3], unit='Å')) + monitor = sc.DataArray( + sc.array(dims=['wavelength'], values=[4.0, 10.0], unit='counts'), + coords={ + 'wavelength': sc.array( + dims=['wavelength'], values=[1.0, 3, 4], unit='Å' + ) + }, + ) + with pytest.raises(ValueError, match="smaller than the range of the detector"): + normalize_by_monitor_integrated( + detector, + monitor=monitor, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + def test_independent_of_monitor_binning(self) -> None: + detector = sc.DataArray( + sc.array(dims=['w'], values=[3, 10, 20, 30], unit='counts'), + coords={'w': sc.arange('w', 1.0, 5.0, unit='Å')}, + ).bin(w=sc.array(dims=['w'], values=[1.0, 3, 4, 7], unit='Å')) + + monitor1 = sc.DataArray( + sc.array(dims=['w'], values=[5.0, 6.0, 7.0], unit='counts'), + coords={'w': sc.array(dims=['w'], values=[1.0, 2, 4, 7], unit='Å')}, + ) + monitor2 = monitor1.rebin( + w=sc.array(dims=['w'], values=[1.0, 2, 3, 5, 7], unit='Å') + ) + + normalized1 = normalize_by_monitor_integrated( + detector, + monitor=monitor1, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + normalized2 = normalize_by_monitor_integrated( + detector, + monitor=monitor2, + uncertainty_broadcast_mode=UncertaintyBroadcastMode.fail, + ) + + sc.testing.assert_identical(normalized1, normalized2)