Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/nitypes/time/_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def convert_datetime(requested_type: type[_TDateTime], value: _AnyDateTime, /) -

@singledispatch
def _convert_to_dt_datetime(value: object, /) -> dt.datetime:
raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value}")
raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value!r}")


@_convert_to_dt_datetime.register
Expand All @@ -57,7 +57,7 @@ def _(value: ht.datetime, /) -> dt.datetime:

@singledispatch
def _convert_to_ht_datetime(value: object, /) -> ht.datetime:
raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value}")
raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value!r}")


@_convert_to_ht_datetime.register
Expand Down Expand Up @@ -98,7 +98,7 @@ def convert_timedelta(requested_type: type[_TTimeDelta], value: _AnyTimeDelta, /

@singledispatch
def _convert_to_dt_timedelta(value: object, /) -> dt.timedelta:
raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value}")
raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value!r}")


@_convert_to_dt_timedelta.register
Expand All @@ -113,7 +113,7 @@ def _(value: ht.timedelta, /) -> dt.timedelta:

@singledispatch
def _convert_to_ht_timedelta(value: object, /) -> ht.timedelta:
raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value}")
raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value!r}")


@_convert_to_ht_timedelta.register
Expand Down
9 changes: 9 additions & 0 deletions src/nitypes/waveform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
ExtendedPropertyDictionary,
ExtendedPropertyValue,
)
from nitypes.waveform._scaling._base import ScaleMode
from nitypes.waveform._scaling._linear import LinearScaleMode
from nitypes.waveform._scaling._none import NoneScaleMode
from nitypes.waveform._timing._base import BaseTiming, SampleIntervalMode
from nitypes.waveform._timing._precision import PrecisionTiming
from nitypes.waveform._timing._standard import Timing
Expand All @@ -14,8 +17,11 @@
"BaseTiming",
"ExtendedPropertyDictionary",
"ExtendedPropertyValue",
"LinearScaleMode",
"NoneScaleMode",
"PrecisionTiming",
"SampleIntervalMode",
"ScaleMode",
"Timing",
]

Expand All @@ -24,6 +30,9 @@
BaseTiming.__module__ = __name__
ExtendedPropertyDictionary.__module__ = __name__
# ExtendedPropertyValue is a TypeAlias
LinearScaleMode.__module__ = __name__
NoneScaleMode.__module__ = __name__
PrecisionTiming.__module__ = __name__
SampleIntervalMode.__module__ = __name__
ScaleMode.__module__ = __name__
Timing.__module__ = __name__
112 changes: 103 additions & 9 deletions src/nitypes/waveform/_analog_waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
UNIT_DESCRIPTION,
ExtendedPropertyDictionary,
)
from nitypes.waveform._scaling._base import ScaleMode
from nitypes.waveform._timing._conversion import convert_timing
from nitypes.waveform._timing._precision import PrecisionTiming
from nitypes.waveform._timing._standard import Timing
Expand Down Expand Up @@ -221,6 +222,7 @@ def from_array_2d(
"_extended_properties",
"_timing",
"_precision_timing",
"_scale_mode",
"__weakref__",
]

Expand All @@ -230,6 +232,7 @@ def from_array_2d(
_extended_properties: ExtendedPropertyDictionary
_timing: Timing | None
_precision_timing: PrecisionTiming | None
_scale_mode: ScaleMode

# If neither dtype nor _data is specified, the type parameter defaults to np.float64.
@overload
Expand Down Expand Up @@ -319,8 +322,8 @@ def _init_with_new_array(
start_index: SupportsIndex | None = None,
capacity: SupportsIndex | None = None,
) -> None:
start_index = arg_to_uint("start index", start_index)
sample_count = arg_to_uint("sample count", sample_count)
start_index = arg_to_uint("start index", start_index, 0)
sample_count = arg_to_uint("sample count", sample_count, 0)
capacity = arg_to_uint("capacity", capacity, sample_count)

if dtype is None:
Expand All @@ -347,6 +350,7 @@ def _init_with_new_array(
self._extended_properties = ExtendedPropertyDictionary()
self._timing = Timing.empty
self._precision_timing = None
self._scale_mode = ScaleMode.none

def _init_with_provided_array(
self,
Expand Down Expand Up @@ -378,7 +382,7 @@ def _init_with_provided_array(
f"Array length: {len(data)}"
)

start_index = arg_to_uint("start index", start_index)
start_index = arg_to_uint("start index", start_index, 0)
if start_index > capacity:
raise ValueError(
"The start index must be less than or equal to the input array length.\n\n"
Expand All @@ -401,17 +405,96 @@ def _init_with_provided_array(
self._extended_properties = ExtendedPropertyDictionary()
self._timing = Timing.empty
self._precision_timing = None
self._scale_mode = ScaleMode.none

@property
def raw_data(self) -> npt.NDArray[_ScalarType_co]:
"""The raw analog waveform data."""
return self._data[self._start_index : self._start_index + self._sample_count]

def get_raw_data(
self, start_index: SupportsIndex | None = 0, sample_count: SupportsIndex | None = None
) -> npt.NDArray[_ScalarType_co]:
"""Get a subset of the raw analog waveform data.

Args:
start_index: The sample index at which the data begins.
sample_count: The number of samples to return.

Returns:
A subset of the raw analog waveform data.
"""
start_index = arg_to_uint("sample index", start_index, 0)
if start_index > self.sample_count:
raise ValueError(
"The start index must be less than or equal to the number of samples in the waveform.\n\n"
f"Start index: {start_index}\n"
f"Number of samples: {self.sample_count}"
)

sample_count = arg_to_uint("sample count", sample_count, self.sample_count - start_index)
if start_index + sample_count > self.sample_count:
raise ValueError(
"The sum of the start index and sample count must be less than or equal to the number of samples in the waveform.\n\n"
f"Start index: {start_index}\n"
f"Sample count: {sample_count}\n"
f"Number of samples: {self.sample_count}"
)

return self.raw_data[start_index : start_index + sample_count]

@property
def scaled_data(self) -> npt.NDArray[np.float64]:
"""The scaled analog waveform data."""
# TODO: implement scaling
return self.raw_data.astype(np.float64)
return self._scale_mode.get_scaled_data(self)

# If dtype is not specified, _ScaledDataType defaults to np.float64.
@overload
def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa)
self,
dtype: None = ...,
*,
start_index: SupportsIndex | None = ...,
sample_count: SupportsIndex | None = ...,
) -> npt.NDArray[np.float64]: ...

@overload
def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa)
self,
dtype: type[_ScalarType] | np.dtype[_ScalarType] = ...,
*,
start_index: SupportsIndex | None = ...,
sample_count: SupportsIndex | None = ...,
) -> npt.NDArray[_ScalarType]: ...

@overload
def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa)
self,
dtype: npt.DTypeLike = ...,
*,
start_index: SupportsIndex | None = ...,
sample_count: SupportsIndex | None = ...,
) -> npt.NDArray[Any]: ...

def get_scaled_data(
self,
dtype: npt.DTypeLike = None,
*,
start_index: SupportsIndex | None = 0,
sample_count: SupportsIndex | None = None,
) -> npt.NDArray[Any]:
"""Get a subset of the scaled analog waveform data.

Args:
start_index: The sample index at which the data begins.
sample_count: The number of samples to return.

Returns:
A subset of the scaled analog waveform data.
"""
return self._scale_mode.get_scaled_data(
self, dtype, start_index=start_index, sample_count=sample_count
)

@property
def sample_count(self) -> int:
Expand Down Expand Up @@ -464,7 +547,7 @@ def channel_name(self) -> str:
@channel_name.setter
def channel_name(self, value: str) -> None:
if not isinstance(value, str):
raise TypeError("The channel name must be a str.\n\n" f"Channel name: {value!r}")
raise TypeError("The channel name must be a str.\n\n" f"Provided value: {value!r}")
self._extended_properties[CHANNEL_NAME] = value

@property
Expand All @@ -477,9 +560,7 @@ def unit_description(self) -> str:
@unit_description.setter
def unit_description(self, value: str) -> None:
if not isinstance(value, str):
raise TypeError(
"The unit description must be a str.\n\n" f"Unit description: {value!r}"
)
raise TypeError("The unit description must be a str.\n\n" f"Provided value: {value!r}")
self._extended_properties[UNIT_DESCRIPTION] = value

@property
Expand Down Expand Up @@ -540,3 +621,16 @@ def precision_timing(self, value: PrecisionTiming) -> None:
raise TypeError("The precision timing information must be a PrecisionTiming object.")
self._precision_timing = value
self._timing = None

@property
def scale_mode(self) -> ScaleMode:
"""The scale mode of the analog waveform."""
return self._scale_mode

@scale_mode.setter
def scale_mode(self, value: ScaleMode) -> None:
if not isinstance(value, ScaleMode):
raise TypeError(
"The scale mode must be a ScaleMode object.\n\n" f"Provided value: {value!r}"
)
self._scale_mode = value
8 changes: 8 additions & 0 deletions src/nitypes/waveform/_scaling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Waveform scaling data types for NI Python APIs."""

from nitypes.waveform._scaling._base import ScaleMode
from nitypes.waveform._scaling._none import NoneScaleMode

# Defined here to avoid a circular dependency
ScaleMode.none = NoneScaleMode()
"""A scale mode that does not scale data."""
105 changes: 105 additions & 0 deletions src/nitypes/waveform/_scaling/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, ClassVar, SupportsIndex, TypeVar, overload

import numpy as np
import numpy.typing as npt

from nitypes.waveform._utils import validate_dtype

if TYPE_CHECKING:
# circular imports
from nitypes.waveform._analog_waveform import AnalogWaveform
from nitypes.waveform._scaling._none import NoneScaleMode

_TRaw = TypeVar("_TRaw", bound=np.generic)

# It might make sense to constrain this to np.float32 or np.float64 once
# https://github.com/numpy/numpy/issues/28805 is fixed, but bound=np.generic currently has more
# predictable results.
_TScaled = TypeVar("_TScaled", bound=np.generic)


_SCALED_DTYPES = (
# Floating point
np.single,
np.double,
)


class ScaleMode(ABC):
"""An object that specifies how the waveform is scaled."""

__slots__ = ()

none: ClassVar[NoneScaleMode]

# If dtype is not specified, _ScaledDataType defaults to np.float64.
@overload
def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa)
self,
waveform: AnalogWaveform[_TRaw],
dtype: None = ...,
*,
start_index: SupportsIndex | None = ...,
sample_count: SupportsIndex | None = ...,
) -> npt.NDArray[np.float64]: ...

@overload
def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa)
self,
waveform: AnalogWaveform[_TRaw],
dtype: type[_TScaled] | np.dtype[_TScaled] = ...,
*,
start_index: SupportsIndex | None = ...,
sample_count: SupportsIndex | None = ...,
) -> npt.NDArray[_TScaled]: ...

@overload
def get_scaled_data( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa)
self,
waveform: AnalogWaveform[Any],
dtype: npt.DTypeLike = ...,
*,
start_index: SupportsIndex | None = ...,
sample_count: SupportsIndex | None = ...,
) -> npt.NDArray[Any]: ...

def get_scaled_data(
self,
waveform: AnalogWaveform[Any],
dtype: npt.DTypeLike = None,
*,
start_index: SupportsIndex | None = 0,
sample_count: SupportsIndex | None = None,
) -> npt.NDArray[_TScaled]:
"""Get scaled analog waveform data using the specified sample index and count.
Args:
waveform: The waveform to scale.
dtype: The NumPy data type for the analog waveform data. If not specified, the data
type defaults to np.float64.
start_index: The start index.
sample_count: The number of samples to scale.
Returns:
The scaled analog waveform data.
"""
from nitypes.waveform._analog_waveform import AnalogWaveform

if not isinstance(waveform, AnalogWaveform):
raise TypeError(
"The waveform must be an AnalogWaveform object.\n\n" f"Type: {type(waveform)}"
)

if dtype is None:
dtype = np.float64
validate_dtype(dtype, _SCALED_DTYPES)

raw_data = waveform.get_raw_data(start_index, sample_count)
return self._transform_data(raw_data.astype(dtype))

@abstractmethod
def _transform_data(self, data: npt.NDArray[_TScaled]) -> npt.NDArray[_TScaled]:
raise NotImplementedError
Loading