diff --git a/niworkflows/interfaces/images.py b/niworkflows/interfaces/images.py index 99bbad4ac37..58f976badd5 100644 --- a/niworkflows/interfaces/images.py +++ b/niworkflows/interfaces/images.py @@ -25,6 +25,50 @@ LOGGER = logging.getLogger("nipype.interface") +class _RegridToZoomsInputSpec(BaseInterfaceInputSpec): + in_file = File( + exists=True, mandatory=True, desc="a file whose resolution is to change" + ) + zooms = traits.Tuple( + traits.Float, + traits.Float, + traits.Float, + mandatory=True, + desc="the new resolution", + ) + order = traits.Int(3, usedefault=True, desc="order of interpolator") + clip = traits.Bool( + True, + usedefault=True, + desc="clip the data array within the original image's range", + ) + + +class _RegridToZoomsOutputSpec(TraitedSpec): + out_file = File(exists=True, dec="the regridded file") + + +class RegridToZooms(SimpleInterface): + """Change the resolution of an image (regrid).""" + + input_spec = _RegridToZoomsInputSpec + output_spec = _RegridToZoomsOutputSpec + + def _run_interface(self, runtime): + from ..utils.images import resample_by_spacing + + self._results["out_file"] = fname_presuffix( + self.inputs.in_file, suffix="_regrid", newpath=runtime.cwd + ) + resample_by_spacing( + self.inputs.in_file, + self.inputs.zooms, + order=self.inputs.order, + clip=self.inputs.clip, + ).to_filename(self._results["out_file"]) + return runtime + + class _IntraModalMergeInputSpec(BaseInterfaceInputSpec): in_files = InputMultiPath(File(exists=True), mandatory=True, desc="input files") in_mask = File(exists=True, desc="input mask for grand mean scaling") diff --git a/niworkflows/utils/images.py b/niworkflows/utils/images.py index 17ae95ab398..b67747dcd08 100644 --- a/niworkflows/utils/images.py +++ b/niworkflows/utils/images.py @@ -125,3 +125,85 @@ def dseg_label(in_seg, label, newpath=None): new.set_data_dtype(np.uint8) new.to_filename(out_file) return out_file + + +def resample_by_spacing(in_file, zooms, order=3, clip=True): + """Regrid the input image to match the new zooms.""" + from pathlib import Path + import numpy as np + import nibabel as nb + from scipy.ndimage import map_coordinates + + if isinstance(in_file, (str, Path)): + in_file = nb.load(in_file) + + # Prepare output x-forms + sform, scode = in_file.get_sform(coded=True) + qform, qcode = in_file.get_qform(coded=True) + + hdr = in_file.header.copy() + dtype = hdr.get_data_dtype() + data = np.asanyarray(in_file.dataobj) + zooms = np.array(zooms) + + # Calculate the factors to normalize voxel size to the specific zooms + pre_zooms = np.array(in_file.header.get_zooms()[:3]) + + # Calculate an affine aligned with cardinal axes, for simplicity + card = nb.affines.from_matvec(np.diag(pre_zooms)) + extent = card[:3, :3].dot(np.array(in_file.shape[:3])) + card[:3, 3] = -0.5 * extent + + # Cover the FoV with the new grid + new_size = np.ceil(extent / zooms).astype(int) + offset = (extent - np.diag(zooms).dot(new_size)) * 0.5 + new_card = nb.affines.from_matvec(np.diag(zooms), card[:3, 3] + offset) + + # Calculate the new indexes + new_grid = np.array( + np.meshgrid( + np.arange(new_size[0]), + np.arange(new_size[1]), + np.arange(new_size[2]), + indexing="ij", + ) + ).reshape((3, -1)) + + # Calculate the locations of the new samples, w.r.t. the original grid + ijk = np.linalg.inv(card).dot( + new_card.dot(np.vstack((new_grid, np.ones((1, new_grid.shape[1]))))) + ) + + # Resample data in the new grid + resampled = map_coordinates( + data, + ijk[:3, :], + output=dtype, + order=order, + mode="constant", + cval=0, + prefilter=True, + ).reshape(new_size) + if clip: + resampled = np.clip(resampled, a_min=data.min(), a_max=data.max()) + + # Set new zooms + hdr.set_zooms(zooms) + + # Get the original image's affine + affine = in_file.affine.copy() + # Determine rotations w.r.t. cardinal axis and eccentricity + rot = affine.dot(np.linalg.inv(card)) + # Apply to the new cardinal, so that the resampling is consistent + new_affine = rot.dot(new_card) + + if qcode != 0: + hdr.set_qform(new_affine.dot(np.linalg.inv(affine).dot(qform)), code=int(qcode)) + if scode != 0: + hdr.set_sform(new_affine.dot(np.linalg.inv(affine).dot(sform)), code=int(scode)) + if (scode, qcode) == (0, 0): + hdr.set_qform(new_affine, code=1) + hdr.set_sform(new_affine, code=1) + + # Create a new x-form affine, aligned with cardinal axes, 1mm3 and centered. + return nb.Nifti1Image(resampled, new_affine, hdr) diff --git a/niworkflows/utils/tests/test_images.py b/niworkflows/utils/tests/test_images.py index 166d2270cc3..35fcd9c8461 100644 --- a/niworkflows/utils/tests/test_images.py +++ b/niworkflows/utils/tests/test_images.py @@ -2,7 +2,12 @@ import numpy as np import pytest -from ..images import update_header_fields, overwrite_header, dseg_label +from ..images import ( + update_header_fields, + overwrite_header, + dseg_label, + resample_by_spacing, +) def random_image(): @@ -82,3 +87,28 @@ def test_dseg_label(tmp_path): new_im = nb.load(dseg_label(fname, label=2, newpath=tmp_path)) assert np.all((data == 2).astype("int16") == np.int16(new_im.dataobj)) + + +def test_resample_by_spacing(): + """Check the output zooms and data.""" + img_shape = (193, 229, 193) # Size from MNI152NLin2009cAsym, res=1 + img_affine = nb.affines.from_matvec(np.eye(3), (-96, -132, -78)) + + data = np.random.normal(size=img_shape).astype(float) + new_affine = nb.affines.from_matvec(2.0 * np.eye(3), (-96.5, -132.5, -78.5)) + + nii = nb.Nifti1Image(data, img_affine, None) + nii.set_qform(img_affine, code=4) + resampled = resample_by_spacing(nii, (2.0, 2.0, 2.0), order=1, clip=False) + assert resampled.header.get_zooms()[:3] == (2.0, 2.0, 2.0) + assert np.all(resampled.affine == new_affine) + + # Create a rotation matrix + rot = nb.affines.from_matvec(nb.eulerangles.euler2mat(0.01, 0.02, 0.004)) + # Check this works with oblique images + nii = nb.Nifti1Image(data, rot.dot(img_affine), None) + nii.set_qform(img_affine, code=4) + nii.set_sform(rot.dot(img_affine), code=4) + resampled = resample_by_spacing(nii, (2.0, 2.0, 2.0), order=1, clip=False) + assert resampled.header.get_zooms()[:3] == (2.0, 2.0, 2.0) + assert np.allclose(resampled.affine, rot.dot(new_affine))