Skip to content

Commit 4c1f0e4

Browse files
Refactor tests for stream writers into smaller files (#826)
* Split out stream writer tests into separate files * remove test_stream_writers.py * move utils out of conftest * poetry run nps fix * poke test_analog_multi_channel_reader.py to try to fix the CI * poking again to see what the CI does * undo poking * fix merge * import nidaqmx.system
1 parent 43fad37 commit 4c1f0e4

21 files changed

+1428
-1434
lines changed

tests/component/_analog_utils.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Shared utilities for analog component tests."""
2+
3+
from __future__ import annotations
4+
5+
import pytest
6+
7+
8+
# Simulated DAQ voltage data is a noisy sinewave within the range of the minimum and maximum values
9+
# of the virtual channel. We can leverage this behavior to validate we get the correct data from
10+
# the Python bindings.
11+
def _get_voltage_offset_for_chan(chan_index: int) -> float:
12+
return float(chan_index + 1)
13+
14+
15+
def _get_voltage_setpoint_for_chan(chan_index: int) -> float:
16+
return float(chan_index + 1)
17+
18+
19+
def _get_current_setpoint_for_chan(chan_index: int) -> float:
20+
return float(chan_index + 1)
21+
22+
23+
def _get_expected_voltage_for_chan(chan_index: int) -> float:
24+
return float(chan_index + 1)
25+
26+
27+
def _volts_to_codes(volts: float, max_code: int = 32767, max_voltage: float = 10.0) -> int:
28+
return int(volts * max_code / max_voltage)
29+
30+
31+
def _pwr_volts_to_codes(volts: float, codes_per_volt: int = 4096) -> int:
32+
return int(volts * codes_per_volt)
33+
34+
35+
def _pwr_current_to_codes(current: float, codes_per_amp: int = 8192) -> int:
36+
return int(current * codes_per_amp)
37+
38+
39+
def _get_voltage_code_setpoint_for_chan(chan_index: int) -> int:
40+
return _pwr_volts_to_codes(_get_voltage_setpoint_for_chan(chan_index))
41+
42+
43+
def _get_current_code_setpoint_for_chan(chan_index: int) -> int:
44+
return _pwr_current_to_codes(_get_current_setpoint_for_chan(chan_index))
45+
46+
47+
# Note: Since we only use positive voltages, this works fine for both signed and unsigned reads.
48+
def _get_voltage_code_offset_for_chan(chan_index: int) -> int:
49+
voltage_limits = _get_voltage_offset_for_chan(chan_index)
50+
return _volts_to_codes(voltage_limits)
51+
52+
53+
def _assert_equal_2d(data: list[list[float]], expected: list[list[float]], abs: float) -> None:
54+
assert len(data) == len(expected)
55+
for i in range(len(data)):
56+
assert data[i] == pytest.approx(expected[i], abs=abs)
57+
58+
59+
# NOTE: We use simulated signals for AI validation, so we can be fairly strict here.
60+
AI_VOLTAGE_EPSILON = 1e-3
61+
62+
# NOTE: We must use real signals for AO validation, but we aren't validating hardware accuracy here.
63+
# This should be wide enough tolerance to allow for uncalibrated boards while still ensuring we are
64+
# correctly configuring hardware.
65+
AO_VOLTAGE_EPSILON = 1e-2
66+
67+
# NOTE: You can't scale from volts to codes correctly without knowing the internal calibration
68+
# constants. The internal reference has a healthy amount of overrange to ensure we can calibrate to
69+
# device specifications. I've used 10.1 volts above to approximate that, but 100mv of accuracy is
70+
# also fine since the expected output of each channel value will be 1 volt apart.
71+
RAW_VOLTAGE_EPSILON = 1e-1
72+
73+
VOLTAGE_CODE_EPSILON = round(_volts_to_codes(AI_VOLTAGE_EPSILON))
74+
POWER_EPSILON = 1e-3
75+
POWER_BINARY_EPSILON = 1

tests/component/_digital_utils.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""Shared utilities for digital component tests."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Callable, TypeVar
6+
7+
import numpy
8+
from nitypes.waveform import DigitalWaveform
9+
10+
import nidaqmx
11+
12+
_D = TypeVar("_D", bound=numpy.generic)
13+
14+
15+
def _start_di_task(task: nidaqmx.Task) -> None:
16+
# Don't reserve the lines, so we can read what DO is writing.
17+
task.di_channels.all.di_tristate = False
18+
task.start()
19+
20+
21+
def _start_do_task(task: nidaqmx.Task, is_port: bool = False, num_chans: int = 1) -> None:
22+
# We'll be doing on-demand, so start the task and drive all lines low
23+
task.start()
24+
if is_port:
25+
if num_chans == 8:
26+
task.write(0)
27+
else:
28+
task.write([0] * num_chans)
29+
else:
30+
if num_chans == 1:
31+
task.write(False)
32+
else:
33+
task.write([False] * num_chans)
34+
35+
36+
def _get_num_di_lines_in_task(task: nidaqmx.Task) -> int:
37+
return sum([chan.di_num_lines for chan in task.channels])
38+
39+
40+
def _get_num_do_lines_in_task(task: nidaqmx.Task) -> int:
41+
return sum([chan.do_num_lines for chan in task.channels])
42+
43+
44+
def _get_digital_data_for_sample(num_lines: int, sample_number: int) -> int:
45+
result = 0
46+
# Simulated digital signals "count" from 0 in binary within each group of 8 lines.
47+
for _ in range((num_lines + 7) // 8):
48+
result = (result << 8) | sample_number
49+
50+
line_mask = (2**num_lines) - 1
51+
return result & line_mask
52+
53+
54+
def _get_expected_data_for_line(num_samples: int, line_number: int) -> list[int]:
55+
data = []
56+
# Simulated digital signals "count" from 0 in binary within each group of 8 lines.
57+
# Each line represents a bit in the binary representation of the sample number.
58+
# - line 0 represents bit 0 (LSB) - alternates every sample: 0,1,0,1,0,1,0,1...
59+
# - line 1 represents bit 1 - alternates every 2 samples: 0,0,1,1,0,0,1,1...
60+
# - line 2 represents bit 2 - alternates every 4 samples: 0,0,0,0,1,1,1,1...
61+
line_number %= 8
62+
for sample_num in range(num_samples):
63+
bit_value = (sample_num >> line_number) & 1
64+
data.append(bit_value)
65+
return data
66+
67+
68+
def _get_digital_data(num_lines: int, num_samples: int) -> list[int]:
69+
return [
70+
_get_digital_data_for_sample(num_lines, sample_number)
71+
for sample_number in range(num_samples)
72+
]
73+
74+
75+
def _get_expected_digital_port_data_port_major(
76+
task: nidaqmx.Task, num_samples: int
77+
) -> list[list[int]]:
78+
return [_get_digital_data(chan.di_num_lines, num_samples) for chan in task.channels]
79+
80+
81+
def _get_expected_digital_port_data_sample_major(
82+
task: nidaqmx.Task, num_samples: int
83+
) -> list[list[int]]:
84+
result = _get_expected_digital_port_data_port_major(task, num_samples)
85+
return numpy.transpose(result).tolist()
86+
87+
88+
def _get_digital_port_data_for_sample(task: nidaqmx.Task, sample_number: int) -> list[int]:
89+
return [
90+
_get_digital_data_for_sample(chan.do_num_lines, sample_number) for chan in task.channels
91+
]
92+
93+
94+
def _get_digital_port_data_port_major(task: nidaqmx.Task, num_samples: int) -> list[list[int]]:
95+
return [_get_digital_data(chan.do_num_lines, num_samples) for chan in task.channels]
96+
97+
98+
def _get_digital_port_data_sample_major(task: nidaqmx.Task, num_samples: int) -> list[list[int]]:
99+
result = _get_digital_port_data_port_major(task, num_samples)
100+
return numpy.transpose(result).tolist()
101+
102+
103+
def _bool_array_to_int(bool_array: numpy.typing.NDArray[numpy.bool_]) -> int:
104+
result = 0
105+
# Simulated data is little-endian
106+
for bit in bool_array[::-1]:
107+
result = (result << 1) | int(bit)
108+
return result
109+
110+
111+
def _int_to_bool_array(num_lines: int, input: int) -> numpy.typing.NDArray[numpy.bool_]:
112+
result = numpy.full(num_lines, True, dtype=numpy.bool_)
113+
for bit in range(num_lines):
114+
result[bit] = (input & (1 << bit)) != 0
115+
return result
116+
117+
118+
def _get_waveform_data(waveform: DigitalWaveform) -> list[int]:
119+
assert isinstance(waveform, DigitalWaveform)
120+
return [_bool_array_to_int(sample) for sample in waveform.data]
121+
122+
123+
def _read_and_copy(
124+
read_func: Callable[[numpy.typing.NDArray[_D]], None], array: numpy.typing.NDArray[_D]
125+
) -> numpy.typing.NDArray[_D]:
126+
read_func(array)
127+
return array.copy()

tests/component/_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
"""Shared utilities for component tests."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import timezone
6+
7+
from hightime import datetime as ht_datetime
8+
9+
10+
def _is_timestamp_close_to_now(timestamp: ht_datetime, tolerance_seconds: float = 1.0) -> bool:
11+
current_time = ht_datetime.now(timezone.utc)
12+
time_diff = abs((timestamp - current_time).total_seconds())
13+
return time_diff <= tolerance_seconds

0 commit comments

Comments
 (0)