diff --git a/tests/component/test_stream_writers_do.py b/tests/component/test_stream_writers_do.py index 37d013c2a..c68740439 100644 --- a/tests/component/test_stream_writers_do.py +++ b/tests/component/test_stream_writers_do.py @@ -160,6 +160,19 @@ def di_port0_loopback_task( return task +@pytest.fixture +def di_port0_loopback_task_32dio( + generate_task: Callable[[], nidaqmx.Task], real_x_series_device_32dio: nidaqmx.system.Device +) -> nidaqmx.Task: + task = generate_task() + task.di_channels.add_di_chan( + real_x_series_device_32dio.di_ports[0].name, + line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, + ) + _start_di_task(task) + return task + + @pytest.fixture def di_port1_loopback_task( generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device @@ -173,6 +186,19 @@ def di_port1_loopback_task( return task +@pytest.fixture +def di_port1_loopback_task_32dio( + generate_task: Callable[[], nidaqmx.Task], real_x_series_device_32dio: nidaqmx.system.Device +) -> nidaqmx.Task: + task = generate_task() + task.di_channels.add_di_chan( + real_x_series_device_32dio.di_ports[1].name, + line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, + ) + _start_di_task(task) + return task + + @pytest.fixture def di_port2_loopback_task( generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device @@ -186,6 +212,19 @@ def di_port2_loopback_task( return task +@pytest.fixture +def di_port2_loopback_task_32dio( + generate_task: Callable[[], nidaqmx.Task], real_x_series_device_32dio: nidaqmx.system.Device +) -> nidaqmx.Task: + task = generate_task() + task.di_channels.add_di_chan( + real_x_series_device_32dio.di_ports[2].name, + line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, + ) + _start_di_task(task) + return task + + @pytest.fixture def di_multi_channel_port_loopback_task( generate_task: Callable[[], nidaqmx.Task], real_x_series_device: nidaqmx.system.Device @@ -447,14 +486,14 @@ def test___digital_multi_channel_writer___write_one_sample_multi_line___updates_ def test___digital_multi_channel_writer___write_one_sample_multi_line_jagged___updates_output( - di_port0_loopback_task: nidaqmx.Task, - di_port1_loopback_task: nidaqmx.Task, - di_port2_loopback_task: nidaqmx.Task, + di_port0_loopback_task_32dio: nidaqmx.Task, + di_port1_loopback_task_32dio: nidaqmx.Task, + di_port2_loopback_task_32dio: nidaqmx.Task, generate_task: Callable[[], nidaqmx.Task], - real_x_series_device: nidaqmx.system.Device, + real_x_series_device_32dio: nidaqmx.system.Device, ) -> None: task = generate_task() - for port in real_x_series_device.do_ports: + for port in real_x_series_device_32dio.do_ports: task.do_channels.add_do_chan( port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, @@ -469,9 +508,9 @@ def test___digital_multi_channel_writer___write_one_sample_multi_line_jagged___u data_to_write = _int_to_bool_array(num_channels * 32, datum).reshape((num_channels, 32)) writer.write_one_sample_multi_line(data_to_write) - assert di_port0_loopback_task.read() == datum & 0xFFFFFFFF - assert di_port1_loopback_task.read() == (datum >> 32) & 0xFF - assert di_port2_loopback_task.read() == (datum >> 64) & 0xFF + assert di_port0_loopback_task_32dio.read() == datum & 0xFFFFFFFF + assert di_port1_loopback_task_32dio.read() == (datum >> 32) & 0xFF + assert di_port2_loopback_task_32dio.read() == (datum >> 64) & 0xFF def test___digital_multi_channel_writer___write_one_sample_multi_line_with_wrong_dtype___raises_error_with_correct_dtype( diff --git a/tests/conftest.py b/tests/conftest.py index d295d9328..8162cd26f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -104,6 +104,7 @@ def _x_series_device( device_type: DeviceType, system: nidaqmx.system.System, sampling_type: SamplingType = SamplingType.ANY, + min_num_lines: int = 8, ) -> nidaqmx.system.Device: for device in system.devices: device_type_match = ( @@ -127,7 +128,7 @@ def _x_series_device( and device.product_category == ProductCategory.X_SERIES_DAQ and len(device.ao_physical_chans) >= 2 and len(device.ai_physical_chans) >= 4 - and len(device.do_lines) >= 8 + and len(device.do_lines) >= min_num_lines and len(device.di_lines) == len(device.do_lines) and len(device.ci_physical_chans) >= 4 ): @@ -180,6 +181,14 @@ def real_x_series_device(system: nidaqmx.system.System) -> nidaqmx.system.Device return _x_series_device(DeviceType.REAL, system) +@pytest.fixture(scope="function") +def real_x_series_device_32dio(system: nidaqmx.system.System) -> nidaqmx.system.Device: + """Gets real 32 DIO X Series device information.""" + return _x_series_device( + DeviceType.REAL, system, sampling_type=SamplingType.ANY, min_num_lines=32 + ) + + @pytest.fixture(scope="function") def real_x_series_multiplexed_device(system: nidaqmx.system.System) -> nidaqmx.system.Device: """Gets device information for a real X Series device with multiplexed sampling."""