Skip to content

Commit ddb23e1

Browse files
mikeprosserniMike Prosser
andauthored
Add nidaqmx.stream_readers.AnalogSingleChannelReader.read_waveform() (#798)
* first draft - based on the PoC with just enough changes to run the example without crashing * fix waveform.timing * clean up pyproject.toml * clean up formatting * more cleanup * fix extended_properties * generate stream_readers.py properly * generate _library_interpreter.py properly * generate types.py properly * generate constants.py properly * test___analog_single_channel_reader___read_waveform___returns_valid_waveform * BaseInterpreter.internal_read_analog_waveform_ex() * cleanup * cleanup * assert channel_index == 0 * updated changelog * add documentation * add documentation (generated) * move set_wfm_attr_callback to LibraryInterpreter * refactor * revert types.py * whitespace * initialize arrays to zero instead of inf or min * update pyproject.toml * Add nitypes to the intersphinx mapping * refactor to avoid unecessary copies and allow reading into an existing waveform * move all the waveform stuff into the interpreter * "for reading samples into." * _INT64_WFM_SEC_PER_TICK and _T0_EPOCH should be module-level constants * fix mypy issue on linux? * make _internal_read_analog_waveform_ex internal to LIbraryInterpreter * better handling for waveform buffer size issues * use hightime * update tests * update tests * f"{sim_6363_device.name}/ai0" --------- Co-authored-by: Mike Prosser <[email protected]>
1 parent d7ab611 commit ddb23e1

File tree

14 files changed

+871
-21
lines changed

14 files changed

+871
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ All notable changes to this project will be documented in this file.
3232
* ...
3333

3434
* ### Major Changes
35-
* ...
35+
* (IN PROGRESS) Added support for reading and writing Waveform data.
3636

3737
* ### Known Issues
3838
* ...

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
intersphinx_mapping = {
4949
"grpc": ("https://grpc.github.io/grpc/python/", None),
50+
"nitypes": ("https://nitypes.readthedocs.io/en/latest/", None),
5051
"numpy": ("https://numpy.org/doc/stable/", None),
5152
"protobuf": ("https://googleapis.dev/python/protobuf/latest/", None),
5253
"python": ("https://docs.python.org/3", None),
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""Example of analog input voltage waveform acquisition.
2+
3+
This example demonstrates how to acquire a finite amount
4+
of data using the DAQ device's internal clock.
5+
"""
6+
7+
import nidaqmx
8+
from nidaqmx.constants import AcquisitionType, READ_ALL_AVAILABLE
9+
from nidaqmx.stream_readers import AnalogSingleChannelReader
10+
11+
with nidaqmx.Task() as task:
12+
task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
13+
task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=50)
14+
15+
reader = AnalogSingleChannelReader(task.in_stream)
16+
waveform = reader.read_waveform(READ_ALL_AVAILABLE)
17+
print(f"Acquired data: {waveform.scaled_data}")
18+
print(f"Channel name: {waveform.channel_name}")
19+
print(f"Unit description: {waveform.unit_description}")
20+
print(f"t0: {waveform.timing.start_time}")
21+
print(f"dt: {waveform.timing.sample_interval}")

generated/nidaqmx/_base_interpreter.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Do not edit this file; it was automatically generated.
22
import abc
3-
from typing import Optional
3+
import numpy
4+
from nitypes.waveform import AnalogWaveform
45

56

67
class BaseEventHandler(abc.ABC):
@@ -1845,4 +1846,14 @@ def write_to_teds_from_file(
18451846

18461847
@abc.abstractmethod
18471848
def hash_task_handle(self, task_handle):
1849+
raise NotImplementedError
1850+
1851+
@abc.abstractmethod
1852+
def read_analog_waveform(
1853+
self,
1854+
task_handle: object,
1855+
number_of_samples_per_channel: int,
1856+
timeout: float,
1857+
waveform: AnalogWaveform[numpy.float64]
1858+
) -> None:
18481859
raise NotImplementedError

generated/nidaqmx/_grpc_interpreter.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import threading
66
import typing
77
import warnings
8+
from nitypes.waveform import AnalogWaveform
89
from typing import Callable, Generic, TypeVar
910

1011
import google.protobuf.message
@@ -3600,6 +3601,14 @@ def set_runtime_environment(
36003601
def internal_get_last_created_chan(self):
36013602
raise NotImplementedError
36023603

3604+
def read_analog_waveform(
3605+
self,
3606+
task_handle: object,
3607+
number_of_samples_per_channel: int,
3608+
timeout: float,
3609+
waveform: AnalogWaveform[numpy.float64]
3610+
) -> None:
3611+
raise NotImplementedError
36033612

36043613
def _assign_numpy_array(numpy_array, grpc_array):
36053614
"""

generated/nidaqmx/_library_interpreter.py

Lines changed: 184 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,57 @@
11
# Do not edit this file; it was automatically generated.
22

3+
from __future__ import annotations
34
import ctypes
45
import logging
6+
import numpy
57
import platform
68
import warnings
7-
from typing import Optional
8-
9-
import numpy
10-
from typing import List
9+
import sys
10+
from enum import Enum
11+
from datetime import timezone
12+
from hightime import datetime as ht_datetime
13+
from hightime import timedelta as ht_timedelta
14+
from typing import Callable, List, Optional, Sequence, Tuple, TYPE_CHECKING
1115

1216
from nidaqmx._base_interpreter import BaseEventHandler, BaseInterpreter
13-
from nidaqmx._lib import lib_importer, ctypes_byte_str, c_bool32, wrapped_ndpointer
17+
from nidaqmx._lib import lib_importer, ctypes_byte_str, c_bool32, wrapped_ndpointer, TaskHandle
18+
from nidaqmx.constants import FillMode
1419
from nidaqmx.error_codes import DAQmxErrors, DAQmxWarnings
1520
from nidaqmx.errors import DaqError, DaqFunctionNotSupportedError, DaqReadError, DaqWarning, DaqWriteError
1621
from nidaqmx._lib_time import AbsoluteTime
22+
from nitypes.waveform.typing import ExtendedPropertyValue
23+
from nitypes.waveform import AnalogWaveform, SampleIntervalMode, Timing, ExtendedPropertyDictionary
1724

25+
if TYPE_CHECKING:
26+
if sys.version_info >= (3, 10):
27+
from typing import TypeAlias
28+
else:
29+
from typing_extensions import TypeAlias
1830

1931
_logger = logging.getLogger(__name__)
2032
_was_runtime_environment_set = None
2133

34+
_INT64_WFM_SEC_PER_TICK = 100e-9
35+
_T0_EPOCH = ht_datetime(1, 1, 1, tzinfo=timezone.utc)
36+
37+
# typedef int32 (CVICALLBACK *DAQmxSetWfmAttrCallbackPtr)(uInt32 channelIndex, const char attributeName[], int32 attributeType, const void* value, uInt32 valueSizeInBytes, void *callbackData); # noqa: W505 - doc line too long
38+
CSetWfmAttrCallbackPtr = ctypes.CFUNCTYPE(
39+
ctypes.c_int32, # return value (error code)
40+
ctypes.c_uint32, # channel_index
41+
ctypes.c_char_p, # attribute_name
42+
ctypes.c_int32, # attribute_type
43+
ctypes.c_void_p, # value
44+
ctypes.c_uint32, # value_size_in_bytes
45+
ctypes.c_void_p, # callback_data
46+
)
47+
48+
class WfmAttrType(Enum):
49+
BOOL32 = 1
50+
FLOAT64 = 2
51+
INT32 = 3
52+
STRING = 4
53+
54+
SetWfmAttrCallback: TypeAlias = Callable[[int, str, WfmAttrType, ExtendedPropertyValue, object], int]
2255

2356
class LibraryEventHandler(BaseEventHandler):
2457
"""Manage the lifetime of a ctypes callback method pointer.
@@ -6338,6 +6371,152 @@ def get_extended_error_info(self):
63386371
return 'Failed to retrieve error description.'
63396372
return error_buffer.value.decode(lib_importer.encoding)
63406373

6374+
def read_analog_waveform(
6375+
self,
6376+
task_handle: object,
6377+
number_of_samples_per_channel: int,
6378+
timeout: float,
6379+
waveform: AnalogWaveform[numpy.float64]
6380+
) -> None:
6381+
"""Read an analog waveform with timing and attributes."""
6382+
error_code, samples_read, timestamps, sample_intervals = self._internal_read_analog_waveform_ex(
6383+
task_handle,
6384+
1, # single channel
6385+
number_of_samples_per_channel,
6386+
timeout,
6387+
FillMode.GROUP_BY_CHANNEL.value,
6388+
waveform.raw_data,
6389+
[waveform.extended_properties]
6390+
)
6391+
6392+
waveform.timing = Timing(
6393+
sample_interval_mode=SampleIntervalMode.REGULAR,
6394+
timestamp=timestamps[0],
6395+
sample_interval=sample_intervals[0],
6396+
)
6397+
6398+
# TODO: AB#3228924 - if the read was short, set waveform.sample_count before throwing the exception
6399+
self.check_for_error(error_code, samps_per_chan_read=samples_read)
6400+
6401+
def _internal_read_analog_waveform_ex(
6402+
self,
6403+
task_handle: object,
6404+
channel_count: int,
6405+
number_of_samples_per_channel: int,
6406+
timeout: float,
6407+
fill_mode: int,
6408+
read_array: numpy.typing.NDArray[numpy.float64],
6409+
properties: Sequence[ExtendedPropertyDictionary]
6410+
) -> Tuple[
6411+
int, # error code
6412+
int, # The number of samples per channel that were read
6413+
Sequence[ht_datetime], # The timestamps for each sample, indexed by channel
6414+
Sequence[ht_timedelta], # The sample intervals, indexed by channel
6415+
]:
6416+
assert isinstance(task_handle, TaskHandle)
6417+
samps_per_chan_read = ctypes.c_int()
6418+
6419+
cfunc = lib_importer.windll.DAQmxInternalReadAnalogWaveformEx
6420+
if cfunc.argtypes is None:
6421+
with cfunc.arglock:
6422+
if cfunc.argtypes is None:
6423+
cfunc.argtypes = [
6424+
TaskHandle,
6425+
ctypes.c_int,
6426+
ctypes.c_double,
6427+
ctypes.c_int,
6428+
wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
6429+
wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
6430+
ctypes.c_uint,
6431+
CSetWfmAttrCallbackPtr,
6432+
ctypes.c_void_p,
6433+
wrapped_ndpointer(dtype=numpy.float64, flags=("C", "W")),
6434+
ctypes.c_uint,
6435+
ctypes.POINTER(ctypes.c_int),
6436+
ctypes.POINTER(c_bool32),
6437+
]
6438+
6439+
t0_array = numpy.zeros(channel_count, dtype=numpy.int64)
6440+
dt_array = numpy.zeros(channel_count, dtype=numpy.int64)
6441+
6442+
def set_wfm_attr_callback(
6443+
channel_index: int,
6444+
attribute_name: str,
6445+
attribute_type: WfmAttrType,
6446+
value: ExtendedPropertyValue,
6447+
callback_data: object,
6448+
) -> int:
6449+
properties[channel_index][attribute_name] = value
6450+
return 0
6451+
6452+
error_code = cfunc(
6453+
task_handle,
6454+
number_of_samples_per_channel,
6455+
timeout,
6456+
fill_mode,
6457+
t0_array,
6458+
dt_array,
6459+
0 if t0_array is None else t0_array.size,
6460+
self._get_wfm_attr_callback_ptr(set_wfm_attr_callback),
6461+
None,
6462+
read_array,
6463+
read_array.size,
6464+
ctypes.byref(samps_per_chan_read),
6465+
None,
6466+
)
6467+
6468+
timestamps = [_T0_EPOCH + ht_timedelta(seconds=t0 * _INT64_WFM_SEC_PER_TICK) for t0 in t0_array]
6469+
sample_intervals = [ht_timedelta(seconds=dt * _INT64_WFM_SEC_PER_TICK) for dt in dt_array]
6470+
6471+
return error_code, samps_per_chan_read.value, timestamps, sample_intervals
6472+
6473+
def _get_wfm_attr_value(
6474+
self, attribute_type: int, value: ctypes.c_void_p, value_size_in_bytes: int
6475+
) -> ExtendedPropertyValue:
6476+
if attribute_type == WfmAttrType.BOOL32.value:
6477+
assert value_size_in_bytes == 4
6478+
return ctypes.cast(value, ctypes.POINTER(ctypes.c_int32))[0] != 0
6479+
elif attribute_type == WfmAttrType.FLOAT64.value:
6480+
assert value_size_in_bytes == 8
6481+
return float(ctypes.cast(value, ctypes.POINTER(ctypes.c_double))[0])
6482+
elif attribute_type == WfmAttrType.INT32.value:
6483+
assert value_size_in_bytes == 4
6484+
return int(ctypes.cast(value, ctypes.POINTER(ctypes.c_int32))[0])
6485+
elif attribute_type == WfmAttrType.STRING.value:
6486+
value_c_bytes = ctypes.cast(value, ctypes.POINTER(ctypes.c_byte))
6487+
assert value_c_bytes[value_size_in_bytes - 1] == 0
6488+
return bytes(value_c_bytes[0 : value_size_in_bytes - 1]).decode(lib_importer.encoding)
6489+
else:
6490+
raise ValueError(f"Unsupported attribute type {attribute_type}")
6491+
6492+
def _get_wfm_attr_callback_ptr(
6493+
self, set_wfm_attr_callback: Optional[SetWfmAttrCallback]
6494+
) -> ctypes._FuncPointer:
6495+
if set_wfm_attr_callback is None:
6496+
return CSetWfmAttrCallbackPtr()
6497+
6498+
def _invoke_callback(
6499+
channel_index: int,
6500+
attribute_name: bytes,
6501+
attribute_type: int,
6502+
value: ctypes.c_void_p,
6503+
value_size_in_bytes: int,
6504+
callback_data: object,
6505+
) -> int:
6506+
try:
6507+
return set_wfm_attr_callback(
6508+
channel_index,
6509+
attribute_name.decode(lib_importer.encoding),
6510+
WfmAttrType(attribute_type),
6511+
self._get_wfm_attr_value(attribute_type, value, value_size_in_bytes),
6512+
callback_data,
6513+
)
6514+
except Exception:
6515+
_logger.exception("Unhandled exception in set_wfm_attr_callback")
6516+
return -1
6517+
6518+
return CSetWfmAttrCallbackPtr(_invoke_callback)
6519+
63416520
def read_id_pin_memory(self, device_name, id_pin_name):
63426521
data_length_read = ctypes.c_uint()
63436522
format_code = ctypes.c_uint()

0 commit comments

Comments
 (0)