diff --git a/src/nitypes/_arguments.py b/src/nitypes/_arguments.py new file mode 100644 index 00000000..2491154d --- /dev/null +++ b/src/nitypes/_arguments.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +import operator +from typing import SupportsFloat, SupportsIndex + +import numpy as np +import numpy.typing as npt + + +def arg_to_float( + arg_description: str, value: SupportsFloat | None, default_value: float | None = None +) -> float: + """Convert an argument to a float. + + >>> arg_to_float("xyz", 1.234) + 1.234 + >>> arg_to_float("xyz", 1234) + 1234.0 + >>> arg_to_float("xyz", np.float64(1.234)) + np.float64(1.234) + >>> arg_to_float("xyz", np.float32(1.234)) # doctest: +ELLIPSIS + 1.233999... + >>> arg_to_float("xyz", 1.234, 5.0) + 1.234 + >>> arg_to_float("xyz", None, 5.0) + 5.0 + >>> arg_to_float("xyz", None) + Traceback (most recent call last): + ... + TypeError: The xyz must be a floating point number. + + Provided value: None + >>> arg_to_float("xyz", "1.234") + Traceback (most recent call last): + ... + TypeError: The xyz must be a floating point number. + + Provided value: '1.234' + """ + if value is None: + if default_value is None: + raise TypeError( + f"The {arg_description} must be a floating point number.\n\n" + f"Provided value: {value!r}" + ) + value = default_value + + if not isinstance(value, float): + try: + # Use value.__float__() because float(value) also accepts strings. + return value.__float__() + except Exception: + raise TypeError( + f"The {arg_description} must be a floating point number.\n\n" + f"Provided value: {value!r}" + ) from None + + return value + + +def arg_to_int( + arg_description: str, value: SupportsIndex | None, default_value: int | None = None +) -> int: + """Convert an argument to a signed integer. + + >>> arg_to_int("xyz", 1234) + 1234 + >>> arg_to_int("xyz", 1234, -1) + 1234 + >>> arg_to_int("xyz", None, -1) + -1 + >>> arg_to_int("xyz", None) + Traceback (most recent call last): + ... + TypeError: The xyz must be an integer. + + Provided value: None + >>> arg_to_int("xyz", 1.234) + Traceback (most recent call last): + ... + TypeError: The xyz must be an integer. + + Provided value: 1.234 + >>> arg_to_int("xyz", "1234") + Traceback (most recent call last): + ... + TypeError: The xyz must be an integer. + + Provided value: '1234' + """ + if value is None: + if default_value is None: + raise TypeError( + f"The {arg_description} must be an integer.\n\n" f"Provided value: {value!r}" + ) + value = default_value + + if not isinstance(value, int): + try: + return operator.index(value) + except Exception: + raise TypeError( + f"The {arg_description} must be an integer.\n\n" f"Provided value: {value!r}" + ) from None + + return value + + +def arg_to_uint( + arg_description: str, value: SupportsIndex | None, default_value: int | None = None +) -> int: + """Convert an argument to an unsigned integer. + + >>> arg_to_uint("xyz", 1234) + 1234 + >>> arg_to_uint("xyz", 1234, 5000) + 1234 + >>> arg_to_uint("xyz", None, 5000) + 5000 + >>> arg_to_uint("xyz", -1234) + Traceback (most recent call last): + ... + ValueError: The xyz must be a non-negative integer. + + Provided value: -1234 + >>> arg_to_uint("xyz", "1234") + Traceback (most recent call last): + ... + TypeError: The xyz must be an integer. + + Provided value: '1234' + """ + value = arg_to_int(arg_description, value, default_value) + if value < 0: + raise ValueError( + f"The {arg_description} must be a non-negative integer.\n\n" + f"Provided value: {value!r}" + ) + return value + + +def validate_dtype(dtype: npt.DTypeLike, supported_dtypes: tuple[npt.DTypeLike, ...]) -> None: + """Validate a dtype-like object against a tuple of supported dtype-like objects. + + >>> validate_dtype(np.float64, (np.float64, np.intc, np.long,)) + >>> validate_dtype("float64", (np.float64, np.intc, np.long,)) + >>> validate_dtype(np.float64, (np.byte, np.short, np.intc, np.int_, np.long, np.longlong)) + Traceback (most recent call last): + ... + TypeError: The requested data type is not supported. + + Data type: float64 + Supported data types: int8, int16, int32, int64 + """ + if not isinstance(dtype, (type, np.dtype)): + dtype = np.dtype(dtype) + if not np.isdtype(dtype, supported_dtypes): + # Remove duplicate names because distinct types (e.g. int vs. long) may have the same name + # ("int32"). + supported_dtype_names = {np.dtype(d).name: None for d in supported_dtypes}.keys() + raise TypeError( + "The requested data type is not supported.\n\n" + f"Data type: {np.dtype(dtype)}\n" + f"Supported data types: {', '.join(supported_dtype_names)}" + ) diff --git a/src/nitypes/_exceptions.py b/src/nitypes/_exceptions.py new file mode 100644 index 00000000..fe011591 --- /dev/null +++ b/src/nitypes/_exceptions.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import sys + + +def add_note(exception: Exception, note: str) -> None: + """Add a note to an exception. + + >>> try: + ... raise ValueError("Oh no") + ... except Exception as e: + ... add_note(e, "p.s. This is bad") + ... raise + Traceback (most recent call last): + ... + ValueError: Oh no + p.s. This is bad + """ + if sys.version_info >= (3, 11): + exception.add_note(note) + else: + message = exception.args[0] + "\n" + note + exception.args = (message,) + exception.args[1:] diff --git a/src/nitypes/time/_conversion.py b/src/nitypes/time/_conversion.py index ad54c2cf..019ea22d 100644 --- a/src/nitypes/time/_conversion.py +++ b/src/nitypes/time/_conversion.py @@ -32,7 +32,7 @@ def convert_datetime(requested_type: type[_TDateTime], value: _AnyDateTime, /) - @singledispatch def _convert_to_dt_datetime(value: object, /) -> dt.datetime: - raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value}") + raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value!r}") @_convert_to_dt_datetime.register @@ -57,7 +57,7 @@ def _(value: ht.datetime, /) -> dt.datetime: @singledispatch def _convert_to_ht_datetime(value: object, /) -> ht.datetime: - raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value}") + raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value!r}") @_convert_to_ht_datetime.register @@ -98,7 +98,7 @@ def convert_timedelta(requested_type: type[_TTimeDelta], value: _AnyTimeDelta, / @singledispatch def _convert_to_dt_timedelta(value: object, /) -> dt.timedelta: - raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value}") + raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value!r}") @_convert_to_dt_timedelta.register @@ -113,7 +113,7 @@ def _(value: ht.timedelta, /) -> dt.timedelta: @singledispatch def _convert_to_ht_timedelta(value: object, /) -> ht.timedelta: - raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value}") + raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value!r}") @_convert_to_ht_timedelta.register diff --git a/src/nitypes/waveform/__init__.py b/src/nitypes/waveform/__init__.py index 64f49953..2b681653 100644 --- a/src/nitypes/waveform/__init__.py +++ b/src/nitypes/waveform/__init__.py @@ -5,6 +5,12 @@ ExtendedPropertyDictionary, ExtendedPropertyValue, ) +from nitypes.waveform._scaling import ( + NO_SCALING, + LinearScaleMode, + NoneScaleMode, + ScaleMode, +) from nitypes.waveform._timing._base import BaseTiming, SampleIntervalMode from nitypes.waveform._timing._precision import PrecisionTiming from nitypes.waveform._timing._standard import Timing @@ -14,8 +20,12 @@ "BaseTiming", "ExtendedPropertyDictionary", "ExtendedPropertyValue", + "LinearScaleMode", + "NO_SCALING", + "NoneScaleMode", "PrecisionTiming", "SampleIntervalMode", + "ScaleMode", "Timing", ] @@ -24,6 +34,9 @@ BaseTiming.__module__ = __name__ ExtendedPropertyDictionary.__module__ = __name__ # ExtendedPropertyValue is a TypeAlias +LinearScaleMode.__module__ = __name__ +NoneScaleMode.__module__ = __name__ PrecisionTiming.__module__ = __name__ SampleIntervalMode.__module__ = __name__ +ScaleMode.__module__ = __name__ Timing.__module__ = __name__ diff --git a/src/nitypes/waveform/_analog_waveform.py b/src/nitypes/waveform/_analog_waveform.py index 3fc785ce..b7c6f912 100644 --- a/src/nitypes/waveform/_analog_waveform.py +++ b/src/nitypes/waveform/_analog_waveform.py @@ -7,15 +7,16 @@ import numpy as np import numpy.typing as npt +from nitypes._arguments import arg_to_uint, validate_dtype from nitypes.waveform._extended_properties import ( CHANNEL_NAME, UNIT_DESCRIPTION, ExtendedPropertyDictionary, ) +from nitypes.waveform._scaling import NO_SCALING, ScaleMode from nitypes.waveform._timing._conversion import convert_timing from nitypes.waveform._timing._precision import PrecisionTiming from nitypes.waveform._timing._standard import Timing -from nitypes.waveform._utils import arg_to_uint, validate_dtype if sys.version_info < (3, 10): import array as std_array @@ -45,6 +46,13 @@ np.ulonglong, ) +_SCALED_DTYPES = ( + # Floating point + np.single, + np.double, +) + + # Note about NumPy type hints: # - At time of writing (April 2025), shape typing is still under development, so we do not # distinguish between 1D and 2D arrays in type hints. @@ -221,6 +229,7 @@ def from_array_2d( "_extended_properties", "_timing", "_precision_timing", + "_scale_mode", "__weakref__", ] @@ -230,6 +239,7 @@ def from_array_2d( _extended_properties: ExtendedPropertyDictionary _timing: Timing | None _precision_timing: PrecisionTiming | None + _scale_mode: ScaleMode # If neither dtype nor _data is specified, the type parameter defaults to np.float64. @overload @@ -319,8 +329,8 @@ def _init_with_new_array( start_index: SupportsIndex | None = None, capacity: SupportsIndex | None = None, ) -> None: - start_index = arg_to_uint("start index", start_index) - sample_count = arg_to_uint("sample count", sample_count) + start_index = arg_to_uint("start index", start_index, 0) + sample_count = arg_to_uint("sample count", sample_count, 0) capacity = arg_to_uint("capacity", capacity, sample_count) if dtype is None: @@ -347,6 +357,7 @@ def _init_with_new_array( self._extended_properties = ExtendedPropertyDictionary() self._timing = Timing.empty self._precision_timing = None + self._scale_mode = NO_SCALING def _init_with_provided_array( self, @@ -378,7 +389,7 @@ def _init_with_provided_array( f"Array length: {len(data)}" ) - start_index = arg_to_uint("start index", start_index) + start_index = arg_to_uint("start index", start_index, 0) if start_index > capacity: raise ValueError( "The start index must be less than or equal to the input array length.\n\n" @@ -401,17 +412,105 @@ def _init_with_provided_array( self._extended_properties = ExtendedPropertyDictionary() self._timing = Timing.empty self._precision_timing = None + self._scale_mode = NO_SCALING @property def raw_data(self) -> npt.NDArray[_ScalarType_co]: """The raw analog waveform data.""" return self._data[self._start_index : self._start_index + self._sample_count] + def get_raw_data( + self, start_index: SupportsIndex | None = 0, sample_count: SupportsIndex | None = None + ) -> npt.NDArray[_ScalarType_co]: + """Get a subset of the raw analog waveform data. + + Args: + start_index: The sample index at which the data begins. + sample_count: The number of samples to return. + + Returns: + A subset of the raw analog waveform data. + """ + start_index = arg_to_uint("sample index", start_index, 0) + if start_index > self.sample_count: + raise ValueError( + "The start index must be less than or equal to the number of samples in the waveform.\n\n" + f"Start index: {start_index}\n" + f"Number of samples: {self.sample_count}" + ) + + sample_count = arg_to_uint("sample count", sample_count, self.sample_count - start_index) + if start_index + sample_count > self.sample_count: + raise ValueError( + "The sum of the start index and sample count must be less than or equal to the number of samples in the waveform.\n\n" + f"Start index: {start_index}\n" + f"Sample count: {sample_count}\n" + f"Number of samples: {self.sample_count}" + ) + + return self.raw_data[start_index : start_index + sample_count] + @property def scaled_data(self) -> npt.NDArray[np.float64]: - """The scaled analog waveform data.""" - # TODO: implement scaling - return self.raw_data.astype(np.float64) + """The scaled analog waveform data. + + This property converts all of the waveform samples to float64 and scales them. To scale a + subset of the waveform or convert to float32, use the get_scaled_data() method instead. + """ + return self.get_scaled_data() + + # If dtype is not specified, _ScaledDataType defaults to np.float64. + @overload + def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self, + dtype: None = ..., + *, + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> npt.NDArray[np.float64]: ... + + @overload + def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self, + dtype: type[_ScalarType] | np.dtype[_ScalarType] = ..., + *, + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> npt.NDArray[_ScalarType]: ... + + @overload + def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self, + dtype: npt.DTypeLike = ..., + *, + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> npt.NDArray[Any]: ... + + def get_scaled_data( + self, + dtype: npt.DTypeLike = None, + *, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + ) -> npt.NDArray[Any]: + """Get a subset of the scaled analog waveform data with the specified dtype. + + Args: + dtype: The NumPy data type to use for scaled data. + start_index: The sample index at which to start scaling. + sample_count: The number of samples to scale. + + Returns: + A subset of the scaled analog waveform data. + """ + if dtype is None: + dtype = np.float64 + validate_dtype(dtype, _SCALED_DTYPES) + + raw_data = self.get_raw_data(start_index, sample_count) + converted_data = raw_data.astype(dtype) + return self._scale_mode._transform_data(converted_data) @property def sample_count(self) -> int: @@ -464,7 +563,7 @@ def channel_name(self) -> str: @channel_name.setter def channel_name(self, value: str) -> None: if not isinstance(value, str): - raise TypeError("The channel name must be a str.\n\n" f"Channel name: {value!r}") + raise TypeError("The channel name must be a str.\n\n" f"Provided value: {value!r}") self._extended_properties[CHANNEL_NAME] = value @property @@ -477,9 +576,7 @@ def unit_description(self) -> str: @unit_description.setter def unit_description(self, value: str) -> None: if not isinstance(value, str): - raise TypeError( - "The unit description must be a str.\n\n" f"Unit description: {value!r}" - ) + raise TypeError("The unit description must be a str.\n\n" f"Provided value: {value!r}") self._extended_properties[UNIT_DESCRIPTION] = value @property @@ -540,3 +637,16 @@ def precision_timing(self, value: PrecisionTiming) -> None: raise TypeError("The precision timing information must be a PrecisionTiming object.") self._precision_timing = value self._timing = None + + @property + def scale_mode(self) -> ScaleMode: + """The scale mode of the analog waveform.""" + return self._scale_mode + + @scale_mode.setter + def scale_mode(self, value: ScaleMode) -> None: + if not isinstance(value, ScaleMode): + raise TypeError( + "The scale mode must be a ScaleMode object.\n\n" f"Provided value: {value!r}" + ) + self._scale_mode = value diff --git a/src/nitypes/waveform/_scaling/__init__.py b/src/nitypes/waveform/_scaling/__init__.py new file mode 100644 index 00000000..764a5ce3 --- /dev/null +++ b/src/nitypes/waveform/_scaling/__init__.py @@ -0,0 +1,10 @@ +"""Waveform scaling data types for NI Python APIs.""" + +from nitypes.waveform._scaling._base import ScaleMode +from nitypes.waveform._scaling._linear import LinearScaleMode +from nitypes.waveform._scaling._none import NoneScaleMode + +__all__ = ["LinearScaleMode", "NO_SCALING", "NoneScaleMode", "ScaleMode"] + +NO_SCALING = NoneScaleMode() +"""A scale mode that does not scale data.""" diff --git a/src/nitypes/waveform/_scaling/_base.py b/src/nitypes/waveform/_scaling/_base.py new file mode 100644 index 00000000..cdc1699b --- /dev/null +++ b/src/nitypes/waveform/_scaling/_base.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TypeVar + +import numpy as np +import numpy.typing as npt + + +_ScalarType = TypeVar("_ScalarType", bound=np.generic) + + +class ScaleMode(ABC): + """An object that specifies how the waveform is scaled.""" + + __slots__ = () + + @abstractmethod + def _transform_data(self, data: npt.NDArray[_ScalarType]) -> npt.NDArray[_ScalarType]: + raise NotImplementedError diff --git a/src/nitypes/waveform/_scaling/_linear.py b/src/nitypes/waveform/_scaling/_linear.py new file mode 100644 index 00000000..a1c32646 --- /dev/null +++ b/src/nitypes/waveform/_scaling/_linear.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from typing import SupportsFloat + +import numpy.typing as npt + +from nitypes._arguments import arg_to_float +from nitypes.waveform._scaling._base import ScaleMode, _ScalarType + + +class LinearScaleMode(ScaleMode): + """A scale mode that scales data linearly.""" + + __slots__ = ["_gain", "_offset", "__weakref__"] + + _gain: float + _offset: float + + def __init__(self, gain: SupportsFloat, offset: SupportsFloat) -> None: + """Construct a scale mode object that scales data linearly. + + Args: + gain: The gain of the linear scale. + offset: The offset of the linear scale. + + Returns: + A scale mode that scales data linearly. + """ + self._gain = arg_to_float("gain", gain) + self._offset = arg_to_float("offset", offset) + + @property + def gain(self) -> float: + """The gain of the linear scale.""" + return self._gain + + @property + def offset(self) -> float: + """The offset of the linear scale.""" + return self._offset + + def _transform_data(self, data: npt.NDArray[_ScalarType]) -> npt.NDArray[_ScalarType]: + # https://github.com/numpy/numpy/issues/28805 - TYP: mypy infers that adding/multiplying a + # npt.NDArray[np.float32] with a float promotes dtype to Any or np.float64 + return data * self._gain + self._offset # type: ignore[operator,no-any-return] + + def __repr__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, + ) -> str: + return f"{self.__class__.__module__}.{self.__class__.__name__}({self.gain}, {self.offset})" diff --git a/src/nitypes/waveform/_scaling/_none.py b/src/nitypes/waveform/_scaling/_none.py new file mode 100644 index 00000000..daeaf004 --- /dev/null +++ b/src/nitypes/waveform/_scaling/_none.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import numpy.typing as npt + +from nitypes.waveform._scaling._base import ScaleMode, _ScalarType + + +class NoneScaleMode(ScaleMode): + """A scale mode that does not scale data.""" + + __slots__ = () + + def _transform_data(self, data: npt.NDArray[_ScalarType]) -> npt.NDArray[_ScalarType]: + return data + + def __repr__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, + ) -> str: + return f"{self.__class__.__module__}.{self.__class__.__name__}()" diff --git a/src/nitypes/waveform/_timing/_base.py b/src/nitypes/waveform/_timing/_base.py index 1632e779..9a12d011 100644 --- a/src/nitypes/waveform/_timing/_base.py +++ b/src/nitypes/waveform/_timing/_base.py @@ -7,7 +7,7 @@ from enum import Enum from typing import Generic, SupportsIndex, TypeVar -from nitypes.waveform._utils import add_note +from nitypes._exceptions import add_note class SampleIntervalMode(Enum): @@ -31,7 +31,7 @@ class SampleIntervalMode(Enum): def _validate_unsupported_arg(arg_description: str, value: object) -> None: if value is not None: raise ValueError( - f"The {arg_description} argument is not supported.\n\n" f"Provided value: {value}" + f"The {arg_description} argument is not supported.\n\n" f"Provided value: {value!r}" ) @@ -119,11 +119,12 @@ def _validate_init_args_none( timedelta_type = self.__class__._get_timedelta_type() if not isinstance(timestamp, (datetime_type, type(None))): raise TypeError( - "The timestamp must be a datetime or None.\n\n" f"Provided value: {timestamp}" + "The timestamp must be a datetime or None.\n\n" f"Provided value: {timestamp!r}" ) if not isinstance(time_offset, (timedelta_type, type(None))): raise TypeError( - f"The time offset must be a timedelta or None.\n\n" f"Provided value: {time_offset}" + f"The time offset must be a timedelta or None.\n\n" + f"Provided value: {time_offset!r}" ) _validate_unsupported_arg("sample interval", sample_interval) _validate_unsupported_arg("timestamps", timestamps) @@ -139,15 +140,17 @@ def _validate_init_args_regular( timedelta_type = self.__class__._get_timedelta_type() if not isinstance(timestamp, (datetime_type, type(None))): raise TypeError( - "The timestamp must be a datetime or None.\n\n" f"Provided value: {timestamp}" + "The timestamp must be a datetime or None.\n\n" f"Provided value: {timestamp!r}" ) if not isinstance(time_offset, (timedelta_type, type(None))): raise TypeError( - f"The time offset must be a timedelta or None.\n\n" f"Provided value: {time_offset}" + f"The time offset must be a timedelta or None.\n\n" + f"Provided value: {time_offset!r}" ) if not isinstance(sample_interval, timedelta_type): raise TypeError( - "The sample interval must be a timedelta.\n\n" f"Provided value: {sample_interval}" + "The sample interval must be a timedelta.\n\n" + f"Provided value: {sample_interval!r}" ) _validate_unsupported_arg("timestamps", timestamps) @@ -167,7 +170,7 @@ def _validate_init_args_irregular( ): raise TypeError( "The timestamps must be a sequence of datetime objects.\n\n" - f"Provided value: {timestamps}" + f"Provided value: {timestamps!r}" ) _VALIDATE_INIT_ARGS_FOR_MODE = { @@ -240,11 +243,15 @@ def get_timestamps( elif self._sample_interval_mode == SampleIntervalMode.IRREGULAR: assert self._timestamps is not None if count > len(self._timestamps): - raise ValueError("The count must be less or equal to the number of timestamps.") + raise ValueError( + "The count must be less than or equal to the number of timestamps." + ) return self._timestamps[start_index : start_index + count] else: raise RuntimeError( - "The waveform timing does not have valid timestamp information. To obtain timestamps, the waveform must be irregular or must be initialized with a valid time stamp and sample interval." + "The waveform timing does not have valid timestamp information. " + "To obtain timestamps, the waveform must be irregular or must be initialized " + "with a valid time stamp and sample interval." ) def _generate_regular_timestamps( diff --git a/src/nitypes/waveform/_timing/_conversion.py b/src/nitypes/waveform/_timing/_conversion.py index 22ea0c96..0eac8c44 100644 --- a/src/nitypes/waveform/_timing/_conversion.py +++ b/src/nitypes/waveform/_timing/_conversion.py @@ -34,7 +34,7 @@ def convert_timing(requested_type: type[_TTiming], value: _AnyTiming, /) -> _TTi @singledispatch def _convert_to_standard_timing(value: object, /) -> Timing: - raise TypeError("The value must be a waveform timing object.\n\n" f"Provided value: {value}") + raise TypeError("The value must be a waveform timing object.\n\n" f"Provided value: {value!r}") @_convert_to_standard_timing.register @@ -67,7 +67,7 @@ def _(value: PrecisionTiming, /) -> Timing: @singledispatch def _convert_to_precision_timing(value: object, /) -> PrecisionTiming: - raise TypeError("The value must be a waveform timing object.\n\n" f"Provided value: {value}") + raise TypeError("The value must be a waveform timing object.\n\n" f"Provided value: {value!r}") @_convert_to_precision_timing.register diff --git a/src/nitypes/waveform/_utils.py b/src/nitypes/waveform/_utils.py deleted file mode 100644 index a37a663a..00000000 --- a/src/nitypes/waveform/_utils.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import annotations - -import operator -import sys -from typing import SupportsIndex - -import numpy as np -import numpy.typing as npt - - -def add_note(exception: Exception, note: str) -> None: - """Add a note to an exception. - - >>> try: - ... raise ValueError("Oh no") - ... except Exception as e: - ... add_note(e, "p.s. This is bad") - ... raise - Traceback (most recent call last): - ... - ValueError: Oh no - p.s. This is bad - """ - if sys.version_info >= (3, 11): - exception.add_note(note) - else: - message = exception.args[0] + "\n" + note - exception.args = (message,) + exception.args[1:] - - -def arg_to_int(arg_description: str, value: SupportsIndex | None, default_value: int = 0) -> int: - """Convert an argument to a signed integer.""" - if value is None: - return default_value - return operator.index(value) - - -def arg_to_uint(arg_description: str, value: SupportsIndex | None, default_value: int = 0) -> int: - """Convert an argument to an unsigned integer.""" - value = arg_to_int(arg_description, value, default_value) - if value < 0: - raise ValueError( - f"The {arg_description} must be a non-negative integer.\n\n" f"Provided value: {value}" - ) - return value - - -def validate_dtype(dtype: npt.DTypeLike, supported_dtypes: tuple[npt.DTypeLike, ...]) -> None: - """Validate a dtype-like object against a tuple of supported dtype-like objects. - - >>> validate_dtype(np.float64, (np.float64, np.intc, np.long,)) - >>> validate_dtype("float64", (np.float64, np.intc, np.long,)) - >>> validate_dtype(np.float64, (np.byte, np.short, np.intc, np.int_, np.long, np.longlong)) - Traceback (most recent call last): - ... - TypeError: The requested data type is not supported. - - Data type: float64 - Supported data types: int8, int16, int32, int64 - """ - if not isinstance(dtype, (type, np.dtype)): - dtype = np.dtype(dtype) - if not np.isdtype(dtype, supported_dtypes): - # Remove duplicate names because distinct types (e.g. int vs. long) may have the same name - # ("int32"). - supported_dtype_names = {np.dtype(d).name: None for d in supported_dtypes}.keys() - raise TypeError( - "The requested data type is not supported.\n\n" - f"Data type: {np.dtype(dtype)}\n" - f"Supported data types: {', '.join(supported_dtype_names)}" - ) diff --git a/tests/unit/waveform/_scaling/__init__.py b/tests/unit/waveform/_scaling/__init__.py new file mode 100644 index 00000000..9d0d66b6 --- /dev/null +++ b/tests/unit/waveform/_scaling/__init__.py @@ -0,0 +1 @@ +"""Unit tests for the nitypes.waveform._scaling package.""" diff --git a/tests/unit/waveform/_scaling/test_linear.py b/tests/unit/waveform/_scaling/test_linear.py new file mode 100644 index 00000000..7de6f0ef --- /dev/null +++ b/tests/unit/waveform/_scaling/test_linear.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import sys +from typing import SupportsFloat + +import numpy as np +import numpy.typing as npt +import pytest + +from nitypes.waveform import LinearScaleMode + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +@pytest.mark.parametrize( + "gain, offset", + [ + (1.0, 0.0), + (1.2345, 0.006789), + (3, 4), + (np.float64(1.2345), np.float64(0.006789)), + # np.float32 is not a subclass of float, but it supports __float__. + (np.float32(1.2345), np.float32(0.006789)), + ], +) +def test___gain_and_offset___construct___constructs_with_gain_and_offset( + gain: SupportsFloat, offset: SupportsFloat +) -> None: + scale_mode = LinearScaleMode(gain, offset) + + assert scale_mode.gain == gain + assert scale_mode.offset == offset + + +@pytest.mark.parametrize( + "gain, offset, expected_message", + [ + (None, 0.0, "The gain must be a floating point number."), + ("1.0", 0.0, "The gain must be a floating point number."), + (1.0, "0.0", "The offset must be a floating point number."), + ], +) +def test__invalid_gain_or_offset___construct___raises_type_error( + gain: object, offset: object, expected_message: str +) -> None: + with pytest.raises(TypeError) as exc: + _ = LinearScaleMode(gain, offset) # type: ignore[arg-type] + + assert exc.value.args[0].startswith(expected_message) + + +def test___empty_ndarray___transform_data___returns_empty_scaled_data() -> None: + waveform = np.zeros(0, np.float64) + scale_mode = LinearScaleMode(3.0, 4.0) + + scaled_data = scale_mode._transform_data(waveform) + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [] + + +def test___float32_ndarray___transform_data___returns_float32_scaled_data() -> None: + raw_data = np.array([0, 1, 2, 3], np.float32) + scale_mode = LinearScaleMode(3.0, 4.0) + + scaled_data = scale_mode._transform_data(raw_data) + + assert_type(scaled_data, npt.NDArray[np.float32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float32 + assert list(scaled_data) == [4.0, 7.0, 10.0, 13.0] + + +def test___float64_ndarray___transform_data___returns_float64_scaled_data() -> None: + raw_data = np.array([0, 1, 2, 3], np.float64) + scale_mode = LinearScaleMode(3.0, 4.0) + + scaled_data = scale_mode._transform_data(raw_data) + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [4.0, 7.0, 10.0, 13.0] + + +def test___scale_mode___repr___looks_ok() -> None: + scale_mode = LinearScaleMode(1.2345, 0.006789) + + assert repr(scale_mode) == "nitypes.waveform.LinearScaleMode(1.2345, 0.006789)" diff --git a/tests/unit/waveform/_scaling/test_none.py b/tests/unit/waveform/_scaling/test_none.py new file mode 100644 index 00000000..1c11e3d5 --- /dev/null +++ b/tests/unit/waveform/_scaling/test_none.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import sys + +import numpy as np +import numpy.typing as npt + +from nitypes.waveform import NO_SCALING, NoneScaleMode + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +def test___no_scaling___type_is_none_scale_mode() -> None: + assert_type(NO_SCALING, NoneScaleMode) + assert isinstance(NO_SCALING, NoneScaleMode) + + +def test___empty_ndarray___transform_data___returns_empty_scaled_data() -> None: + waveform = np.zeros(0, np.float64) + + scaled_data = NO_SCALING._transform_data(waveform) + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [] + + +def test___float32_ndarray___transform_data___returns_float32_scaled_data() -> None: + raw_data = np.array([0, 1, 2, 3], np.float32) + + scaled_data = NO_SCALING._transform_data(raw_data) + + assert_type(scaled_data, npt.NDArray[np.float32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float32 + assert list(scaled_data) == [0.0, 1.0, 2.0, 3.0] + + +def test___float64_ndarray___transform_data___returns_float64_scaled_data() -> None: + raw_data = np.array([0, 1, 2, 3], np.float64) + + scaled_data = NO_SCALING._transform_data(raw_data) + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [0.0, 1.0, 2.0, 3.0] + + +def test___scale_mode___repr___looks_ok() -> None: + assert repr(NO_SCALING) == "nitypes.waveform.NoneScaleMode()" diff --git a/tests/unit/waveform/test_analog_waveform.py b/tests/unit/waveform/test_analog_waveform.py index eca00e75..1b5e1952 100644 --- a/tests/unit/waveform/test_analog_waveform.py +++ b/tests/unit/waveform/test_analog_waveform.py @@ -9,9 +9,18 @@ import hightime as ht import numpy as np +import numpy.typing as npt import pytest -from nitypes.waveform import AnalogWaveform, PrecisionTiming, Timing +from nitypes.waveform import ( + NO_SCALING, + AnalogWaveform, + LinearScaleMode, + NoneScaleMode, + PrecisionTiming, + ScaleMode, + Timing, +) if sys.version_info >= (3, 11): from typing import assert_type @@ -573,6 +582,224 @@ def test___invalid_array_subset___from_array_2d___raises_value_error( assert exc.value.args[0].startswith(expected_message) +############################################################################### +# raw_data +############################################################################### +def test___int32_waveform___raw_data___returns_int32_data() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + + raw_data = waveform.raw_data + + assert_type(raw_data, npt.NDArray[np.int32]) + assert isinstance(raw_data, np.ndarray) and raw_data.dtype == np.int32 + assert list(raw_data) == [0, 1, 2, 3] + + +def test___int32_waveform_with_linear_scale___raw_data___returns_int32_data() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + raw_data = waveform.raw_data + + assert_type(raw_data, npt.NDArray[np.int32]) + assert isinstance(raw_data, np.ndarray) and raw_data.dtype == np.int32 + assert list(raw_data) == [0, 1, 2, 3] + + +############################################################################### +# get_raw_data +############################################################################### +def test___int32_waveform___get_raw_data___returns_raw_data() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + + scaled_data = waveform.get_raw_data() + + assert_type(scaled_data, npt.NDArray[np.int32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.int32 + assert list(scaled_data) == [0, 1, 2, 3] + + +def test___int32_waveform_with_linear_scale___get_raw_data___returns_raw_data() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + scaled_data = waveform.get_raw_data() + + assert_type(scaled_data, npt.NDArray[np.int32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.int32 + assert list(scaled_data) == [0, 1, 2, 3] + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_raw_data", + [ + (None, None, [0, 1, 2, 3]), + (0, None, [0, 1, 2, 3]), + (1, None, [1, 2, 3]), + (3, None, [3]), + (4, None, []), + (None, None, [0, 1, 2, 3]), + (None, 1, [0]), + (None, 3, [0, 1, 2]), + (None, 4, [0, 1, 2, 3]), + (1, 2, [1, 2]), + (4, 0, []), + ], +) +def test___array_subset___get_raw_data___returns_array_subset( + start_index: int, sample_count: int, expected_raw_data: list[int] +) -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + scaled_data = waveform.get_raw_data(start_index=start_index, sample_count=sample_count) + + assert_type(scaled_data, npt.NDArray[np.int32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.int32 + assert list(scaled_data) == expected_raw_data + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_message", + [ + ( + 5, + None, + "The start index must be less than or equal to the number of samples in the waveform.", + ), + ( + 0, + 5, + "The sum of the start index and sample count must be less than or equal to the number of samples in the waveform.", + ), + ( + 4, + 1, + "The sum of the start index and sample count must be less than or equal to the number of samples in the waveform.", + ), + ], +) +def test___invalid_array_subset___get_raw_data___returns_array_subset( + start_index: int, sample_count: int, expected_message: str +) -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + with pytest.raises((TypeError, ValueError)) as exc: + _ = waveform.get_raw_data(start_index=start_index, sample_count=sample_count) + + assert exc.value.args[0].startswith(expected_message) + + +############################################################################### +# scaled_data +############################################################################### +def test___int32_waveform___scaled_data___converts_to_float64() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + + scaled_data = waveform.scaled_data + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [0, 1, 2, 3] + + +def test___int32_waveform_with_linear_scale___scaled_data___applies_linear_scale() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + scaled_data = waveform.scaled_data + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [0.5, 2.5, 4.5, 6.5] + + +############################################################################### +# get_scaled_data +############################################################################### +def test___int32_waveform___get_scaled_data___converts_to_float64() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + + scaled_data = waveform.get_scaled_data() + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [0, 1, 2, 3] + + +def test___int32_waveform_with_linear_scale___get_scaled_data___applies_linear_scale() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + scaled_data = waveform.get_scaled_data() + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [0.5, 2.5, 4.5, 6.5] + + +def test___float32_dtype___get_scaled_data___converts_to_float32() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + scaled_data = waveform.get_scaled_data(np.float32) + + assert_type(scaled_data, npt.NDArray[np.float32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float32 + assert list(scaled_data) == [0.5, 2.5, 4.5, 6.5] + + +@pytest.mark.parametrize( + "waveform_dtype", + [ + np.float32, + np.float64, + np.int8, + np.int16, + np.int32, + np.int64, + np.uint8, + np.uint16, + np.uint32, + np.uint64, + ], +) +@pytest.mark.parametrize("scaled_dtype", [np.float32, np.float64]) +def test___varying_dtype___get_scaled_data___converts_to_requested_dtype( + waveform_dtype: npt.DTypeLike, scaled_dtype: npt.DTypeLike +) -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], waveform_dtype) + waveform.scale_mode = LinearScaleMode(3.0, 4.0) + + scaled_data = waveform.get_scaled_data(scaled_dtype) + + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == scaled_dtype + assert list(scaled_data) == [4.0, 7.0, 10.0, 13.0] + + +def test___unsupported_dtype___get_scaled_data___raises_type_error() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(3.0, 4.0) + + with pytest.raises(TypeError) as exc: + _ = waveform.get_scaled_data(np.int32) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + assert "Supported data types: float32, float64" in exc.value.args[0] + + +def test___array_subset___get_scaled_data___returns_array_subset() -> None: + waveform = AnalogWaveform.from_array_1d([0, 1, 2, 3], np.int32) + waveform.scale_mode = LinearScaleMode(2.0, 0.5) + + scaled_data = waveform.get_scaled_data(start_index=1, sample_count=2) + + assert_type(scaled_data, npt.NDArray[np.float64]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.float64 + assert list(scaled_data) == [2.5, 4.5] + + ############################################################################### # capacity ############################################################################### @@ -731,3 +958,14 @@ def test___waveform_with_precision_timing___get_timing___converts_timing() -> No assert timing == Timing.create_with_regular_interval( dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) ) + + +############################################################################### +# scale_mode +############################################################################### +def test___waveform___scale_mode___defaults_to_no_scaling() -> None: + waveform = AnalogWaveform() + + assert_type(waveform.scale_mode, ScaleMode) + assert isinstance(waveform.scale_mode, NoneScaleMode) + assert waveform.scale_mode is NO_SCALING