Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions generated/nidaqmx/_base_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import abc
import numpy
from nitypes.waveform import AnalogWaveform
from typing import Sequence


class BaseEventHandler(abc.ABC):
Expand Down Expand Up @@ -1855,5 +1856,15 @@ def read_analog_waveform(
number_of_samples_per_channel: int,
timeout: float,
waveform: AnalogWaveform[numpy.float64]
) -> None:
raise NotImplementedError

@abc.abstractmethod
def read_analog_waveforms(
self,
task_handle: object,
number_of_samples_per_channel: int,
timeout: float,
waveforms: Sequence[AnalogWaveform[numpy.float64]]
) -> None:
raise NotImplementedError
11 changes: 10 additions & 1 deletion generated/nidaqmx/_grpc_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import typing
import warnings
from nitypes.waveform import AnalogWaveform
from typing import Callable, Generic, TypeVar
from typing import Callable, Generic, Sequence, TypeVar

import google.protobuf.message
from google.protobuf.timestamp_pb2 import Timestamp as GrpcTimestamp
Expand Down Expand Up @@ -3610,6 +3610,15 @@ def read_analog_waveform(
) -> None:
raise NotImplementedError

def read_analog_waveforms(
self,
task_handle: object,
number_of_samples_per_channel: int,
timeout: float,
waveforms: Sequence[AnalogWaveform[numpy.float64]]
) -> None:
raise NotImplementedError

def _assign_numpy_array(numpy_array, grpc_array):
"""
Assigns grpc array to numpy array maintaining the original shape.
Expand Down
107 changes: 107 additions & 0 deletions generated/nidaqmx/_library_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6398,6 +6398,32 @@ def read_analog_waveform(
# TODO: AB#3228924 - if the read was short, set waveform.sample_count before throwing the exception
self.check_for_error(error_code, samps_per_chan_read=samples_read)

def read_analog_waveforms(
self,
task_handle: object,
number_of_samples_per_channel: int,
timeout: float,
waveforms: Sequence[AnalogWaveform[numpy.float64]]
) -> None:
"""Read a set of analog waveforms with timing and attributes. All of the waveforms must be the same size."""
error_code, samples_read, timestamps, sample_intervals = self.internal_read_analog_waveform_per_chan(
task_handle,
number_of_samples_per_channel,
timeout,
[waveform.raw_data for waveform in waveforms],
[waveform.extended_properties for waveform in waveforms]
)

for i, waveform in enumerate(waveforms):
waveform.timing = Timing(
sample_interval_mode=SampleIntervalMode.REGULAR,
timestamp=timestamps[i],
sample_interval=sample_intervals[i],
)

# TODO: AB#3228924 - if the read was short, set waveform.sample_count before throwing the exception
self.check_for_error(error_code, samps_per_chan_read=samples_read)

def _internal_read_analog_waveform_ex(
self,
task_handle: object,
Expand Down Expand Up @@ -6470,6 +6496,87 @@ def set_wfm_attr_callback(

return error_code, samps_per_chan_read.value, timestamps, sample_intervals

def internal_read_analog_waveform_per_chan(
self,
task_handle: object,
num_samps_per_chan: int,
timeout: float,
read_arrays: Sequence[numpy.typing.NDArray[numpy.float64]],
properties: Sequence[ExtendedPropertyDictionary]
) -> Tuple[
int, # error code
int, # The number of samples per channel that were read
Sequence[ht_datetime], # The timestamps for each sample, indexed by channel
Sequence[ht_timedelta], # The sample intervals, indexed by channel
]:
assert isinstance(task_handle, TaskHandle)
samps_per_chan_read = ctypes.c_int()

channel_count = len(read_arrays)
assert channel_count > 0
array_size = read_arrays[0].size
assert all(read_array.size == array_size for read_array in read_arrays)

cfunc = lib_importer.windll.DAQmxInternalReadAnalogWaveformPerChan
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
TaskHandle,
ctypes.c_int,
ctypes.c_double,
wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
ctypes.c_uint,
CSetWfmAttrCallbackPtr,
ctypes.c_void_p,
ctypes.POINTER(ctypes.POINTER(ctypes.c_double)),
ctypes.c_uint,
ctypes.c_uint,
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32),
]

t0_array = numpy.zeros(channel_count, dtype=numpy.int64)
dt_array = numpy.zeros(channel_count, dtype=numpy.int64)

read_array_ptrs = (ctypes.POINTER(ctypes.c_double) * channel_count)()
for i, read_array in enumerate(read_arrays):
read_array_ptrs[i] = read_array.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

def set_wfm_attr_callback(
channel_index: int,
attribute_name: str,
attribute_type: WfmAttrType,
value: ExtendedPropertyValue,
callback_data: object,
) -> int:
properties[channel_index][attribute_name] = value
return 0

error_code = cfunc(
task_handle,
num_samps_per_chan,
timeout,
t0_array,
dt_array,
0 if t0_array is None else t0_array.size,
self._get_wfm_attr_callback_ptr(set_wfm_attr_callback),
None,
read_array_ptrs,
channel_count,
array_size,
ctypes.byref(samps_per_chan_read),
None,
)
self.check_for_error(error_code, samps_per_chan_read=samps_per_chan_read.value)

timestamps = [_T0_EPOCH + ht_timedelta(seconds=t0 * _INT64_WFM_SEC_PER_TICK) for t0 in t0_array]
sample_intervals = [ht_timedelta(seconds=dt * _INT64_WFM_SEC_PER_TICK) for dt in dt_array]

return error_code, samps_per_chan_read.value, timestamps, sample_intervals


def _get_wfm_attr_value(
self, attribute_type: int, value: ctypes.c_void_p, value_size_in_bytes: int
) -> ExtendedPropertyValue:
Expand Down
98 changes: 98 additions & 0 deletions generated/nidaqmx/stream_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,104 @@ def read_one_sample(self, data, timeout=10):

self._interpreter.read_analog_f64(self._handle, 1, timeout, FillMode.GROUP_BY_CHANNEL.value, data)

@requires_feature(WAVEFORM_SUPPORT)
def read_waveforms(
self,
number_of_samples_per_channel: int = READ_ALL_AVAILABLE,
timeout: int = 10,
waveforms: list[AnalogWaveform[numpy.float64]] | None = None
) -> list[AnalogWaveform[numpy.float64]]:
"""
Reads one or more floating-point samples from one or more analog
input channels into a list of waveforms.

This read method optionally accepts a preallocated list of waveforms to hold
the samples requested, which can be advantageous for performance and
interoperability with NumPy and SciPy.

Passing in a preallocated list of waveforms is valuable in continuous
acquisition scenarios, where the same waveforms can be used
repeatedly in each call to the method.

Args:
number_of_samples_per_channel (Optional[int]): Specifies the
number of samples to read.

If you set this input to nidaqmx.constants.
READ_ALL_AVAILABLE, NI-DAQmx determines how many samples
to read based on if the task acquires samples
continuously or acquires a finite number of samples.

If the task acquires samples continuously and you set
this input to nidaqmx.constants.READ_ALL_AVAILABLE, this
method reads all the samples currently available in the
buffer.

If the task acquires a finite number of samples and you
set this input to nidaqmx.constants.READ_ALL_AVAILABLE,
the method waits for the task to acquire all requested
samples, then reads those samples. If you set the
"read_all_avail_samp" property to True, the method reads
the samples currently available in the buffer and does
not wait for the task to acquire all requested samples.
timeout (Optional[float]): Specifies the amount of time in
seconds to wait for samples to become available. If the
time elapses, the method returns an error and any
samples read before the timeout elapsed. The default
timeout is 10 seconds. If you set timeout to
nidaqmx.constants.WAIT_INFINITELY, the method waits
indefinitely. If you set timeout to 0, the method tries
once to read the requested samples and returns an error
if it is unable to.
waveforms (Optional[list[AnalogWaveform]]): Specifies an existing
list of AnalogWaveform objects to use for reading samples into.
If provided, the raw_data array of each waveform will be
used to store the samples. The list must contain one waveform
for each channel in the task. If not provided, new
AnalogWaveform objects will be created.
Returns:
list[AnalogWaveform]:

A list of waveform objects containing the acquired samples,
one for each channel in the task.
"""
number_of_channels = self._in_stream.num_chans
number_of_samples_per_channel = (
self._task._calculate_num_samps_per_chan(
number_of_samples_per_channel))


if waveforms is None:
waveforms = [
AnalogWaveform(raw_data=numpy.zeros(number_of_samples_per_channel, dtype=numpy.float64))
for _ in range(number_of_channels)
]
else:
if len(waveforms) != number_of_channels:
raise DaqError(
f'The number of waveforms provided ({len(waveforms)}) does not match '
f'the number of channels in the task ({number_of_channels}). Please provide '
'one waveform for each channel.',
DAQmxErrors.MISMATCHED_INPUT_ARRAY_SIZES, task_name=self._task.name)

for i, waveform in enumerate(waveforms):
if number_of_samples_per_channel > waveform.sample_count:
# TODO: AB#3228924 - if allowed by the caller, increase the sample count of the waveform
raise DaqError(
f'The waveform at index {i} does not have enough space ({waveform.sample_count}) to hold '
f'the requested number of samples ({number_of_samples_per_channel}). Please provide larger '
'waveforms or adjust the number of samples requested.',
DAQmxErrors.READ_BUFFER_TOO_SMALL, task_name=self._task.name)

self._interpreter.read_analog_waveforms(
self._handle,
number_of_samples_per_channel,
timeout,
waveforms,
)

return waveforms


class AnalogUnscaledReader(ChannelReaderBase):
"""
Expand Down
11 changes: 11 additions & 0 deletions src/codegen/templates/_base_interpreter.py.mako
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import abc
import numpy
from nitypes.waveform import AnalogWaveform
from typing import Sequence


class BaseEventHandler(abc.ABC):
Expand Down Expand Up @@ -64,5 +65,15 @@ class BaseInterpreter(abc.ABC):
number_of_samples_per_channel: int,
timeout: float,
waveform: AnalogWaveform[numpy.float64]
) -> None:
raise NotImplementedError

@abc.abstractmethod
def read_analog_waveforms(
self,
task_handle: object,
number_of_samples_per_channel: int,
timeout: float,
waveforms: Sequence[AnalogWaveform[numpy.float64]]
) -> None:
raise NotImplementedError
11 changes: 10 additions & 1 deletion src/codegen/templates/_grpc_interpreter.py.mako
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import threading
import typing
import warnings
from nitypes.waveform import AnalogWaveform
from typing import Callable, Generic, TypeVar
from typing import Callable, Generic, Sequence, TypeVar

import google.protobuf.message
from google.protobuf.timestamp_pb2 import Timestamp as GrpcTimestamp
Expand Down Expand Up @@ -255,6 +255,15 @@ class GrpcStubInterpreter(BaseInterpreter):
) -> None:
raise NotImplementedError

def read_analog_waveforms(
self,
task_handle: object,
number_of_samples_per_channel: int,
timeout: float,
waveforms: Sequence[AnalogWaveform[numpy.float64]]
) -> None:
raise NotImplementedError

def _assign_numpy_array(numpy_array, grpc_array):
"""
Assigns grpc array to numpy array maintaining the original shape.
Expand Down
Loading