diff --git a/pyproject.toml b/pyproject.toml index cbb5cf278..d06d4817b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,13 +21,14 @@ license = "Apache-2.0" requires-python = ">=3.10" dependencies = [ "attrs", - "dipy>=1.5.0", + "dipy>=1.10.0", "joblib", "nipype>= 1.5.1,<2.0", "nitransforms>=22.0.0,<24", "nireports", "numpy>=1.21.3", "nest-asyncio>=1.5.1", + "ray", "scikit-image>=0.15.0", "scikit_learn>=1.3.0", "scipy>=1.8.0", diff --git a/src/nifreeze/cli/parser.py b/src/nifreeze/cli/parser.py index 16ad2459c..93d6f9848 100644 --- a/src/nifreeze/cli/parser.py +++ b/src/nifreeze/cli/parser.py @@ -91,6 +91,8 @@ def build_parser() -> ArgumentParser: ) parser.add_argument( "--nthreads", + "--omp-nthreads", + "--ncpus", action="store", type=int, default=None, diff --git a/src/nifreeze/data/base.py b/src/nifreeze/data/base.py index 4acb1e252..5c9be4f78 100644 --- a/src/nifreeze/data/base.py +++ b/src/nifreeze/data/base.py @@ -97,6 +97,16 @@ def __len__(self) -> int: return self.dataobj.shape[-1] + @property + def shape3d(self): + """Get the shape of the 3D volume.""" + return self.dataobj.shape[:3] + + @property + def size3d(self): + """Get the number of voxels in the 3D volume.""" + return np.prod(self.dataobj.shape[:3]) + def _getextra(self, idx: int | slice | tuple | np.ndarray) -> tuple[Unpack[Ts]]: return () # type: ignore[return-value] diff --git a/src/nifreeze/estimator.py b/src/nifreeze/estimator.py index 46d3f96f0..e0ec85e07 100644 --- a/src/nifreeze/estimator.py +++ b/src/nifreeze/estimator.py @@ -27,6 +27,7 @@ from os import cpu_count from pathlib import Path from tempfile import TemporaryDirectory +from timeit import default_timer as timer from typing import TypeVar from tqdm import tqdm @@ -42,7 +43,9 @@ DatasetT = TypeVar("DatasetT", bound=BaseDataset) +DEFAULT_CHUNK_SIZE: int = int(1e6) FIT_MSG = "Fit&predict" +PRE_MSG = "Predicted" REG_MSG = "Realign" @@ -109,6 +112,10 @@ def run(self, dataset: DatasetT, **kwargs) -> Self: dataset = result # type: ignore[assignment] n_jobs = kwargs.pop("n_jobs", None) or min(cpu_count() or 1, 8) + n_threads = kwargs.pop("omp_nthreads", None) or ((cpu_count() or 2) - 1) + + num_voxels = dataset.brainmask.sum() if dataset.brainmask is not None else dataset.size3d + chunk_size = DEFAULT_CHUNK_SIZE * (n_threads or 1) # Prepare iterator iterfunc = getattr(iterators, f"{self._strategy}_iterator") @@ -116,6 +123,9 @@ def run(self, dataset: DatasetT, **kwargs) -> Self: # Initialize model if isinstance(self._model, str): + if self._model.endswith("dti"): + self._model_kwargs["step"] = chunk_size + # Factory creates the appropriate model and pipes arguments model = ModelFactory.init( model=self._model, @@ -125,10 +135,25 @@ def run(self, dataset: DatasetT, **kwargs) -> Self: else: model = self._model + fit_pred_kwargs = { + "n_jobs": n_jobs, + "omp_nthreads": n_threads, + } + + if model.__class__.__name__ == "DTIModel": + fit_pred_kwargs["step"] = chunk_size + + print(f"Dataset size: {num_voxels}x{len(dataset)}.") + print(f"Parallel execution: {fit_pred_kwargs}.") + print(f"Model: {model}.") + if self._single_fit: - model.fit_predict(None, n_jobs=n_jobs) + print("Fitting 'single' model started ...") + start = timer() + model.fit_predict(None, **fit_pred_kwargs) + print(f"Fitting 'single' model finished, elapsed {timer() - start}s.") - kwargs["num_threads"] = kwargs.pop("omp_nthreads", None) or kwargs.pop("num_threads", None) + kwargs["num_threads"] = n_threads kwargs = self._align_kwargs | kwargs dataset_length = len(dataset) @@ -151,15 +176,16 @@ def run(self, dataset: DatasetT, **kwargs) -> Self: pbar.set_description_str(f"{FIT_MSG: <16} vol. <{i}>") # fit the model - test_set = dataset[i] predicted = model.fit_predict( # type: ignore[union-attr] i, - n_jobs=n_jobs, + **fit_pred_kwargs, ) + pbar.set_description_str(f"{PRE_MSG: <16} vol. <{i}>") + # prepare data for running ANTs predicted_path, volume_path, init_path = _prepare_registration_data( - test_set[0], + dataset[i][0], # Access the target volume predicted, dataset.affine, i, diff --git a/src/nifreeze/model/_dipy.py b/src/nifreeze/model/_dipy.py index adc1b15b6..2afa2f080 100644 --- a/src/nifreeze/model/_dipy.py +++ b/src/nifreeze/model/_dipy.py @@ -29,6 +29,7 @@ import numpy as np from dipy.core.gradients import GradientTable from dipy.reconst.base import ReconstModel +from dipy.reconst.multi_voxel import multi_voxel_fit from sklearn.gaussian_process import GaussianProcessRegressor from nifreeze.model.gpr import ( @@ -38,6 +39,11 @@ ) +@multi_voxel_fit +def multi_fit(obj, data, *, mask=None, **kwargs): + return obj.fit(data, *obj.args, mask=mask) + + def gp_prediction( model: GaussianProcessRegressor, gtab: GradientTable | np.ndarray, diff --git a/src/nifreeze/model/dmri.py b/src/nifreeze/model/dmri.py index ca8a78fa6..823ed596a 100644 --- a/src/nifreeze/model/dmri.py +++ b/src/nifreeze/model/dmri.py @@ -25,7 +25,6 @@ import numpy as np from dipy.core.gradients import gradient_table_from_bvals_bvecs -from joblib import Parallel, delayed from nifreeze.data.dmri import ( DEFAULT_CLIP_PERCENTILE, @@ -38,16 +37,6 @@ B_MIN = 50 -def _exec_fit(model, data, chunk=None): - retval = model.fit(data) - return retval, chunk - - -def _exec_predict(model, chunk=None, **kwargs): - """Propagate model parameters and call predict.""" - return np.squeeze(model.predict(**kwargs)), chunk - - class BaseDWIModel(BaseModel): """Interface and default methods for DWI models.""" @@ -57,7 +46,7 @@ class BaseDWIModel(BaseModel): "_S0": "The S0 (b=0 reference signal) that will be fed into DIPY models", "_model_class": "Defining a model class, DIPY models are instantiated automagically", "_modelargs": "Arguments acceptable by the underlying DIPY-like model.", - "_models": "List with one or more (if parallel execution) model instances", + "_model_fit": "Fitted model", } def __init__(self, dataset: DWI, max_b: float | int | None = None, **kwargs): @@ -104,11 +93,9 @@ def __init__(self, dataset: DWI, max_b: float | int | None = None, **kwargs): super().__init__(dataset, **kwargs) - def _fit(self, index: int | None = None, n_jobs=None, **kwargs): + def _fit(self, index: int | None = None, n_jobs=None, omp_nthreads=None, **kwargs): """Fit the model chunk-by-chunk asynchronously""" - n_jobs = n_jobs or 1 - if self._locked_fit is not None: return n_jobs @@ -136,25 +123,21 @@ def _fit(self, index: int | None = None, n_jobs=None, **kwargs): class_name, )(gtab, **kwargs) - # One single CPU - linear execution (full model) - if n_jobs == 1: - _modelfit, _ = _exec_fit(model, data) - self._models = [_modelfit] - return 1 + fitargs = {"engine": "ray", "n_jobs": n_jobs} if n_jobs > 1 else {} - # Split data into chunks of group of slices - data_chunks = np.array_split(data, n_jobs) + if "step" in kwargs: + fitargs["step"] = kwargs["step"] - self._models = [None] * n_jobs + try: + self._model_fit = model.fit(data, **fitargs) + except TypeError: + from nifreeze.model._dipy import multi_fit - # Parallelize process with joblib - with Parallel(n_jobs=n_jobs) as executor: - results = executor( - delayed(_exec_fit)(model, dchunk, i) for i, dchunk in enumerate(data_chunks) + self._model_fit = multi_fit( + model, + data, + **fitargs, ) - for submodel, rindex in results: - self._models[rindex] = submodel - return n_jobs def fit_predict(self, index: int | None = None, **kwargs): @@ -168,44 +151,35 @@ def fit_predict(self, index: int | None = None, **kwargs): """ - n_models = self._fit( + omp_nthreads = kwargs.pop("omp_nthreads", None) + n_jobs = kwargs.pop("n_jobs", None) + + brainmask = self._dataset.brainmask + self._fit( index, - n_jobs=kwargs.pop("n_jobs"), + n_jobs=n_jobs, + omp_nthreads=omp_nthreads, **kwargs, ) if index is None: + self._locked_fit = True return None + # Prepare gradient(s) for simulation gradient = self._dataset.gradients[:, index] - if "dipy" in getattr(self, "_model_class", ""): gradient = gradient_table_from_bvals_bvecs( gradient[np.newaxis, -1], gradient[np.newaxis, :-1] ) - - if n_models == 1: - predicted, _ = _exec_predict( - self._models[0], **(kwargs | {"gtab": gradient, "S0": self._S0}) + # Prediction + predicted = np.squeeze( + self._model_fit.predict( + gtab=gradient, + S0=self._S0, + **kwargs, ) - else: - predicted = [None] * n_models - S0 = np.array_split(self._S0, n_models) - - # Parallelize process with joblib - with Parallel(n_jobs=n_models) as executor: - results = executor( - delayed(_exec_predict)( - model, - chunk=i, - **(kwargs | {"gtab": gradient, "S0": S0[i]}), - ) - for i, model in enumerate(self._models) - ) - for subprediction, index in results: - predicted[index] = subprediction - - predicted = np.hstack(predicted) + ) retval = np.zeros_like(self._data_mask, dtype=self._dataset.dataobj.dtype) retval[self._data_mask, ...] = predicted