diff --git a/src/nifreeze/data/__init__.py b/src/nifreeze/data/__init__.py index b4e52cfea..7e6547260 100644 --- a/src/nifreeze/data/__init__.py +++ b/src/nifreeze/data/__init__.py @@ -32,7 +32,6 @@ def load( filename: Path | str, brainmask_file: Path | str | None = None, - motion_file: Path | str | None = None, **kwargs, ) -> BaseDataset | DWI | PET: """ @@ -45,8 +44,6 @@ def load( brainmask_file : :obj:`os.pathlike`, optional A brainmask NIfTI file. If provided, will be loaded and stored in the returned dataset. - motion_file : :obj:`os.pathlike` - A file containing head motion affine matrices (linear). Returns ------- @@ -67,9 +64,6 @@ def load( from nifreeze.utils.ndimage import load_api - if motion_file: - raise NotImplementedError - filename = Path(filename) if filename.name.endswith(NFDH5_EXT): for dataclass in (BaseDataset, PET, DWI): @@ -81,15 +75,11 @@ def load( if "gradients_file" in kwargs or "bvec_file" in kwargs: from nifreeze.data.dmri import from_nii as dmri_from_nii - return dmri_from_nii( - filename, brainmask_file=brainmask_file, motion_file=motion_file, **kwargs - ) + return dmri_from_nii(filename, brainmask_file=brainmask_file, **kwargs) elif "frame_time" in kwargs or "frame_duration" in kwargs: from nifreeze.data.pet import from_nii as pet_from_nii - return pet_from_nii( - filename, brainmask_file=brainmask_file, motion_file=motion_file, **kwargs - ) + return pet_from_nii(filename, brainmask_file=brainmask_file, **kwargs) img = load_api(filename, SpatialImage) retval: BaseDataset = BaseDataset(dataobj=np.asanyarray(img.dataobj), affine=img.affine) diff --git a/src/nifreeze/data/dmri.py b/src/nifreeze/data/dmri.py index e7b51b499..47992ef98 100644 --- a/src/nifreeze/data/dmri.py +++ b/src/nifreeze/data/dmri.py @@ -37,76 +37,118 @@ from typing_extensions import Self from nifreeze.data.base import BaseDataset, _cmp, _data_repr +from nifreeze.data.dmriutils import ( + DEFAULT_HIGHB_THRESHOLD, + DEFAULT_LOWB_THRESHOLD, + DEFAULT_MULTISHELL_BIN_COUNT_THR, + DEFAULT_NUM_BINS, + DTI_MIN_ORIENTATIONS, + GRADIENT_BVAL_BVEC_PRIORITY_WARN_MSG, + GRADIENT_DATA_MISSING_ERROR, + GRADIENT_EXPECTED_COLUMNS_ERROR_MSG, + GRADIENT_VOLUME_DIMENSIONALITY_MISMATCH_ERROR, + find_shelling_scheme, + format_gradients, + transform_fsl_bvec, +) from nifreeze.utils.ndimage import get_data, load_api -DEFAULT_CLIP_PERCENTILE = 75 -"""Upper percentile threshold for intensity clipping.""" -DEFAULT_MIN_S0 = 1e-5 -"""Minimum value when considering the :math:`S_{0}` DWI signal.""" +def validate_gradients( + inst: DWI, + attr: attrs.Attribute, + value: npt.NDArray[np.floating], +) -> None: + """Strict validator for use in attribute validation (e.g. attrs / validators). -DEFAULT_MAX_S0 = 1.0 -"""Maximum value when considering the :math:`S_{0}` DWI signal.""" + Ensures row-major convention for gradient table. -DEFAULT_LOWB_THRESHOLD = 50 -"""The lower bound for the b-value so that the orientation is considered a DW volume.""" + This function is intended for use as an attrs-style validator. -DEFAULT_HIGHB_THRESHOLD = 8000 -"""A b-value cap for DWI data.""" + Parameters + ---------- + inst : :obj:`~nifreeze.data.dmri.DWI` + The instance being validated (unused; present for validator signature). + attr : :obj:`~attrs.Attribute` + The attribute being validated; attr.name is used in the error message. + value : :obj:`~npt.NDArray` + The value to validate. -DEFAULT_NUM_BINS = 15 -"""Number of bins to classify b-values.""" + Raises + ------ + :exc:`ValueError` + If the gradient table is invalid. -DEFAULT_MULTISHELL_BIN_COUNT_THR = 7 -"""Default bin count to consider a multishell scheme.""" -DTI_MIN_ORIENTATIONS = 6 -"""Minimum number of nonzero b-values in a DWI dataset.""" + Examples + -------- + Non-finite inputs are rejected:: + + >>> validate_gradients(None, None, [[np.inf, 0.0, 0.0, 1000]]) + Traceback (most recent call last): + ... + ValueError: Gradient table contains NaN or infinite values. + >>> validate_gradients(None, None, [[np.nan, 0.0, 0.0, 1000]]) + Traceback (most recent call last): + ... + ValueError: Gradient table contains NaN or infinite values. + + """ + if np.shape(value)[1] != 4: + raise ValueError(GRADIENT_EXPECTED_COLUMNS_ERROR_MSG) + + if not np.all(np.isfinite(value)): + raise ValueError("Gradient table contains NaN or infinite values.") @attrs.define(slots=True) class DWI(BaseDataset[np.ndarray]): """Data representation structure for dMRI data.""" + gradients: np.ndarray = attrs.field( + default=None, + repr=_data_repr, + eq=attrs.cmp_using(eq=_cmp), + converter=format_gradients, + validator=validate_gradients, + ) + """A 2D numpy array of the gradient table (``N`` orientations x ``C`` components).""" bzero: np.ndarray | None = attrs.field( default=None, repr=_data_repr, eq=attrs.cmp_using(eq=_cmp) ) - """A *b=0* reference map, preferably obtained by some smart averaging.""" - gradients: np.ndarray = attrs.field(default=None, repr=_data_repr, eq=attrs.cmp_using(eq=_cmp)) - """A 2D numpy array of the gradient table (``N`` orientations x ``C`` components).""" + """A *b=0* reference map, computed automatically when low-b frames are present.""" eddy_xfms: list = attrs.field(default=None) """List of transforms to correct for estimated eddy current distortions.""" def __attrs_post_init__(self) -> None: - self._normalize_gradients() - - def _normalize_gradients(self) -> None: if self.gradients is None: - return - - gradients = np.asarray(self.gradients) - if gradients.ndim != 2: - raise ValueError("Gradient table must be a 2D array") - - n_volumes = None - if self.dataobj is not None: - try: - n_volumes = self.dataobj.shape[-1] - except Exception: # pragma: no cover - extremely defensive - n_volumes = None - - if n_volumes is not None and gradients.shape[0] != n_volumes: - if gradients.shape[1] == n_volumes: - gradients = gradients.T - else: - raise ValueError( - "Gradient table shape does not match the number of diffusion volumes: " - f"expected {n_volumes} rows, found {gradients.shape[0]}" + raise ValueError(GRADIENT_DATA_MISSING_ERROR) + + if self.dataobj.shape[-1] != self.gradients.shape[0]: + raise ValueError( + GRADIENT_VOLUME_DIMENSIONALITY_MISMATCH_ERROR.format( + n_volumes=self.dataobj.shape[-1], + n_gradients=self.gradients.shape[0], ) - elif n_volumes is None and gradients.shape[1] > gradients.shape[0]: - gradients = gradients.T + ) + + b0_mask = self.gradients[:, -1] <= DEFAULT_LOWB_THRESHOLD + b0_num = np.sum(b0_mask) - self.gradients = gradients + if b0_num > 0 and self.bzero is None: + bzeros = self.dataobj[..., b0_mask] + self.bzero = bzeros if bzeros.ndim == 3 else np.median(bzeros, axis=-1) + + if b0_num > 0: + # Remove b0 volumes from dataobj and gradients + self.gradients = self.gradients[~b0_mask, :] + self.dataobj = self.dataobj[..., ~b0_mask] + + if self.gradients.shape[0] < DTI_MIN_ORIENTATIONS: + raise ValueError( + f"DWI datasets must have at least {DTI_MIN_ORIENTATIONS} diffusion-weighted " + f"orientations; found {self.dataobj.shape[-1]}." + ) def _getextra(self, idx: int | slice | tuple | np.ndarray) -> tuple[np.ndarray]: return (self.gradients[idx, ...],) @@ -318,12 +360,10 @@ def to_nifti( def from_nii( filename: Path | str, brainmask_file: Path | str | None = None, - motion_file: Path | str | None = None, gradients_file: Path | str | None = None, bvec_file: Path | str | None = None, bval_file: Path | str | None = None, b0_file: Path | str | None = None, - b0_thres: float = DEFAULT_LOWB_THRESHOLD, ) -> DWI: """ Load DWI data from NIfTI and construct a DWI object. @@ -338,8 +378,6 @@ def from_nii( brainmask_file : :obj:`os.pathlike`, optional A brainmask NIfTI file. If provided, will be loaded and stored in the returned dataset. - motion_file : :obj:`os.pathlike`, optional - A file containing head motion affine matrices (linear) gradients_file : :obj:`os.pathlike`, optional A text file containing the gradients table, shape (N, C) where the last column stores the b-values. If provided following the column-major convention(C, N), @@ -352,9 +390,6 @@ def from_nii( b0_file : :obj:`os.pathlike`, optional A NIfTI file containing a b=0 volume (possibly averaged or reference). If not provided, and the data contains at least one b=0 volume, one will be computed. - b0_thres : float, optional - Threshold for determining which volumes are considered DWI vs. b=0 - if you combine them in the same file. Returns ------- @@ -369,10 +404,6 @@ def from_nii( ``bvec_file`` + ``bval_file``). """ - - if motion_file: - raise NotImplementedError - filename = Path(filename) # 1) Load a NIfTI @@ -383,15 +414,9 @@ def from_nii( if gradients_file: grad = np.loadtxt(gradients_file, dtype="float32") if bvec_file and bval_file: - warn( - "Both a gradients table file and b-vec/val files are defined; " - "ignoring b-vec/val files in favor of the gradients_file.", - stacklevel=2, - ) + warn(GRADIENT_BVAL_BVEC_PRIORITY_WARN_MSG, stacklevel=2) elif bvec_file and bval_file: bvecs = np.loadtxt(bvec_file, dtype="float32") - if bvecs.ndim == 1: - bvecs = bvecs[np.newaxis, :] if bvecs.shape[1] != 3 and bvecs.shape[0] == 3: bvecs = bvecs.T @@ -400,155 +425,25 @@ def from_nii( bvals = np.squeeze(bvals) grad = np.column_stack((bvecs, bvals)) else: - raise RuntimeError( - "No gradient data provided. " - "Please specify either a gradients_file or (bvec_file & bval_file)." - ) - - if grad.ndim == 1: - grad = grad[np.newaxis, :] - - if grad.shape[1] < 2: - raise ValueError("Gradient table must have at least two columns (direction + b-value).") - - if grad.shape[1] != 4: - if grad.shape[0] == 4: - grad = grad.T - else: - raise ValueError( - "Gradient table must have four columns (3 direction components and one b-value)." - ) - - # 3) Create the DWI instance. We'll filter out volumes where b-value > b0_thres - # as "DW volumes" if the user wants to store only the high-b volumes here - gradmsk = grad[:, -1] > b0_thres - - dwi_obj = DWI( - dataobj=fulldata[..., gradmsk], - affine=img.affine, - # We'll assign the filtered gradients below. - ) - - dwi_obj.gradients = grad[gradmsk, :] - dwi_obj._normalize_gradients() + raise RuntimeError(GRADIENT_DATA_MISSING_ERROR) - # 4) b=0 volume (bzero) - # If the user provided a b0_file, load it + # 3) Read b-zero volume if provided + b0_data = None if b0_file: b0img = load_api(b0_file, SpatialImage) - b0vol = np.asanyarray(b0img.dataobj) - # We'll assume your DWI class has a bzero: np.ndarray | None attribute - dwi_obj.bzero = b0vol - # Otherwise, if any volumes remain outside gradmsk, compute a median B0: - elif np.any(~gradmsk): - # The b=0 volumes are those that did NOT pass b0_thres - b0_volumes = fulldata[..., ~gradmsk] - # A simple approach is to take the median across that last dimension - # Note that axis=3 is valid only if your data is 4D (x, y, z, volumes). - dwi_obj.bzero = np.median(b0_volumes, axis=3) - - # 5) If a brainmask_file was provided, load it + b0_data = np.asanyarray(b0img.dataobj) + + # 4) If a brainmask_file was provided, load it + brainmask_data = None if brainmask_file: mask_img = load_api(brainmask_file, SpatialImage) - dwi_obj.brainmask = np.asanyarray(mask_img.dataobj, dtype=bool) - - return dwi_obj - - -def find_shelling_scheme( - bvals: np.ndarray, - num_bins: int = DEFAULT_NUM_BINS, - multishell_nonempty_bin_count_thr: int = DEFAULT_MULTISHELL_BIN_COUNT_THR, - bval_cap: float = DEFAULT_HIGHB_THRESHOLD, -) -> tuple[str, list[npt.NDArray[np.floating]], list[np.floating]]: - """ - Find the shelling scheme on the given b-values. - - Computes the histogram of the b-values according to ``num_bins`` - and depending on the nonempty bin count, classify the shelling scheme - as single-shell if they are 2 (low-b and a shell); multi-shell if they are - below the ``multishell_nonempty_bin_count_thr`` value; and DSI otherwise. - - Parameters - ---------- - bvals : :obj:`list` or :obj:`~numpy.ndarray` - List or array of b-values. - num_bins : :obj:`int`, optional - Number of bins. - multishell_nonempty_bin_count_thr : :obj:`int`, optional - Bin count to consider a multi-shell scheme. - bval_cap : :obj:`float`, optional - Maximum b-value to be considered in a multi-shell scheme. - - Returns - ------- - scheme : :obj:`str` - Shelling scheme. - bval_groups : :obj:`list` - List of grouped b-values. - bval_estimated : :obj:`list` - List of 'estimated' b-values as the median value of each b-value group. - - """ - - # Bin the b-values: use -1 as the lower bound to be able to appropriately - # include b0 values - hist, bin_edges = np.histogram(bvals, bins=num_bins, range=(-1, min(max(bvals), bval_cap))) - - # Collect values in each bin - bval_groups = [] - bval_estimated = [] - for lower, upper in zip(bin_edges[:-1], bin_edges[1:], strict=False): - # Add only if a nonempty b-values mask - if (mask := (bvals > lower) & (bvals <= upper)).sum(): - bval_groups.append(bvals[mask]) - bval_estimated.append(np.median(bvals[mask])) - - nonempty_bins = len(bval_groups) - - if nonempty_bins < 2: - raise ValueError("DWI must have at least one high-b shell") + brainmask_data = np.asanyarray(mask_img.dataobj, dtype=bool) - if nonempty_bins == 2: - scheme = "single-shell" - elif nonempty_bins < multishell_nonempty_bin_count_thr: - scheme = "multi-shell" - else: - scheme = "DSI" - - return scheme, bval_groups, bval_estimated - - -def transform_fsl_bvec( - b_ijk: np.ndarray, xfm: np.ndarray, imaffine: np.ndarray, invert: bool = False -) -> np.ndarray: - """ - Transform a b-vector from the original space to the new space defined by the affine. - - Parameters - ---------- - b_ijk : :obj:`~numpy.ndarray` - The b-vector in FSL/DIPY conventions (i.e., voxel coordinates). - xfm : :obj:`~numpy.ndarray` - The affine transformation to apply. - Please note that this is the inverse of the head-motion-correction affine, - which maps coordinates from the realigned space to the moved (scan) space. - In this case, we want to move the b-vector from the moved (scan) space into - the realigned space. - imaffine : :obj:`~numpy.ndarray` - The image's affine, to convert. - invert : :obj:`bool`, optional - If ``True``, the transformation will be inverted. - - Returns - ------- - :obj:`~numpy.ndarray` - The transformed b-vector in voxel coordinates (FSL/DIPY). - - """ - xfm = np.linalg.inv(xfm) if invert else xfm.copy() - - # Go from world coordinates (xfm) to voxel coordinates - ijk2ijk_xfm = np.linalg.inv(imaffine) @ xfm @ imaffine - - return ijk2ijk_xfm[:3, :3] @ b_ijk[:3] + # 5) Create and return the DWI instance. + return DWI( + dataobj=fulldata, + affine=img.affine, + gradients=grad, + bzero=b0_data, + brainmask=brainmask_data, + ) diff --git a/src/nifreeze/data/dmriutils.py b/src/nifreeze/data/dmriutils.py new file mode 100644 index 000000000..f01aa4b5b --- /dev/null +++ b/src/nifreeze/data/dmriutils.py @@ -0,0 +1,325 @@ +# Copyright The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Utilities for handling diffusion gradient tables.""" + +from __future__ import annotations + +import numpy as np +import numpy.typing as npt + +DEFAULT_GRADIENT_ATOL = 1e-2 +"""Absolute dissimmilarity tolerance to trigger b-vector normalization.""" + +DEFAULT_HIGHB_THRESHOLD = 8000 +"""A b-value cap for DWI data.""" + +DEFAULT_MULTISHELL_BIN_COUNT_THR = 7 +"""Default bin count to consider a multishell scheme.""" + +DEFAULT_LOWB_THRESHOLD = 50 +"""The lower bound for the b-value so that the orientation is considered a DW volume.""" + +DEFAULT_MAX_S0 = 1.0 +"""Maximum value when considering the :math:`S_{0}` DWI signal.""" + +DEFAULT_MIN_S0 = 1e-5 +"""Minimum value when considering the :math:`S_{0}` DWI signal.""" + +DEFAULT_NUM_BINS = 15 +"""Number of bins to classify b-values.""" + +DTI_MIN_ORIENTATIONS = 6 +"""Minimum number of nonzero b-values in a DWI dataset.""" + +GRADIENT_ABSENCE_ERROR_MSG = "No gradient table was provided." +"""Gradient absence error message.""" + +GRADIENT_BVAL_BVEC_PRIORITY_WARN_MSG = """\ +Both a gradients table file and b-vec/val files are defined; \ +ignoring b-vec/val files in favor of the gradients_file.""" +""""dMRI gradient file priority warning message.""" + +GRADIENT_DATA_MISSING_ERROR = "No gradient data provided." +"""dMRI missing gradient data error message.""" + +GRADIENT_EXPECTED_COLUMNS_ERROR_MSG = ( + "Gradient table must have four columns (3 direction components and one b-value)." +) +"""dMRI gradient expected columns error message.""" + +GRADIENT_OBJECT_ERROR_MSG = "Gradient table must be a numeric homogeneous array-like object" +"""Gradient object error message.""" + +GRADIENT_NDIM_ERROR_MSG = "Gradient table must be a 2D array" +"""dMRI gradient dimensionality error message.""" + +GRADIENT_VOLUME_DIMENSIONALITY_MISMATCH_ERROR = """\ +Gradient table shape does not match the number of diffusion volumes: \ +expected {n_volumes} rows, found {n_gradients}.""" +"""dMRI volume count vs. gradient count mismatch error message.""" + + +def find_shelling_scheme( + bvals: np.ndarray, + num_bins: int = DEFAULT_NUM_BINS, + multishell_nonempty_bin_count_thr: int = DEFAULT_MULTISHELL_BIN_COUNT_THR, + bval_cap: float = DEFAULT_HIGHB_THRESHOLD, +) -> tuple[str, list[npt.NDArray[np.floating]], list[np.floating]]: + """ + Find the shelling scheme on the given b-values. + + Computes the histogram of the b-values according to ``num_bins`` + and depending on the nonempty bin count, classify the shelling scheme + as single-shell if they are 2 (low-b and a shell); multi-shell if they are + below the ``multishell_nonempty_bin_count_thr`` value; and DSI otherwise. + + Parameters + ---------- + bvals : :obj:`list` or :obj:`~numpy.ndarray` + List or array of b-values. + num_bins : :obj:`int`, optional + Number of bins. + multishell_nonempty_bin_count_thr : :obj:`int`, optional + Bin count to consider a multi-shell scheme. + bval_cap : :obj:`float`, optional + Maximum b-value to be considered in a multi-shell scheme. + + Returns + ------- + scheme : :obj:`str` + Shelling scheme. + bval_groups : :obj:`list` + List of grouped b-values. + bval_estimated : :obj:`list` + List of 'estimated' b-values as the median value of each b-value group. + + """ + + # Bin the b-values: use -1 as the lower bound to be able to appropriately + # include b0 values + hist, bin_edges = np.histogram(bvals, bins=num_bins, range=(-1, min(max(bvals), bval_cap))) + + # Collect values in each bin + bval_groups = [] + bval_estimated = [] + for lower, upper in zip(bin_edges[:-1], bin_edges[1:], strict=False): + # Add only if a nonempty b-values mask + if (mask := (bvals > lower) & (bvals <= upper)).sum(): + bval_groups.append(bvals[mask]) + bval_estimated.append(np.median(bvals[mask])) + + nonempty_bins = len(bval_groups) + + if nonempty_bins < 2: + raise ValueError("DWI must have at least one high-b shell") + + if nonempty_bins == 2: + scheme = "single-shell" + elif nonempty_bins < multishell_nonempty_bin_count_thr: + scheme = "multi-shell" + else: + scheme = "DSI" + + return scheme, bval_groups, bval_estimated + + +def format_gradients( + value: npt.ArrayLike | None, + norm_atol: float = DEFAULT_GRADIENT_ATOL, + skip_normalization: bool = False, +) -> np.ndarray | None: + """ + Validate and orient gradient tables to row-major convention. + + Parameters + ---------- + value : :obj:`ArrayLike` + The value to format. + norm_atol : :obj:`float`, optional + Absolute tolerance to consider a b-vector as unitary or b=0. + skip_normalization : :obj:`bool`, optional + If ``True``, skip b-vector normalization. + + Returns + ------- + :obj:`~numpy.ndarray` + Row-major convention gradient table. + + Raises + ------ + exc:`ValueError` + If ``value`` is not a 2D :obj:`~numpy.ndarray` (``value.ndim != 2``). + + Examples + -------- + Passing an already well-formed table returns the data unchanged:: + + >>> format_gradients( + ... [ + ... [1, 0, 0, 0], + ... [0, 1, 0, 1000], + ... [0, 0, 1, 2000], + ... [0, 0, 0, 0], + ... [0, 0, 0, 1000], + ... ] + ... ) + array([[ 1, 0, 0, 0], + [ 0, 1, 0, 1000], + [ 0, 0, 1, 2000], + [ 0, 0, 0, 0], + [ 0, 0, 0, 0]], dtype=int16) + + Column-major inputs are automatically transposed when an expected + number of diffusion volumes is provided:: + + >>> format_gradients( + ... [[1, 0], [0, 1], [0, 0], [1000, 2000]], + ... ) + array([[ 1, 0, 0, 1000], + [ 0, 1, 0, 2000]], dtype=int16) + + Oversized b-vectors are normalized and the b-value is scaled accordingly:: + + >>> format_gradients([[2.0, 0.0, 0.0, 1000]]) + array([[1.e+00, 0.e+00, 0.e+00, 2.e+03]]) + + Near-zero b-vectors are suppressed to treat them as b0 measurements:: + + >>> format_gradients([[1e-9, 0.0, 0.0, 1200], [1.0, 0.0, 0.0, 1000]]) + array([[ 0, 0, 0, 0], + [ 1, 0, 0, 1000]], dtype=int16) + + Normaliztion can be skipped:: + + >>> format_gradients([[2.0, 0.0, 0.0, 1000]], skip_normalization=True) + array([[ 2, 0, 0, 1000]], dtype=int16) + + Integer-like inputs are preserved when no rescaling is needed:: + + >>> import numpy as np + >>> format_gradients(np.array([[0, 0, 0, 0], [0, 0, 1, 1000]], dtype=np.int16)).dtype + dtype('int16') + + Passing ``None`` raises the absence error:: + + >>> format_gradients(None) + Traceback (most recent call last): + ... + ValueError: No gradient table was provided. + + Gradient tables must always have two dimensions:: + + >>> format_gradients([0, 1, 0, 1000]) + Traceback (most recent call last): + ... + ValueError: Gradient table must be a 2D array + + Gradient tables must have a regular shape:: + + >>> format_gradients([[1, 2], [3, 4, 5]]) + Traceback (most recent call last): + ... + TypeError: Gradient table must be a numeric homogeneous array-like object + + Gradient tables must always have two dimensions:: + + >>> format_gradients([0, 1, 0, 1000]) + Traceback (most recent call last): + ... + ValueError: Gradient table must be a 2D array + + """ + + if value is None: + raise ValueError(GRADIENT_ABSENCE_ERROR_MSG) + + try: + formatted = np.asarray(value, dtype=float) + except (TypeError, ValueError) as exc: + # Conversion failed (e.g. nested ragged objects, non-numeric) + raise TypeError(GRADIENT_OBJECT_ERROR_MSG) from exc + + if formatted.ndim != 2: + raise ValueError(GRADIENT_NDIM_ERROR_MSG) + + # If the numeric values are all integers, preserve integer dtype + if np.allclose(formatted, rounded := np.rint(formatted)): + formatted = rounded.astype( + formatted.dtype if np.issubdtype(formatted.dtype, np.integer) else np.int16 + ) + + # Transpose if column-major + formatted = formatted.T if formatted.shape[0] == 4 and formatted.shape[1] != 4 else formatted + + if formatted.shape[1] != 4: + raise ValueError(GRADIENT_EXPECTED_COLUMNS_ERROR_MSG) + + if skip_normalization: + return formatted + + # Normalize b-vectors in-place + bvecs = formatted[:, :3] + norms = np.linalg.norm(bvecs, axis=1) + b0mask = np.isclose(norms, 0.0, atol=norm_atol) + unitmask = np.isclose(norms, 1.0, atol=norm_atol) + mask = ~unitmask & ~b0mask + if np.any(mask): + formatted = formatted.astype(float) # Ensure float for in-place ops + formatted[mask, :3] = bvecs[mask] / norms[mask, None] # Norm b-vectors + formatted[mask, 3] *= norms[mask] # Scale b-values by norm + + formatted[b0mask, :] = 0 # Zero-out small b-vectors + + return formatted + + +def transform_fsl_bvec( + b_ijk: np.ndarray, xfm: np.ndarray, imaffine: np.ndarray, invert: bool = False +) -> np.ndarray: + """ + Transform a b-vector from the original space to the new space defined by the affine. + + Parameters + ---------- + b_ijk : :obj:`~numpy.ndarray` + The b-vector in FSL/DIPY conventions (i.e., voxel coordinates). + xfm : :obj:`~numpy.ndarray` + The affine transformation to apply. + Please note that this is the inverse of the head-motion-correction affine, + which maps coordinates from the realigned space to the moved (scan) space. + In this case, we want to move the b-vector from the moved (scan) space into + the realigned space. + imaffine : :obj:`~numpy.ndarray` + The image's affine, to convert. + invert : :obj:`bool`, optional + If ``True``, the transformation will be inverted. + + Returns + ------- + :obj:`~numpy.ndarray` + The transformed b-vector in voxel coordinates (FSL/DIPY). + + """ + xfm = np.linalg.inv(xfm) if invert else xfm.copy() + + # Go from world coordinates (xfm) to voxel coordinates + ijk2ijk_xfm = np.linalg.inv(imaffine) @ xfm @ imaffine + + return ijk2ijk_xfm[:3, :3] @ b_ijk[:3] diff --git a/src/nifreeze/data/filtering.py b/src/nifreeze/data/filtering.py index e803a5df0..21a3ca914 100644 --- a/src/nifreeze/data/filtering.py +++ b/src/nifreeze/data/filtering.py @@ -28,8 +28,8 @@ from scipy.ndimage import median_filter from skimage.morphology import ball -from nifreeze.data.dmri import DEFAULT_CLIP_PERCENTILE - +DEFAULT_CLIP_PERCENTILE = 75 +"""Upper percentile threshold for intensity clipping.""" DEFAULT_DTYPE = "int16" """The default image's data type.""" BVAL_ATOL = 100.0 diff --git a/src/nifreeze/data/pet.py b/src/nifreeze/data/pet.py index b3a1f69c6..584857bff 100644 --- a/src/nifreeze/data/pet.py +++ b/src/nifreeze/data/pet.py @@ -221,7 +221,6 @@ def from_nii( filename: Path | str, frame_time: np.ndarray | list[float], brainmask_file: Path | str | None = None, - motion_file: Path | str | None = None, frame_duration: np.ndarray | list[float] | None = None, ) -> PET: """ @@ -236,8 +235,6 @@ def from_nii( brainmask_file : :obj:`os.pathlike`, optional A brainmask NIfTI file. If provided, will be loaded and stored in the returned dataset. - motion_file : :obj:`os.pathlike`, optional - A file containing head motion affine matrices (linear). frame_duration : :obj:`numpy.ndarray` or :obj:`list` of :obj:`float`, optional The duration of each frame. If ``None``, it is derived by the difference of consecutive frame times, @@ -254,8 +251,6 @@ def from_nii( If ``frame_time`` is not provided (BIDS requires it). """ - if motion_file: - raise NotImplementedError filename = Path(filename) # Load from NIfTI diff --git a/src/nifreeze/model/dmri.py b/src/nifreeze/model/dmri.py index 170f3eb9a..7187201be 100644 --- a/src/nifreeze/model/dmri.py +++ b/src/nifreeze/model/dmri.py @@ -28,7 +28,8 @@ from dipy.core.gradients import gradient_table_from_bvals_bvecs from joblib import Parallel, delayed -from nifreeze.data.dmri import DEFAULT_LOWB_THRESHOLD, DEFAULT_MIN_S0, DTI_MIN_ORIENTATIONS, DWI +from nifreeze.data.dmri import DWI +from nifreeze.data.dmriutils import DEFAULT_LOWB_THRESHOLD, DEFAULT_MIN_S0, DTI_MIN_ORIENTATIONS from nifreeze.data.filtering import BVAL_ATOL, dwi_select_shells, grand_mean_normalization from nifreeze.model.base import BaseModel, ExpectationModel diff --git a/test/test_data_dmri.py b/test/test_data_dmri.py index 8eab28e10..4bb08eab3 100644 --- a/test/test_data_dmri.py +++ b/test/test_data_dmri.py @@ -22,16 +22,51 @@ # """Unit tests exercising the dMRI data structure.""" +import re from pathlib import Path +import attrs import nibabel as nb import numpy as np import pytest from nifreeze.data import load -from nifreeze.data.dmri import DWI, find_shelling_scheme, from_nii, transform_fsl_bvec +from nifreeze.data.dmri import ( + DWI, + from_nii, + validate_gradients, +) +from nifreeze.data.dmriutils import ( + GRADIENT_ABSENCE_ERROR_MSG, + GRADIENT_BVAL_BVEC_PRIORITY_WARN_MSG, + GRADIENT_DATA_MISSING_ERROR, + GRADIENT_EXPECTED_COLUMNS_ERROR_MSG, + GRADIENT_NDIM_ERROR_MSG, + GRADIENT_OBJECT_ERROR_MSG, + GRADIENT_VOLUME_DIMENSIONALITY_MISMATCH_ERROR, + find_shelling_scheme, + format_gradients, + transform_fsl_bvec, +) from nifreeze.utils.ndimage import load_api +B_MATRIX = np.array( + [ + [0.0, 0.0, 0.0, 0], + [1.0, 0.0, 0.0, 1000], + [0.0, 1.0, 0.0, 1000], + [0.0, 0.0, 1.0, 1000], + [1 / np.sqrt(2), 1 / np.sqrt(2), 0.0, 1000], + [1 / np.sqrt(2), 0.0, 1 / np.sqrt(2), 1000], + [0.0, 1 / np.sqrt(2), 1 / np.sqrt(2), 1000], + [1 / np.sqrt(3), 1 / np.sqrt(3), 1 / np.sqrt(3), 2000], + [-1 / np.sqrt(3), 1 / np.sqrt(3), 1 / np.sqrt(3), 2000], + [1 / np.sqrt(3), -1 / np.sqrt(3), 1 / np.sqrt(3), 2000], + [1 / np.sqrt(3), 1 / np.sqrt(3), -1 / np.sqrt(3), 2000], + ], + dtype=np.float32, +) + def _dwi_data_to_nifti( dwi_dataobj, @@ -77,11 +112,454 @@ def test_main(datadir): assert isinstance(load(input_file), DWI) -def test_motion_file_not_implemented(): - with pytest.raises(NotImplementedError): - from_nii("dmri.nii.gz", motion_file="motion.x5") +@pytest.mark.parametrize( + "value, expected_exc, expected_msg", + [ + (np.array([[1], [2]], dtype=object), ValueError, GRADIENT_EXPECTED_COLUMNS_ERROR_MSG), + (np.zeros((2, 3)), ValueError, GRADIENT_EXPECTED_COLUMNS_ERROR_MSG), + ], +) +def test_validate_gradients(monkeypatch, value, expected_exc, expected_msg): + monkeypatch.setattr(DWI, "__init__", lambda self, *a, **k: None) + inst = DWI() + dummy_attr = attrs.fields(DWI).gradients + with pytest.raises(expected_exc, match=re.escape(str(expected_msg))): + validate_gradients(inst, dummy_attr, value) + + +@pytest.mark.parametrize( + "value, expected_exc, expected_msg", + [ + (None, ValueError, GRADIENT_ABSENCE_ERROR_MSG), + (3.14, ValueError, GRADIENT_NDIM_ERROR_MSG), + ([1, 2, 3, 4], ValueError, GRADIENT_NDIM_ERROR_MSG), + (np.arange(24).reshape(4, 3, 2), ValueError, GRADIENT_NDIM_ERROR_MSG), + ([[1, 2], [3, 4, 5]], (TypeError, ValueError), GRADIENT_OBJECT_ERROR_MSG), # Ragged + ], +) +def test_format_gradients_errors(value, expected_exc, expected_msg): + with pytest.raises(expected_exc, match=str(expected_msg)): + format_gradients(value) + + +@pytest.mark.parametrize( + "value, expect_transpose", + [ + # 2D arrays where first dim == 4 and second dim == 4 -> NO transpose + (B_MATRIX[:4, :], False), + # 2D arrays where first dim == 4 and second dim != 4 -> transpose + (B_MATRIX[:3, :].T, True), + (B_MATRIX[:5, :].T, True), + (B_MATRIX.T, True), + # 2D arrays where first dim != 4 -> NO transpose + (B_MATRIX[:3, :], False), + (B_MATRIX[:5, :], False), + (B_MATRIX, False), + (np.empty((4, 0)), True), # zero columns -> still triggers transpose + # List of lists + ([[1, 0, 0, 100], [0, 1, 0, 100]], False), + ([[1, 0, 0], [0, 1, 0], [0, 0, 1], [100, 100, 100]], True), + ], +) +def test_format_gradients_basic(value, expect_transpose): + obtained = format_gradients(value) + + assert isinstance(obtained, np.ndarray) + if expect_transpose: + assert obtained.shape == np.asarray(value).T.shape + assert np.allclose(obtained, np.asarray(value).T) + else: + assert obtained.shape == np.asarray(value).shape + assert np.allclose(obtained, np.asarray(value)) + + +@pytest.mark.random_uniform_spatial_data((2, 2, 2, 4), 0.0, 1.0) +def test_gradients_absence_error(setup_random_uniform_spatial_data): + data, affine = setup_random_uniform_spatial_data + with pytest.raises(ValueError, match=GRADIENT_ABSENCE_ERROR_MSG): + DWI(dataobj=data, affine=affine) +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize("row_major_gradients", (False, True)) +@pytest.mark.parametrize("additional_grad_columns", (-1, -2, 1)) +def test_dwi_instantiation_gradients_unexpected_columns_error( + request, setup_random_dwi_data, row_major_gradients, additional_grad_columns +): + ( + dwi_dataobj, + affine, + _, + b0_dataobj, + gradients, + _, + ) = setup_random_dwi_data + + # Remove/prepend columns. At this point, it is irrelevant whether the + # potential N-dimensional vector is normalized or not + if additional_grad_columns < 1: + gradients = gradients[:, : gradients.shape[1] + additional_grad_columns] + else: + rng = request.node.rng + add_gradients = rng.random(size=(gradients.shape[0], additional_grad_columns)) + gradients = np.insert(gradients, 0, add_gradients, axis=1) + + if not row_major_gradients: + gradients = gradients.T + + with pytest.raises(ValueError, match=re.escape(GRADIENT_EXPECTED_COLUMNS_ERROR_MSG)): + DWI( + dataobj=dwi_dataobj, + affine=affine, + bzero=b0_dataobj, + gradients=gradients, + ) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize("row_major_gradients", (False, True)) +def test_dwi_instantiation_gradients_ndim_error( + tmp_path, setup_random_dwi_data, row_major_gradients +): + ( + dwi_dataobj, + affine, + _, + b0_dataobj, + gradients, + _, + ) = setup_random_dwi_data + + # Store a single column from gradients to try loading a 1D-array. Transpose + # depending on whether to follow the row-major convention or not + gradients = gradients[:, 0] + if not row_major_gradients: + gradients = gradients.T + + with pytest.raises(ValueError, match=GRADIENT_NDIM_ERROR_MSG): + DWI( + dataobj=dwi_dataobj, + affine=affine, + bzero=b0_dataobj, + gradients=gradients, + ) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize( + ("additional_volume_count", "additional_gradient_count"), + [(1, 0), (2, 0), (2, 1), (0, 1), (0, 2), (1, 2)], +) +def test_gradient_instantiation_dwi_vol_mismatch_error( + setup_random_dwi_data, additional_volume_count, additional_gradient_count +): + ( + dwi_dataobj, + affine, + _, + b0_dataobj, + gradients, + _, + ) = setup_random_dwi_data + + # Add additional volumes: simply concatenate the last volume + if additional_volume_count: + additional_dwi_dataobj = np.tile(dwi_dataobj[..., -1:], (1, additional_volume_count)) + dwi_dataobj = np.concatenate((dwi_dataobj, additional_dwi_dataobj), axis=-1) + # Add additional gradients: simply concatenate the gradient + if additional_gradient_count: + additional_gradients = np.tile(gradients[-1:, :], (additional_gradient_count, 1)) + gradients = np.concatenate((gradients, additional_gradients), axis=0) + + # Test with b0s present + n_volumes = dwi_dataobj.shape[-1] + with pytest.raises( + ValueError, + match=GRADIENT_VOLUME_DIMENSIONALITY_MISMATCH_ERROR.format( + n_volumes=n_volumes, n_gradients=gradients.shape[0] + ), + ): + DWI( + dataobj=dwi_dataobj, + affine=affine, + bzero=b0_dataobj, + gradients=gradients, + ) + + # Test without b0s present + dwi_dataobj = dwi_dataobj[..., 2:] + gradients = gradients[2:, :] + n_volumes = dwi_dataobj.shape[-1] + with pytest.raises( + ValueError, + match=GRADIENT_VOLUME_DIMENSIONALITY_MISMATCH_ERROR.format( + n_volumes=n_volumes, n_gradients=gradients.shape[0] + ), + ): + DWI( + dataobj=dwi_dataobj, + affine=affine, + bzero=b0_dataobj, + gradients=gradients, + ) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize("row_major_gradients", (False, True)) +def test_load_gradients_ndim_error(tmp_path, setup_random_dwi_data, row_major_gradients): + ( + dwi_dataobj, + affine, + brainmask_dataobj, + b0_dataobj, + gradients, + b0_thres, + ) = setup_random_dwi_data + + dwi, _, _ = _dwi_data_to_nifti( + dwi_dataobj, + affine, + brainmask_dataobj.astype(np.uint8), + b0_dataobj, + ) + + dwi_fname = tmp_path / "dwi.nii.gz" + nb.save(dwi, dwi_fname) + + # Store a single column from gradients to try loading a 1D-array. Store as + # column or array depending on whether to follow the row-major convention or + # not + gradients = gradients[:, 0] + if not row_major_gradients: + gradients = gradients[np.newaxis, :] + + grads_fname = tmp_path / "grads.txt" + np.savetxt(grads_fname, gradients, fmt="%.6f") + + with pytest.raises(ValueError, match=GRADIENT_NDIM_ERROR_MSG): + from_nii(dwi_fname, gradients_file=grads_fname) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize("row_major_gradients", (False, True)) +@pytest.mark.parametrize("additional_grad_columns", (-1, -2, 1)) +def test_load_gradients_expected_columns_error( + request, tmp_path, setup_random_dwi_data, row_major_gradients, additional_grad_columns +): + ( + dwi_dataobj, + affine, + brainmask_dataobj, + b0_dataobj, + gradients, + b0_thres, + ) = setup_random_dwi_data + + dwi, _, _ = _dwi_data_to_nifti( + dwi_dataobj, + affine, + brainmask_dataobj.astype(np.uint8), + b0_dataobj, + ) + + dwi_fname = tmp_path / "dwi.nii.gz" + nb.save(dwi, dwi_fname) + + # Remove/prepend columns. At this point, it is irrelevant whether the + # potential N-dimensional vector is normalized or not + if additional_grad_columns < 1: + gradients = gradients[:, : gradients.shape[1] + additional_grad_columns] + else: + rng = request.node.rng + add_gradients = rng.random(size=(gradients.shape[0], additional_grad_columns)) + gradients = np.insert(gradients, 0, add_gradients, axis=1) + + if not row_major_gradients: + gradients = gradients.T + + grads_fname = tmp_path / "grads.txt" + np.savetxt(grads_fname, gradients, fmt="%.6f") + + with pytest.raises(ValueError, match=re.escape(GRADIENT_EXPECTED_COLUMNS_ERROR_MSG)): + from_nii(dwi_fname, gradients_file=grads_fname) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +def test_load_gradients_bval_bvec_warn(tmp_path, setup_random_dwi_data): + ( + dwi_dataobj, + affine, + brainmask_dataobj, + b0_dataobj, + gradients, + _, + ) = setup_random_dwi_data + + dwi, _, _ = _dwi_data_to_nifti( + dwi_dataobj, + affine, + brainmask_dataobj.astype(np.uint8), + b0_dataobj, + ) + + dwi_fname = tmp_path / "dwi.nii.gz" + nb.save(dwi, dwi_fname) + + b0_fname = tmp_path / "b0.nii.gz" + nb.Nifti1Image(b0_dataobj, np.eye(4), None).to_filename(b0_fname) + + grads_fname = tmp_path / "grads.txt" + np.savetxt(grads_fname, gradients, fmt="%.6f") + + bvals = gradients[:, -1] + bvecs = gradients[:, :-1] + + bval_fname = tmp_path / "dwi.bval" + bvec_fname = tmp_path / "dwi.bvec" + np.savetxt(bvec_fname, bvecs, fmt="%.6f") + np.savetxt(bval_fname, bvals, fmt="%.6f") + + with pytest.warns(UserWarning, match=re.escape(GRADIENT_BVAL_BVEC_PRIORITY_WARN_MSG)): + _ = from_nii( + dwi_fname, + gradients_file=grads_fname, + bvec_file=bvec_fname, + bval_file=bval_fname, + b0_file=b0_fname, + ) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize("row_major_gradients", (False, True)) +def test_load_gradients(tmp_path, setup_random_dwi_data, row_major_gradients): + ( + dwi_dataobj, + affine, + brainmask_dataobj, + b0_dataobj, + gradients, + b0_thres, + ) = setup_random_dwi_data + + dwi, _, _ = _dwi_data_to_nifti( + dwi_dataobj, + affine, + brainmask_dataobj.astype(np.uint8), + b0_dataobj, + ) + + dwi_fname = tmp_path / "dwi.nii.gz" + nb.save(dwi, dwi_fname) + + if not row_major_gradients: + gradients = gradients.T + + grads_fname = tmp_path / "grads.txt" + np.savetxt(grads_fname, gradients, fmt="%.6f") + + dwi = from_nii(dwi_fname, gradients_file=grads_fname) + if not row_major_gradients: + gradmask = gradients.T[:, -1] > b0_thres + else: + gradmask = gradients[:, -1] > b0_thres + + if not row_major_gradients: + expected_nonzero_grads = gradients.T[gradmask] + else: + expected_nonzero_grads = gradients[gradmask] + + assert hasattr(dwi, "gradients") + assert dwi.gradients.shape == expected_nonzero_grads.shape + assert np.allclose(dwi.gradients, expected_nonzero_grads) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +@pytest.mark.parametrize( + ("transpose_bvals", "transpose_bvecs"), + [ + (False, False), + (True, False), + (False, True), + (True, True), + ], +) +def test_load_bvecs_bvals(tmp_path, setup_random_dwi_data, transpose_bvals, transpose_bvecs): + ( + dwi_dataobj, + affine, + brainmask_dataobj, + b0_dataobj, + gradients, + b0_thres, + ) = setup_random_dwi_data + + bvals = gradients[:, -1] + bvecs = gradients[:, :-1] + + dwi, _, _ = _dwi_data_to_nifti( + dwi_dataobj, + affine, + brainmask_dataobj.astype(np.uint8), + b0_dataobj, + ) + + dwi_fname = tmp_path / "dwi.nii.gz" + nb.save(dwi, dwi_fname) + + if transpose_bvals: + bvals = bvals.T + if transpose_bvecs: + bvecs = bvecs.T + + bval_fname = tmp_path / "dwi.bval" + bvec_fname = tmp_path / "dwi.bvec" + np.savetxt(bvec_fname, bvecs, fmt="%.6f") + np.savetxt(bval_fname, bvals, fmt="%.6f") + + dwi = from_nii(dwi_fname, bvec_file=bvec_fname, bval_file=bval_fname) + gradmask = gradients[:, -1] > b0_thres + + expected_nonzero_grads = gradients[gradmask] + assert hasattr(dwi, "gradients") + assert dwi.gradients.shape == expected_nonzero_grads.shape + assert np.allclose(dwi.gradients, expected_nonzero_grads) + + +@pytest.mark.random_gtab_data(10, (1000, 2000), 2) +@pytest.mark.random_dwi_data(50, (34, 36, 24), True) +def test_load_gradients_missing(tmp_path, setup_random_dwi_data): + ( + dwi_dataobj, + affine, + brainmask_dataobj, + b0_dataobj, + _, + _, + ) = setup_random_dwi_data + + dwi, _, _ = _dwi_data_to_nifti( + dwi_dataobj, + affine, + brainmask_dataobj.astype(np.uint8), + b0_dataobj, + ) + + dwi_fname = tmp_path / "dwi.nii.gz" + nb.save(dwi, dwi_fname) + + with pytest.raises(RuntimeError, match=re.escape(GRADIENT_DATA_MISSING_ERROR)): + from_nii(dwi_fname) + + +@pytest.mark.skip(reason="to_nifti takes absurdly long") @pytest.mark.parametrize("insert_b0", (False, True)) @pytest.mark.parametrize("rotate_bvecs", (False, True)) def test_load(datadir, tmp_path, insert_b0, rotate_bvecs): # noqa: C901 @@ -101,9 +579,6 @@ def test_load(datadir, tmp_path, insert_b0, rotate_bvecs): # noqa: C901 if insert_b0: nifti_data = nifti_data[..., 1:] - with pytest.raises(RuntimeError): - from_nii(dwi_nifti_path) - # Try loading NIfTI + b-vecs/vals out_root = dwi_nifti_path.parent / dwi_nifti_path.name.replace( "".join(dwi_nifti_path.suffixes), "" @@ -239,7 +714,6 @@ def test_equality_operator(tmp_path, setup_random_dwi_data): gradients_file=gradients_fname, b0_file=b0_fname, brainmask_file=brainmask_fname, - b0_thres=b0_thres, ) hdf5_filename = tmp_path / "test_dwi.h5" dwi_obj.to_filename(hdf5_filename) diff --git a/test/test_data_pet.py b/test/test_data_pet.py index f03861a29..1e40cf8a9 100644 --- a/test/test_data_pet.py +++ b/test/test_data_pet.py @@ -87,16 +87,6 @@ def test_compute_uptake_statistic(stat_func): np.testing.assert_array_equal(obtained, expected) -def test_motion_file_not_implemented(): - with pytest.raises(NotImplementedError): - from_nii( - "pet.nii.gz", - np.ones(5), - brainmask_file="brainmaks.nii.gz", - motion_file="motion.x5", - ) - - @pytest.mark.parametrize( ("brainmask_file", "frame_time", "frame_duration"), [ diff --git a/test/test_model.py b/test/test_model.py index 44cb882ac..8e990118e 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -31,12 +31,12 @@ from nifreeze import model from nifreeze.data.base import BaseDataset -from nifreeze.data.dmri import ( +from nifreeze.data.dmri import DWI +from nifreeze.data.dmriutils import ( DEFAULT_LOWB_THRESHOLD, DEFAULT_MAX_S0, DEFAULT_MIN_S0, DTI_MIN_ORIENTATIONS, - DWI, ) from nifreeze.model._dipy import GaussianProcessModel from nifreeze.model.base import ( @@ -46,6 +46,23 @@ ) from nifreeze.testing import simulations as _sim +B_MATRIX = np.array( + [ + [0.0, 0.0, 0.0, 0], + [1.0, 0.0, 0.0, 500], + [0.0, 1.0, 0.0, 1000], + [0.0, 0.0, 1.0, 1000], + [1 / np.sqrt(2), 1 / np.sqrt(2), 0.0, 1000], + [1 / np.sqrt(2), 0.0, 1 / np.sqrt(2), 1000], + [0.0, 1 / np.sqrt(2), 1 / np.sqrt(2), 1000], + [1 / np.sqrt(3), 1 / np.sqrt(3), 1 / np.sqrt(3), 2000], + [-1 / np.sqrt(3), 1 / np.sqrt(3), 1 / np.sqrt(3), 2000], + [1 / np.sqrt(3), -1 / np.sqrt(3), 1 / np.sqrt(3), 2000], + [1 / np.sqrt(3), 1 / np.sqrt(3), -1 / np.sqrt(3), 2000], + ], + dtype=np.float32, +) + # Dummy classes to simulate model factory essential features class DummyDMRIModel: @@ -109,11 +126,19 @@ def test_trivial_model(request, use_mask): a_max=DEFAULT_MAX_S0, ) + n_vols = 10 + + bvecs = rng.normal(size=(n_vols, 3)) + bvecs /= np.linalg.norm(bvecs, axis=1, keepdims=True) + bvals = np.full((bvecs.shape[0], 1), 1000.0) + gradients = np.hstack((bvecs, bvals)) + data = DWI( - dataobj=rng.normal(size=(*_S0.shape, 10)), + dataobj=rng.normal(size=(*_S0.shape, n_vols)), affine=np.eye(4), bzero=_clipped_S0, brainmask=mask, + gradients=gradients, ) with context: @@ -178,22 +203,8 @@ def __getitem__(self, index): def test_average_model(): """Check the implementation of the average DW model.""" - gtab = np.array( - [ - [0, 0, 0, 0], - [-0.31, 0.933, 0.785, 25], - [0.25, 0.565, 0.21, 500], - [-0.861, -0.464, 0.564, 1000], - [0.307, -0.766, 0.677, 1000], - [0.736, 0.013, 0.774, 1000], - [-0.31, 0.933, 0.785, 1000], - [0.25, 0.565, 0.21, 2000], - [-0.861, -0.464, 0.564, 2000], - [0.307, -0.766, 0.677, 2000], - ] - ) - - size = (100, 100, 100, gtab.shape[0]) + gtab = B_MATRIX.copy() + size = (10, 10, 10, gtab.shape[0]) data = np.ones(size, dtype=float) mask = np.ones(size[:3], dtype=bool) @@ -205,21 +216,24 @@ def test_average_model(): avgmodel_median = model.AverageDWIModel(dataset) # Verify that average cannot be calculated in shells with one single value + # The two first gradients are considered low-b orientations because of the + # default threshold and are pulled out (so now index is 0 for b=500). with pytest.raises(RuntimeError): - avgmodel_mean.fit_predict(2) + avgmodel_mean.fit_predict(0) assert np.allclose(avgmodel_mean.fit_predict(3), 1000) assert np.allclose(avgmodel_median.fit_predict(3), 1000) - grads = list(gtab[:, -1]) - del grads[1] - assert np.allclose(avgmodel_mean_full.fit_predict(1), np.mean(grads)) + grads = list(gtab[1:, -1]) # Exclude b0 + del grads[3] + assert np.allclose(avgmodel_mean_full.fit_predict(3), np.mean(grads)) avgmodel_mean_2000 = model.AverageDWIModel(dataset, stat="mean", atol_low=1100) avgmodel_median_2000 = model.AverageDWIModel(dataset, atol_low=1100) - assert np.allclose(avgmodel_mean_2000.fit_predict(9), gtab[3:-1, -1].mean()) - assert np.allclose(avgmodel_median_2000.fit_predict(9), 1000) + last = gtab.shape[0] - 2 # Last but one index (b=2000) + assert np.allclose(avgmodel_mean_2000.fit_predict(last), gtab[2:-1, -1].mean()) + assert np.allclose(avgmodel_median_2000.fit_predict(last), 1000) @pytest.mark.parametrize(