|
| 1 | +r"""Interactive test and visualization of vector flow derivatives.""" |
| 2 | + |
| 3 | +# %% |
| 4 | +# Imports |
| 5 | +from typing import Dict, Optional, Sequence |
| 6 | + |
| 7 | +import matplotlib.pyplot as plt |
| 8 | + |
| 9 | +import torch |
| 10 | +from torch import Tensor |
| 11 | +from torch.random import Generator |
| 12 | + |
| 13 | +from deepali.core import Axes, Grid |
| 14 | +import deepali.core.bspline as B |
| 15 | +import deepali.core.functional as U |
| 16 | + |
| 17 | + |
| 18 | +# %% |
| 19 | +# Auxiliary functions |
| 20 | +def change_axes(flow: Tensor, grid: Grid, axes: Axes, to_axes: Axes) -> Tensor: |
| 21 | + if axes != to_axes: |
| 22 | + flow = U.move_dim(flow, 1, -1) |
| 23 | + flow = grid.transform_vectors(flow, axes=axes, to_axes=to_axes) |
| 24 | + flow = U.move_dim(flow, -1, 1) |
| 25 | + return flow |
| 26 | + |
| 27 | + |
| 28 | +def flow_derivatives( |
| 29 | + flow: Tensor, grid: Grid, axes: Axes, to_axes: Optional[Axes] = None, **kwargs |
| 30 | +) -> Dict[str, Tensor]: |
| 31 | + if to_axes is None: |
| 32 | + to_axes = axes |
| 33 | + flow = change_axes(flow, grid, axes, to_axes) |
| 34 | + axes = to_axes |
| 35 | + if "spacing" not in kwargs: |
| 36 | + if axes == Axes.CUBE: |
| 37 | + spacing = tuple(2 / n for n in grid.size()) |
| 38 | + elif axes == Axes.CUBE_CORNERS: |
| 39 | + spacing = tuple(2 / (n - 1) for n in grid.size()) |
| 40 | + elif axes == Axes.GRID: |
| 41 | + spacing = 1 |
| 42 | + elif axes == Axes.WORLD: |
| 43 | + spacing = grid.spacing() |
| 44 | + else: |
| 45 | + spacing = None |
| 46 | + kwargs["spacing"] = spacing |
| 47 | + return U.flow_derivatives(flow, **kwargs) |
| 48 | + |
| 49 | + |
| 50 | +def random_svf( |
| 51 | + size: Sequence[int], |
| 52 | + stride: int = 1, |
| 53 | + generator: Optional[Generator] = None, |
| 54 | +) -> Tensor: |
| 55 | + cp_grid_size = B.cubic_bspline_control_point_grid_size(size, stride=stride) |
| 56 | + cp_grid_size = tuple(reversed(cp_grid_size)) |
| 57 | + data = torch.randn((1, 3) + cp_grid_size, generator=generator) |
| 58 | + data = U.fill_border(data, margin=3, value=0, inplace=True) |
| 59 | + return B.evaluate_cubic_bspline(data, size=size, stride=stride) |
| 60 | + |
| 61 | + |
| 62 | +def visualize_flow( |
| 63 | + ax: plt.Axes, |
| 64 | + flow: Tensor, |
| 65 | + grid: Optional[Grid] = None, |
| 66 | + axes: Optional[Axes] = None, |
| 67 | + label: Optional[str] = None, |
| 68 | +) -> None: |
| 69 | + if grid is None: |
| 70 | + grid = Grid(shape=flow.shape[2:]) |
| 71 | + if axes is None: |
| 72 | + axes = grid.axes() |
| 73 | + flow = change_axes(flow, grid, axes, grid.axes()) |
| 74 | + x = grid.coords(channels_last=False, dtype=flow.dtype, device=flow.device) |
| 75 | + x = U.move_dim(x.unsqueeze_(0).add_(flow), 1, -1) |
| 76 | + target_grid = U.grid_image(shape=flow.shape[2:], inverted=True, stride=(5, 5)) |
| 77 | + warped_grid = U.warp_image(target_grid, x, align_corners=grid.align_corners()) |
| 78 | + ax.imshow(warped_grid[0, 0, flow.shape[2] // 2], cmap="gray") |
| 79 | + if label: |
| 80 | + ax.set_title(label, fontsize=24) |
| 81 | + |
| 82 | + |
| 83 | +# %% |
| 84 | +# Random velocity fields |
| 85 | +generator = torch.Generator().manual_seed(42) |
| 86 | +grid = Grid(size=(128, 128, 64), spacing=(0.5, 0.5, 1.0)) |
| 87 | +flow = random_svf(grid.size(), stride=8, generator=generator).mul_(0.1) |
| 88 | + |
| 89 | +fig, axes = plt.subplots(1, 1, figsize=(4, 4)) |
| 90 | + |
| 91 | +ax = axes |
| 92 | +ax.set_title("v", fontsize=24, pad=20) |
| 93 | +visualize_flow(ax, flow, grid=grid, axes=grid.axes()) |
| 94 | + |
| 95 | + |
| 96 | +# %% |
| 97 | +# Visualise first order derivatives for different modes |
| 98 | +configs = [ |
| 99 | + dict(mode="forward_central_backward"), |
| 100 | + dict(mode="bspline"), |
| 101 | + dict(mode="gaussian", sigma=0.7355), |
| 102 | +] |
| 103 | + |
| 104 | +fig, axes = plt.subplots(len(configs), 4, figsize=(16, 4 * len(configs))) |
| 105 | + |
| 106 | +for i, config in enumerate(configs): |
| 107 | + derivs = flow_derivatives( |
| 108 | + flow, |
| 109 | + grid=grid, |
| 110 | + axes=grid.axes(), |
| 111 | + to_axes=Axes.GRID, |
| 112 | + which=["du/dx", "du/dy", "dv/dx", "dv/dy"], |
| 113 | + **config, |
| 114 | + ) |
| 115 | + for ax, (key, deriv) in zip(axes[i], derivs.items()): |
| 116 | + if i == 0: |
| 117 | + ax.set_title(key, fontsize=24, pad=20) |
| 118 | + ax.imshow(deriv[0, 0, deriv.shape[2] // 2], vmin=-1, vmax=1) |
| 119 | + |
| 120 | + |
| 121 | +# %% |
| 122 | +# Compare magnitudes of first order derivatives for different modes |
| 123 | +flow_axes = [Axes.GRID, Axes.WORLD, Axes.CUBE_CORNERS] |
| 124 | + |
| 125 | +sigma = 0.7355 |
| 126 | + |
| 127 | +configs = [ |
| 128 | + dict(mode="bspline"), |
| 129 | + dict(mode="gaussian", sigma=sigma), |
| 130 | + dict(mode="forward_central_backward", sigma=sigma), |
| 131 | + dict(mode="forward_central_backward"), |
| 132 | +] |
| 133 | + |
| 134 | +for to_axes in flow_axes: |
| 135 | + for config in configs: |
| 136 | + print(f"axes={to_axes}, " + ", ".join(f"{k}={v!r}" for k, v in config.items())) |
| 137 | + derivs = flow_derivatives( |
| 138 | + flow, |
| 139 | + grid=grid, |
| 140 | + axes=grid.axes(), |
| 141 | + to_axes=to_axes, |
| 142 | + which=["du/dx", "du/dy", "dv/dx", "dv/dy"], |
| 143 | + **config, |
| 144 | + ) |
| 145 | + for key, deriv in derivs.items(): |
| 146 | + print(f"- max(abs({key})): {deriv.abs().max().item():.5f}") |
| 147 | + print() |
| 148 | + print("\n") |
| 149 | + |
| 150 | + |
| 151 | +# %% |
| 152 | +# Visualise second order derivatives for different modes |
| 153 | +configs = [ |
| 154 | + dict(mode="forward_central_backward"), |
| 155 | + dict(mode="bspline"), |
| 156 | + dict(mode="gaussian", sigma=0.7355), |
| 157 | +] |
| 158 | + |
| 159 | +fig, axes = plt.subplots(len(configs), 4, figsize=(16, 4 * len(configs))) |
| 160 | + |
| 161 | +for i, config in enumerate(configs): |
| 162 | + derivs = flow_derivatives( |
| 163 | + flow, |
| 164 | + grid=grid, |
| 165 | + axes=grid.axes(), |
| 166 | + to_axes=Axes.GRID, |
| 167 | + which=["du/dxx", "du/dxy", "dv/dxy", "dv/dyy"], |
| 168 | + **config, |
| 169 | + ) |
| 170 | + for ax, (key, deriv) in zip(axes[i], derivs.items()): |
| 171 | + if i == 0: |
| 172 | + ax.set_title(key, fontsize=24, pad=20) |
| 173 | + ax.imshow(deriv[0, 0, deriv.shape[2] // 2], vmin=-0.4, vmax=0.4) |
| 174 | + |
| 175 | + |
| 176 | +# %% |
| 177 | +# Compare magnitudes of second order derivatives for different modes |
| 178 | +flow_axes = [Axes.GRID, Axes.WORLD, Axes.CUBE_CORNERS] |
| 179 | + |
| 180 | +sigma = 0.7355 |
| 181 | + |
| 182 | +configs = [ |
| 183 | + dict(mode="bspline"), |
| 184 | + dict(mode="gaussian", sigma=sigma), |
| 185 | + dict(mode="forward_central_backward", sigma=sigma), |
| 186 | + dict(mode="forward_central_backward"), |
| 187 | +] |
| 188 | + |
| 189 | +for to_axes in flow_axes: |
| 190 | + for config in configs: |
| 191 | + print(f"axes={to_axes}, " + ", ".join(f"{k}={v!r}" for k, v in config.items())) |
| 192 | + derivs = flow_derivatives( |
| 193 | + flow, |
| 194 | + grid=grid, |
| 195 | + axes=grid.axes(), |
| 196 | + to_axes=to_axes, |
| 197 | + which=["du/dxx", "du/dxy", "dv/dxy", "dv/dyy"], |
| 198 | + **config, |
| 199 | + ) |
| 200 | + for key, deriv in derivs.items(): |
| 201 | + print(f"- max(abs({key})): {deriv.abs().max().item():.5f}") |
| 202 | + print() |
| 203 | + print("\n") |
| 204 | + |
| 205 | +# %% |
0 commit comments