Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dmriprep/config/emc_coarse_Affine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [1000, 100],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
8 changes: 8 additions & 0 deletions dmriprep/config/emc_coarse_Rigid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [100, 100],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
8 changes: 8 additions & 0 deletions dmriprep/config/emc_precise_Affine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [1000, 1000],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
8 changes: 8 additions & 0 deletions dmriprep/config/emc_precise_Rigid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [1000, 1000],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
147 changes: 147 additions & 0 deletions dmriprep/interfaces/registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Register tools interfaces."""
import numpy as np
import nibabel as nb
import dmriprep
from nipype import logging
from pathlib import Path
from nipype.utils.filemanip import fname_presuffix
from nipype.interfaces.base import (
traits,
TraitedSpec,
BaseInterfaceInputSpec,
InputMultiObject,
SimpleInterface,
File,
)


LOGGER = logging.getLogger("nipype.interface")


class _ApplyAffineInputSpec(BaseInterfaceInputSpec):
moving_image = File(
exists=True, mandatory=True, desc="image to apply transformation from"
)
fixed_image = File(
exists=True, mandatory=True, desc="image to apply transformation to"
)
transform_affine = InputMultiObject(
File(exists=True), mandatory=True, desc="transformation affine"
)
invert_transform = traits.Bool(False, usedefault=True)


class _ApplyAffineOutputSpec(TraitedSpec):
warped_image = File(exists=True, desc="Outputs warped image")


class ApplyAffine(SimpleInterface):
"""
Interface to apply an affine transformation to an image.
"""

input_spec = _ApplyAffineInputSpec
output_spec = _ApplyAffineOutputSpec

def _run_interface(self, runtime):
from dmriprep.utils.registration import apply_affine

warped_image_nifti = apply_affine(
nb.load(self.inputs.moving_image),
nb.load(self.inputs.fixed_image),
np.load(self.inputs.transform_affine[0]),
self.inputs.invert_transform,
)
cwd = Path(runtime.cwd).absolute()
warped_file = fname_presuffix(
self.inputs.moving_image,
use_ext=False,
suffix="_warped.nii.gz",
newpath=str(cwd),
)

warped_image_nifti.to_filename(warped_file)

self._results["warped_image"] = warped_file
return runtime


class _RegisterInputSpec(BaseInterfaceInputSpec):
moving_image = File(
exists=True, mandatory=True, desc="image to apply transformation from"
)
fixed_image = File(
exists=True, mandatory=True, desc="image to apply transformation to"
)
nbins = traits.Int(default_value=32, usedefault=True)
sampling_prop = traits.Float(default_value=1, usedefault=True)
metric = traits.Str(default_value="MI", usedefault=True)
level_iters = traits.List(
trait=traits.Any(), value=[10000, 1000, 100], usedefault=True
)
sigmas = traits.List(trait=traits.Any(), value=[5.0, 2.5, 0.0], usedefault=True)
factors = traits.List(trait=traits.Any(), value=[4, 2, 1], usedefault=True)
params0 = traits.ArrayOrNone(value=None, usedefault=True)
pipeline = traits.List(
trait=traits.Any(),
value=["c_of_mass", "translation", "rigid", "affine"],
usedefault=True,
)


class _RegisterOutputSpec(TraitedSpec):
forward_transforms = traits.List(
File(exists=True), desc="List of output transforms for forward registration"
)
warped_image = File(exists=True, desc="Outputs warped image")


class Register(SimpleInterface):
"""
Interface to perform affine registration.
"""

input_spec = _RegisterInputSpec
output_spec = _RegisterOutputSpec

def _run_interface(self, runtime):
from dmriprep.utils.registration import affine_registration

reg_types = ["c_of_mass", "translation", "rigid", "affine"]
pipeline = [
getattr(dmriprep.utils.register, i)
for i in self.inputs.pipeline
if i in reg_types
]

warped_image_nifti, forward_transform_mat = affine_registration(
nb.load(self.inputs.moving_image),
nb.load(self.inputs.fixed_image),
self.inputs.nbins,
self.inputs.sampling_prop,
self.inputs.metric,
pipeline,
self.inputs.level_iters,
self.inputs.sigmas,
self.inputs.factors,
self.inputs.params0,
)
cwd = Path(runtime.cwd).absolute()
warped_file = fname_presuffix(
self.inputs.moving_image,
use_ext=False,
suffix="_warped.nii.gz",
newpath=str(cwd),
)
forward_transform_file = fname_presuffix(
self.inputs.moving_image,
use_ext=False,
suffix="_forward_transform.npy",
newpath=str(cwd),
)
warped_image_nifti.to_filename(warped_file)

np.save(forward_transform_file, forward_transform_mat)
self._results["warped_image"] = warped_file
self._results["forward_transforms"] = [forward_transform_file]
return runtime
170 changes: 170 additions & 0 deletions dmriprep/utils/registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""
Linear affine registration tools for motion correction.
"""
import numpy as np
import nibabel as nb
from dipy.align.metrics import CCMetric, EMMetric, SSDMetric
from dipy.align.imaffine import (
transform_centers_of_mass,
AffineMap,
MutualInformationMetric,
AffineRegistration,
)
from dipy.align.transforms import (
TranslationTransform3D,
RigidTransform3D,
AffineTransform3D,
)
from nipype.utils.filemanip import fname_presuffix

syn_metric_dict = {"CC": CCMetric, "EM": EMMetric, "SSD": SSDMetric}

__all__ = [
"c_of_mass",
"translation",
"rigid",
"affine",
"affine_registration",
]


def apply_affine(moving, static, transform_affine, invert=False):
"""Apply an affine to transform an image from one space to another.

Parameters
----------
moving : array
The image to be resampled

static : array

Returns
-------
warped_img : the moving array warped into the static array's space.

"""
affine_map = AffineMap(
transform_affine, static.shape, static.affine, moving.shape, moving.affine
)
if invert is True:
warped_arr = affine_map.transform_inverse(np.asarray(moving.dataobj))
else:
warped_arr = affine_map.transform(np.asarray(moving.dataobj))

return nb.Nifti1Image(warped_arr, static.affine)


def average_affines(transforms):
affine_list = [np.load(aff) for aff in transforms]
average_affine_file = fname_presuffix(
transforms[0], use_ext=False, suffix="_average.npy"
)
np.save(average_affine_file, np.mean(affine_list, axis=0))
return average_affine_file


# Affine registration pipeline:
affine_metric_dict = {"MI": MutualInformationMetric, "CC": CCMetric}


def c_of_mass(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = transform_centers_of_mass(static, static_affine, moving, moving_affine)
transformed = transform.transform(moving)
return transformed, transform.affine


def translation(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = TranslationTransform3D()
translation = reg.optimize(
static,
moving,
transform,
params0,
static_affine,
moving_affine,
starting_affine=starting_affine,
)

return translation.transform(moving), translation.affine


def rigid(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = RigidTransform3D()
rigid = reg.optimize(
static,
moving,
transform,
params0,
static_affine,
moving_affine,
starting_affine=starting_affine,
)
return rigid.transform(moving), rigid.affine


def affine(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = AffineTransform3D()
affine = reg.optimize(
static,
moving,
transform,
params0,
static_affine,
moving_affine,
starting_affine=starting_affine,
)

return affine.transform(moving), affine.affine


def affine_registration(
moving,
static,
nbins,
sampling_prop,
metric,
pipeline,
level_iters,
sigmas,
factors,
params0,
):

"""
Find the affine transformation between two 3D images.

Parameters
----------

"""
# Define the Affine registration object we'll use with the chosen metric:
use_metric = affine_metric_dict[metric](nbins, sampling_prop)
affreg = AffineRegistration(
metric=use_metric, level_iters=level_iters, sigmas=sigmas, factors=factors
)

if not params0:
starting_affine = np.eye(4)
else:
starting_affine = params0

# Go through the selected transformation:
for func in pipeline:
transformed, starting_affine = func(
np.asarray(moving.dataobj),
np.asarray(static.dataobj),
static.affine,
moving.affine,
affreg,
starting_affine,
params0,
)
return nb.Nifti1Image(np.array(transformed), static.affine), starting_affine
Loading