diff --git a/docs/trajectory_gradspec.rst b/docs/trajectory_gradspec.rst index 68f6341bd..289797279 100644 --- a/docs/trajectory_gradspec.rst +++ b/docs/trajectory_gradspec.rst @@ -32,7 +32,11 @@ The binary file format is specified as follows: +----------------+-------+---------+---------+------------------------------------------------------------------------+ | timestamp | FLOAT | 1 | n.a. | Time stamp when the binary is created | +----------------+-------+---------+---------+------------------------------------------------------------------------+ -| Empty places | FLOAT | 9 | n.a. | Yet unused : Default initialized with 0 | +| ADC pre-skip | FLOAT | 1 | n.a. | Gradient samples to skip before starting ADC, for pre-phasors | ++----------------+-------+---------+---------+------------------------------------------------------------------------+ +| ADC post-skip | FLOAT | 1 | n.a. | Gradient samples to skip at the end of trajectory by turning off ADC | ++----------------+-------+---------+---------+------------------------------------------------------------------------+ +| Empty places | FLOAT | 7 | n.a. | Yet unused : Default initialized with 0 | +----------------+-------+---------+---------+------------------------------------------------------------------------+ | kStarts | FLOAT | D*Nc | 1/m | K-space location start | +----------------+-------+---------+---------+------------------------------------------------------------------------+ diff --git a/pyproject.toml b/pyproject.toml index 6c491e56a..530d540fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ description = "MRI Non-Cartesian Fourier Operators with multiple computation bac authors = [{name="Pierre-antoine Comby", email="pierre-antoine.comby@crans.org"}] readme = "README.md" -dependencies = ["numpy", "scipy", "matplotlib", "tqdm"] +dependencies = ["numpy", "scipy", "matplotlib", "tqdm", "joblib"] requires-python = ">=3.9" dynamic = ["version"] diff --git a/src/mrinufft/io/nsp.py b/src/mrinufft/io/nsp.py index 08603fefe..7e4d116ce 100644 --- a/src/mrinufft/io/nsp.py +++ b/src/mrinufft/io/nsp.py @@ -8,6 +8,7 @@ from datetime import datetime import numpy as np +from typing import Optional from mrinufft.trajectories.utils import ( DEFAULT_GMAX, @@ -19,6 +20,7 @@ convert_gradients_to_slew_rates, convert_trajectory_to_gradients, ) +from mrinufft.trajectories.tools import get_gradient_amplitudes_to_travel_for_set_time from .siemens import read_siemens_rawdat @@ -29,7 +31,7 @@ def write_gradients( grad_filename: str, img_size: tuple[int, ...], FOV: tuple[float, ...], - in_out: bool = True, + TE_pos: float = 0.5, min_osf: int = 5, gamma: float = Gammas.HYDROGEN, version: float = 4.2, @@ -37,6 +39,8 @@ def write_gradients( timestamp: float | None = None, keep_txt_file: bool = False, final_positions: np.ndarray | None = None, + start_skip_samples: int = 0, + end_skip_samples: int = 0, ): """Create gradient file from gradients and initial positions. @@ -52,8 +56,10 @@ def write_gradients( Image size. FOV : tuple[float, ...] Field of view. - in_out : bool, optional - Whether it is In-Out trajectory?, by default True + TE_pos : float, optional + The ratio of trajectory when TE occurs, with 0 as start of + trajectory and 1 as end. By default 0.5, which is the + center of the trajectory (in-out trajectory). min_osf : int, optional Minimum oversampling factor needed at ADC, by default 5 gamma : float, optional @@ -69,6 +75,12 @@ def write_gradients( binary file, by default False final_positions : np.ndarray, optional Final positions. Shape (num_shots, dimension), by default None + start_skip_samples : int, optional + Number of samples to skip in ADC at start of each shot, by default 0 + This works only for version >= 5.1. + end_skip_samples : int, optional + Number of samples to skip in ADC at end of each shot, by default 0 + This works only for version >= 5.1. """ num_shots = gradients.shape[0] @@ -100,14 +112,12 @@ def write_gradients( file.write(str(num_shots) + "\n") file.write(str(num_samples_per_shot) + "\n") if version >= 4.1: - if not in_out: + if TE_pos == 0: if np.sum(initial_positions) != 0: warnings.warn( "The initial positions are not all zero for center-out trajectory" ) - file.write("0\n") - else: - file.write("0.5\n") + file.write(str(TE_pos) + "\n") # Write the maximum Gradient file.write(str(max_grad) + "\n") # Write recon Pipeline version tag @@ -119,6 +129,10 @@ def write_gradients( timestamp = float(datetime.now().timestamp()) file.write(str(timestamp) + "\n") left_over -= 1 + if version >= 5.1: + file.write(str(start_skip_samples) + "\n") + file.write(str(end_skip_samples) + "\n") + left_over -= 2 file.write(str("0\n" * left_over)) # Write all the k0 values file.write( @@ -165,7 +179,7 @@ def write_gradients( os.remove(grad_filename + ".txt") -def _pop_elements(array, num_elements=1, type="float"): +def _pop_elements(array, num_elements=1, type=np.float32): """Pop elements from an array. Parameters @@ -200,8 +214,11 @@ def write_trajectory( gamma: float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, check_constraints: bool = True, + TE_pos: float = 0.5, gmax: float = DEFAULT_GMAX, smax: float = DEFAULT_SMAX, + pregrad: str | None = None, + postgrad: str | None = None, version: float = 5, **kwargs, ): @@ -226,10 +243,26 @@ def write_trajectory( Gradient raster time in ms, by default 0.01 check_constraints : bool, optional Check scanner constraints, by default True + TE_pos : float, optional + The ratio of trajectory when TE occurs, with 0 as start of + trajectory and 1 as end. By default 0.5, which is the + center of the trajectory (in-out trajectory). gmax : float, optional Maximum gradient magnitude in T/m, by default 0.04 smax : float, optional Maximum slew rate in T/m/ms, by default 0.1 + pregrad : str, optional + Pregrad method, by default `prephase` + `prephase` will add a prephasing gradient to the start of the trajectory. + postgrad : str, optional + Postgrad method, by default 'slowdown_to_edge' + `slowdown_to_edge` will add a gradient to slow down to the edge of the k-space + along x-axis for all the shots i.e. go to (Kmax, 0, 0). + This is useful for sequences needing a spoiler at the end of the trajectory. + However, spoiler is still not added, it is expected that the sequence + handles the spoilers, which can be variable. + `slowdown_to_center` will add a gradient to slow down to the center + of the k-space. version: float, optional Trajectory versioning, by default 5 kwargs : dict, optional @@ -245,7 +278,46 @@ def write_trajectory( gamma=gamma, get_final_positions=True, ) - + Ns_to_skip_at_start = 0 + Ns_to_skip_at_end = 0 + scan_consts = { + "gamma": gamma, + "gmax": gmax, + "smax": smax, + "raster_time": raster_time, + } + if pregrad == "prephase": + if version < 5.1: + raise ValueError( + "pregrad is only supported for version >= 5.1, " + "please set version to 5.1 or higher." + ) + start_gradients = get_gradient_amplitudes_to_travel_for_set_time( + kspace_end_loc=initial_positions, + end_gradients=gradients[:, 0], + **scan_consts, + ) + initial_positions = np.zeros_like(initial_positions) + gradients = np.hstack([start_gradients, gradients]) + Ns_to_skip_at_start = start_gradients.shape[1] + if postgrad: + if version < 5.1: + raise ValueError( + "postgrad is only supported for version >= 5.1, " + "please set version to 5.1 or higher." + ) + edge_locations = np.zeros_like(final_positions) + if postgrad == "slowdown_to_edge": + # Always end at KMax, the spoilers can be handeled by the sequence. + edge_locations[..., 0] = img_size[0] / FOV[0] / 2 + end_gradients = get_gradient_amplitudes_to_travel_for_set_time( + kspace_end_loc=edge_locations, + start_gradients=gradients[:, -1], + kspace_start_loc=final_positions, + **scan_consts, + ) + gradients = np.hstack([gradients, end_gradients]) + Ns_to_skip_at_end = end_gradients.shape[1] # Check constraints if requested if check_constraints: slewrates, _ = convert_gradients_to_slew_rates(gradients, raster_time) @@ -261,6 +333,15 @@ def write_trajectory( f"Maximum gradient amplitude: {maxG:.3f} > {gmax:.3f}" f"Maximum slew rate: {maxS:.3f} > {smax:.3f}" ) + if pregrad != "prephase": + border_slew_rate = gradients[:, 0] / raster_time + if np.any(np.abs(border_slew_rate) > smax): + warnings.warn( + "Slew rate at start of trajectory exceeds maximum slew rate!" + f"Maximum slew rate: {np.max(np.abs(border_slew_rate)):.3f}" + f" > {smax:.3f}. Please use prephase gradient to avoid this " + " issue." + ) # Write gradients in file write_gradients( @@ -270,8 +351,11 @@ def write_trajectory( grad_filename=grad_filename, img_size=img_size, FOV=FOV, + TE_pos=TE_pos, gamma=gamma, version=version, + start_skip_samples=Ns_to_skip_at_start, + end_skip_samples=Ns_to_skip_at_end, **kwargs, ) @@ -279,7 +363,7 @@ def write_trajectory( def read_trajectory( grad_filename: str, dwell_time: float | str = DEFAULT_RASTER_TIME, - num_adc_samples: int = None, + num_adc_samples: int | None = None, gamma: Gammas | float = Gammas.HYDROGEN, raster_time: float = DEFAULT_RASTER_TIME, read_shots: bool = False, @@ -336,25 +420,27 @@ def read_trajectory( if dwell_time == "min_osf": dwell_time = raster_time / min_osf (num_shots, num_samples_per_shot), data = _pop_elements(data, 2, type="int") - if num_adc_samples is None: - if read_shots: - num_adc_samples = num_samples_per_shot + 1 - else: - num_adc_samples = int(num_samples_per_shot * (raster_time / dwell_time)) - if version >= 4.1: + if version > 4: TE, data = _pop_elements(data) grad_max, data = _pop_elements(data) recon_tag, data = _pop_elements(data) recon_tag = np.around(recon_tag, 2) left_over = 10 - if version >= 4.2: + if version > 4.1: timestamp, data = _pop_elements(data) timestamp = datetime.fromtimestamp(float(timestamp)) left_over -= 1 + if version > 5: + packed_skips, data = _pop_elements(data, num_elements=2, type="int") + start_skip_samples, end_skip_samples = packed_skips + left_over -= 2 + else: + start_skip_samples = 0 + end_skip_samples = 0 _, data = _pop_elements(data, left_over) initial_positions, data = _pop_elements(data, dimension * num_shots) initial_positions = np.reshape(initial_positions, (num_shots, dimension)) - if version >= 5: + if version > 4.5: final_positions, data = _pop_elements(data, dimension * num_shots) final_positions = np.reshape(final_positions, (num_shots, dimension)) dwell_time_ns = dwell_time * 1e6 @@ -365,12 +451,28 @@ def read_trajectory( data, dimension * num_samples_per_shot * num_shots, ) + # Convert gradients to T/m gradients = np.reshape( - grad_max * gradients, (num_shots * num_samples_per_shot, dimension) + grad_max * gradients * 1e-3, (num_shots, num_samples_per_shot, dimension) ) - # Convert gradients from mT/m to T/m - gradients = np.reshape(gradients * 1e-3, (-1, num_samples_per_shot, dimension)) - kspace_loc = np.zeros((num_shots, num_adc_samples, dimension)) + # Handle skipped samples + if start_skip_samples > 0: + start_location_updates = ( + np.sum(gradients[:, :start_skip_samples], axis=1) * raster_time * gamma + ) + initial_positions += start_location_updates + gradients = gradients[:, start_skip_samples:, :] + num_samples_per_shot -= start_skip_samples + if end_skip_samples > 0: + gradients = gradients[:, :-end_skip_samples, :] + num_samples_per_shot -= end_skip_samples + if num_adc_samples is None: + if read_shots: + # Acquire one extra sample at the end of each shot in read_shots mode + num_adc_samples = num_samples_per_shot + 1 + else: + num_adc_samples = int(num_samples_per_shot * (raster_time / dwell_time)) + kspace_loc = np.zeros((num_shots, num_adc_samples, dimension), dtype=np.float32) kspace_loc[:, 0, :] = initial_positions adc_times = dwell_time_ns * np.arange(1, num_adc_samples) Q, R = divmod(adc_times, gradient_raster_time_ns) diff --git a/src/mrinufft/trajectories/tools.py b/src/mrinufft/trajectories/tools.py index 87b3f6f73..4b67e1e15 100644 --- a/src/mrinufft/trajectories/tools.py +++ b/src/mrinufft/trajectories/tools.py @@ -1,14 +1,25 @@ """Functions to manipulate/modify trajectories.""" -from typing import Any, Callable, Literal +from typing import Any, Callable, Literal, Optional import numpy as np from numpy.typing import NDArray from scipy.interpolate import CubicSpline, interp1d from scipy.stats import norm +from scipy.optimize import minimize_scalar +from joblib import Parallel, delayed from .maths import Rv, Rx, Ry, Rz -from .utils import KMAX, VDSorder, VDSpdf, initialize_tilt +from .utils import ( + KMAX, + VDSorder, + VDSpdf, + initialize_tilt, + DEFAULT_GMAX, + DEFAULT_RASTER_TIME, + DEFAULT_SMAX, + Gammas, +) ################ # DIRECT TOOLS # @@ -369,6 +380,298 @@ def unepify(trajectory: NDArray, Ns_readouts: int, Ns_transitions: int) -> NDArr return trajectory +def _trapezoidal_area(gs, ge, gi, n_down, n_up, n_pl): + """Calculate the area traversed by the trapezoidal gradient waveform.""" + return 0.5 * (gs + gi) * (n_down + 1) + 0.5 * (ge + gi) * (n_up - 1) + n_pl * gi + + +def _trapezoidal_plateau_length( + gs, ge, gi, n_down, n_up, area_needed, ceil=False, buffer=0 +): + """Calculate the plateau length of the trapezoidal gradient waveform.""" + n_pl = ( + 0.5 + * (2 * area_needed - gs * (n_down + 1) - ge * (n_up - 1) + gi * (n_down + n_up)) + / (gi + np.finfo(gi.dtype).eps) + ) + if ceil: + n_pl = np.ceil(n_pl).astype(int) + return n_pl + buffer + + +def _trapezoidal_ramps(gs, ge, gi, smax, raster_time, ceil=False, buffer=0): + """Calculate the number of time steps for the ramp down and up.""" + n_ramp_down = np.abs(gi - gs) / (smax * raster_time) + n_ramp_up = np.abs(ge - gi) / (smax * raster_time) + if ceil: + n_ramp_down = np.ceil(n_ramp_down).astype(int) + n_ramp_up = np.ceil(n_ramp_up).astype(int) + return n_ramp_down + buffer, n_ramp_up + buffer + + +def _plateau_value(gs, ge, n_down, n_up, n_pl, area_needed): + """Calculate the value of the plateau of a trapezoidal gradient waveform.""" + return (2 * area_needed - (n_down + 1) * gs - (n_up - 1) * ge) / ( + n_down + n_up + 2 * n_pl + ) + + +def get_gradient_times_to_travel( + kspace_end_loc: Optional[NDArray] = None, + kspace_start_loc: Optional[NDArray] = None, + end_gradients: Optional[NDArray] = None, + start_gradients: Optional[NDArray] = None, + gamma: float = Gammas.Hydrogen, + raster_time: float = DEFAULT_RASTER_TIME, + gmax: float = DEFAULT_GMAX, + smax: float = DEFAULT_SMAX, + n_jobs: int = 1, +) -> tuple[NDArray, NDArray, NDArray, NDArray]: + """Get gradient timing values for trapezoidal or triangular waveforms. + + Compute gradient timing values to take k-space trajectories + from position ``ks`` with gradient ``gs`` to position ``ke`` with gradient + ``ge``, while being hardware compliant. + This function calculates the minimal number of time steps required for + the ramp down, ramp up, and plateau phases of the gradient waveform, + ensuring that the area traversed in k-space matches the desired + trajectory while adhering to the maximum gradient amplitude and + slew rate constraints. + + Parameters + ---------- + kspace_end_loc : NDArray + Ending k-space positions, shape (nb_shots, nb_dimension). + kspace_start_loc : NDArray, default None when it is 0 + Starting k-space positions, shape (nb_shots, nb_dimension). + end_gradients : NDArray, default None when it is 0 + Ending gradient values, shape (nb_shots, nb_dimension). + start_gradients : NDArray, default None when it is 0 + Starting gradient values, shape (nb_shots, nb_dimension). + gamma : float, optional + Gyromagnetic ratio in Hz/T. Default is Gammas.Hydrogen. + raster_time : float, optional + Time interval between gradient samples (s). Default is DEFAULT_RASTER_TIME. + gmax : float, optional + Maximum gradient amplitude (T/m). Default is DEFAULT_GMAX. + smax : float, optional + Maximum slew rate ``T/m/s``. Default is DEFAULT_SMAX. + n_jobs : int, optional + Number of parallel jobs to run for optimization, by default 1. + + Returns + ------- + n_ramp_down: The timing values for the ramp down phase. + n_ramp_up: The timing values for the ramp up phase. + n_plateau: The timing values for the plateau phase. + gi: The intermediate gradient values for trapezoidal or triangular waveforms. + + See Also + -------- + get_gradient_amplitudes_to_travel_for_set_time : + To directly get the waveforms required. This is most-likely what + you want to use. + """ + area_needed = (kspace_end_loc - kspace_start_loc) / gamma / raster_time + + def solve_gi_min_plateau(gs, ge, area): + def _residual(gi): + n_down, n_up = _trapezoidal_ramps(gs, ge, gi, smax, raster_time) + n_pl = _trapezoidal_plateau_length(gs, ge, gi, n_down, n_up, area) + if n_pl < 0: + return np.abs(n_pl) * 10000 # Penalize negative plateau + return n_pl * 100 # Penalize large plateau + + res = minimize_scalar( + _residual, + bounds=(-gmax, gmax), + method="bounded", + ) + if not res.success: + raise RuntimeError(f"Minimization failed: {res.message}") + return res.x + + gi = Parallel(n_jobs=n_jobs)( + delayed(solve_gi_min_plateau)( + start_gradients[i, j], + end_gradients[i, j], + area_needed[i, j], + ) + for i in range(start_gradients.shape[0]) + for j in range(start_gradients.shape[1]) + ) + gi = np.reshape(gi, start_gradients.shape) + n_ramp_down, n_ramp_up = _trapezoidal_ramps( + start_gradients, + end_gradients, + gi, + smax, + raster_time, + ceil=True, + buffer=1, + ) + n_plateau = _trapezoidal_plateau_length( + start_gradients, + end_gradients, + gi, + n_ramp_down, + n_ramp_up, + area_needed, + ceil=True, + ) + return n_ramp_down, n_ramp_up, n_plateau, gi + + +def get_gradient_amplitudes_to_travel_for_set_time( + kspace_end_loc: NDArray, + kspace_start_loc: Optional[NDArray] = None, + end_gradients: Optional[NDArray] = None, + start_gradients: Optional[NDArray] = None, + nb_raster_points: Optional[int] = None, + gamma: float = Gammas.Hydrogen, + raster_time: float = DEFAULT_RASTER_TIME, + gmax: float = DEFAULT_GMAX, + smax: float = DEFAULT_SMAX, + n_jobs: int = 1, +) -> NDArray: + """Calculate timings for trapezoidal or triangular gradient waveforms. + + Compute the gradient waveforms required to traverse from a starting k-space + position ``ks`` to an ending k-space position ``ke`` in a fixed number of time + steps ``N``, subject to hardware constraints on maximum gradient amplitude + ``gmax`` and slew rate ``smax``. The function supports both trapezoidal + and triangular gradient shapes, automatically adjusting the waveform to + meet the area constraint imposed by the desired k-space traversal + and the specified timing and hardware limits. + + Parameters + ---------- + kspace_end_loc : NDArray + Ending k-space positions, shape (nb_shots, nb_dimension). + kspace_start_loc : NDArray, default None when it is 0 + Starting k-space positions, shape (nb_shots, nb_dimension). + end_gradients : NDArray, default None when it is 0 + Ending gradient values, shape (nb_shots, nb_dimension). + start_gradients : NDArray, default None when it is 0 + Starting gradient values, shape (nb_shots, nb_dimension). + nb_raster_points : int, default None + Number of time steps (samples) for the gradient waveform. + If None, timing is calculated based on the area needed and hardware limits. + gamma : float, optional + Gyromagnetic ratio in Hz/T. Default is Gammas.Hydrogen. + raster_time : float, optional + Time interval between gradient samples (s). Default is DEFAULT_RASTER_TIME. + gmax : float, optional + Maximum gradient amplitude (T/m). Default is DEFAULT_GMAX. + smax : float, optional + Maximum slew rate (T/m/s). Default is DEFAULT_SMAX. + n_jobs : int, optional + Number of parallel jobs to run for optimization, by default 1. + + Returns + ------- + NDArray + Gradient waveforms, shape (nb_shots, nb_samples_per_shot, nb_dimension), + where each entry contains the gradient value at each time step for each shot + and dimension. + + Notes + ----- + - The function automatically determines whether a trapezoidal or triangular waveform + is needed based on the area constraint and hardware limits. + - The returned gradients are suitable for use in MRI pulse sequence design, + ensuring compliance with specified hardware constraints. + """ + kspace_end_loc = np.atleast_2d(kspace_end_loc) + if kspace_start_loc is None: + kspace_start_loc = np.zeros_like(kspace_end_loc) + if start_gradients is None: + start_gradients = np.zeros_like(kspace_end_loc) + if end_gradients is None: + end_gradients = np.zeros_like(kspace_end_loc) + kspace_start_loc = np.atleast_2d(kspace_start_loc) + start_gradients = np.atleast_2d(start_gradients) + end_gradients = np.atleast_2d(end_gradients) + + assert ( + kspace_start_loc.shape + == kspace_end_loc.shape + == start_gradients.shape + == end_gradients.shape + ), "All input arrays must have shape (nb_shots, nb_dimension)" + if nb_raster_points is None: + # Calculate the number of time steps based on the area needed + n_ramp_down, n_ramp_up, n_plateau, gi = get_gradient_times_to_travel( + kspace_end_loc=kspace_end_loc, + kspace_start_loc=kspace_start_loc, + end_gradients=end_gradients, + start_gradients=start_gradients, + gamma=gamma, + raster_time=raster_time, + gmax=gmax, + smax=smax, + ) + # Extra 2 buffer samples + nb_raster_points = np.max(n_ramp_down + n_ramp_up + n_plateau) + 2 + + area_needed = (kspace_end_loc - kspace_start_loc) / gamma / raster_time + + def solve_gi_fixed_N(gs, ge, area): + def _residual(gi): + n_down, n_up = _trapezoidal_ramps(gs, ge, gi, smax, raster_time, buffer=1) + n_pl = nb_raster_points - n_down - n_up + if n_pl < 0: + return np.abs(n_pl) # Penalize this + area_expr = _trapezoidal_area(gs, ge, gi, n_down, n_up, n_pl) + return np.abs(area - area_expr) + + res = minimize_scalar( + _residual, bounds=(-gmax, gmax), method="bounded", options={"xatol": 1e-10} + ) + if not res.success: + raise RuntimeError(f"Minimization failed: {res.message}") + return res.x + + gi = Parallel(n_jobs=n_jobs)( + delayed(solve_gi_fixed_N)( + start_gradients[i, j], + end_gradients[i, j], + area_needed[i, j], + ) + for i in range(start_gradients.shape[0]) + for j in range(start_gradients.shape[1]) + ) + gi = np.reshape(gi, start_gradients.shape) + n_ramp_down, n_ramp_up = _trapezoidal_ramps( + start_gradients, + end_gradients, + gi, + smax, + raster_time, + ceil=True, + ) + n_plateau = nb_raster_points - n_ramp_down - n_ramp_up + gi = _plateau_value( + start_gradients, end_gradients, n_ramp_down, n_ramp_up, n_plateau, area_needed + ) + nb_shots, nb_dimension = kspace_end_loc.shape + G = np.zeros((nb_shots, nb_raster_points, nb_dimension), dtype=np.float32) + for i in range(nb_shots): + for d in range(nb_dimension): + start = 0 + G[i, : n_ramp_down[i, d], d] = np.linspace( + start_gradients[i, d], gi[i, d], n_ramp_down[i, d], endpoint=False + ) + start += n_ramp_down[i, d] + if n_plateau[i, d] > 0: + G[i, start : start + n_plateau[i, d], d] = gi[i, d] + start += n_plateau[i, d] + G[i, start : start + n_ramp_up[i, d], d] = np.linspace( + gi[i, d], end_gradients[i, d], n_ramp_up[i, d], endpoint=False + ) + return G + + def prewind(trajectory: NDArray, Ns_transitions: int) -> NDArray: """Add pre-winding/positioning to the trajectory. diff --git a/tests/case_trajectories.py b/tests/case_trajectories.py index 4fc04b8cf..24207bd00 100644 --- a/tests/case_trajectories.py +++ b/tests/case_trajectories.py @@ -29,6 +29,11 @@ def case_random3D(self, M=200000, N=64, pdf="uniform", seed=0): # Have assymetric image size to better catch shape mismatch issues return samples, (N, N * 2, N + 10) + def case_in_out_radial2D(self, Nc=10, Ns=500, N=64): + """Create a 2D radial trajectory with in-out sampling.""" + trajectory = initialize_2D_radial(Nc, Ns, in_out=True) + return trajectory, (N, N) + def case_radial2D(self, Nc=10, Ns=500, N=64): """Create a 2D radial trajectory.""" trajectory = initialize_2D_radial(Nc, Ns) diff --git a/tests/test_io.py b/tests/test_io.py index 7d840fbb7..ef85f0eda 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -4,9 +4,17 @@ from mrinufft.io import read_trajectory, write_trajectory from mrinufft.io.utils import add_phase_to_kspace_with_shifts from mrinufft.trajectories.trajectory2D import initialize_2D_radial +from mrinufft.trajectories.utils import ( + Gammas, + DEFAULT_GMAX, + DEFAULT_SMAX, + DEFAULT_RASTER_TIME, +) +from mrinufft.trajectories.tools import get_gradient_amplitudes_to_travel_for_set_time from mrinufft.trajectories.trajectory3D import initialize_3D_cones from pytest_cases import parametrize, parametrize_with_cases from case_trajectories import CasesTrajectories +import pytest class CasesIO: @@ -17,68 +25,166 @@ def case_trajectory_2D(self): trajectory = initialize_2D_radial( Nc=32, Ns=256, tilt="uniform", in_out=False ).astype(np.float32) - return "2D", trajectory, (0.23, 0.23), (256, 256), False, 2, 42.576e3, 1.1 + return "2D", trajectory, (0.23, 0.23), (256, 256), 0.5, 2, 42.576e3, 1.1 def case_trajectory_3D(self): """Test the 3D Trajectory.""" trajectory = initialize_3D_cones( - Nc=32, Ns=256, tilt="uniform", in_out=True + Nc=32, Ns=512, tilt="uniform", in_out=True ).astype(np.float32) return ( "3D", trajectory, (0.23, 0.23, 0.1248), - (256, 256, 128), - True, + (64, 64, 32), + 0, 5, - 10e3, + Gammas.Na, 1.2, ) +@parametrize("gamma,raster_time", [(Gammas.Hydrogen, DEFAULT_RASTER_TIME)]) +@parametrize_with_cases( + "kspace_loc, shape", + cases=[ + CasesTrajectories.case_radial2D, + CasesTrajectories.case_radial3D, + CasesTrajectories.case_in_out_radial2D, + ], +) +@parametrize("gmax", [0.1, DEFAULT_GMAX]) +@parametrize("smax", [0.7, DEFAULT_SMAX]) +def test_trajectory_state_changer_start( + kspace_loc, shape, gamma, raster_time, gmax, smax +): + """Test the trajectory state changer.""" + dimension = len(shape) + resolution = dimension * (0.23 / 256,) + trajectory = kspace_loc / resolution + gradients = np.diff(trajectory, axis=1) / gamma / raster_time + GS = get_gradient_amplitudes_to_travel_for_set_time( + kspace_end_loc=trajectory[:, 0], + end_gradients=gradients[:, 0], + gamma=gamma, + raster_time=raster_time, + gmax=gmax, + smax=smax, + ) + # Hardware constraints check + assert np.all(np.abs(GS) <= gmax) + assert np.all(np.abs(np.diff(GS, axis=1) / raster_time) <= smax) + assert np.all(np.abs(GS[:, -1] - gradients[:, 0]) / raster_time < smax) + if np.all(trajectory[:, 0] == 0): + # If the trajectory starts at the origin, we can check that the first gradient is zero + assert np.all(GS.shape[1] < 10) + assert GS.shape[1] < 200 # Checks to ensure we don't have too many samples + # Check that ending location matches. + np.testing.assert_allclose( + np.sum(GS, axis=1) * gamma * raster_time, + trajectory[:, 0], + atol=1e-2 / min(resolution) / 2, + ) + # Check that gradients match. + np.testing.assert_allclose(GS[:, 0], 0, atol=1e-5) + + +@parametrize("gamma,raster_time", [(Gammas.Hydrogen, DEFAULT_RASTER_TIME)]) @parametrize_with_cases( - "name, trajectory, FOV, img_size, in_out, min_osf, gamma, recon_tag", + "kspace_loc, shape", + cases=[ + CasesTrajectories.case_radial2D, + CasesTrajectories.case_radial3D, + CasesTrajectories.case_in_out_radial2D, + ], +) +@parametrize("gmax", [0.1, DEFAULT_GMAX]) +@parametrize("smax", [0.7, DEFAULT_SMAX]) +def test_trajectory_state_changer_end( + kspace_loc, shape, gamma, raster_time, gmax, smax +): + dimension = len(shape) + resolution = dimension * (0.23 / 256,) + trajectory = kspace_loc / resolution + gradients = np.diff(trajectory, axis=1) / gamma / raster_time + GE = get_gradient_amplitudes_to_travel_for_set_time( + kspace_start_loc=trajectory[:, -1], + kspace_end_loc=np.zeros_like(trajectory[:, -1]), + start_gradients=gradients[:, -1], + gamma=gamma, + raster_time=raster_time, + gmax=gmax, + smax=smax, + ) + # Hardware constraints check + assert np.all(np.abs(GE) <= gmax) + assert np.all(np.abs(np.diff(GE, axis=1) / raster_time) <= smax) + assert np.all(np.abs(GE[:, -1]) / raster_time < smax) + assert GE.shape[1] < 200 # Checks to ensure we don't have too many samples + # Check that ending location matches. + np.testing.assert_allclose( + 0, + trajectory[:, -1] + np.sum(GE, axis=1) * gamma * raster_time, + atol=1e-2 / min(resolution) / 2, + ) + # Check that gradients match. + np.testing.assert_allclose(GE[:, 0], gradients[:, -1], atol=1e-5) + + +@parametrize_with_cases( + "name, trajectory, FOV, img_size, TE_pos, min_osf, gamma, recon_tag", cases=CasesIO, ) -@parametrize("version", [4.2, 5.0]) +@parametrize("version", [4.2, 5.0, 5.1]) +@parametrize("postgrad", [None, "slowdown_to_center", "slowdown_to_edge"]) +@parametrize("pregrad", [None, "prephase"]) def test_write_n_read( name, trajectory, FOV, img_size, - in_out, + TE_pos, min_osf, gamma, recon_tag, tmp_path, version, + postgrad, + pregrad, ): + if version < 5.1 and (postgrad is not None or pregrad is not None): + pytest.skip("postgrad 'slowdown_to_edge' is not supported in version < 5.0") """Test function which writes the trajectory and reads it back.""" + if np.all(trajectory[:, 0] == 0) and pregrad is not None: + pytest.skip("We dont need prephasors for UTE trajectories") + write_trajectory( trajectory=trajectory, FOV=FOV, img_size=img_size, check_constraints=True, grad_filename=str(tmp_path / name), - in_out=in_out, + TE_pos=TE_pos, version=version, min_osf=min_osf, recon_tag=recon_tag, gamma=gamma, + pregrad=pregrad, + postgrad=postgrad, ) read_traj, params = read_trajectory( str((tmp_path / name).with_suffix(".bin")), gamma=gamma, read_shots=True ) - assert params["version"] == version + np.testing.assert_allclose(params["version"], version) assert params["num_shots"] == trajectory.shape[0] assert params["num_samples_per_shot"] == trajectory.shape[1] - 1 - assert params["TE"] == (0.5 if in_out else 0) - assert params["gamma"] == gamma - assert params["recon_tag"] == recon_tag + np.testing.assert_almost_equal(params["TE"], TE_pos) + np.testing.assert_allclose(params["gamma"], gamma) + np.testing.assert_allclose(params["recon_tag"], recon_tag) assert params["min_osf"] == min_osf np.testing.assert_almost_equal(params["FOV"], FOV, decimal=6) np.testing.assert_equal(params["img_size"], img_size) - np.testing.assert_almost_equal(read_traj, trajectory, decimal=5) + np.testing.assert_almost_equal(read_traj, trajectory, decimal=4) @parametrize_with_cases( diff --git a/tests/test_offres_exp_approx.py b/tests/test_offres_exp_approx.py index e89f1932f..ef306f18d 100644 --- a/tests/test_offres_exp_approx.py +++ b/tests/test_offres_exp_approx.py @@ -110,7 +110,6 @@ def test_zmap_coeff(zmap, mask, array_interface): def test_b0_map_upsampling_warns_and_matches_shape(): """Test that MRIFourierCorrected upscales the b0_map and warns if shape mismatch exists.""" - shape_target = (16, 16, 16) b0_shape = (8, 8, 8)