diff --git a/src/modacor/dataclasses/process_step_describer.py b/src/modacor/dataclasses/process_step_describer.py index d8e9aa4..91e23b2 100644 --- a/src/modacor/dataclasses/process_step_describer.py +++ b/src/modacor/dataclasses/process_step_describer.py @@ -50,7 +50,7 @@ class ProcessStepDescriber: required_data_keys: list[str] = field(factory=list) # list of data keys required by the process required_arguments: list[str] = field(factory=list) # list of argument key-val combos required by the process calling_arguments: dict[str, Any] = field(factory=dict, validator=validate_required_keys) - works_on: dict[str, list] = field( + modifies: dict[str, list] = field( factory=dict, validator=v.instance_of(dict) ) # which aspects of BaseData are modified by this step_keywords: list[str] = field( diff --git a/src/modacor/modules/__init__.py b/src/modacor/modules/__init__.py index f66d1b4..4b71c9e 100644 --- a/src/modacor/modules/__init__.py +++ b/src/modacor/modules/__init__.py @@ -18,13 +18,19 @@ from modacor.modules.base_modules.reduce_dimensionality import ReduceDimensionality from modacor.modules.base_modules.subtract import Subtract from modacor.modules.base_modules.subtract_databundles import SubtractDatabundles +from modacor.modules.technique_modules.index_pixels import IndexPixels +from modacor.modules.technique_modules.indexed_averager import IndexedAverager +from modacor.modules.technique_modules.solid_angle_correction import SolidAngleCorrection from modacor.modules.technique_modules.xs_geometry import XSGeometry __all__ = [ "Divide", + "IndexPixels", + "IndexedAverager", "Multiply", "PoissonUncertainties", "ReduceDimensionality", + "SolidAngleCorrection", "SubtractDatabundles", "Subtract", "XSGeometry", diff --git a/src/modacor/modules/base_modules/append_processing_data.py b/src/modacor/modules/base_modules/append_processing_data.py new file mode 100644 index 0000000..05fa8be --- /dev/null +++ b/src/modacor/modules/base_modules/append_processing_data.py @@ -0,0 +1,311 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "30/10/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +__all__ = ["AppendProcessingData"] +__version__ = "20251030.3" + +from pathlib import Path +from typing import Any + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.helpers import basedata_from_sources +from modacor.dataclasses.messagehandler import MessageHandler +from modacor.dataclasses.process_step import ProcessStep +from modacor.dataclasses.process_step_describer import ProcessStepDescriber +from modacor.io.io_sources import IoSources + +# Module-level handler; facilities can swap MessageHandler implementation as needed +logger = MessageHandler(name=__name__) + + +class AppendProcessingData(ProcessStep): + """ + Load signal data from ``self.io_sources`` into a processing :class:`DataBundle` + in ``self.processing_data``. + + This step creates or updates a single :class:`DataBundle` from existing + :class:`IoSources` entries in ``self.io_sources``. It: + + 1. Loads the signal array from ``signal_location`` (string reference). + 2. Loads units either from: + - ``units_location`` via :meth:`IoSources.get_static_metadata`, or + - ``units_override`` as a direct units string, or + - defaults to dimensionless if neither is provided. + 3. Optionally loads uncertainty arrays from ``uncertainties_sources``. + 4. Wraps everything in a :class:`BaseData` instance. + 5. Sets ``BaseData.rank_of_data`` based on the configured ``rank_of_data``: + - If it is an ``int``, it is used directly. + - If it is a ``str``, it is interpreted as an IoSources metadata reference + (``'::'``) and read via + :meth:`IoSources.get_static_metadata`, then converted to ``int``. + - Validation and bounds checking are handled by :func:`validate_rank_of_data` + inside :class:`BaseData`. + 6. Stores the resulting :class:`BaseData` under the configured + ``databundle_output_key`` (default: ``"signal"``) in a + :class:`DataBundle` at ``self.processing_data[processing_key]``. If that + DataBundle already exists, it is updated: existing entries are preserved + and the ``databundle_output_key`` entry is overwritten or added. + + The resulting mapping ``{processing_key: DataBundle}`` is returned. + """ + + documentation = ProcessStepDescriber( + calling_name="Append Processing Data", + calling_id="AppendProcessingData", + calling_module_path=Path(__file__), + calling_version=__version__, + required_data_keys=[], # this step creates/updates a DataBundle + modifies={}, # processing_key: databundle: databundle_output_key + required_arguments=[ + "processing_key", # Processing data key to create/update. Must be a string. + "signal_location", # Data identifier to read from, '::'. Must be a string. + "rank_of_data", # Rank of the created BaseData array. int or str (io_source location). + ], + calling_arguments={ + "processing_key": "", # must be set by the user + "signal_location": "", # must be set by the user + "rank_of_data": 2, + # key under which the BaseData will be stored in the DataBundle + "databundle_output_key": "signal", + # optional location for units definition, in the form + # '::' or with attribute suffix + # '::@units'. If None, units will be + # set to ureg.dimensionless unless units_override is given. + "units_location": None, + # optional direct units string to override any loaded units + "units_override": None, + # optional sources for uncertainties data, in the form: + # {'': '::'} + "uncertainties_sources": {}, + }, + step_keywords=["append", "processing", "data", "signal"], + step_doc="Append signal data from IoSources into a processing DataBundle.", + step_reference="", + step_note=( + "This step reads from existing IoSources and creates or updates a named DataBundle " + "with a BaseData entry (default 'signal') for use in the processing pipeline." + ), + ) + + # ------------------------------------------------------------------------- + # Internal helpers + # ------------------------------------------------------------------------- + def _resolve_rank_of_data(self, rank_cfg: Any, io_sources: IoSources) -> int: + """ + Resolve the configured rank_of_data to an integer. + + Parameters + ---------- + rank_cfg : + Either an integer directly, or a string reference of the form + '::' pointing to metadata that contains + the rank as an integer-compatible value. + + io_sources : + The IoSources object used to resolve metadata references. + + Returns + ------- + int + The resolved rank_of_data. Actual bounds checking is performed by + BaseData's internal validation. + """ + # Direct int → use as-is (with int() for safety) + if isinstance(rank_cfg, int): + return int(rank_cfg) + + # If it *looks* like an io_source reference, treat it as such + if isinstance(rank_cfg, str): + logger.debug( + f"AppendProcessingData: resolving rank_of_data from IoSources metadata reference '{rank_cfg}'." + ) + meta_value = io_sources.get_static_metadata(rank_cfg) + try: + return int(meta_value) + except (TypeError, ValueError) as exc: + raise ValueError( + f"Could not convert rank_of_data metadata from '{rank_cfg}' to int (value={meta_value!r})." + ) from exc + + # Fallback: try to cast whatever it is to int + try: + return int(rank_cfg) + except (TypeError, ValueError) as exc: + raise TypeError( + "rank_of_data must be an int or an IoSources metadata reference string, " + f"got {rank_cfg!r} ({type(rank_cfg).__name__})." + ) from exc + + def _load_and_validate_configuration(self) -> dict[str, Any]: + """ + Load and validate configuration values from ``self.configuration``. + + Returns + ------- + dict[str, Any] + A dictionary containing the resolved configuration: + - processing_key (str) + - signal_location (str) + - rank_of_data (int) + - databundle_output_key (str) + - units_location (str | None) + - units_override (str | None) + - uncertainties_sources (dict[str, str]) + """ + cfg = self.configuration + + processing_key = cfg.get("processing_key") + if not isinstance(processing_key, str) or not processing_key: + raise ValueError("AppendProcessingData requires 'processing_key' to be a non-empty string.") + + signal_location = cfg.get("signal_location") + if not isinstance(signal_location, str) or not signal_location: + raise ValueError("AppendProcessingData requires 'signal_location' to be a non-empty string.") + + if "rank_of_data" not in cfg: + raise ValueError("AppendProcessingData requires 'rank_of_data' in the configuration.") + rank_cfg = cfg["rank_of_data"] + resolved_rank = self._resolve_rank_of_data(rank_cfg, self.io_sources) + + databundle_output_key = cfg.get("databundle_output_key", "signal") + if not isinstance(databundle_output_key, str) or not databundle_output_key: + raise ValueError("AppendProcessingData requires 'databundle_output_key' to be a non-empty string.") + + units_location = cfg.get("units_location") + if units_location is not None and not isinstance(units_location, str): + raise TypeError("'units_location' must be a string '::' or None.") + + units_override = cfg.get("units_override") + if units_override is not None and not isinstance(units_override, str): + raise TypeError("'units_override' must be a units string if provided.") + + uncertainties_sources: dict[str, str] = cfg.get("uncertainties_sources", {}) or {} + if not isinstance(uncertainties_sources, dict): + raise TypeError( + f"'uncertainties_sources' must be a dict[str, str], got {type(uncertainties_sources).__name__}." + ) + + return { + "processing_key": processing_key, + "signal_location": signal_location, + "rank_of_data": resolved_rank, + "databundle_output_key": databundle_output_key, + "units_location": units_location, + "units_override": units_override, + "uncertainties_sources": uncertainties_sources, + } + + # ------------------------------------------------------------------------- + # Public API used by the pipeline + # ------------------------------------------------------------------------- + def calculate(self) -> dict[str, DataBundle]: + """ + Create or update a DataBundle from ``self.io_sources`` and return it. + + Configuration fields: + + - ``processing_key`` (str): + Name under which the DataBundle will be stored in + ``self.processing_data``. + + - ``signal_location`` (str): + Data reference in the form ``'::'``. + + - ``rank_of_data`` (int or str): + Desired rank for the created :class:`BaseData` object. + If a string, it is treated as an IoSources metadata reference and + resolved via :meth:`IoSources.get_static_metadata`. Validation and + bounds checking are handled by :class:`BaseData`. + + - ``databundle_output_key`` (str, default: ``"signal"``): + Key under which the new :class:`BaseData` will be stored inside the + :class:`DataBundle`. If the DataBundle already contains an entry + under this key, it will be overwritten. + + - ``units_location`` (str | None): + Data reference pointing to a static metadata entry that defines the + units. If provided, the value from + :meth:`IoSources.get_static_metadata` is passed to :func:`ureg.Unit`. + If omitted and ``units_override`` is None, units default to + dimensionless. + + - ``units_override`` (str | None): + Direct units string (e.g. ``"counts"`` or ``"1/m"``) that overrides + any value loaded via ``units_location``. + + - ``uncertainties_sources`` (dict[str, str]): + Mapping from uncertainty name (e.g. ``"poisson"``) to data reference + (``'::'``). + """ + cfg = self._load_and_validate_configuration() + + processing_key: str = cfg["processing_key"] + signal_location: str = cfg["signal_location"] + rank_of_data: int = cfg["rank_of_data"] + databundle_output_key: str = cfg["databundle_output_key"] + units_location = cfg["units_location"] + units_override = cfg["units_override"] + uncertainties_sources: dict[str, str] = cfg["uncertainties_sources"] + + io_sources: IoSources = self.io_sources + + logger.info( + ( + f"AppendProcessingData: creating/updating DataBundle '{processing_key}' " + f"from signal_location='{signal_location}' into key '{databundle_output_key}'." + ), + ) + + # Load BaseData via helper: signal + units + uncertainties + bd: BaseData = basedata_from_sources( + io_sources=io_sources, + signal_source=signal_location, + units_source=units_location, + uncertainty_sources=uncertainties_sources, + ) + + # Override units if requested + if units_override is not None: + logger.debug( + f"AppendProcessingData: overriding units for '{processing_key}' to '{units_override}'.", + ) + bd.units = ureg.Unit(units_override) + + # Set rank_of_data; BaseData's own validation handles bounds + bd.rank_of_data = rank_of_data + + # Create or update the DataBundle in processing_data + existing_bundle = self.processing_data.get(processing_key) + if existing_bundle is None: + databundle = DataBundle() + else: + databundle = existing_bundle + + # Update/insert the BaseData at the requested key + databundle[databundle_output_key] = bd + + # If no default_plot is set yet, use this key as a sensible default + if getattr(databundle, "default_plot", None) is None: + databundle.default_plot = databundle_output_key + + # Store back into processing_data and build output + self.processing_data[processing_key] = databundle + output: dict[str, DataBundle] = {processing_key: databundle} + + logger.info( + f"AppendProcessingData: DataBundle '{processing_key}' now contains datasets {list(databundle.keys())}.", + ) + + return output diff --git a/src/modacor/modules/base_modules/append_source.py b/src/modacor/modules/base_modules/append_source.py new file mode 100644 index 0000000..f8a6ebd --- /dev/null +++ b/src/modacor/modules/base_modules/append_source.py @@ -0,0 +1,158 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "30/10/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +__all__ = ["AppendSource"] +__version__ = "20251030.1" + +from importlib import import_module +from pathlib import Path +from typing import Any, Callable + +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.messagehandler import MessageHandler +from modacor.dataclasses.process_step import ProcessStep +from modacor.dataclasses.process_step_describer import ProcessStepDescriber +from modacor.io.io_sources import IoSources + +# Module-level handler; facilities can swap MessageHandler implementation as needed +logger = MessageHandler(name=__name__) + + +class AppendSource(ProcessStep): + """ + Appends an ioSource to self.io_sources. + + This step is intended for pipeline-graph / provenance operations: it augments + the set of available I/O sources but does not touch the actual data bundles. + """ + + documentation = ProcessStepDescriber( + calling_name="Append Source", + calling_id="AppendSource", + calling_module_path=Path(__file__), + calling_version=__version__, + required_data_keys=[], + modifies={}, # sources only; no data modified + required_arguments=[ + "source_identifier", # The identifier to use for the appended ioSource in the data sources. Can be a string or list of strings for multiple sources + "source_location", # The ioSource path or other location identifier of the object to append to the IoSources. Can be a string or list of strings for multiple sources + "loader_module", # The fully qualified import path to the module to load the source_location into an ioSource object, e.g. 'modacor.io.yaml.yaml_loader.YAMLLoader' or 'modacor.io.hdf.hdf_loader.HDFLoader'. Choose only one. + ], + calling_arguments={ + "source_identifier": "", + "source_location": "", + "loader_module": "", + }, + step_keywords=["append", "source"], + step_doc="Append an ioSource to the available data sources", + step_reference="", + step_note="This adds an ioSource to the data sources of the databundle.", + ) + + # ------------------------------------------------------------------------- + # Public API used by the pipeline + # ------------------------------------------------------------------------- + def calculate(self) -> dict[str, DataBundle]: + """ + Append one or more sources to ``self.io_sources``. + + Notes + ----- + - No ``DataBundle`` objects are modified or created. + - The pipeline can treat an empty output dict as "no-op on data", + while the side-effect on ``self.io_sources`` persists. + """ + output: dict[str, DataBundle] = {} + + source_ids: str | list[str] = self.configuration["source_identifier"] + source_locations: str | list[str] = self.configuration["source_location"] + loader_module: str = self.configuration["loader_module"] + + # Normalise to lists + if isinstance(source_ids, str): + source_ids = [source_ids] + if isinstance(source_locations, str): + source_locations = [source_locations] + + if len(source_ids) != len(source_locations): + raise ValueError( + "If multiple source_identifiers and source_locations are provided, their counts must match." + ) + + for source_id, source_location in zip(source_ids, source_locations): + # Only append if not already present + if source_id not in self.io_sources.defined_sources: + self._append_loader_by_name( + loader_name=loader_module, + source_location=source_location, + source_identifier=source_id, + ) + # No data modified – only side-effect is on self.io_sources + return output + + # ------------------------------------------------------------------------- + # Internal helpers + # ------------------------------------------------------------------------- + def _append_loader_by_name( + self, + loader_name: str, + source_location: str, + source_identifier: str, + ) -> None: + """ + Resolve the requested loader and append the resulting ioSource to + ``self.io_sources``. + + Parameters + ---------- + loader_name: + Either a fully qualified import path + (e.g. ``"modacor.io.hdf.hdf_loader.HDFLoader"``). + source_location: + Path / URI / identifier understood by the loader. + source_identifier: + Key under which the resulting ioSource will be stored in + ``self.io_sources``. + """ + loader_callable = self._resolve_loader_callable(loader_name) + + # Ensure io_sources exists or initialize it + if not hasattr(self, "io_sources") or self.io_sources is None: + # ProcessStep normally sets this up, but be defensive. + self.io_sources = IoSources() + logger.info("Initialized self.io_sources in AppendSource step.") + + self.io_sources.register_source( + loader_callable(source_reference=source_identifier, resource_location=source_location) + ) + + def _resolve_loader_callable(self, loader_name: str) -> Callable[..., Any]: + """ + Resolve the configured loader into a callable. + + Strategy + -------- + 1. If ``loader_name`` contains a dot, treat it as a fully qualified + import path like ``package.module.ClassOrFunc``. + """ + + module_path, attr_name = loader_name.rsplit(".", 1) + module = import_module(module_path) + try: + loader_obj = getattr(module, attr_name) + except AttributeError as exc: + raise ImportError( + f"Could not find '{attr_name}' in module '{module_path}' for loader_module='{loader_name}'." + ) from exc + + return loader_obj diff --git a/src/modacor/modules/base_modules/azimuthal_integration.py b/src/modacor/modules/base_modules/azimuthal_integration.py index d1c5921..1a5b497 100644 --- a/src/modacor/modules/base_modules/azimuthal_integration.py +++ b/src/modacor/modules/base_modules/azimuthal_integration.py @@ -30,7 +30,7 @@ class AzimuthalIntegration(ProcessStep): calling_module_path=Path(__file__), calling_version="0.0.1", required_data_keys=["signal"], - works_on={"data": ["signal", "variances", "normalization"]}, + modifies={"data": ["signal", "variances", "normalization"]}, step_keywords=["azimuthal integration"], step_doc="Add azimuthal integration data with variance propagated", step_reference="DOI 10.1107/S1600576724011038", diff --git a/src/modacor/modules/base_modules/divide.py b/src/modacor/modules/base_modules/divide.py index 422c453..6d0b50f 100644 --- a/src/modacor/modules/base_modules/divide.py +++ b/src/modacor/modules/base_modules/divide.py @@ -35,7 +35,8 @@ class Divide(ProcessStep): calling_module_path=Path(__file__), calling_version=__version__, required_data_keys=["signal"], - works_on={"signal": ["signal", "uncertainties", "units"]}, + modifies={"signal": ["signal", "uncertainties", "units"]}, + required_arguments={}, calling_arguments={ "divisor_source": None, # IoSources key for signal "divisor_units_source": None, # IoSources key for units diff --git a/src/modacor/modules/base_modules/multiply.py b/src/modacor/modules/base_modules/multiply.py index a142811..8181c09 100644 --- a/src/modacor/modules/base_modules/multiply.py +++ b/src/modacor/modules/base_modules/multiply.py @@ -33,7 +33,7 @@ class Multiply(ProcessStep): calling_module_path=Path(__file__), calling_version=__version__, required_data_keys=["signal"], - works_on={"signal": ["signal", "uncertainties", "units"]}, + modifies={"signal": ["signal", "uncertainties", "units"]}, calling_arguments={ "multiplier_source": None, # IoSources key for signal "multiplier_units_source": None, # IoSources key for units diff --git a/src/modacor/modules/base_modules/poisson_uncertainties.py b/src/modacor/modules/base_modules/poisson_uncertainties.py index 3540914..d309075 100644 --- a/src/modacor/modules/base_modules/poisson_uncertainties.py +++ b/src/modacor/modules/base_modules/poisson_uncertainties.py @@ -36,7 +36,7 @@ class PoissonUncertainties(ProcessStep): calling_module_path=Path(__file__), calling_version=__version__, required_data_keys=["signal"], - works_on={"variances": ["Poisson"]}, + modifies={"variances": ["Poisson"]}, step_keywords=["uncertainties", "Poisson"], step_doc="Add Poisson uncertainties to the data", step_reference="DOI 10.1088/0953-8984/25/38/383201", diff --git a/src/modacor/modules/base_modules/reduce_dimensionality.py b/src/modacor/modules/base_modules/reduce_dimensionality.py index 0dc18f4..4e30573 100644 --- a/src/modacor/modules/base_modules/reduce_dimensionality.py +++ b/src/modacor/modules/base_modules/reduce_dimensionality.py @@ -50,7 +50,7 @@ class ReduceDimensionality(ProcessStep): calling_module_path=Path(__file__), calling_version=__version__, required_data_keys=["signal"], - works_on={"signal": ["signal", "uncertainties", "units", "weights"]}, + modifies={"signal": ["signal", "uncertainties", "units", "weights"]}, calling_arguments={ # Axis or axes to reduce. Can be int, list/tuple of ints, or None (reduce all). "axes": None, diff --git a/src/modacor/modules/base_modules/subtract.py b/src/modacor/modules/base_modules/subtract.py index a6f5924..c52f2a3 100644 --- a/src/modacor/modules/base_modules/subtract.py +++ b/src/modacor/modules/base_modules/subtract.py @@ -33,7 +33,7 @@ class Subtract(ProcessStep): calling_module_path=Path(__file__), calling_version=__version__, required_data_keys=["signal"], - works_on={"signal": ["signal", "uncertainties", "units"]}, + modifies={"signal": ["signal", "uncertainties", "units"]}, calling_arguments={ "subtrahend_source": None, # IoSources key for signal "subtrahend_units_source": None, # IoSources key for units diff --git a/src/modacor/modules/base_modules/subtract_databundles.py b/src/modacor/modules/base_modules/subtract_databundles.py index ab6ea0b..6c3fb7d 100644 --- a/src/modacor/modules/base_modules/subtract_databundles.py +++ b/src/modacor/modules/base_modules/subtract_databundles.py @@ -32,7 +32,7 @@ class SubtractDatabundles(ProcessStep): calling_module_path=Path(__file__), calling_version=__version__, required_data_keys=["signal"], - works_on={"signal": ["signal", "uncertainties", "units"]}, + modifies={"signal": ["signal", "uncertainties", "units"]}, calling_arguments={}, # no arguments needed step_keywords=["subtract", "background", "databundle"], step_doc="Subtract a DataBundle element using another DataBundle", diff --git a/src/modacor/modules/technique_modules/index_pixels.py b/src/modacor/modules/technique_modules/index_pixels.py new file mode 100644 index 0000000..4b0efcc --- /dev/null +++ b/src/modacor/modules/technique_modules/index_pixels.py @@ -0,0 +1,493 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +from typing import Dict, List, Tuple + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "29/11/2025" +__status__ = "Development" # "Development", "Production" + +__version__ = "20251130.1" +__all__ = ["IndexPixels"] + +from pathlib import Path + +import numpy as np + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.messagehandler import MessageHandler +from modacor.dataclasses.process_step import ProcessStep +from modacor.dataclasses.process_step_describer import ProcessStepDescriber + +logger = MessageHandler(name=__name__) + + +class IndexPixels(ProcessStep): + """ + Compute pixel bin indices for a single dataset, for subsequent 1D averaging. + + Depending on `averaging_direction`, this step can prepare indices for: + - azimuthal averaging (bin along Q, normal for 1D X-ray scattering curves). + - radial averaging (bin along Psi, usually for getting orientation information); + + This step: + - Interprets Q limits in user-specified units (q_limits_unit). + - Interprets Psi limits in user-specified units (psi_limits_unit). + - Builds bin edges internally (Q or Psi depending on averaging_direction). + - For each pixel, decides which bin it belongs to, or -1 if it does + not participate in any bin (out of range / outside ROI / non-finite). + + Inputs (from the databundle selected via with_processing_keys) + -------------------------------------------------------------- + - "signal": BaseData (together with its rank_of_data used for data shape) + - "Q": BaseData (modulus of scattering vector) + - "Psi": BaseData (azimuthal angle) + + This step does *not* apply the Mask. Mask is left to downstream modules + (e.g., the averaging step), so that it can vary per frame for dynamic masking. + + Configuration + ------------- + with_processing_keys : str | list[str] | None + Databundle key(s) to work on. The pixel index map is computed from + the first key and attached to all specified keys. + If None and there is exactly one databundle, that one is used. + + averaging_direction : {"radial", "azimuthal"}, default "azimuthal" + - "azimuthal": bins along Q, using q_min/q_max and bin_type; + - "radial": bins along Psi (linear bins), using psi_min/psi_max. + In this case q_min/q_max define a radial ROI mask (optional). + + q_min, q_max : float, optional + Q limits expressed in units given by q_limits_unit. + If omitted: + - For "radial" + "log" binning: q_min = smallest positive finite Q; + - Otherwise: q_min = min(Q), q_max = max(Q). + q_min may be negative if not using "log" binning. Useful for e.g. USAXS scans. + + q_limits_unit : str or pint.Unit, optional + Units in which q_min/q_max are defined, e.g. "1/nm". + Defaults to the Q.units of the dataset. + + n_bins : int, default 100 + Number of bins along the averaging direction (Q or Psi). + + bin_type : {"log", "linear"}, default "log" + - For averaging_direction="radial": + "log" uses geometric spacing (np.geomspace); + "linear" uses np.linspace. + - For averaging_direction="azimuthal": + Must be "linear" (logarithmic psi is not implemented). + + psi_min, psi_max : float, optional + Azimuth limits expressed in psi_limits_unit. + For averaging_direction="azimuthal": + - These define an azimuthal mask (ROI). + - Defaults to a full circle: + * 0 .. 360 if psi_limits_unit is degree; + * 0 .. 2π if psi_limits_unit is radian. + For averaging_direction="radial": + - These also define the binning range along Psi. + + psi_limits_unit : str or pint.Unit, optional + Units in which psi_min/psi_max are defined (i.e. "degree" or "radian"). + Defaults to the Psi.units of the dataset. + + Outputs (returned from calculate()) + ----------------------------------- + One DataBundle per key in with_processing_keys, each containing: + + - "pixel_index": BaseData + signal : ndarray with same shape as the last rank_of_data ndims + of the chosen "signal" BaseData. + Each entry is an integer bin index (stored as float in + BaseData; will be cast back to int when used). + -1 means "this pixel does not participate in any bin". + units : dimensionless + uncertainties : empty dict + axes : copied from the *last* rank_of_data axes of the original signal + rank_of_data : same as the original signal BaseData + """ + + documentation = ProcessStepDescriber( + calling_name="Index Pixels", + calling_id="IndexPixels", + calling_module_path=Path(__file__), + calling_version=__version__, + required_data_keys=["signal", "Q", "Psi"], + required_arguments=[ + "with_processing_keys", + "averaging_direction", + ], + calling_arguments={ + "with_processing_keys": None, + "averaging_direction": "radial", + "q_min": None, + "q_max": None, + "q_limits_unit": None, + "n_bins": 100, + "bin_type": "log", + "psi_min": None, + "psi_max": None, + "psi_limits_unit": None, + }, + modifies={}, # nothing, we only add. + step_keywords=[ + "radial", + "azimuthal", + "pixel indexing", + "binning", + "scattering", + ], + step_doc="Compute per-pixel bin indices (radial or azimuthal) for later 1D averaging.", + step_reference="DOI 10.1088/0953-8984/25/38/383201", + step_note=( + "IndexPixels computes bin indices purely from geometry (Q, Psi) and " + "user-defined limits; Mask is not used here so it can be applied " + "per frame in downstream steps." + ), + ) + + def __attrs_post_init__(self) -> None: + super().__attrs_post_init__() + # Prepared state lives in self._prepared_data. + + # ------------------------------------------------------------------ + # internal helper: normalise with_processing_keys + # ------------------------------------------------------------------ + def _normalised_keys(self) -> Tuple[str, List[str]]: + """ + Return (primary_key, keys_to_update). + + primary_key: the key used to compute the pixel index map. + keys_to_update: all keys that should receive the map. + """ + if self.processing_data is None: + raise RuntimeError("IndexPixels: processing_data is None in _normalised_keys.") + + cfg_value = self.configuration.get("with_processing_keys", None) + + # None → use only key if exactly one available + if cfg_value is None: + if len(self.processing_data) == 0: + raise ValueError("IndexPixels: with_processing_keys is None and processing_data is empty.") + if len(self.processing_data) == 1: + only_key = next(iter(self.processing_data.keys())) + logger.info( + f"IndexPixels: with_processing_keys not set; using the only key {only_key}.", # noqa: E702 + ) + return only_key, [only_key] + raise ValueError( + "IndexPixels: with_processing_keys is None but multiple " + "databundles are present. Please specify a key or list of keys." + ) + + # Single string + if isinstance(cfg_value, str): + return cfg_value, [cfg_value] + + # Iterable of strings + try: + keys = list(cfg_value) + except TypeError as exc: # not iterable + raise ValueError( + "IndexPixels: with_processing_keys must be a string, an iterable of strings, or None." + ) from exc + + if not keys: + raise ValueError("IndexPixels: with_processing_keys is an empty iterable.") + + for k in keys: + if not isinstance(k, str): + raise ValueError("IndexPixels: all entries in with_processing_keys must be strings, got %r." % (k,)) + + primary_key = keys[0] + if len(keys) > 1: + logger.warning( + ( + "IndexPixels: multiple with_processing_keys given; " + "pixel index map will be computed from the first (%r) and " + "attached to all %r." + ), + primary_key, + keys, + ) + return primary_key, keys + + # ------------------------------------------------------------------ + # internal helper: geometry / shape validation + # ------------------------------------------------------------------ + def _validate_and_get_geometry( + self, + databundle: DataBundle, + ) -> Tuple[BaseData, BaseData, BaseData, int, Tuple[int, ...], List[BaseData | None]]: + """ + Validate signal/Q/Psi for azimuthal geometry and return: + + signal_bd, q_bd, psi_bd, RoD, spatial_shape, spatial_axes + """ + signal_bd: BaseData = databundle["signal"] + q_bd: BaseData = databundle["Q"] + psi_bd: BaseData = databundle["Psi"] + + RoD: int = int(signal_bd.rank_of_data) + if RoD not in (1, 2): + raise ValueError(f"IndexPixels: rank_of_data must be 1 or 2 for azimuthal geometry, got {RoD}.") + + spatial_shape: Tuple[int, ...] = signal_bd.shape[-RoD:] if RoD > 0 else () + + if q_bd.shape != spatial_shape: + raise ValueError(f"IndexPixels: Q shape {q_bd.shape} does not match spatial shape {spatial_shape}.") + if psi_bd.shape != spatial_shape: + raise ValueError(f"IndexPixels: Psi shape {psi_bd.shape} does not match spatial shape {spatial_shape}.") + + if signal_bd.axes: + spatial_axes: List[BaseData | None] = list(signal_bd.axes[-RoD:]) + else: + spatial_axes = [] + + return signal_bd, q_bd, psi_bd, RoD, spatial_shape, spatial_axes + + # ------------------------------------------------------------------ + # prepare_execution: all geometry + array work happens here + # ------------------------------------------------------------------ + def prepare_execution(self) -> None: # noqa: C901 # complexity issue / separation of concerns TODO: fix this later. + """ + Prepare the pixel index map for the selected databundle. + + All heavy computations and array manipulations are done here. + calculate() only wraps the prepared BaseData into DataBundles. + """ + if self._prepared_data.get("pixel_index_bd") is not None: + return + + if self.processing_data is None: + raise RuntimeError("IndexPixels: processing_data is None in prepare_execution.") + + primary_key, keys_to_update = self._normalised_keys() + self._prepared_data["keys_to_update"] = keys_to_update + + if primary_key not in self.processing_data: + raise KeyError(f"IndexPixels: key {primary_key!r} not found in processing_data.") # noqa: E713 + + databundle: DataBundle = self.processing_data[primary_key] + ( + signal_bd, + q_bd, + psi_bd, + RoD, + spatial_shape, + spatial_axes, + ) = self._validate_and_get_geometry(databundle) + + # Direction of averaging: "radial" or "azimuthal" + direction = str(self.configuration.get("averaging_direction", "azimuthal")).lower() + if direction not in ("radial", "azimuthal"): + raise ValueError(f"IndexPixels: averaging_direction must be 'radial' or 'azimuthal', got {direction!r}.") + + # ------------------------------------------------------------------ + # 1. Resolve Q limits (mask +, for radial, binning) + # ------------------------------------------------------------------ + q_min_cfg = self.configuration.get("q_min", None) + q_max_cfg = self.configuration.get("q_max", None) + n_bins = int(self.configuration.get("n_bins", 100)) + bin_type = str(self.configuration.get("bin_type", "log")).lower() + + if n_bins <= 0: + raise ValueError(f"IndexPixels: n_bins must be positive, got {n_bins}.") + + q_limits_unit_cfg = self.configuration.get("q_limits_unit", None) + if q_limits_unit_cfg is None: + q_limits_unit = q_bd.units + else: + q_limits_unit = ureg.Unit(q_limits_unit_cfg) + + q_full = np.asarray(q_bd.signal, dtype=float) + try: + q_flat = q_full.ravel() + except Exception as exc: # noqa: BLE001 + raise ValueError("IndexPixels: could not flatten Q array.") from exc + + finite_q = q_flat[np.isfinite(q_flat)] + if finite_q.size == 0: + raise ValueError("IndexPixels: Q array has no finite values.") + + data_q_min = float(np.nanmin(finite_q)) + data_q_max = float(np.nanmax(finite_q)) + + if direction == "azimuthal": + # q_min/q_max define both mask and bin range + if q_min_cfg is not None: + q_min_val = (float(q_min_cfg) * q_limits_unit).to(q_bd.units).magnitude + else: + if bin_type == "log": + positive = finite_q[finite_q > 0.0] + if positive.size == 0: + raise ValueError("IndexPixels: cannot determine positive q_min for log binning.") + q_min_val = float(np.nanmin(positive)) + else: + q_min_val = data_q_min + + if q_max_cfg is not None: + q_max_val = (float(q_max_cfg) * q_limits_unit).to(q_bd.units).magnitude + else: + q_max_val = data_q_max + else: + # radial: q_min/q_max are optional ROI only; ignore bin_type here + if q_min_cfg is not None: + q_min_val = (float(q_min_cfg) * q_limits_unit).to(q_bd.units).magnitude + else: + q_min_val = data_q_min + + if q_max_cfg is not None: + q_max_val = (float(q_max_cfg) * q_limits_unit).to(q_bd.units).magnitude + else: + q_max_val = data_q_max + + if q_max_val <= q_min_val or not np.isfinite(q_min_val) or not np.isfinite(q_max_val): + raise ValueError(f"IndexPixels: invalid Q range q_min={q_min_val}, q_max={q_max_val}.") + + # ------------------------------------------------------------------ + # 2. Resolve Psi limits (mask +, for azimuthal, binning) + # ------------------------------------------------------------------ + psi_limits_unit_cfg = self.configuration.get("psi_limits_unit", None) + if psi_limits_unit_cfg is None: + psi_limits_unit = psi_bd.units + else: + psi_limits_unit = ureg.Unit(psi_limits_unit_cfg) + + psi_min_cfg = self.configuration.get("psi_min", None) + psi_max_cfg = self.configuration.get("psi_max", None) + + if psi_min_cfg is None: + psi_min_cfg = 0.0 + + if psi_max_cfg is None: + # Choose a default full-circle depending on psi_limits_unit + if psi_limits_unit == ureg.degree: + psi_max_cfg = 360.0 + elif psi_limits_unit == ureg.radian: + psi_max_cfg = 2.0 * np.pi + else: + raise ValueError( + "IndexPixels: psi_limits_unit is neither degree nor radian " + "and no psi_max is specified; cannot infer a full-circle default." + ) + + psi_min_val = (float(psi_min_cfg) * psi_limits_unit).to(psi_bd.units).magnitude + psi_max_val = (float(psi_max_cfg) * psi_limits_unit).to(psi_bd.units).magnitude + + psi_full = np.asarray(psi_bd.signal, dtype=float) + try: + psi_flat = psi_full.ravel() + except Exception as exc: # noqa: BLE001 + raise ValueError("IndexPixels: could not flatten Psi array.") from exc + + # ------------------------------------------------------------------ + # 3. Build masks + # ------------------------------------------------------------------ + finite_mask = np.isfinite(q_flat) & np.isfinite(psi_flat) + + # Radial mask from Q limits + q_range_mask = (q_flat >= q_min_val) & (q_flat <= q_max_val) + + # Azimuthal mask from Psi limits + if np.isclose(psi_min_val, psi_max_val): + # Full circle + psi_mask = np.ones_like(psi_flat, dtype=bool) + elif psi_min_val < psi_max_val: + psi_mask = (psi_flat >= psi_min_val) & (psi_flat <= psi_max_val) + else: + # Wrap-around (e.g. 350° .. 10° converted to Psi.units) + psi_mask = (psi_flat >= psi_min_val) | (psi_flat <= psi_max_val) + + valid_geom = q_range_mask & psi_mask & finite_mask + + # ------------------------------------------------------------------ + # 4. Build bin edges and assign indices + # ------------------------------------------------------------------ + if direction == "azimuthal": + coord_flat = q_flat + if bin_type == "log": + if q_min_val <= 0.0: + raise ValueError("IndexPixels: q_min must be > 0 for log binning.") + bin_edges = np.geomspace(q_min_val, q_max_val, num=n_bins + 1, dtype=float) + elif bin_type == "linear": + bin_edges = np.linspace(q_min_val, q_max_val, num=n_bins + 1, dtype=float) + else: + raise ValueError( + f"IndexPixels: unknown bin_type {bin_type!r} for radial averaging. Expected 'log' or 'linear'." + ) + else: # radial + # direction == "radial": bin along Psi, require linear spacing + coord_flat = psi_flat + if bin_type != "linear": + raise ValueError("IndexPixels: for averaging_direction='radial', only bin_type='linear' is supported.") + bin_edges = np.linspace(psi_min_val, psi_max_val, num=n_bins + 1, dtype=float) + + bin_idx = np.searchsorted(bin_edges, coord_flat, side="right") - 1 + out_of_range = (bin_idx < 0) | (bin_idx >= n_bins) + valid_idx = valid_geom & ~out_of_range + + # Pixels that are not valid for any reason get index -1 + bin_idx[~valid_idx] = -1 + + # Reshape to the spatial shape + bin_idx_reshaped = bin_idx.reshape(spatial_shape) + + pixel_index_bd = BaseData( + signal=bin_idx_reshaped, + units=ureg.dimensionless, + uncertainties={}, + weights=np.array(1.0), + axes=spatial_axes, + rank_of_data=signal_bd.rank_of_data, + ) + + self._prepared_data["pixel_index_bd"] = pixel_index_bd + + # ------------------------------------------------------------------ + # calculate: only wraps the prepared BaseData into DataBundles + # ------------------------------------------------------------------ + def calculate(self) -> Dict[str, DataBundle]: + """ + Add the pixel index as BaseData to the databundles specified in + 'with_processing_keys'. If multiple keys are given, the same pixel + index map (computed from the first) is added to all. + """ + output: Dict[str, DataBundle] = {} + + if self.processing_data is None: + logger.warning("IndexPixels: processing_data is None in calculate; nothing to do.") + return output + + if self._prepared_data.get("pixel_index_bd") is None: + self.prepare_execution() + + pixel_index_bd: BaseData = self._prepared_data["pixel_index_bd"] + _primary, keys_to_update = self._normalised_keys() + + logger.info(f"IndexPixels: adding pixel indices to keys={keys_to_update}") + + for key in keys_to_update: + databundle = self.processing_data.get(key) + if databundle is None: + logger.warning( + "IndexPixels: processing_data has no entry for key=%r; skipping.", + key, + ) + continue + + # Use a copy so each databundle has its own BaseData instance + databundle["pixel_index"] = pixel_index_bd.copy(with_axes=True) + output[key] = databundle + + logger.info(f"IndexPixels: pixel indices attached for {len(output)} keys.") + return output diff --git a/src/modacor/modules/technique_modules/indexed_averager.py b/src/modacor/modules/technique_modules/indexed_averager.py new file mode 100644 index 0000000..470375d --- /dev/null +++ b/src/modacor/modules/technique_modules/indexed_averager.py @@ -0,0 +1,612 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +from typing import Dict, List, Tuple + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "30/11/2025" +__status__ = "Development" # "Development", "Production" + +__version__ = "20251130.1" +__all__ = ["IndexedAverager"] + +from pathlib import Path + +import numpy as np + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.messagehandler import MessageHandler +from modacor.dataclasses.process_step import ProcessStep +from modacor.dataclasses.process_step_describer import ProcessStepDescriber + +logger = MessageHandler(name=__name__) + + +class IndexedAverager(ProcessStep): + """ + Perform averaging of signal using precomputed per-pixel bin indices. + + This module expects that a previous step (e.g. IndexPixels) has produced a + "pixel_index" BaseData map containing, for each pixel, the bin index it + belongs to (or -1 for "not participating"). + + It then: + - Combines the per-pixel signal into bin-averaged values using optional + weights. + - Computes a weighted mean Q per bin. + - Computes a weighted circular mean Psi per bin. + - Propagates per-pixel uncertainties on signal and Q to the bin-mean. + - Estimates a bin-level SEM ("SEM" key) from the scatter of the signal for Q, Psi and signal. + + Inputs (from the databundle selected via with_processing_keys) + -------------------------------------------------------------- + - "signal": BaseData + - "Q": BaseData + - "Psi": BaseData + - "pixel_index": BaseData + same spatial rank and shape as (at least) the signal data. + - "Mask": BaseData (optional) + boolean mask, True meaning "masked" (pixel ignored). + + Configuration + ------------- + with_processing_keys : list[str] + Databundle key(s) to work on. If None and there is exactly one + databundle, that one is used. + + averaging_direction : {"radial", "azimuthal"}, default "azimuthal" + Only used to decide which axis is attached to the output signal: + - "azimuthal": signal.axes[0] will reference Q_1d. + - "radial": signal.axes[0] will reference Psi_1d. + The underlying binning is entirely determined by "pixel_index". + + use_signal_weights : bool, default True + If True, multiply per-pixel weights into the total weight. + + use_signal_uncertainty_weights : bool, default False + If True, use 1 / sigma^2 (for the specified key) as an additional + factor in the weights. + + uncertainty_weight_key : str | None, default None + Uncertainty key in signal.uncertainties to use when + use_signal_uncertainty_weights is True. Must be provided and present + in signal.uncertainties in that case. + + Outputs (returned from calculate()) + ----------------------------------- + For each key in with_processing_keys, the corresponding databundle will + be updated with 1D BaseData: + + - "signal": BaseData + Bin-averaged signal as 1D array (length n_bins). + Units: same as input signal.units. + uncertainties: + * For each original signal uncertainty key 'k', a propagated sigma + for the bin mean under that key. + * An additional key "SEM" with a bin-level standard error on the + mean derived from the weighted scatter of the signal values. + + - "Q": BaseData + Weighted mean Q per bin (length n_bins). + units: same as input Q.units. + uncertainties: + * For each original Q uncertainty key 'k', propagated sigma on the + bin mean for that key. + + - "Psi": BaseData + Weighted circular mean of Psi per bin (length n_bins). + units: same as input Psi.units. + uncertainties: + * For each original Psi uncertainty key 'k', propagated sigma on the + bin mean for that key (using linear propagation on angles). + + The original 2D/1D "pixel_index" and optional "Mask" remain present in + the databundle, enabling further inspection or reuse. + """ + + documentation = ProcessStepDescriber( + calling_name="Indexed Averager", + calling_id="IndexedAverager", + calling_module_path=Path(__file__), + calling_version=__version__, + required_data_keys=["signal", "Q", "Psi", "pixel_index"], + required_arguments=[ + "with_processing_keys", + "output_processing_key", + "averaging_direction", + "use_signal_weights", + "use_signal_uncertainty_weights", + "uncertainty_weight_key", + ], + calling_arguments={ + "with_processing_keys": None, + "output_processing_key": None, # currently unused + "averaging_direction": "azimuthal", + "use_signal_weights": True, + "use_signal_uncertainty_weights": False, + "uncertainty_weight_key": None, + }, + modifies={ + # We overwrite 'signal', 'Q', 'Psi' with their 1D binned versions. + "signal": ["signal", "uncertainties"], + "Q": ["signal", "uncertainties"], + "Psi": ["signal", "uncertainties"], + }, + step_keywords=[ + "radial", + "azimuthal", + "averaging", + "binning", + "scattering", + ], + step_doc="Average signal and geometry using precomputed pixel bin indices.", + step_reference="DOI 10.1088/0953-8984/25/38/383201", + step_note=( + "IndexedAverager expects a 'pixel_index' map from a previous " + "IndexPixels step and performs per-bin weighted means of signal, " + "Q and Psi, including uncertainty propagation." + ), + ) + + def __attrs_post_init__(self) -> None: + super().__attrs_post_init__() + + # ------------------------------------------------------------------ + # Helper: normalise with_processing_keys to a list + # ------------------------------------------------------------------ + def _normalised_keys(self) -> List[str]: + """ + Normalise with_processing_keys into a non-empty list of strings. + + If configuration value is None and exactly one databundle is present + in processing_data, that key is returned as the single entry. + """ + if self.processing_data is None: + raise RuntimeError("IndexedAverager: processing_data is None in _normalised_keys.") + + cfg_value = self.configuration.get("with_processing_keys", None) + + # None → use only key if exactly one available + if cfg_value is None: + if len(self.processing_data) == 0: + raise ValueError("IndexedAverager: with_processing_keys is None and processing_data is empty.") + if len(self.processing_data) == 1: + only_key = next(iter(self.processing_data.keys())) + logger.info( + "IndexedAverager: with_processing_keys not set; using the only key %r.", + only_key, + ) + return [only_key] + raise ValueError( + "IndexedAverager: with_processing_keys is None but multiple " + "databundles are present. Please specify a key or list of keys." + ) + + # Single string + if isinstance(cfg_value, str): + return [cfg_value] + + # Iterable of strings + try: + keys = list(cfg_value) + except TypeError as exc: # not iterable + raise ValueError( + "IndexedAverager: with_processing_keys must be a string, an iterable of strings, or None." + ) from exc + + if not keys: + raise ValueError("IndexedAverager: with_processing_keys is an empty iterable.") + + for k in keys: + if not isinstance(k, str): + raise ValueError(f"IndexedAverager: all entries in with_processing_keys must be strings, got {k!r}.") + return keys + + # ------------------------------------------------------------------ + # Helper: validate geometry, signal and pixel_index for a databundle + # ------------------------------------------------------------------ + def _validate_inputs( + self, + databundle: DataBundle, + ) -> Tuple[BaseData, BaseData, BaseData, BaseData, BaseData | None, Tuple[int, ...]]: + """ + Validate presence and shapes of signal, Q, Psi, pixel_index + (and optional Mask) for a given databundle. + + Returns: + signal_bd, q_bd, psi_bd, pix_bd, mask_bd_or_None, spatial_shape + """ + try: + signal_bd: BaseData = databundle["signal"] + q_bd: BaseData = databundle["Q"] + psi_bd: BaseData = databundle["Psi"] + pix_bd: BaseData = databundle["pixel_index"] + except KeyError as exc: + raise KeyError( + "IndexedAverager: databundle missing required keys 'signal', 'Q', 'Psi', or 'pixel_index'." + ) from exc + + spatial_shape: Tuple[int, ...] = tuple(signal_bd.shape) + if q_bd.shape != spatial_shape: + raise ValueError(f"IndexedAverager: Q shape {q_bd.shape} does not match signal shape {spatial_shape}.") + if psi_bd.shape != spatial_shape: + raise ValueError(f"IndexedAverager: Psi shape {psi_bd.shape} does not match signal shape {spatial_shape}.") + if pix_bd.shape != spatial_shape: + raise ValueError( + f"IndexedAverager: pixel_index shape {pix_bd.shape} does not match signal shape {spatial_shape}." + ) + + mask_bd: BaseData | None = None + # Optional mask: we accept 'Mask' or 'mask' + if "Mask" in databundle: + mask_bd = databundle["Mask"] + elif "mask" in databundle: + mask_bd = databundle["mask"] + + if mask_bd is not None and mask_bd.shape != spatial_shape: + raise ValueError( + f"IndexedAverager: Mask shape {mask_bd.shape} does not match signal shape {spatial_shape}." + ) + + return signal_bd, q_bd, psi_bd, pix_bd, mask_bd, spatial_shape + + # ------------------------------------------------------------------ + # Helper: core binning/averaging logic + # ------------------------------------------------------------------ + @staticmethod + def _compute_bin_averages( # noqa: C901 -- complexity # TODO: reduce complexity after testing with index_pixels + signal_bd: BaseData, + q_bd: BaseData, + psi_bd: BaseData, + pix_bd: BaseData, + mask_bd: BaseData | None, + use_signal_weights: bool, + use_signal_uncertainty_weights: bool, + uncertainty_weight_key: str | None, + ) -> Tuple[BaseData, BaseData, BaseData]: + """ + Core binning logic: produce 1D BaseData for signal, Q, Psi. + + All inputs are assumed to have identical shapes. + """ + + # Flatten arrays + sig_full = np.asarray(signal_bd.signal, dtype=float).ravel() + q_full = np.asarray(q_bd.signal, dtype=float).ravel() + psi_full = np.asarray(psi_bd.signal, dtype=float).ravel() + + pix_flat = np.asarray(pix_bd.signal, dtype=float).ravel().astype(int) + + if sig_full.size == 0: + raise ValueError("IndexedAverager: signal array is empty.") + + if not np.all(np.isfinite(pix_flat) | (pix_flat == -1)): + logger.warning("IndexedAverager: pixel_index contains non-finite entries; treating them as -1.") + pix_flat[~np.isfinite(pix_flat)] = -1 + + # Base validity: a pixel participates if index >= 0 + valid = pix_flat >= 0 + + # Apply optional mask: True means "masked" → exclude + if mask_bd is not None: + mask_flat = np.asarray(mask_bd.signal, dtype=bool).ravel() + if mask_flat.shape != pix_flat.shape: + raise ValueError("IndexedAverager: Mask shape does not match pixel_index.") + valid &= ~mask_flat + + # Exclude non-finite signal / Q / Psi + valid &= np.isfinite(sig_full) & np.isfinite(q_full) & np.isfinite(psi_full) + + if not np.any(valid): + raise ValueError("IndexedAverager: no valid pixels to average.") + + bin_idx = pix_flat[valid] + sig_valid = sig_full[valid] + q_valid = q_full[valid] + psi_valid = psi_full[valid] + + n_bins = int(bin_idx.max()) + 1 + if n_bins <= 0: + raise ValueError("IndexedAverager: inferred n_bins is non-positive.") + + # ------------------------------------------------------------------ + # 1. Combined weights + # ------------------------------------------------------------------ + w = np.ones_like(sig_full, dtype=float) + + if use_signal_weights: + w_bd = np.asarray(signal_bd.weights, dtype=float) + try: + w_bd_full = np.broadcast_to(w_bd, signal_bd.shape).ravel() + except ValueError as exc: + raise ValueError("IndexedAverager: could not broadcast signal.weights to signal shape.") from exc + w *= w_bd_full + + if use_signal_uncertainty_weights: + if uncertainty_weight_key is None: + raise ValueError( + "IndexedAverager: use_signal_uncertainty_weights=True but uncertainty_weight_key is None." + ) + if uncertainty_weight_key not in signal_bd.uncertainties: + raise KeyError( + f"IndexedAverager: uncertainty key {uncertainty_weight_key!r} not found in signal.uncertainties." # noqa: E713 + ) + + sigma_u = np.asarray(signal_bd.uncertainties[uncertainty_weight_key], dtype=float) + try: + sigma_full = np.broadcast_to(sigma_u, signal_bd.shape).ravel() + except ValueError as exc: + raise ValueError( + "IndexedAverager: could not broadcast chosen uncertainty array to signal shape." + ) from exc + + var_full = sigma_full**2 + # Only accept strictly positive finite variances for weighting + valid_sigma = np.isfinite(var_full) & (var_full > 0.0) + if not np.any(valid_sigma & valid): + raise ValueError( + "IndexedAverager: no pixels have positive finite variance under the chosen uncertainty_weight_key." + ) + + # Pixels with non-positive/NaN variance are effectively dropped + valid &= valid_sigma + var_full[~valid_sigma] = np.inf # avoid division by zero + w *= 1.0 / var_full + + # Recompute valid slice after potential tightening due to uncertainty weights + valid_idx = np.where(valid)[0] + bin_idx = pix_flat[valid_idx] + sig_valid = sig_full[valid_idx] + q_valid = q_full[valid_idx] + psi_valid = psi_full[valid_idx] + w_valid = w[valid_idx] + + if not np.any(w_valid > 0.0): + raise ValueError("IndexedAverager: all weights are zero; cannot compute averages.") + + # Clamp negative weights to zero (should not happen, but be robust) + w_valid = np.clip(w_valid, a_min=0.0, a_max=None) + + # ------------------------------------------------------------------ + # 2. Weighted sums for signal and Q + # ------------------------------------------------------------------ + sum_w = np.bincount(bin_idx, weights=w_valid, minlength=n_bins) + sum_wx = np.bincount(bin_idx, weights=w_valid * sig_valid, minlength=n_bins) + sum_wq = np.bincount(bin_idx, weights=w_valid * q_valid, minlength=n_bins) + + with np.errstate(divide="ignore", invalid="ignore"): + mean_signal = np.full(n_bins, np.nan, dtype=float) + mean_q = np.full(n_bins, np.nan, dtype=float) + + positive = sum_w > 0.0 + mean_signal[positive] = sum_wx[positive] / sum_w[positive] + mean_q[positive] = sum_wq[positive] / sum_w[positive] + + # ------------------------------------------------------------------ + # 3. Weighted circular mean for Psi + # ------------------------------------------------------------------ + psi_unit = psi_bd.units + + # Convert Psi to radians for trigonometric operations + cf_to_rad = ureg.radian.m_from(psi_unit) + psi_rad_valid = psi_valid * cf_to_rad + + cos_psi = np.cos(psi_rad_valid) + sin_psi = np.sin(psi_rad_valid) + + sum_wcos = np.bincount(bin_idx, weights=w_valid * cos_psi, minlength=n_bins) + sum_wsin = np.bincount(bin_idx, weights=w_valid * sin_psi, minlength=n_bins) + + with np.errstate(divide="ignore", invalid="ignore"): + mean_cos = np.full(n_bins, np.nan, dtype=float) + mean_sin = np.full(n_bins, np.nan, dtype=float) + mean_psi_rad = np.full(n_bins, np.nan, dtype=float) + + positive = sum_w > 0.0 + mean_cos[positive] = sum_wcos[positive] / sum_w[positive] + mean_sin[positive] = sum_wsin[positive] / sum_w[positive] + + mean_psi_rad[positive] = np.arctan2(mean_sin[positive], mean_cos[positive]) + + # Convert back to original Psi units + cf_from_rad = psi_unit.m_from(ureg.radian) + mean_psi = mean_psi_rad * cf_from_rad + + # ------------------------------------------------------------------ + # 4. Propagate uncertainties on signal, Q, Psi + # ------------------------------------------------------------------ + sig_unc_binned: Dict[str, np.ndarray] = {} + q_unc_binned: Dict[str, np.ndarray] = {} + psi_unc_binned: Dict[str, np.ndarray] = {} + + # Helper for propagation: sigma_mean = sqrt(sum (w^2 * sigma^2)) / sum_w + def _propagate_uncertainties(unc_dict: Dict[str, np.ndarray], ref_bd: BaseData) -> Dict[str, np.ndarray]: + result: Dict[str, np.ndarray] = {} + for key, arr in unc_dict.items(): + arr_full = np.asarray(arr, dtype=float) + try: + arr_full = np.broadcast_to(arr_full, ref_bd.shape).ravel() + except ValueError as exc: + raise ValueError( + f"IndexedAverager: could not broadcast uncertainty[{key!r}] to reference shape." + ) from exc + + arr_valid = arr_full[valid_idx] + var_valid = arr_valid**2 + + sum_w2_var = np.bincount(bin_idx, weights=(w_valid**2) * var_valid, minlength=n_bins) + + sigma = np.full(n_bins, np.nan, dtype=float) + with np.errstate(divide="ignore", invalid="ignore"): + positive = sum_w > 0.0 + sigma[positive] = np.sqrt(sum_w2_var[positive]) / sum_w[positive] + + result[key] = sigma + return result + + if signal_bd.uncertainties: + sig_unc_binned.update(_propagate_uncertainties(signal_bd.uncertainties, signal_bd)) + if q_bd.uncertainties: + q_unc_binned.update(_propagate_uncertainties(q_bd.uncertainties, q_bd)) + if psi_bd.uncertainties: + psi_unc_binned.update(_propagate_uncertainties(psi_bd.uncertainties, psi_bd)) + + # ------------------------------------------------------------------ + # 5. SEM from scatter of signal ("SEM" key) + # ------------------------------------------------------------------ + # Effective sample size: + sum_w2 = np.bincount(bin_idx, weights=w_valid**2, minlength=n_bins) + with np.errstate(divide="ignore", invalid="ignore"): + N_eff = np.full(n_bins, np.nan, dtype=float) + positive = sum_w2 > 0.0 + N_eff[positive] = (sum_w[positive] ** 2) / sum_w2[positive] + + # Weighted variance around mean + # dev_i = x_i - mean_signal[bin_idx_i] + mean_signal_per_pixel = mean_signal[bin_idx] + dev_valid = sig_valid - mean_signal_per_pixel + + sum_w_dev2 = np.bincount(bin_idx, weights=w_valid * (dev_valid**2), minlength=n_bins) + + with np.errstate(divide="ignore", invalid="ignore"): + var_spread = np.full(n_bins, np.nan, dtype=float) + sem_spread = np.full(n_bins, np.nan, dtype=float) + + valid_bins = (sum_w > 0.0) & np.isfinite(N_eff) & (N_eff > 1.0) + + var_spread[valid_bins] = sum_w_dev2[valid_bins] / sum_w[valid_bins] + sem_spread[valid_bins] = np.sqrt(var_spread[valid_bins] / N_eff[valid_bins]) + + # Add SEM as a dedicated uncertainty key on the binned signal + sig_unc_binned["SEM"] = sem_spread + + # ------------------------------------------------------------------ + # 6. Build output BaseData objects + # ------------------------------------------------------------------ + # 1D signal + signal_1d = BaseData( + signal=mean_signal, + units=signal_bd.units, + uncertainties=sig_unc_binned, + weights=np.ones_like(mean_signal, dtype=float), + axes=[], # will be filled based on averaging_direction in caller + rank_of_data=1, + ) + + # 1D Q + Q_1d = BaseData( + signal=mean_q, + units=q_bd.units, + uncertainties=q_unc_binned, + weights=np.ones_like(mean_q, dtype=float), + axes=[], + rank_of_data=1, + ) + + # 1D Psi + Psi_1d = BaseData( + signal=mean_psi, + units=psi_bd.units, + uncertainties=psi_unc_binned, + weights=np.ones_like(mean_psi, dtype=float), + axes=[], + rank_of_data=1, + ) + + return signal_1d, Q_1d, Psi_1d + + # ------------------------------------------------------------------ + # prepare_execution: nothing heavy here for now + # ------------------------------------------------------------------ + def prepare_execution(self) -> None: + """ + For IndexedAverager, there is no heavy geometry to precompute. + + All binning work depends on the per-frame signal, so we perform the + averaging inside calculate(). This method only validates that the + configuration is at least minimally sensible. + """ + # ensure configuration keys are present; defaults are already set by __attrs_post_init__ + direction = str(self.configuration.get("averaging_direction", "azimuthal")).lower() + if direction not in ("radial", "azimuthal"): + raise ValueError( + f"IndexedAverager: averaging_direction must be 'radial' or 'azimuthal', got {direction!r}." + ) + + # ------------------------------------------------------------------ + # calculate: perform per-key averaging using pixel_index + # ------------------------------------------------------------------ + def calculate(self) -> Dict[str, DataBundle]: + """ + For each databundle in with_processing_keys, perform the binning / + averaging using the precomputed pixel_index map and return updated + DataBundles containing 1D 'signal', 'Q', and 'Psi' BaseData. + """ + output: Dict[str, DataBundle] = {} + + if self.processing_data is None: + logger.warning("IndexedAverager: processing_data is None in calculate; nothing to do.") + return output + + keys = self._normalised_keys() + use_signal_weights = bool(self.configuration.get("use_signal_weights", True)) + use_unc_w = bool(self.configuration.get("use_signal_uncertainty_weights", False)) + uncertainty_weight_key = self.configuration.get("uncertainty_weight_key", None) + direction = str(self.configuration.get("averaging_direction", "azimuthal")).lower() + + for key in keys: + if key not in self.processing_data: + logger.warning( + "IndexedAverager: processing_data has no entry for key=%r; skipping.", + key, + ) + continue + + databundle = self.processing_data[key] + + ( + signal_bd, + q_bd, + psi_bd, + pix_bd, + mask_bd, + _spatial_shape, + ) = self._validate_inputs(databundle) + + # Compute binned 1D BaseData + signal_1d, Q_1d, Psi_1d = self._compute_bin_averages( + signal_bd=signal_bd, + q_bd=q_bd, + psi_bd=psi_bd, + pix_bd=pix_bd, + mask_bd=mask_bd, + use_signal_weights=use_signal_weights, + use_signal_uncertainty_weights=use_unc_w, + uncertainty_weight_key=uncertainty_weight_key, + ) + + # Attach axis: Q for azimuthal, Psi for radial (convention) + if direction == "azimuthal": + signal_1d.axes = [Q_1d] + else: # "radial" + signal_1d.axes = [Psi_1d] + + db_out = DataBundle( + { + "signal": signal_1d, + "Q": Q_1d, + "Psi": Psi_1d, + # pixel_index, Mask, etc. remain in the original databundle + } + ) + + output[key] = db_out + + return output diff --git a/src/modacor/modules/technique_modules/solid_angle_correction.py b/src/modacor/modules/technique_modules/solid_angle_correction.py new file mode 100644 index 0000000..f0e65d8 --- /dev/null +++ b/src/modacor/modules/technique_modules/solid_angle_correction.py @@ -0,0 +1,57 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw", "Armin Moser"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "29/10/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +__all__ = ["SolidAngleCorrection"] +__version__ = "20251029.1" + +from pathlib import Path + +# from modacor import ureg +# from modacor.dataclasss.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.process_step import ProcessStep +from modacor.dataclasses.process_step_describer import ProcessStepDescriber + + +class SolidAngleCorrection(ProcessStep): + """ + Normalize a signal by a solid angle "Omega" calculated using XSGeometry + """ + + documentation = ProcessStepDescriber( + calling_name="Solid Angle Correction", + calling_id="SolidAngleCorrection", + calling_module_path=Path(__file__), + calling_version=__version__, + required_data_keys=["signal", "Omega"], + modifies={"signal": ["signal", "uncertainties", "units"]}, + required_arguments={}, # none required, defaults all around + calling_arguments={}, + step_keywords=["divide", "normalize", "solid angle"], + step_doc="Divide the pixels in a signal by their solid angle coverage", + step_reference="DOI 10.1088/0953-8984/25/38/383201", + step_note="""This divides the signal by the value previously calculated + using the XSGeometry module""", + ) + + def calculate(self) -> dict[str, DataBundle]: + output: dict[str, DataBundle] = {} + + # actual work happens here: + for key in self.configuration["with_processing_keys"]: + databundle = self.processing_data.get(key) + # divide the data + # Rely on BaseData.__truediv__ for units + uncertainty propagation + databundle["signal"] /= databundle["Omega"] + output[key] = databundle + return output diff --git a/src/modacor/modules/technique_modules/xs_geometry.py b/src/modacor/modules/technique_modules/xs_geometry.py index ad9c4e3..47f91cb 100644 --- a/src/modacor/modules/technique_modules/xs_geometry.py +++ b/src/modacor/modules/technique_modules/xs_geometry.py @@ -85,7 +85,7 @@ class XSGeometry(ProcessStep): "wavelength_units_source": None, "wavelength_uncertainties_sources": {}, }, - works_on={ + modifies={ "Q": ["signal", "uncertainties"], "Q0": ["signal", "uncertainties"], "Q1": ["signal", "uncertainties"], diff --git a/src/modacor/tests/modules/base_modules/test_append_processing_data.py b/src/modacor/tests/modules/base_modules/test_append_processing_data.py new file mode 100644 index 0000000..9dd8cb5 --- /dev/null +++ b/src/modacor/tests/modules/base_modules/test_append_processing_data.py @@ -0,0 +1,222 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "03/12/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +import numpy as np +import pytest + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.processing_data import ProcessingData +from modacor.modules.base_modules.append_processing_data import AppendProcessingData + + +class FakeIoSources: + """ + Minimal IoSources stand-in for testing AppendProcessingData. + + It only implements the methods actually used by the step: + - get_data + - get_static_metadata + """ + + def __init__(self, data=None, metadata=None): + self.data = data or {} + self.metadata = metadata or {} + + def get_data(self, data_reference: str, load_slice=...): + return self.data[data_reference] + + def get_static_metadata(self, data_reference: str): + return self.metadata[data_reference] + + +@pytest.fixture +def signal_array(): + return np.arange(6, dtype=float).reshape(2, 3) + + +@pytest.fixture +def fake_io(signal_array): + # Only the signal is needed for the simplest tests + data = { + "sample::entry/instrument/detector/data": signal_array, + "sample::entry/instrument/detector/sigma": np.ones_like(signal_array), + } + metadata = { + # for rank-of-data-from-metadata test + "sample::config/rank": 1, + # for units_location test (if you decide to use it later) + "sample::entry/instrument/detector/data@units": "dimensionless", + } + return FakeIoSources(data=data, metadata=metadata) + + +def _make_step(fake_io, processing_data=None) -> AppendProcessingData: + """ + Helper to create a minimally configured AppendProcessingData instance. + """ + if processing_data is None: + processing_data = ProcessingData() + + step = AppendProcessingData(io_sources=fake_io) + step.processing_data = processing_data + return step + + +# --------------------------------------------------------------------------- # +# 1. Basic creation: new DataBundle with default output key "signal" +# --------------------------------------------------------------------------- # +def test_append_processing_data_creates_new_bundle(fake_io, signal_array): + step = _make_step(fake_io) + + step.modify_config_by_dict( + { + "processing_key": "sample", + "signal_location": "sample::entry/instrument/detector/data", + "rank_of_data": 2, + # rely on default databundle_output_key = "signal" + "units_location": None, + "units_override": None, + "uncertainties_sources": {}, + } + ) + + output = step.calculate() + + # Only one bundle, keyed by processing_key + assert list(output.keys()) == ["sample"] + bundle = output["sample"] + assert isinstance(bundle, DataBundle) + + # New bundle is also stored in processing_data + assert "sample" in step.processing_data + assert step.processing_data["sample"] is bundle + + # A BaseData is stored under "signal" with the correct data + assert "signal" in bundle + bd = bundle["signal"] + assert isinstance(bd, BaseData) + np.testing.assert_array_equal(bd.signal, signal_array) + + # Default units (no units_location / override) → dimensionless + assert bd.units == ureg.dimensionless + + # rank_of_data comes from configuration + assert bd.rank_of_data == 2 + + # default_plot should be set to the output key for a new bundle + assert bundle.default_plot == "signal" + + +# --------------------------------------------------------------------------- # +# 2. Updating an existing DataBundle (reusing processing_key) +# --------------------------------------------------------------------------- # +def test_append_processing_data_updates_existing_bundle(fake_io, signal_array): + processing_data = ProcessingData() + + # Pre-existing bundle with some content + existing_bundle = DataBundle() + existing_bd = BaseData(signal=np.ones_like(signal_array), units=ureg.dimensionless) + existing_bundle["existing"] = existing_bd + existing_bundle.default_plot = "existing" + processing_data["sample"] = existing_bundle + + step = _make_step(fake_io, processing_data=processing_data) + + # Configure the step to write into "I" for the same processing_key + step.modify_config_by_dict( + { + "processing_key": "sample", + "signal_location": "sample::entry/instrument/detector/data", + "rank_of_data": 1, + "databundle_output_key": "I", + "units_location": None, + "units_override": None, + "uncertainties_sources": {}, + } + ) + + output = step.calculate() + + # Still just one bundle, still the same object + assert list(output.keys()) == ["sample"] + bundle = output["sample"] + assert bundle is existing_bundle + + # Existing entry retained + assert "existing" in bundle + assert bundle["existing"] is existing_bd + + # New entry added/overwritten at key "I" + assert "I" in bundle + bd_I = bundle["I"] + np.testing.assert_array_equal(bd_I.signal, signal_array) + assert bd_I.rank_of_data == 1 + + # default_plot should NOT be changed if it was already set + assert bundle.default_plot == "existing" + + +# --------------------------------------------------------------------------- # +# 3. rank_of_data resolved from IoSources metadata (string reference) +# --------------------------------------------------------------------------- # +def test_append_processing_data_rank_from_metadata(fake_io, signal_array): + step = _make_step(fake_io) + + # rank_of_data is a metadata reference this time + step.modify_config_by_dict( + { + "processing_key": "sample", + "signal_location": "sample::entry/instrument/detector/data", + "rank_of_data": "sample::config/rank", # resolves to 1 in fake_io.metadata + "units_location": None, + "units_override": None, + "uncertainties_sources": {}, + } + ) + + output = step.calculate() + bd = output["sample"]["signal"] + + # Should have picked up rank 1 from metadata + assert bd.rank_of_data == 1 + + +# --------------------------------------------------------------------------- # +# 4. uncertainties_sources wiring +# --------------------------------------------------------------------------- # +def test_append_processing_data_adds_uncertainties(fake_io, signal_array): + step = _make_step(fake_io) + + step.modify_config_by_dict( + { + "processing_key": "sample", + "signal_location": "sample::entry/instrument/detector/data", + "rank_of_data": 2, + "units_location": None, + "units_override": None, + "uncertainties_sources": { + "sigma": "sample::entry/instrument/detector/sigma", + }, + } + ) + + output = step.calculate() + bd = output["sample"]["signal"] + + assert "sigma" in bd.uncertainties + np.testing.assert_array_equal( + bd.uncertainties["sigma"], + np.ones_like(signal_array), + ) diff --git a/src/modacor/tests/modules/base_modules/test_append_source.py b/src/modacor/tests/modules/base_modules/test_append_source.py new file mode 100644 index 0000000..ffb73c7 --- /dev/null +++ b/src/modacor/tests/modules/base_modules/test_append_source.py @@ -0,0 +1,180 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "30/11/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +import sys +import types +from typing import List, Tuple + +import pytest + +from modacor.io.io_sources import IoSource, IoSources +from modacor.modules.base_modules.append_source import AppendSource + + +def _install_dummy_loader_module(monkeypatch) -> Tuple[str, List[tuple]]: + """ + Install a dummy loader module into sys.modules and return: + + - the fully qualified loader path string + - a list that will collect call arguments for assertions + """ + calls: List[tuple] = [] + + module_name = "modacor.tests.dummy_loader_module" + mod = types.ModuleType(module_name) + + class DummySource(IoSource): + """ + Minimal IoSource subclass used only for testing. + + We deliberately override __init__ and *do not* call super().__init__ + to avoid depending on IoSource's real constructor signature. + We just provide the attributes IoSources.register_source is expected + to use (source_reference, resource_location). + """ + + def __init__(self, ref: str, loc: str) -> None: + self.source_reference = ref + self.resource_location = loc + + def dummy_loader(*, source_reference: str, resource_location: str): + """ + Minimal loader that mimics the real loader signature used by AppendSource. + Returns a DummySource instance and records calls for assertions. + """ + calls.append((source_reference, resource_location)) + return DummySource(source_reference, resource_location) + + # The attribute name here is what AppendSource._resolve_loader_callable + # will try to retrieve from the module. + mod.DummyLoader = dummy_loader # type: ignore[attr-defined] + + # Inject into sys.modules so import_module() can find it + monkeypatch.setitem(sys.modules, module_name, mod) + + loader_path = f"{module_name}.DummyLoader" + return loader_path, calls + + +def _make_append_source_instance() -> AppendSource: + """ + Create an AppendSource instance without going through ProcessStep.__init__. + + We only need 'configuration' and 'io_sources' for these tests, so __new__ + is sufficient and avoids coupling to ProcessStep's constructor signature. + """ + instance = AppendSource.__new__(AppendSource) + return instance + + +def test_append_single_source(monkeypatch): + loader_path, calls = _install_dummy_loader_module(monkeypatch) + + step = _make_append_source_instance() + step.configuration = { + "source_identifier": "sample_source", + "source_location": "/tmp/sample.dat", + "loader_module": loader_path, + } + step.io_sources = IoSources() + + result = step.calculate() + + # No databundles modified + assert result == {} + + # The new source should be present + assert "sample_source" in step.io_sources.defined_sources + + # Loader should be called exactly once with the expected args + assert calls == [("sample_source", "/tmp/sample.dat")] + + +def test_append_multiple_sources(monkeypatch): + loader_path, calls = _install_dummy_loader_module(monkeypatch) + + step = _make_append_source_instance() + step.configuration = { + "source_identifier": ["src1", "src2"], + "source_location": ["/tmp/file1.dat", "/tmp/file2.dat"], + "loader_module": loader_path, + } + step.io_sources = IoSources() + + result = step.calculate() + + # Still no databundles modified + assert result == {} + + # Both sources should be present + assert "src1" in step.io_sources.defined_sources + assert "src2" in step.io_sources.defined_sources + + # Loader should have been called twice in order + assert calls == [ + ("src1", "/tmp/file1.dat"), + ("src2", "/tmp/file2.dat"), + ] + + +def test_mismatched_source_lengths_raises(monkeypatch): + loader_path, _ = _install_dummy_loader_module(monkeypatch) + + step = _make_append_source_instance() + step.configuration = { + "source_identifier": ["src1", "src2"], + "source_location": ["/tmp/only_one.dat"], + "loader_module": loader_path, + } + step.io_sources = IoSources() + + with pytest.raises(ValueError, match="counts must match"): + step.calculate() + + +def test_existing_source_is_not_overwritten(monkeypatch): + loader_path, calls = _install_dummy_loader_module(monkeypatch) + + step = _make_append_source_instance() + step.io_sources = IoSources() + + class PreExistingSource(IoSource): + """ + Pre-registered IoSource subclass for testing overwrite behaviour. + """ + + def __init__(self, ref: str, loc: str) -> None: + self.source_reference = ref + self.resource_location = loc + + # Pre-register a source with identifier "existing" + step.io_sources.register_source(PreExistingSource(ref="existing", loc="/tmp/original.dat")) + + # Configuration attempts to append a source with the same identifier + step.configuration = { + "source_identifier": "existing", + "source_location": "/tmp/new_location.dat", + "loader_module": loader_path, + } + + result = step.calculate() + + # No databundles modified + assert result == {} + + # The identifier should still be present… + assert "existing" in step.io_sources.defined_sources + + # …but the loader should not have been called at all, + # since AppendSource checks defined_sources before appending. + assert calls == [] diff --git a/src/modacor/tests/modules/technique_modules/test_index_pixels.py b/src/modacor/tests/modules/technique_modules/test_index_pixels.py new file mode 100644 index 0000000..0593040 --- /dev/null +++ b/src/modacor/tests/modules/technique_modules/test_index_pixels.py @@ -0,0 +1,426 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "30/11/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports +__version__ = "20251130.1" + +""" +Tests for the IndexPixels processing step. + +We test: +- Basic azimuthal binning in Q (1D case). +- Basic radial binning in Psi (1D case). +- Unit conversion for q_limits_unit. +- Psi wrap-around masking for azimuthal direction. +- A small integration-style test using prepare_execution() + calculate(). +""" + +import numpy as np + +# import pytest +import pint +from numpy.testing import assert_array_equal + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.processing_data import ProcessingData +from modacor.io.io_sources import IoSources +from modacor.modules.technique_modules.index_pixels import IndexPixels + +# --------------------------------------------------------------------------- +# Small helpers +# --------------------------------------------------------------------------- + + +def make_1d_signal_bundle( + q_values: np.ndarray, + psi_values: np.ndarray, + q_unit: pint.Unit = ureg.Unit("1/nm"), + psi_unit: pint.Unit = ureg.Unit("radian"), +) -> DataBundle: + """ + Build a minimal 1D databundle with signal, Q, Psi. + """ + n = q_values.size + assert psi_values.size == n + + signal_bd = BaseData( + signal=np.ones(n, dtype=float), + units=ureg.dimensionless, + rank_of_data=1, + ) + q_bd = BaseData( + signal=np.asarray(q_values, dtype=float), + units=q_unit, + rank_of_data=1, + ) + psi_bd = BaseData( + signal=np.asarray(psi_values, dtype=float), + units=psi_unit, + rank_of_data=1, + ) + + # simple axis metadata (optional, but useful to check propagation) + axis_bd = BaseData( + signal=np.arange(n, dtype=float), + units=ureg.pixel, + rank_of_data=0, + ) + signal_bd.axes = [axis_bd] + + bundle = DataBundle( + { + "signal": signal_bd, + "Q": q_bd, + "Psi": psi_bd, + } + ) + return bundle + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_indexpixels_azimuthal_basic_binning_1d_linear(): + """ + Azimuthal direction: + - Bins along Q (linear bins). + - Psi ROI is full circle. + - Check that obvious Q values fall into expected bins. + """ + # Q values: two points clearly in each bin, others outside range + q_vals = np.array([0.5, 1.25, 1.75, 2.25, 2.75, 3.5], dtype=float) + # Psi irrelevant here beyond being finite and inside ROI + psi_vals = np.zeros_like(q_vals, dtype=float) + + bundle = make_1d_signal_bundle( + q_values=q_vals, + psi_values=psi_vals, + q_unit=ureg.dimensionless, + psi_unit=ureg.degree, + ) + + processing_data = ProcessingData() + processing_data["test"] = bundle + + step = IndexPixels(io_sources=IoSources()) + step.processing_data = processing_data + step._prepared_data = {} + + step.configuration = { + "with_processing_keys": ["test"], + "output_processing_key": None, + # azimuthal: binning along Q + "averaging_direction": "azimuthal", + "n_bins": 2, + "bin_type": "linear", + "q_min": 1.0, + "q_max": 3.0, + "q_limits_unit": None, # same as Q.units + # full-circle Psi in degrees + "psi_min": 0.0, + "psi_max": 360.0, + "psi_limits_unit": "degree", + } + + step.prepare_execution() + out = step.calculate() + + assert "test" in out + db = out["test"] + assert "pixel_index" in db + + pix = db["pixel_index"] + assert pix.signal.shape == q_vals.shape + assert pix.units == ureg.dimensionless + + # Edges are [1, 2, 3]. With searchsorted(side="right") - 1: + # Q: 0.5 1.25 1.75 2.25 2.75 3.5 + # idx: -1 0 0 1 1 -1 (plus Q-range mask) + expected = np.array([-1, 0, 0, 1, 1, -1], dtype=int) + # pixel_index stored as float; cast to int for comparison + result = pix.signal.astype(int) + assert_array_equal(result, expected) + + +def test_indexpixels_radial_basic_binning_1d_linear_psi(): + """ + Radial direction: + - Bins along Psi (linear bins from psi_min to psi_max). + - Q ROI wide enough to include all pixels. + - Check straightforward bin assignment for four angles. + """ + # Four angles: 0°, 90°, 180°, 270° + psi_deg = np.array([0.0, 90.0, 180.0, 270.0], dtype=float) + psi_rad = np.deg2rad(psi_deg) + q_vals = np.ones_like(psi_deg, dtype=float) # simple constant Q + + bundle = make_1d_signal_bundle( + q_values=q_vals, + psi_values=psi_rad, + q_unit=ureg.dimensionless, + psi_unit=ureg.radian, + ) + + processing_data = ProcessingData() + processing_data["test"] = bundle + + step = IndexPixels(io_sources=IoSources()) + step.processing_data = processing_data + step._prepared_data = {} + + step.configuration = { + "with_processing_keys": ["test"], + "output_processing_key": None, + "averaging_direction": "radial", + "n_bins": 4, + "bin_type": "linear", + # Wide Q ROI, everything included + "q_min": 0.0, + "q_max": 2.0, + "q_limits_unit": None, + # Psi bin range 0..360° in radians + "psi_min": 0.0, + "psi_max": 360.0, + "psi_limits_unit": "degree", + } + + step.prepare_execution() + out = step.calculate() + + db = out["test"] + pix = db["pixel_index"] + + assert pix.signal.shape == psi_deg.shape + assert pix.units == ureg.dimensionless + + # Edges (in radians) correspond to [0, 90, 180, 270, 360] deg. + # Values: 0°, 90°, 180°, 270° → bins 0, 1, 2, 3 + expected = np.array([0, 1, 2, 3], dtype=int) + result = pix.signal.astype(int) + assert_array_equal(result, expected) + + +def test_indexpixels_azimuthal_q_limits_unit_conversion(): + """ + Check that q_min/q_max specified in q_limits_unit are converted + correctly to the Q units. + + Example: + - Q in 1/Å, + - q_limits_unit = 1/nm, + - q_min = 1.5 1/nm → 0.15 1/Å + - q_max = 2.5 1/nm → 0.25 1/Å + Only Q values in [0.15, 0.25] should be included. + """ + Q_unit = ureg.Unit("1/angstrom") + + q_vals = np.array([0.10, 0.20, 0.30], dtype=float) # in 1/Å + psi_vals = np.zeros_like(q_vals, dtype=float) + + bundle = make_1d_signal_bundle( + q_values=q_vals, + psi_values=psi_vals, + q_unit=Q_unit, + psi_unit=ureg.radian, + ) + + processing_data = ProcessingData() + processing_data["test"] = bundle + + step = IndexPixels(io_sources=IoSources()) + step.processing_data = processing_data + step._prepared_data = {} + + step.configuration = { + "with_processing_keys": ["test"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "n_bins": 1, + "bin_type": "linear", + # In 1/nm; will be converted to 1/Å + "q_min": 1.5, + "q_max": 2.5, + "q_limits_unit": "1/nm", + # full circle in Psi + "psi_min": 0.0, + "psi_max": 2.0 * np.pi, + "psi_limits_unit": "radian", + } + + step.prepare_execution() + out = step.calculate() + + db = out["test"] + pix = db["pixel_index"] + + # Only the middle value (0.20 1/Å) lies between 0.15 and 0.25 + # and within the Q-range mask; with 1 bin, its index should be 0. + expected = np.array([-1, 0, -1], dtype=int) + result = pix.signal.astype(int) + assert_array_equal(result, expected) + + +def test_indexpixels_azimuthal_psi_wraparound_mask(): + """ + Azimuthal direction with Psi wrap-around ROI: + psi_min > psi_max, e.g. 300°..60°. + + Pixels with Psi in [300°, 360°) U [0°, 60°] should be included; + others should be masked to -1. + """ + psi_deg = np.array([10.0, 100.0, 200.0, 350.0], dtype=float) + psi_rad = np.deg2rad(psi_deg) + # Q values chosen to be all in-range for binning + q_vals = np.array([1.0, 1.5, 2.0, 2.5], dtype=float) + + bundle = make_1d_signal_bundle( + q_values=q_vals, + psi_values=psi_rad, + q_unit=ureg.dimensionless, + psi_unit=ureg.radian, + ) + + processing_data = ProcessingData() + processing_data["test"] = bundle + + step = IndexPixels(io_sources=IoSources()) + step.processing_data = processing_data + step._prepared_data = {} + + step.configuration = { + "with_processing_keys": ["test"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "n_bins": 4, + "bin_type": "linear", + "q_min": 0.5, + "q_max": 3.0, + "q_limits_unit": None, + # wrap-around: 300° .. 60° + "psi_min": 300.0, + "psi_max": 60.0, + "psi_limits_unit": "degree", + } + + step.prepare_execution() + out = step.calculate() + + db = out["test"] + pix = db["pixel_index"] + + # The first (10°) and last (350°) pixels lie inside the wrap-around ROI. + # The internal binning is along Q; with 4 linear bins between 0.5 and 3.0: + # edges [0.5, 1.125, 1.75, 2.375, 3.0] + # q = [1.0, 1.5, 2.0, 2.5] -> indices [0, 1, 2, 3] if unmasked. + # Applying the Psi ROI: + # 10° -> index 0 + # 100° -> masked -> -1 + # 200° -> masked -> -1 + # 350° -> index 3 + expected = np.array([0, -1, -1, 3], dtype=int) + result = pix.signal.astype(int) + assert_array_equal(result, expected) + + +def test_indexpixels_prepare_and_calculate_integration_2d(): + """ + Integration-style test (2D case): + - Build a 2D signal, Q, Psi. + - Run prepare_execution() and calculate(). + - Check that the pixel_index BaseData is present with expected shape, + axes, and rank_of_data. + """ + n0, n1 = 4, 6 + spatial_shape = (n0, n1) + + # Simple Q: monotonically increasing across the flattened array + q_vals = np.linspace(0.1, 2.0, num=n0 * n1, dtype=float).reshape(spatial_shape) + # Psi: zeros (full-circle ROI) + psi_vals = np.zeros(spatial_shape, dtype=float) + + signal_bd = BaseData( + signal=np.ones(spatial_shape, dtype=float), + units=ureg.dimensionless, + rank_of_data=2, + ) + q_bd = BaseData( + signal=q_vals, + units=ureg.Unit("1/nm"), + rank_of_data=2, + ) + psi_bd = BaseData( + signal=psi_vals, + units=ureg.radian, + rank_of_data=2, + ) + + # Add two spatial axes for completeness + axis_row = BaseData( + signal=np.arange(n0, dtype=float), + units=ureg.pixel, + rank_of_data=0, + ) + axis_col = BaseData( + signal=np.arange(n1, dtype=float), + units=ureg.pixel, + rank_of_data=0, + ) + signal_bd.axes = [axis_row, axis_col] + + bundle = DataBundle( + { + "signal": signal_bd, + "Q": q_bd, + "Psi": psi_bd, + } + ) + + processing_data = ProcessingData() + processing_data["image"] = bundle + + step = IndexPixels(io_sources=IoSources()) + step.processing_data = processing_data + step._prepared_data = {} + + step.configuration = { + "with_processing_keys": ["image"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "n_bins": 8, + "bin_type": "log", + "q_min": None, # auto from data + "q_max": None, + "q_limits_unit": None, + "psi_min": 0.0, + "psi_max": 2.0 * np.pi, + "psi_limits_unit": "radian", + } + + step.prepare_execution() + out = step.calculate() + + assert "image" in out + db = out["image"] + assert "pixel_index" in db + + pix = db["pixel_index"] + assert isinstance(pix, BaseData) + assert pix.signal.shape == spatial_shape + assert pix.rank_of_data == signal_bd.rank_of_data + + # axes should match the last rank_of_data axes of the original signal + assert len(pix.axes) == len(signal_bd.axes[-signal_bd.rank_of_data :]) + # dimensionless units + assert pix.units == ureg.dimensionless diff --git a/src/modacor/tests/modules/technique_modules/test_indexed_averaging.py b/src/modacor/tests/modules/technique_modules/test_indexed_averaging.py new file mode 100644 index 0000000..d8088e3 --- /dev/null +++ b/src/modacor/tests/modules/technique_modules/test_indexed_averaging.py @@ -0,0 +1,513 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +""" +Tests for the IndexedAverager processing step. + +We test: +- Basic 1D averaging with a simple, hand-crafted pixel_index map. +- Correct handling of Mask and pixel_index == -1. +- Uncertainty propagation from per-pixel uncertainties to bin-mean. +- SEM ("SEM" key) behaviour for signal. +- Integration-style test using prepare_execution() + calculate(). +""" + +import numpy as np +import pytest +from numpy.testing import assert_allclose + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.processing_data import ProcessingData +from modacor.io.io_sources import IoSources +from modacor.modules.technique_modules.indexed_averager import IndexedAverager + +# --------------------------------------------------------------------------- +# Small helpers to build simple test databundles +# --------------------------------------------------------------------------- + + +def make_1d_bundle_basic(): + """ + Simple 1D test bundle with 4 pixels and 2 bins. + + pixel_index: + - pixels 0, 1 -> bin 0 + - pixels 2, 3 -> bin 1 + """ + signal = np.array([1.0, 2.0, 3.0, 4.0], dtype=float) + Q = np.array([10.0, 20.0, 30.0, 40.0], dtype=float) + Psi = np.array([0.0, 0.0, np.pi, np.pi], dtype=float) + + signal_bd = BaseData( + signal=signal, + units=ureg.dimensionless, + rank_of_data=1, + ) + Q_bd = BaseData( + signal=Q, + units=ureg.dimensionless, + rank_of_data=1, + ) + Psi_bd = BaseData( + signal=Psi, + units=ureg.radian, + rank_of_data=1, + ) + pixel_index = np.array([0, 0, 1, 1], dtype=float) + pix_bd = BaseData( + signal=pixel_index, + units=ureg.dimensionless, + rank_of_data=1, + ) + + bundle = DataBundle( + { + "signal": signal_bd, + "Q": Q_bd, + "Psi": Psi_bd, + "pixel_index": pix_bd, + } + ) + return bundle + + +def make_1d_bundle_with_mask_and_uncertainties(): + """ + 1D test bundle with: + - Mask on one pixel. + - Per-pixel uncertainties on signal and Q. + - 2 bins as before. + + pixel_index: + - pixel 0 -> bin 0 + - pixel 1 -> bin 0 (masked) + - pixel 2 -> bin 1 + - pixel 3 -> -1 (ignored) + """ + signal = np.array([1.0, 2.0, 3.0, 4.0], dtype=float) + Q = np.array([10.0, 20.0, 30.0, 40.0], dtype=float) + Psi = np.array([0.0, 0.0, np.pi, np.pi], dtype=float) + + # uncertainties: "sigma" for signal, "dq" for Q + sigma_signal = np.array([0.1, 0.1, 0.2, 0.2], dtype=float) + dq = np.array([0.5, 0.5, 1.0, 1.0], dtype=float) + + signal_bd = BaseData( + signal=signal, + units=ureg.dimensionless, + uncertainties={"sigma": sigma_signal}, + rank_of_data=1, + ) + Q_bd = BaseData( + signal=Q, + units=ureg.dimensionless, + uncertainties={"dq": dq}, + rank_of_data=1, + ) + Psi_bd = BaseData( + signal=Psi, + units=ureg.radian, + rank_of_data=1, + ) + + pixel_index = np.array([0, 0, 1, -1], dtype=float) + pix_bd = BaseData( + signal=pixel_index, + units=ureg.dimensionless, + rank_of_data=1, + ) + + # Mask out pixel 1; booleans, True = masked + mask_arr = np.array([False, True, False, False], dtype=bool) + mask_bd = BaseData( + signal=mask_arr, + units=ureg.dimensionless, + rank_of_data=1, + ) + + bundle = DataBundle( + { + "signal": signal_bd, + "Q": Q_bd, + "Psi": Psi_bd, + "pixel_index": pix_bd, + "Mask": mask_bd, + } + ) + return bundle + + +def make_2d_bundle_basic(): + """ + Simple 2D test bundle (2 x 3) with 3 bins. + + Layout (row-major): + + signal = + [[1, 2, 3], + [4, 5, 6]] + + Q = + [[10, 20, 30], + [40, 50, 60]] + + Psi = all zeros (for a trivial circular mean) + + pixel_index = + [[0, 0, 1], + [1, 2, 2]] + + So: + bin 0: pixels (0,0), (0,1) → signal 1,2; Q 10,20 + bin 1: pixels (0,2), (1,0) → signal 3,4; Q 30,40 + bin 2: pixels (1,1), (1,2) → signal 5,6; Q 50,60 + """ + signal = np.array( + [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], + dtype=float, + ) + Q = np.array( + [[10.0, 20.0, 30.0], [40.0, 50.0, 60.0]], + dtype=float, + ) + # Psi all zeros in radians → mean Psi per bin is trivially 0 + Psi = np.zeros_like(signal, dtype=float) + + # Simple per-pixel uncertainties (same everywhere) + sigma_signal = np.full_like(signal, 0.1, dtype=float) + dq = np.full_like(signal, 0.5, dtype=float) + + signal_bd = BaseData( + signal=signal, + units=ureg.dimensionless, + uncertainties={"sigma": sigma_signal}, + rank_of_data=2, + ) + Q_bd = BaseData( + signal=Q, + units=ureg.dimensionless, + uncertainties={"dq": dq}, + rank_of_data=2, + ) + Psi_bd = BaseData( + signal=Psi, + units=ureg.radian, + rank_of_data=2, + ) + + pixel_index = np.array( + [[0, 0, 1], [1, 2, 2]], + dtype=float, + ) + pix_bd = BaseData( + signal=pixel_index, + units=ureg.dimensionless, + rank_of_data=2, + ) + + bundle = DataBundle( + { + "signal": signal_bd, + "Q": Q_bd, + "Psi": Psi_bd, + "pixel_index": pix_bd, + } + ) + return bundle + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_indexedaverager_1d_basic_unweighted_mean(): + """ + For a simple 1D bundle without mask and with equal weights: + - Bin 0 averages pixels 0 and 1. + - Bin 1 averages pixels 2 and 3. + We check signal, Q, Psi means and axis wiring. + """ + step = IndexedAverager(io_sources=IoSources()) + + processing_data = ProcessingData() + processing_data["bundle"] = make_1d_bundle_basic() + + step.processing_data = processing_data + step.configuration.update( + { + "with_processing_keys": ["bundle"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "use_signal_weights": True, + "use_signal_uncertainty_weights": False, + "uncertainty_weight_key": None, + } + ) + + step.prepare_execution() + out = step.calculate() + + assert "bundle" in out + db_out = out["bundle"] + + assert "signal" in db_out + assert "Q" in db_out + assert "Psi" in db_out + + sig_1d = db_out["signal"] + Q_1d = db_out["Q"] + Psi_1d = db_out["Psi"] + + # 2 bins + assert sig_1d.signal.shape == (2,) + assert Q_1d.signal.shape == (2,) + assert Psi_1d.signal.shape == (2,) + + # Simple averages: + # bin 0: (1+2)/2 = 1.5, (10+20)/2 = 15, Psi = 0 + # bin 1: (3+4)/2 = 3.5, (30+40)/2 = 35, Psi = pi + assert_allclose(sig_1d.signal, np.array([1.5, 3.5]), rtol=1e-12, atol=1e-12) + assert_allclose(Q_1d.signal, np.array([15.0, 35.0]), rtol=1e-12, atol=1e-12) + assert_allclose(Psi_1d.signal, np.array([0.0, np.pi]), rtol=1e-12, atol=1e-12) + + # Axis wiring: for averaging_direction="azimuthal", signal axes should reference Q + assert len(sig_1d.axes) == 1 + assert sig_1d.axes[0] is Q_1d + + +def test_indexedaverager_mask_and_negative_index(): + """ + Check that: + - Masked pixels are excluded. + - Pixels with pixel_index == -1 are excluded. + - Means are computed only from remaining pixels. + """ + step = IndexedAverager(io_sources=IoSources()) + + processing_data = ProcessingData() + processing_data["bundle"] = make_1d_bundle_with_mask_and_uncertainties() + + step.processing_data = processing_data + step.configuration.update( + { + "with_processing_keys": ["bundle"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "use_signal_weights": True, + "use_signal_uncertainty_weights": False, + "uncertainty_weight_key": None, + } + ) + + step.prepare_execution() + out = step.calculate() + + db_out = out["bundle"] + sig_1d = db_out["signal"] + Q_1d = db_out["Q"] + Psi_1d = db_out["Psi"] + + # Valid pixels: + # - pixel 0: index 0, not masked -> bin 0 + # - pixel 1: index 0, masked -> ignored + # - pixel 2: index 1, not masked -> bin 1 + # - pixel 3: index -1 -> ignored + # So: + # bin 0: signal=1, Q=10, Psi=0 + # bin 1: signal=3, Q=30, Psi=pi + assert_allclose(sig_1d.signal, np.array([1.0, 3.0]), rtol=1e-12, atol=1e-12) + assert_allclose(Q_1d.signal, np.array([10.0, 30.0]), rtol=1e-12, atol=1e-12) + assert_allclose(Psi_1d.signal, np.array([0.0, np.pi]), rtol=1e-12, atol=1e-12) + + +def test_indexedaverager_uncertainty_propagation_and_sem(): + """ + For the basic 1D setup without mask, with a simple per-pixel uncertainty: + - Check that per-bin propagated uncertainties on signal match the expected + sqrt(sum(w^2 sigma^2)) / sum_w for equal weights. + - Check that SEM ("SEM") is present and behaves as expected. + """ + step = IndexedAverager(io_sources=IoSources()) + + # Start from the basic bundle and add a sigma uncertainty + bundle = make_1d_bundle_basic() + sigma_signal = np.array([0.1, 0.1, 0.2, 0.2], dtype=float) + bundle["signal"].uncertainties = {"sigma": sigma_signal} + + processing_data = ProcessingData() + processing_data["bundle"] = bundle + + step.processing_data = processing_data + step.configuration.update( + { + "with_processing_keys": ["bundle"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "use_signal_weights": True, + "use_signal_uncertainty_weights": False, + "uncertainty_weight_key": None, + } + ) + + step.prepare_execution() + out = step.calculate() + + sig_1d = out["bundle"]["signal"] + assert "sigma" in sig_1d.uncertainties + assert "SEM" in sig_1d.uncertainties + + # Expected propagated sigma on bin means: + # bin 0: pixels 0,1, sigma=0.1 each + # sigma_mean0 = sqrt(0.1^2 + 0.1^2) / 2 = 0.1 / sqrt(2) + # bin 1: pixels 2,3, sigma=0.2 each + # sigma_mean1 = sqrt(0.2^2 + 0.2^2) / 2 = 0.2 / sqrt(2) + expected_sigma = np.array([0.1 / np.sqrt(2.0), 0.2 / np.sqrt(2.0)], dtype=float) + + assert_allclose(sig_1d.uncertainties["sigma"], expected_sigma, rtol=1e-12, atol=1e-12) + + # SEM from scatter: + # For basic bundle signal = [1,2,3,4] and means [1.5, 3.5] + # bin 0: dev = [-0.5, +0.5], sum(dev^2) = 0.5, sum_w=2 -> var_spread=0.25 + # bin 1: same pattern + # Effective N_eff = (sum_w^2 / sum_w2) = 4/2 = 2 + # sem = sqrt(var_spread / N_eff) = sqrt(0.25/2) = sqrt(0.125) + expected_sem = np.full(2, np.sqrt(0.125), dtype=float) + + sem = sig_1d.uncertainties["SEM"] + assert_allclose(sem, expected_sem, rtol=1e-12, atol=1e-12) + + +def test_indexedaverager_raises_on_missing_uncertainty_weight_key(): + """ + If use_signal_uncertainty_weights=True but uncertainty_weight_key is None, + a ValueError should be raised. + """ + step = IndexedAverager(io_sources=IoSources()) + + processing_data = ProcessingData() + processing_data["bundle"] = make_1d_bundle_basic() + + step.processing_data = processing_data + step.configuration.update( + { + "with_processing_keys": ["bundle"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "use_signal_weights": True, + "use_signal_uncertainty_weights": True, + "uncertainty_weight_key": None, + } + ) + + step.prepare_execution() + with pytest.raises(ValueError): + _ = step.calculate() + + +def test_indexedaverager_prepare_and_calculate_integration_radial_axis(): + """ + Integration-style test: + - Build a small 1D bundle. + - Run prepare_execution() and calculate(). + - Check that for averaging_direction='radial', signal.axes[0] references Psi. + """ + step = IndexedAverager(io_sources=IoSources()) + + processing_data = ProcessingData() + processing_data["bundle"] = make_1d_bundle_basic() + + step.processing_data = processing_data + step.configuration.update( + { + "with_processing_keys": ["bundle"], + "output_processing_key": None, + "averaging_direction": "radial", + "use_signal_weights": True, + "use_signal_uncertainty_weights": False, + "uncertainty_weight_key": None, + } + ) + + step.prepare_execution() + out = step.calculate() + + db_out = out["bundle"] + sig_1d = db_out["signal"] + Psi_1d = db_out["Psi"] + + assert len(sig_1d.axes) == 1 + # For radial averaging, we expect signal.axes[0] to be Psi + assert sig_1d.axes[0] is Psi_1d + + +def test_indexedaverager_2d_basic_unweighted_mean(): + """ + 2D main-use-case test: + + - 2x3 signal, Q, Psi, pixel_index (3 bins). + - No mask, equal weights. + - Check binned 1D means for signal, Q, Psi. + - Check propagated signal uncertainty shape and values. + """ + step = IndexedAverager(io_sources=IoSources()) + + processing_data = ProcessingData() + processing_data["img"] = make_2d_bundle_basic() + + step.processing_data = processing_data + step.configuration.update( + { + "with_processing_keys": ["img"], + "output_processing_key": None, + "averaging_direction": "azimuthal", + "use_signal_weights": True, + "use_signal_uncertainty_weights": False, + "uncertainty_weight_key": None, + } + ) + + step.prepare_execution() + out = step.calculate() + + assert "img" in out + db_out = out["img"] + + sig_1d = db_out["signal"] + Q_1d = db_out["Q"] + Psi_1d = db_out["Psi"] + + # We expect 3 bins + assert sig_1d.signal.shape == (3,) + assert Q_1d.signal.shape == (3,) + assert Psi_1d.signal.shape == (3,) + + # Means (from the docstring of make_2d_bundle_basic): + # bin 0: (1, 2) -> 1.5; (10, 20) -> 15 + # bin 1: (3, 4) -> 3.5; (30, 40) -> 35 + # bin 2: (5, 6) -> 5.5; (50, 60) -> 55 + expected_signal = np.array([1.5, 3.5, 5.5], dtype=float) + expected_Q = np.array([15.0, 35.0, 55.0], dtype=float) + expected_Psi = np.zeros(3, dtype=float) # all Psi were zero + + assert_allclose(sig_1d.signal, expected_signal, rtol=1e-12, atol=1e-12) + assert_allclose(Q_1d.signal, expected_Q, rtol=1e-12, atol=1e-12) + assert_allclose(Psi_1d.signal, expected_Psi, rtol=1e-12, atol=1e-12) + + # For averaging_direction="azimuthal", axis should reference Q + assert len(sig_1d.axes) == 1 + assert sig_1d.axes[0] is Q_1d + + # Uncertainty propagation sanity check: + # signal sigma per pixel = 0.1 everywhere, equal weights. + # Each bin has 2 pixels → sigma_mean = 0.1 / sqrt(2) for all bins. + assert "sigma" in sig_1d.uncertainties + sigma_binned = sig_1d.uncertainties["sigma"] + expected_sigma = np.full(3, 0.1 / np.sqrt(2.0), dtype=float) + + assert_allclose(sigma_binned, expected_sigma, rtol=1e-12, atol=1e-12)