diff --git a/nibabel/volumeutils.py b/nibabel/volumeutils.py index 225062b2c..d61a41e67 100644 --- a/nibabel/volumeutils.py +++ b/nibabel/volumeutils.py @@ -10,21 +10,35 @@ from __future__ import annotations import gzip +import io import sys +import typing as ty import warnings -from collections import OrderedDict +from bz2 import BZ2File from functools import reduce -from operator import mul +from operator import getitem, mul from os.path import exists, splitext import numpy as np from .casting import OK_FLOATS, shared_range from .externals.oset import OrderedSet -from .openers import BZ2File, IndexedGzipFile +from .openers import IndexedGzipFile from .optpkg import optional_package -pyzstd, HAVE_ZSTD, _ = optional_package('pyzstd') +if ty.TYPE_CHECKING: # pragma: no cover + import numpy.typing as npt + import pyzstd + + HAVE_ZSTD = True + + Scalar = np.number | float + + K = ty.TypeVar('K') + V = ty.TypeVar('V') + DT = ty.TypeVar('DT', bound=np.generic) +else: + pyzstd, HAVE_ZSTD, _ = optional_package('pyzstd') sys_is_le = sys.byteorder == 'little' native_code = sys_is_le and '<' or '>' @@ -42,7 +56,7 @@ default_compresslevel = 1 #: file-like classes known to hold compressed data -COMPRESSED_FILE_LIKES: tuple[type, ...] = (gzip.GzipFile, BZ2File, IndexedGzipFile) +COMPRESSED_FILE_LIKES: tuple[type[io.IOBase], ...] = (gzip.GzipFile, BZ2File, IndexedGzipFile) # Enable .zst support if pyzstd installed. if HAVE_ZSTD: @@ -83,7 +97,14 @@ class Recoder: 2 """ - def __init__(self, codes, fields=('code',), map_maker=OrderedDict): + fields: tuple[str, ...] + + def __init__( + self, + codes: ty.Sequence[ty.Sequence[ty.Hashable]], + fields: ty.Sequence[str] = ('code',), + map_maker: type[ty.Mapping[ty.Hashable, ty.Hashable]] = dict, + ): """Create recoder object ``codes`` give a sequence of code, alias sequences @@ -121,7 +142,14 @@ def __init__(self, codes, fields=('code',), map_maker=OrderedDict): self.field1 = self.__dict__[fields[0]] self.add_codes(codes) - def add_codes(self, code_syn_seqs): + def __getattr__(self, key: str) -> ty.Mapping[ty.Hashable, ty.Hashable]: + # By setting this, we let static analyzers know that dynamic attributes will + # be dict-like (Mapping). + # However, __getattr__ is called if looking up the field in __dict__ fails, + # so we only get here if the attribute is really missing. + raise AttributeError(f'{self.__class__.__name__!r} object has no attribute {key!r}') + + def add_codes(self, code_syn_seqs: ty.Sequence[ty.Sequence[ty.Hashable]]) -> None: """Add codes to object Parameters @@ -155,7 +183,7 @@ def add_codes(self, code_syn_seqs): for field_ind, field_name in enumerate(self.fields): self.__dict__[field_name][alias] = code_syns[field_ind] - def __getitem__(self, key): + def __getitem__(self, key: ty.Hashable) -> ty.Hashable: """Return value from field1 dictionary (first column of values) Returns same value as ``obj.field1[key]`` and, with the @@ -168,13 +196,9 @@ def __getitem__(self, key): """ return self.field1[key] - def __contains__(self, key): + def __contains__(self, key: ty.Hashable) -> bool: """True if field1 in recoder contains `key`""" - try: - self.field1[key] - except KeyError: - return False - return True + return key in self.field1 def keys(self): """Return all available code and alias values @@ -190,7 +214,7 @@ def keys(self): """ return self.field1.keys() - def value_set(self, name=None): + def value_set(self, name: str | None = None) -> OrderedSet: """Return OrderedSet of possible returned values for column By default, the column is the first column. @@ -224,7 +248,7 @@ def value_set(self, name=None): endian_codes = Recoder(_endian_codes) -class DtypeMapper: +class DtypeMapper(ty.Dict[ty.Hashable, ty.Hashable]): """Specialized mapper for numpy dtypes We pass this mapper into the Recoder class to deal with numpy dtype @@ -242,26 +266,20 @@ class DtypeMapper: and return any matching values for the matching key. """ - def __init__(self): - self._dict = {} - self._dtype_keys = [] - - def keys(self): - return self._dict.keys() + def __init__(self) -> None: + super().__init__() + self._dtype_keys: list[np.dtype] = [] - def values(self): - return self._dict.values() - - def __setitem__(self, key, value): + def __setitem__(self, key: ty.Hashable, value: ty.Hashable) -> None: """Set item into mapping, checking for dtype keys Cache dtype keys for comparison test in __getitem__ """ - self._dict[key] = value - if hasattr(key, 'subdtype'): + super().__setitem__(key, value) + if isinstance(key, np.dtype): self._dtype_keys.append(key) - def __getitem__(self, key): + def __getitem__(self, key: ty.Hashable) -> ty.Hashable: """Get item from mapping, checking for dtype keys First do simple hash lookup, then check for a dtype key that has failed @@ -269,17 +287,20 @@ def __getitem__(self, key): to `key`. """ try: - return self._dict[key] + return super().__getitem__(key) except KeyError: pass - if hasattr(key, 'subdtype'): + if isinstance(key, np.dtype): for dt in self._dtype_keys: if key == dt: - return self._dict[dt] + return super().__getitem__(dt) raise KeyError(key) -def pretty_mapping(mapping, getterfunc=None): +def pretty_mapping( + mapping: ty.Mapping[K, V], + getterfunc: ty.Callable[[ty.Mapping[K, V], K], V] | None = None, +) -> str: """Make pretty string from mapping Adjusts text column to print values on basis of longest key. @@ -328,9 +349,8 @@ def pretty_mapping(mapping, getterfunc=None): longer_field : method string """ if getterfunc is None: - getterfunc = lambda obj, key: obj[key] - lens = [len(str(name)) for name in mapping] - mxlen = np.max(lens) + getterfunc = getitem + mxlen = max(len(str(name)) for name in mapping) fmt = '%%-%ds : %%s' % mxlen out = [] for name in mapping: @@ -339,7 +359,7 @@ def pretty_mapping(mapping, getterfunc=None): return '\n'.join(out) -def make_dt_codes(codes_seqs): +def make_dt_codes(codes_seqs: ty.Sequence[ty.Sequence]) -> Recoder: """Create full dt codes Recoder instance from datatype codes Include created numpy dtype (from numpy type) and opposite endian @@ -379,12 +399,19 @@ def make_dt_codes(codes_seqs): return Recoder(dt_codes, fields + ['dtype', 'sw_dtype'], DtypeMapper) -def _is_compressed_fobj(fobj): +def _is_compressed_fobj(fobj: io.IOBase) -> bool: """Return True if fobj represents a compressed data file-like object""" return isinstance(fobj, COMPRESSED_FILE_LIKES) -def array_from_file(shape, in_dtype, infile, offset=0, order='F', mmap=True): +def array_from_file( + shape: tuple[int, ...], + in_dtype: np.dtype[DT], + infile: io.IOBase, + offset: int = 0, + order: ty.Literal['C', 'F'] = 'F', + mmap: bool | ty.Literal['c', 'r', 'r+'] = True, +) -> npt.NDArray[DT]: """Get array from file with specified shape, dtype and file offset Parameters @@ -429,24 +456,23 @@ def array_from_file(shape, in_dtype, infile, offset=0, order='F', mmap=True): """ if mmap not in (True, False, 'c', 'r', 'r+'): raise ValueError("mmap value should be one of True, False, 'c', " "'r', 'r+'") - if mmap is True: - mmap = 'c' in_dtype = np.dtype(in_dtype) # Get file-like object from Opener instance infile = getattr(infile, 'fobj', infile) if mmap and not _is_compressed_fobj(infile): + mode = 'c' if mmap is True else mmap try: # Try memmapping file on disk - return np.memmap(infile, in_dtype, mode=mmap, shape=shape, order=order, offset=offset) + return np.memmap(infile, in_dtype, mode=mode, shape=shape, order=order, offset=offset) # The error raised by memmap, for different file types, has # changed in different incarnations of the numpy routine except (AttributeError, TypeError, ValueError): pass if len(shape) == 0: - return np.array([]) + return np.array([], in_dtype) # Use reduce and mul to work around numpy integer overflow n_bytes = reduce(mul, shape) * in_dtype.itemsize if n_bytes == 0: - return np.array([]) + return np.array([], in_dtype) # Read data from file infile.seek(offset) if hasattr(infile, 'readinto'): @@ -462,7 +488,7 @@ def array_from_file(shape, in_dtype, infile, offset=0, order='F', mmap=True): f'Expected {n_bytes} bytes, got {n_read} bytes from ' f"{getattr(infile, 'name', 'object')}\n - could the file be damaged?" ) - arr = np.ndarray(shape, in_dtype, buffer=data_bytes, order=order) + arr: np.ndarray = np.ndarray(shape, in_dtype, buffer=data_bytes, order=order) if needs_copy: return arr.copy() arr.flags.writeable = True @@ -470,17 +496,17 @@ def array_from_file(shape, in_dtype, infile, offset=0, order='F', mmap=True): def array_to_file( - data, - fileobj, - out_dtype=None, - offset=0, - intercept=0.0, - divslope=1.0, - mn=None, - mx=None, - order='F', - nan2zero=True, -): + data: npt.ArrayLike, + fileobj: io.IOBase, + out_dtype: np.dtype | None = None, + offset: int = 0, + intercept: Scalar = 0.0, + divslope: Scalar | None = 1.0, + mn: Scalar | None = None, + mx: Scalar | None = None, + order: ty.Literal['C', 'F'] = 'F', + nan2zero: bool = True, +) -> None: """Helper function for writing arrays to file objects Writes arrays as scaled by `intercept` and `divslope`, and clipped @@ -562,8 +588,7 @@ def array_to_file( True """ # Shield special case - div_none = divslope is None - if not np.all(np.isfinite((intercept, 1.0 if div_none else divslope))): + if not np.isfinite(np.array((intercept, 1.0 if divslope is None else divslope))).all(): raise ValueError('divslope and intercept must be finite') if divslope == 0: raise ValueError('divslope cannot be zero') @@ -575,7 +600,7 @@ def array_to_file( out_dtype = np.dtype(out_dtype) if offset is not None: seek_tell(fileobj, offset) - if div_none or (mn, mx) == (0, 0) or ((mn is not None and mx is not None) and mx < mn): + if divslope is None or (mn, mx) == (0, 0) or ((mn is not None and mx is not None) and mx < mn): write_zeros(fileobj, data.size * out_dtype.itemsize) return if order not in 'FC': @@ -707,17 +732,17 @@ def array_to_file( def _write_data( - data, - fileobj, - out_dtype, - order, - in_cast=None, - pre_clips=None, - inter=0.0, - slope=1.0, - post_clips=None, - nan_fill=None, -): + data: np.ndarray, + fileobj: io.IOBase, + out_dtype: np.dtype, + order: ty.Literal['C', 'F'], + in_cast: np.dtype | None = None, + pre_clips: tuple[Scalar | None, Scalar | None] | None = None, + inter: Scalar | np.ndarray = 0.0, + slope: Scalar | np.ndarray = 1.0, + post_clips: tuple[Scalar | None, Scalar | None] | None = None, + nan_fill: Scalar | None = None, +) -> None: """Write array `data` to `fileobj` as `out_dtype` type, layout `order` Does not modify `data` in-place. @@ -774,7 +799,9 @@ def _write_data( fileobj.write(dslice.tobytes()) -def _dt_min_max(dtype_like, mn=None, mx=None): +def _dt_min_max( + dtype_like: npt.DTypeLike, mn: Scalar | None = None, mx: Scalar | None = None +) -> tuple[Scalar, Scalar]: dt = np.dtype(dtype_like) if dt.kind in 'fc': dt_mn, dt_mx = (-np.inf, np.inf) @@ -786,20 +813,25 @@ def _dt_min_max(dtype_like, mn=None, mx=None): return dt_mn if mn is None else mn, dt_mx if mx is None else mx -_CSIZE2FLOAT = {8: np.float32, 16: np.float64, 24: np.longdouble, 32: np.longdouble} +_CSIZE2FLOAT: dict[int, type[np.floating]] = { + 8: np.float32, + 16: np.float64, + 24: np.longdouble, + 32: np.longdouble, +} -def _matching_float(np_type): +def _matching_float(np_type: npt.DTypeLike) -> type[np.floating]: """Return floating point type matching `np_type`""" dtype = np.dtype(np_type) if dtype.kind not in 'cf': raise ValueError('Expecting float or complex type as input') - if dtype.kind in 'f': + if issubclass(dtype.type, np.floating): return dtype.type return _CSIZE2FLOAT[dtype.itemsize] -def write_zeros(fileobj, count, block_size=8194): +def write_zeros(fileobj: io.IOBase, count: int, block_size: int = 8194) -> None: """Write `count` zero bytes to `fileobj` Parameters @@ -819,7 +851,7 @@ def write_zeros(fileobj, count, block_size=8194): fileobj.write(b'\x00' * rem) -def seek_tell(fileobj, offset, write0=False): +def seek_tell(fileobj: io.IOBase, offset: int, write0: bool = False) -> None: """Seek in `fileobj` or check we're in the right place already Parameters @@ -849,7 +881,11 @@ def seek_tell(fileobj, offset, write0=False): assert fileobj.tell() == offset -def apply_read_scaling(arr, slope=None, inter=None): +def apply_read_scaling( + arr: np.ndarray, + slope: Scalar | None = None, + inter: Scalar | None = None, +) -> np.ndarray: """Apply scaling in `slope` and `inter` to array `arr` This is for loading the array from a file (as opposed to the reverse @@ -888,23 +924,28 @@ def apply_read_scaling(arr, slope=None, inter=None): return arr shape = arr.shape # Force float / float upcasting by promoting to arrays - arr, slope, inter = (np.atleast_1d(v) for v in (arr, slope, inter)) + slope1d, inter1d = (np.atleast_1d(v) for v in (slope, inter)) + arr = np.atleast_1d(arr) if arr.dtype.kind in 'iu': # int to float; get enough precision to avoid infs # Find floating point type for which scaling does not overflow, # starting at given type - default = slope.dtype.type if slope.dtype.kind == 'f' else np.float64 - ftype = int_scinter_ftype(arr.dtype, slope, inter, default) - slope = slope.astype(ftype) - inter = inter.astype(ftype) - if slope != 1.0: - arr = arr * slope - if inter != 0.0: - arr = arr + inter + default = slope1d.dtype.type if slope1d.dtype.kind == 'f' else np.float64 + ftype = int_scinter_ftype(arr.dtype, slope1d, inter1d, default) + slope1d = slope1d.astype(ftype) + inter1d = inter1d.astype(ftype) + if slope1d != 1.0: + arr = arr * slope1d + if inter1d != 0.0: + arr = arr + inter1d return arr.reshape(shape) -def working_type(in_type, slope=1.0, inter=0.0): +def working_type( + in_type: npt.DTypeLike, + slope: npt.ArrayLike = 1.0, + inter: npt.ArrayLike = 0.0, +) -> type[np.number]: """Return array type from applying `slope`, `inter` to array of `in_type` Numpy type that results from an array of type `in_type` being combined with @@ -935,19 +976,22 @@ def working_type(in_type, slope=1.0, inter=0.0): `in_type`. """ val = np.array([1], dtype=in_type) - slope = np.array(slope) - inter = np.array(inter) # Don't use real values to avoid overflows. Promote to 1D to avoid scalar # casting rules. Don't use ones_like, zeros_like because of a bug in numpy # <= 1.5.1 in converting complex192 / complex256 scalars. if inter != 0: - val = val + np.array([0], dtype=inter.dtype) + val = val + np.array([0], dtype=np.array(inter).dtype) if slope != 1: - val = val / np.array([1], dtype=slope.dtype) + val = val / np.array([1], dtype=np.array(slope).dtype) return val.dtype.type -def int_scinter_ftype(ifmt, slope=1.0, inter=0.0, default=np.float32): +def int_scinter_ftype( + ifmt: type[np.integer], + slope: npt.ArrayLike = 1.0, + inter: npt.ArrayLike = 0.0, + default: type[np.floating] = np.float32, +) -> type[np.floating]: """float type containing int type `ifmt` * `slope` + `inter` Return float type that can represent the max and the min of the `ifmt` type @@ -999,7 +1043,12 @@ def int_scinter_ftype(ifmt, slope=1.0, inter=0.0, default=np.float32): raise ValueError('Overflow using highest floating point type') -def best_write_scale_ftype(arr, slope=1.0, inter=0.0, default=np.float32): +def best_write_scale_ftype( + arr: np.ndarray, + slope: npt.ArrayLike = 1.0, + inter: npt.ArrayLike = 0.0, + default: type[np.number] = np.float32, +) -> type[np.floating]: """Smallest float type to contain range of ``arr`` after scaling Scaling that will be applied to ``arr`` is ``(arr - inter) / slope``. @@ -1063,7 +1112,11 @@ def best_write_scale_ftype(arr, slope=1.0, inter=0.0, default=np.float32): return OK_FLOATS[-1] -def better_float_of(first, second, default=np.float32): +def better_float_of( + first: npt.DTypeLike, + second: npt.DTypeLike, + default: type[np.floating] = np.float32, +) -> type[np.floating]: """Return more capable float type of `first` and `second` Return `default` if neither of `first` or `second` is a float @@ -1097,19 +1150,22 @@ def better_float_of(first, second, default=np.float32): first = np.dtype(first) second = np.dtype(second) default = np.dtype(default).type - kinds = (first.kind, second.kind) - if 'f' not in kinds: - return default - if kinds == ('f', 'f'): - if first.itemsize >= second.itemsize: - return first.type - return second.type - if first.kind == 'f': + if issubclass(first.type, np.floating): + if issubclass(second.type, np.floating) and first.itemsize < second.itemsize: + return second.type return first.type - return second.type + if issubclass(second.type, np.floating): + return second.type + return default -def _ftype4scaled_finite(tst_arr, slope, inter, direction='read', default=np.float32): +def _ftype4scaled_finite( + tst_arr: np.ndarray, + slope: npt.ArrayLike, + inter: npt.ArrayLike, + direction: ty.Literal['read', 'write'] = 'read', + default: type[np.floating] = np.float32, +) -> type[np.floating]: """Smallest float type for scaling of `tst_arr` that does not overflow""" assert direction in ('read', 'write') if default not in OK_FLOATS and default is np.longdouble: @@ -1120,7 +1176,6 @@ def _ftype4scaled_finite(tst_arr, slope, inter, direction='read', default=np.flo tst_arr = np.atleast_1d(tst_arr) slope = np.atleast_1d(slope) inter = np.atleast_1d(inter) - overflow_filter = ('error', '.*overflow.*', RuntimeWarning) for ftype in OK_FLOATS[def_ind:]: tst_trans = tst_arr.copy() slope = slope.astype(ftype) @@ -1128,7 +1183,7 @@ def _ftype4scaled_finite(tst_arr, slope, inter, direction='read', default=np.flo try: with warnings.catch_warnings(): # Error on overflows to short circuit the logic - warnings.filterwarnings(*overflow_filter) + warnings.filterwarnings('error', '.*overflow.*', RuntimeWarning) if direction == 'read': # as in reading of image from disk if slope != 1.0: tst_trans = tst_trans * slope @@ -1147,7 +1202,22 @@ def _ftype4scaled_finite(tst_arr, slope, inter, direction='read', default=np.flo raise ValueError('Overflow using highest floating point type') -def finite_range(arr, check_nan=False): +@ty.overload +def finite_range( + arr: npt.ArrayLike, check_nan: ty.Literal[False] = False +) -> tuple[Scalar, Scalar]: + ... # pragma: no cover + + +@ty.overload +def finite_range(arr: npt.ArrayLike, check_nan: ty.Literal[True]) -> tuple[Scalar, Scalar, bool]: + ... # pragma: no cover + + +def finite_range( + arr: npt.ArrayLike, + check_nan: bool = False, +) -> tuple[Scalar, Scalar, bool] | tuple[Scalar, Scalar]: """Get range (min, max) or range and flag (min, max, has_nan) from `arr` Parameters @@ -1195,7 +1265,9 @@ def finite_range(arr, check_nan=False): """ arr = np.asarray(arr) if arr.size == 0: - return (np.inf, -np.inf) + (False,) * check_nan + if check_nan: + return (np.inf, -np.inf, False) + return (np.inf, -np.inf) # Resort array to slowest->fastest memory change indices stride_order = np.argsort(arr.strides)[::-1] sarr = arr.transpose(stride_order) @@ -1243,7 +1315,11 @@ def finite_range(arr, check_nan=False): return np.nanmin(mins), np.nanmax(maxes) -def shape_zoom_affine(shape, zooms, x_flip=True): +def shape_zoom_affine( + shape: ty.Sequence[int] | np.ndarray, + zooms: ty.Sequence[float] | np.ndarray, + x_flip: bool = True, +) -> np.ndarray: """Get affine implied by given shape and zooms We get the translations from the center of the image (implied by @@ -1305,7 +1381,7 @@ def shape_zoom_affine(shape, zooms, x_flip=True): return aff -def rec2dict(rec): +def rec2dict(rec: np.ndarray) -> dict[str, np.generic | np.ndarray]: """Convert recarray to dictionary Also converts scalar values to scalars @@ -1338,7 +1414,7 @@ def rec2dict(rec): return dct -def fname_ext_ul_case(fname): +def fname_ext_ul_case(fname: str) -> str: """`fname` with ext changed to upper / lower case if file exists Check for existence of `fname`. If it does exist, return unmodified. If diff --git a/pyproject.toml b/pyproject.toml index e002f6d05..83556a6b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,11 +74,12 @@ test = [ ] typing = [ "mypy", + "importlib_resources", + "pydicom", "pytest", + "pyzstd", "types-setuptools", "types-Pillow", - "pydicom", - "importlib_resources", ] zstd = ["pyzstd >= 0.14.3"]