Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions tests/component/test_stream_writers_do.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ def di_port0_loopback_task(
return task


@pytest.fixture
def di_port0_loopback_task_32dio(
generate_task: Callable[[], nidaqmx.Task], real_x_series_device_32dio: nidaqmx.system.Device
) -> nidaqmx.Task:
task = generate_task()
task.di_channels.add_di_chan(
real_x_series_device_32dio.di_ports[0].name,
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES,
)
_start_di_task(task)
return task


@pytest.fixture
def di_port1_loopback_task(
generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device
Expand All @@ -173,6 +186,19 @@ def di_port1_loopback_task(
return task


@pytest.fixture
def di_port1_loopback_task_32dio(
generate_task: Callable[[], nidaqmx.Task], real_x_series_device_32dio: nidaqmx.system.Device
) -> nidaqmx.Task:
task = generate_task()
task.di_channels.add_di_chan(
real_x_series_device_32dio.di_ports[1].name,
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES,
)
_start_di_task(task)
return task


@pytest.fixture
def di_port2_loopback_task(
generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device
Expand All @@ -186,6 +212,19 @@ def di_port2_loopback_task(
return task


@pytest.fixture
def di_port2_loopback_task_32dio(
generate_task: Callable[[], nidaqmx.Task], real_x_series_device_32dio: nidaqmx.system.Device
) -> nidaqmx.Task:
task = generate_task()
task.di_channels.add_di_chan(
real_x_series_device_32dio.di_ports[2].name,
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES,
)
_start_di_task(task)
return task


@pytest.fixture
def di_multi_channel_port_loopback_task(
generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device
Expand Down Expand Up @@ -447,14 +486,14 @@ def test___digital_multi_channel_writer___write_one_sample_multi_line___updates_


def test___digital_multi_channel_writer___write_one_sample_multi_line_jagged___updates_output(
di_port0_loopback_task: nidaqmx.Task,
di_port1_loopback_task: nidaqmx.Task,
di_port2_loopback_task: nidaqmx.Task,
di_port0_loopback_task_32dio: nidaqmx.Task,
di_port1_loopback_task_32dio: nidaqmx.Task,
di_port2_loopback_task_32dio: nidaqmx.Task,
generate_task: Callable[[], nidaqmx.Task],
real_x_series_device: nidaqmx.system.Device,
real_x_series_device_32dio: nidaqmx.system.Device,
) -> None:
task = generate_task()
for port in real_x_series_device.do_ports:
for port in real_x_series_device_32dio.do_ports:
task.do_channels.add_do_chan(
port.name,
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES,
Expand All @@ -469,9 +508,9 @@ def test___digital_multi_channel_writer___write_one_sample_multi_line_jagged___u
data_to_write = _int_to_bool_array(num_channels * 32, datum).reshape((num_channels, 32))
writer.write_one_sample_multi_line(data_to_write)

assert di_port0_loopback_task.read() == datum & 0xFFFFFFFF
assert di_port1_loopback_task.read() == (datum >> 32) & 0xFF
assert di_port2_loopback_task.read() == (datum >> 64) & 0xFF
assert di_port0_loopback_task_32dio.read() == datum & 0xFFFFFFFF
assert di_port1_loopback_task_32dio.read() == (datum >> 32) & 0xFF
assert di_port2_loopback_task_32dio.read() == (datum >> 64) & 0xFF


def test___digital_multi_channel_writer___write_one_sample_multi_line_with_wrong_dtype___raises_error_with_correct_dtype(
Expand Down
11 changes: 10 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _x_series_device(
device_type: DeviceType,
system: nidaqmx.system.System,
sampling_type: SamplingType = SamplingType.ANY,
min_num_lines: int = 8,
) -> nidaqmx.system.Device:
for device in system.devices:
device_type_match = (
Expand All @@ -127,7 +128,7 @@ def _x_series_device(
and device.product_category == ProductCategory.X_SERIES_DAQ
and len(device.ao_physical_chans) >= 2
and len(device.ai_physical_chans) >= 4
and len(device.do_lines) >= 8
and len(device.do_lines) >= min_num_lines
and len(device.di_lines) == len(device.do_lines)
and len(device.ci_physical_chans) >= 4
):
Expand Down Expand Up @@ -180,6 +181,14 @@ def real_x_series_device(system: nidaqmx.system.System) -> nidaqmx.system.Device
return _x_series_device(DeviceType.REAL, system)


@pytest.fixture(scope="function")
def real_x_series_device_32dio(system: nidaqmx.system.System) -> nidaqmx.system.Device:
"""Gets real 32 DIO X Series device information."""
return _x_series_device(
DeviceType.REAL, system, sampling_type=SamplingType.ANY, min_num_lines=32
)


@pytest.fixture(scope="function")
def real_x_series_multiplexed_device(system: nidaqmx.system.System) -> nidaqmx.system.Device:
"""Gets device information for a real X Series device with multiplexed sampling."""
Expand Down