diff --git a/nitransforms/tests/test_io_itk.py b/nitransforms/tests/test_io_itk.py index 32423733..54f2e29f 100644 --- a/nitransforms/tests/test_io_itk.py +++ b/nitransforms/tests/test_io_itk.py @@ -4,6 +4,7 @@ import os import shutil +from subprocess import check_call import pytest @@ -14,11 +15,19 @@ from scipy.io import loadmat from h5py import File as H5File -from nitransforms.io.base import TransformIOError, TransformFileError -from nitransforms.io import itk +from nitransforms.io.base import TransformIOError, TransformFileError +from nitransforms.io.itk import ( + ITKLinearTransform, + ITKLinearTransformArray, + ITKDisplacementsField, + ITKCompositeH5, +) +from nitransforms import nonlinear as nitnl from nitransforms.conftest import _datadir, _testdir +from nitransforms.tests.utils import get_points +RNG_SEED = 202508140819 LPS = np.diag([-1, -1, 1, 1]) ITK_MAT = LPS.dot(np.ones((4, 4)).dot(LPS)) @@ -29,14 +38,14 @@ def test_ITKLinearTransform(tmpdir, testdata_path): matlabfile = testdata_path / "ds-005_sub-01_from-T1_to-OASIS_affine.mat" mat = loadmat(str(matlabfile)) with open(str(matlabfile), "rb") as f: - itkxfm = itk.ITKLinearTransform.from_fileobj(f) + itkxfm = ITKLinearTransform.from_fileobj(f) assert np.allclose( itkxfm["parameters"][:3, :3].flatten(), mat["AffineTransform_float_3_3"][:-3].flatten(), ) assert np.allclose(itkxfm["offset"], mat["fixed"].reshape((3,))) - itkxfm = itk.ITKLinearTransform.from_filename(matlabfile) + itkxfm = ITKLinearTransform.from_filename(matlabfile) assert np.allclose( itkxfm["parameters"][:3, :3].flatten(), mat["AffineTransform_float_3_3"][:-3].flatten(), @@ -46,17 +55,17 @@ def test_ITKLinearTransform(tmpdir, testdata_path): # Test to_filename(textfiles) itkxfm.to_filename("textfile.tfm") with open("textfile.tfm") as f: - itkxfm2 = itk.ITKLinearTransform.from_fileobj(f) + itkxfm2 = ITKLinearTransform.from_fileobj(f) assert np.allclose(itkxfm["parameters"], itkxfm2["parameters"]) # Test to_filename(matlab) itkxfm.to_filename("copy.mat") with open("copy.mat", "rb") as f: - itkxfm3 = itk.ITKLinearTransform.from_fileobj(f) + itkxfm3 = ITKLinearTransform.from_fileobj(f) assert np.all(itkxfm["parameters"] == itkxfm3["parameters"]) rasmat = from_matvec(euler2mat(x=0.9, y=0.001, z=0.001), [4.0, 2.0, -1.0]) - itkxfm = itk.ITKLinearTransform.from_ras(rasmat) + itkxfm = ITKLinearTransform.from_ras(rasmat) assert np.allclose(itkxfm["parameters"], ITK_MAT * rasmat) assert np.allclose(itkxfm.to_ras(), rasmat) @@ -67,9 +76,9 @@ def test_ITKLinearTransformArray(tmpdir, data_path): with open(str(data_path / "itktflist.tfm")) as f: text = f.read() f.seek(0) - itklist = itk.ITKLinearTransformArray.from_fileobj(f) + itklist = ITKLinearTransformArray.from_fileobj(f) - itklistb = itk.ITKLinearTransformArray.from_filename(data_path / "itktflist.tfm") + itklistb = ITKLinearTransformArray.from_filename(data_path / "itktflist.tfm") assert itklist["nxforms"] == itklistb["nxforms"] assert all( [ @@ -85,14 +94,14 @@ def test_ITKLinearTransformArray(tmpdir, data_path): assert itklist["nxforms"] == 9 assert text == itklist.to_string() with pytest.raises(TransformFileError): - itk.ITKLinearTransformArray.from_string("\n".join(text.splitlines()[1:])) + ITKLinearTransformArray.from_string("\n".join(text.splitlines()[1:])) itklist.to_filename("copy.tfm") with open("copy.tfm") as f: copytext = f.read() assert text == copytext - itklist = itk.ITKLinearTransformArray( + itklist = ITKLinearTransformArray( xforms=[np.random.normal(size=(4, 4)) for _ in range(4)] ) @@ -108,7 +117,7 @@ def test_ITKLinearTransformArray(tmpdir, data_path): f.write(xfm.to_string()) with open("extracted.tfm") as f: - xfm2 = itk.ITKLinearTransform.from_fileobj(f) + xfm2 = ITKLinearTransform.from_fileobj(f) assert np.allclose( xfm.structarr["parameters"][:3, ...], xfm2.structarr["parameters"][:3, ...] ) @@ -118,18 +127,18 @@ def test_ITKLinearTransformArray(tmpdir, data_path): itklist.to_filename("matlablist.mat") with pytest.raises(TransformFileError): - xfm2 = itk.ITKLinearTransformArray.from_binary({}) + xfm2 = ITKLinearTransformArray.from_binary({}) with open("filename.mat", "ab") as f: with pytest.raises(TransformFileError): - xfm2 = itk.ITKLinearTransformArray.from_fileobj(f) + xfm2 = ITKLinearTransformArray.from_fileobj(f) @pytest.mark.parametrize("size", [(20, 20, 20), (20, 20, 20, 3)]) def test_itk_disp_load(size): """Checks field sizes.""" with pytest.raises(TransformFileError): - itk.ITKDisplacementsField.from_image( + ITKDisplacementsField.from_image( nb.Nifti1Image(np.zeros(size), np.eye(4), None) ) @@ -137,7 +146,7 @@ def test_itk_disp_load(size): def test_itk_disp_load_intent(): """Checks whether the NIfTI intent is fixed.""" with pytest.warns(UserWarning): - field = itk.ITKDisplacementsField.from_image( + field = ITKDisplacementsField.from_image( nb.Nifti1Image(np.zeros((20, 20, 20, 1, 3)), np.eye(4), None) ) @@ -161,7 +170,7 @@ def test_itk_h5(tmpdir, only_linear, h5_path, nxforms): assert ( len( list( - itk.ITKCompositeH5.from_filename( + ITKCompositeH5.from_filename( h5_path, only_linear=only_linear, ) @@ -174,7 +183,7 @@ def test_itk_h5(tmpdir, only_linear, h5_path, nxforms): with pytest.raises(TransformFileError): list( - itk.ITKCompositeH5.from_filename( + ITKCompositeH5.from_filename( h5_path.absolute().name.replace(".h5", ".x5"), only_linear=only_linear, ) @@ -190,30 +199,28 @@ def test_itk_h5(tmpdir, only_linear, h5_path, nxforms): xfm["TransformType"][0] = b"InventTransform" with pytest.raises(TransformIOError): - itk.ITKCompositeH5.from_filename("test.h5") + ITKCompositeH5.from_filename("test.h5") def test_itk_linear_h5(tmpdir, data_path, testdata_path): """Check different lower-level loading options.""" # File loadable with transform array - h5xfm = itk.ITKLinearTransformArray.from_filename( - data_path / "affine-antsComposite.h5" - ) + h5xfm = ITKLinearTransformArray.from_filename(data_path / "affine-antsComposite.h5") assert len(h5xfm.xforms) == 1 with open(data_path / "affine-antsComposite.h5", "rb") as f: - h5xfm = itk.ITKLinearTransformArray.from_fileobj(f) + h5xfm = ITKLinearTransformArray.from_fileobj(f) assert len(h5xfm.xforms) == 1 # File loadable with single affine object - itk.ITKLinearTransform.from_filename(data_path / "affine-antsComposite.h5") + ITKLinearTransform.from_filename(data_path / "affine-antsComposite.h5") with open(data_path / "affine-antsComposite.h5", "rb") as f: - itk.ITKLinearTransform.from_fileobj(f) + ITKLinearTransform.from_fileobj(f) # Exercise only_linear - itk.ITKCompositeH5.from_filename( + ITKCompositeH5.from_filename( testdata_path / "ds-005_sub-01_from-T1w_to-MNI152NLin2009cAsym_mode-image_xfm.h5", only_linear=True, @@ -231,16 +238,16 @@ def test_itk_linear_h5(tmpdir, data_path, testdata_path): xfm["TransformFixedParameters"] = np.zeros(3, dtype=float) # File loadable with transform array - h5xfm = itk.ITKLinearTransformArray.from_filename("test.h5") + h5xfm = ITKLinearTransformArray.from_filename("test.h5") assert len(h5xfm.xforms) == 2 # File loadable with generalistic object (NOTE we directly access the list) - h5xfm = itk.ITKCompositeH5.from_filename("test.h5") + h5xfm = ITKCompositeH5.from_filename("test.h5") assert len(h5xfm) == 2 # Error raised if the we try to use the single affine loader with pytest.raises(TransformIOError): - itk.ITKLinearTransform.from_filename("test.h5") + ITKLinearTransform.from_filename("test.h5") shutil.copy(data_path / "affine-antsComposite.h5", "test.h5") os.chmod("test.h5", 0o666) @@ -251,12 +258,12 @@ def test_itk_linear_h5(tmpdir, data_path, testdata_path): del h5group["1"] # File loadable with generalistic object - h5xfm = itk.ITKCompositeH5.from_filename("test.h5") + h5xfm = ITKCompositeH5.from_filename("test.h5") assert len(h5xfm) == 0 # Error raised if the we try to use the single affine loader with pytest.raises(TransformIOError): - itk.ITKLinearTransform.from_filename("test.h5") + ITKLinearTransform.from_filename("test.h5") # Added tests for h5 orientation bug @@ -285,7 +292,7 @@ def test_itk_h5_field_order(tmp_path): g1["TransformFixedParameters"] = fixed g1["TransformParameters"] = params - img = itk.ITKCompositeH5.from_filename(fname)[0] + img = ITKCompositeH5.from_filename(fname)[0] expected = np.moveaxis(field, 0, -1) expected[..., (0, 1)] *= -1 assert np.allclose(img.get_fdata(), expected) @@ -314,9 +321,9 @@ def _load_composite_testdata(data_path): def test_itk_h5_displacement_mismatch(testdata_path): """Composite displacements should match the standalone field""" h5file, warpfile = _load_composite_testdata(testdata_path) - xforms = itk.ITKCompositeH5.from_filename(h5file) + xforms = ITKCompositeH5.from_filename(h5file) field_h5 = xforms[1] - field_img = itk.ITKDisplacementsField.from_filename(warpfile) + field_img = ITKDisplacementsField.from_filename(warpfile) np.testing.assert_array_equal( np.asanyarray(field_h5.dataobj), np.asanyarray(field_img.dataobj) @@ -368,7 +375,181 @@ def test_itk_h5_field_order_fortran(tmp_path): g1["TransformFixedParameters"] = fixed g1["TransformParameters"] = params - img = itk.ITKCompositeH5.from_filename(fname)[0] + img = ITKCompositeH5.from_filename(fname)[0] expected = np.moveaxis(field, 0, -1) expected[..., (0, 1)] *= -1 assert np.allclose(img.get_fdata(), expected) + + +# Tests against ANTs' ``antsApplyTransformsToPoints`` +@pytest.mark.parametrize("ongrid", [True, False]) +def test_densefield_map_vs_ants(testdata_path, tmp_path, ongrid): + """Map points with DenseFieldTransform and compare to ANTs.""" + + rng = np.random.default_rng(RNG_SEED) + warpfile = ( + testdata_path + / "regressions" + / ("01_ants_t1_to_mniComposite_DisplacementFieldTransform.nii.gz") + ) + if not warpfile.exists(): + pytest.skip("Composite transform test data not available") + + nii = ITKDisplacementsField.from_filename(warpfile) + + # Get sampling indices + coords_xyz, points_ijk, grid_xyz, shape, ref_affine, reference, subsample = ( + get_points(nii, ongrid, npoints=5, rng=rng) + ) + coords_map = grid_xyz.reshape(*shape, 3) + + csvin = tmp_path / "fixed_coords.csv" + csvout = tmp_path / "moving_coords.csv" + + # antsApplyTransformsToPoints wants LPS coordinates, see last post at + # http://sourceforge.net/p/advants/discussion/840261/thread/2a1e9307/ + lps_xyz = coords_xyz.copy() * (-1, -1, 1) + np.savetxt(csvin, lps_xyz, delimiter=",", header="x,y,z", comments="") + + cmd = f"antsApplyTransformsToPoints -d 3 -i {csvin} -o {csvout} -t {warpfile}" + exe = cmd.split()[0] + if not shutil.which(exe): + pytest.skip(f"Command {exe} not found on host") + check_call(cmd, shell=True) + + ants_res = np.genfromtxt(csvout, delimiter=",", names=True) + + # antsApplyTransformsToPoints writes LPS coordinates, see last post at + # http://sourceforge.net/p/advants/discussion/840261/thread/2a1e9307/ + ants_pts = np.vstack([ants_res[n] for n in ("x", "y", "z")]).T * (-1, -1, 1) + + xfm = nitnl.DenseFieldTransform(nii, reference=reference) + mapped = xfm.map(coords_xyz) + + if ongrid: + ants_mapped_xyz = ants_pts.reshape(*shape, 3) + nit_mapped_xyz = mapped.reshape(*shape, 3) + + nb.load(warpfile).to_filename(tmp_path / "original_ants_deltas.nii.gz") + + nb.Nifti1Image(coords_map, ref_affine, None).to_filename( + tmp_path / "baseline_positions.nii.gz" + ) + + nii.to_filename(tmp_path / "original_interpreted_deltas.nii.gz") + + nb.Nifti1Image(nit_mapped_xyz, ref_affine, None).to_filename( + tmp_path / "nit_deformation_xyz.nii.gz" + ) + nb.Nifti1Image(ants_mapped_xyz - coords_map, ref_affine, None).to_filename( + tmp_path / "ants_deltas_xyz.nii.gz" + ) + nb.Nifti1Image(nit_mapped_xyz - coords_map, ref_affine, None).to_filename( + tmp_path / "nit_deltas_xyz.nii.gz" + ) + + # When transforming off-grid points, rounding errors are large + atol = 0 if ongrid else 1e-1 + rtol = 1e-4 if ongrid else 1e-3 + np.testing.assert_allclose(mapped, ants_pts, atol=atol, rtol=rtol) + + +@pytest.mark.parametrize("image_orientation", ["RAS", "LAS", "LPS", "oblique"]) +@pytest.mark.parametrize("ongrid", [True, False]) +def test_constant_field_vs_ants(tmp_path, get_testdata, image_orientation, ongrid): + """Create a constant displacement field and compare mappings.""" + + rng = np.random.default_rng(RNG_SEED) + + nii = get_testdata[image_orientation] + + # Get sampling indices + coords_xyz, points_ijk, grid_xyz, shape, ref_affine, reference, subsample = ( + get_points(nii, ongrid, npoints=5, rng=rng) + ) + + tol = ( + {"atol": 0, "rtol": 1e-4} + if image_orientation != "oblique" + else {"atol": 1e-4, "rtol": 1e-2} + ) + coords_map = grid_xyz.reshape(*shape, 3) + + deltas = np.hstack( + ( + np.zeros(np.prod(shape)), + np.linspace(-80, 80, num=np.prod(shape)), + np.linspace(-50, 50, num=np.prod(shape)), + ) + ).reshape(shape + (3,)) + gold_mapped_xyz = coords_map + deltas + + fieldnii = nb.Nifti1Image(deltas, ref_affine, None) + warpfile = tmp_path / "itk_transform.nii.gz" + + # Ensure direct (xfm) and ITK roundtrip (itk_xfm) are equivalent + xfm = nitnl.DenseFieldTransform(fieldnii) + xfm.to_filename(warpfile, fmt="itk") + itk_xfm = nitnl.DenseFieldTransform(ITKDisplacementsField.from_filename(warpfile)) + + np.testing.assert_allclose(xfm.reference.affine, itk_xfm.reference.affine) + np.testing.assert_allclose(ref_affine, itk_xfm.reference.affine) + np.testing.assert_allclose(xfm.reference.shape, itk_xfm.reference.shape) + np.testing.assert_allclose(xfm._field, itk_xfm._field, **tol) + if image_orientation != "oblique": + assert xfm == itk_xfm + + # Ensure deltas and mapped grid are equivalent + orig_grid_mapped_xyz = xfm.map(grid_xyz).reshape(*shape, -1) + np.testing.assert_allclose(gold_mapped_xyz, orig_grid_mapped_xyz) + + # Test ANTs mapping + grid_mapped_xyz = itk_xfm.map(grid_xyz).reshape(*shape, -1) + + # Check apparent healthiness of mapping + np.testing.assert_allclose(gold_mapped_xyz, grid_mapped_xyz, **tol) + np.testing.assert_allclose(orig_grid_mapped_xyz, grid_mapped_xyz, **tol) + + csvout = tmp_path / "mapped_xyz.csv" + csvin = tmp_path / "coords_xyz.csv" + # antsApplyTransformsToPoints wants LPS coordinates, see last post at + # http://sourceforge.net/p/advants/discussion/840261/thread/2a1e9307/ + lps_xyz = coords_xyz.copy() * (-1, -1, 1) + np.savetxt(csvin, lps_xyz, delimiter=",", header="x,y,z", comments="") + + cmd = f"antsApplyTransformsToPoints -d 3 -i {csvin} -o {csvout} -t {warpfile}" + exe = cmd.split()[0] + if not shutil.which(exe): + pytest.skip(f"Command {exe} not found on host") + check_call(cmd, shell=True) + + ants_res = np.genfromtxt(csvout, delimiter=",", names=True) + # antsApplyTransformsToPoints writes LPS coordinates, see last post at + # http://sourceforge.net/p/advants/discussion/840261/thread/2a1e9307/ + ants_pts = np.vstack([ants_res[n] for n in ("x", "y", "z")]).T * (-1, -1, 1) + + nb.Nifti1Image(grid_mapped_xyz, ref_affine, None).to_filename( + tmp_path / "grid_mapped.nii.gz" + ) + nb.Nifti1Image(coords_map, ref_affine, None).to_filename( + tmp_path / "baseline_field.nii.gz" + ) + nb.Nifti1Image(gold_mapped_xyz, ref_affine, None).to_filename( + tmp_path / "gold_mapped_xyz.nii.gz" + ) + + if ongrid: + ants_pts = ants_pts.reshape(*shape, 3) + + nb.Nifti1Image(ants_pts, ref_affine, None).to_filename( + tmp_path / "ants_mapped_xyz.nii.gz" + ) + np.testing.assert_allclose(gold_mapped_xyz, ants_pts, rtol=1e-2, atol=1e-3) + np.testing.assert_allclose(deltas, ants_pts - coords_map, rtol=1e-2, atol=1e-3) + else: + # TODO Change test to norms and investigate extreme cases + # We're likely hitting OBB points (see gh-188) + # https://github.com/nipy/nitransforms/pull/188 + np.testing.assert_allclose( + xfm.map(coords_xyz) - coords_xyz, ants_pts - coords_xyz, rtol=1, atol=1 + )