|
| 1 | +"""Fixtures for benchmark tests.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import pytest |
| 6 | + |
| 7 | +from nidaqmx import Task |
| 8 | +from nidaqmx.constants import ( |
| 9 | + AcquisitionType, |
| 10 | + Edge, |
| 11 | + LineGrouping, |
| 12 | + ReadRelativeTo, |
| 13 | + TaskMode, |
| 14 | + WaveformAttributeMode, |
| 15 | +) |
| 16 | +from nidaqmx.system import Device, System |
| 17 | +from tests.conftest import DeviceType, _device_by_product_type |
| 18 | + |
| 19 | + |
| 20 | +_WAVEFORM_BENCHMARK_MODES = [ |
| 21 | + WaveformAttributeMode.NONE, |
| 22 | + WaveformAttributeMode.TIMING, |
| 23 | + WaveformAttributeMode.TIMING | WaveformAttributeMode.EXTENDED_PROPERTIES, |
| 24 | +] |
| 25 | + |
| 26 | +_WAVEFORM_BENCHMARK_MODE_IDS = ["NONE", "TIMING", "ALL"] |
| 27 | + |
| 28 | + |
| 29 | +def _configure_timing(task: Task, num_channels: int, num_samples: int) -> None: |
| 30 | + task.timing.cfg_samp_clk_timing( |
| 31 | + rate=25000.0, |
| 32 | + active_edge=Edge.RISING, |
| 33 | + sample_mode=AcquisitionType.FINITE, |
| 34 | + samps_per_chan=num_channels * num_samples * 2, |
| 35 | + ) |
| 36 | + |
| 37 | + |
| 38 | +def _start_input_task(task: Task) -> None: |
| 39 | + task.start() |
| 40 | + task.wait_until_done(timeout=10.0) |
| 41 | + task.in_stream.relative_to = ReadRelativeTo.FIRST_SAMPLE |
| 42 | + |
| 43 | + |
| 44 | +def _commit_output_task(task: Task, num_channels: int, num_samples: int) -> None: |
| 45 | + task.out_stream.output_buf_size = num_channels * num_samples * 2 |
| 46 | + task.control(TaskMode.TASK_COMMIT) |
| 47 | + task.out_stream.relative_to = ReadRelativeTo.FIRST_SAMPLE |
| 48 | + |
| 49 | + |
| 50 | +def pytest_addoption(parser: pytest.Parser) -> None: |
| 51 | + """Add command line options to pytest.""" |
| 52 | + parser.addoption("--device", action="store", default=None, help="Device name for benchmarks") |
| 53 | + |
| 54 | + |
| 55 | +@pytest.fixture |
| 56 | +def benchmark_device(system: System, request: pytest.FixtureRequest) -> Device: |
| 57 | + """Get device for benchmarking.""" |
| 58 | + device: str | None = request.config.getoption("--device") |
| 59 | + if device is not None: |
| 60 | + return system.devices[device] |
| 61 | + |
| 62 | + return _device_by_product_type("PCIe-6363", DeviceType.ANY, system) |
| 63 | + |
| 64 | + |
| 65 | +@pytest.fixture |
| 66 | +def ai_benchmark_task( |
| 67 | + task: Task, |
| 68 | + benchmark_device: Device, |
| 69 | + request: pytest.FixtureRequest, |
| 70 | +) -> Task: |
| 71 | + """Configure an AI task for benchmarking.""" |
| 72 | + num_channels = request.node.callspec.params.get("num_channels", 1) |
| 73 | + num_samples = request.node.callspec.params.get("num_samples", 1) |
| 74 | + |
| 75 | + for chan in range(num_channels): |
| 76 | + task.ai_channels.add_ai_voltage_chan( |
| 77 | + benchmark_device.ai_physical_chans[chan].name, |
| 78 | + min_val=-5.0, |
| 79 | + max_val=5.0, |
| 80 | + ) |
| 81 | + |
| 82 | + _configure_timing(task, num_channels, num_samples) |
| 83 | + _start_input_task(task) |
| 84 | + |
| 85 | + return task |
| 86 | + |
| 87 | + |
| 88 | +@pytest.fixture |
| 89 | +def ao_benchmark_task( |
| 90 | + task: Task, |
| 91 | + real_x_series_multiplexed_device: Device, |
| 92 | + request: pytest.FixtureRequest, |
| 93 | +) -> Task: |
| 94 | + """Configure a hardware-timed buffered AO task for benchmarking.""" |
| 95 | + num_channels = request.node.callspec.params.get("num_channels", 1) |
| 96 | + num_samples = request.node.callspec.params.get("num_samples", 1) |
| 97 | + |
| 98 | + for chan in range(num_channels): |
| 99 | + task.ao_channels.add_ao_voltage_chan( |
| 100 | + real_x_series_multiplexed_device.ao_physical_chans[chan].name, |
| 101 | + min_val=-10.0, |
| 102 | + max_val=10.0, |
| 103 | + ) |
| 104 | + |
| 105 | + _configure_timing(task, num_channels, num_samples) |
| 106 | + _commit_output_task(task, num_channels, num_samples) |
| 107 | + |
| 108 | + return task |
| 109 | + |
| 110 | + |
| 111 | +@pytest.fixture |
| 112 | +def di_lines_benchmark_task( |
| 113 | + task: Task, |
| 114 | + benchmark_device: Device, |
| 115 | + request: pytest.FixtureRequest, |
| 116 | +) -> Task: |
| 117 | + """Configure a hardware-timed buffered DI task for benchmarking.""" |
| 118 | + num_channels = request.node.callspec.params.get("num_channels", 1) |
| 119 | + num_samples = request.node.callspec.params.get("num_samples", 1) |
| 120 | + num_lines = request.node.callspec.params.get("num_lines", 1) |
| 121 | + |
| 122 | + for chan in range(num_channels): |
| 123 | + line_names = [ |
| 124 | + chan.name |
| 125 | + for chan in benchmark_device.di_lines[chan * num_lines : (chan + 1) * num_lines] |
| 126 | + ] |
| 127 | + physical_channel_string = ",".join(line_names) |
| 128 | + task.di_channels.add_di_chan( |
| 129 | + physical_channel_string, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES |
| 130 | + ) |
| 131 | + |
| 132 | + _configure_timing(task, num_channels, num_samples) |
| 133 | + _start_input_task(task) |
| 134 | + |
| 135 | + return task |
| 136 | + |
| 137 | + |
| 138 | +@pytest.fixture |
| 139 | +def di_port32_benchmark_task( |
| 140 | + task: Task, |
| 141 | + benchmark_device: Device, |
| 142 | + request: pytest.FixtureRequest, |
| 143 | +) -> Task: |
| 144 | + """Configure a hardware-timed buffered DI task for benchmarking.""" |
| 145 | + num_samples = request.node.callspec.params.get("num_samples", 1) |
| 146 | + |
| 147 | + # port 0 is the only port that supports buffered operations |
| 148 | + task.di_channels.add_di_chan( |
| 149 | + benchmark_device.di_ports[0].name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES |
| 150 | + ) |
| 151 | + |
| 152 | + _configure_timing(task, 1, num_samples) |
| 153 | + _start_input_task(task) |
| 154 | + |
| 155 | + return task |
| 156 | + |
| 157 | + |
| 158 | +@pytest.fixture |
| 159 | +def do_lines_benchmark_task( |
| 160 | + task: Task, |
| 161 | + benchmark_device: Device, |
| 162 | + request: pytest.FixtureRequest, |
| 163 | +) -> Task: |
| 164 | + """Configure a hardware-timed buffered DO task for benchmarking.""" |
| 165 | + num_channels = request.node.callspec.params.get("num_channels", 1) |
| 166 | + num_samples = request.node.callspec.params.get("num_samples", 1) |
| 167 | + num_lines = request.node.callspec.params.get("num_lines", 1) |
| 168 | + |
| 169 | + for chan in range(num_channels): |
| 170 | + line_names = [ |
| 171 | + chan.name |
| 172 | + for chan in benchmark_device.do_lines[chan * num_lines : (chan + 1) * num_lines] |
| 173 | + ] |
| 174 | + physical_channel_string = ",".join(line_names) |
| 175 | + task.do_channels.add_do_chan( |
| 176 | + physical_channel_string, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES |
| 177 | + ) |
| 178 | + |
| 179 | + _configure_timing(task, num_channels, num_samples) |
| 180 | + _commit_output_task(task, num_channels, num_samples) |
| 181 | + |
| 182 | + return task |
| 183 | + |
| 184 | + |
| 185 | +@pytest.fixture |
| 186 | +def do_port32_benchmark_task( |
| 187 | + task: Task, |
| 188 | + benchmark_device: Device, |
| 189 | + request: pytest.FixtureRequest, |
| 190 | +) -> Task: |
| 191 | + """Configure a hardware-timed buffered DO task for benchmarking.""" |
| 192 | + num_samples = request.node.callspec.params.get("num_samples", 1) |
| 193 | + |
| 194 | + # port 0 is the only port that supports buffered operations |
| 195 | + task.do_channels.add_do_chan( |
| 196 | + benchmark_device.do_ports[0].name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES |
| 197 | + ) |
| 198 | + |
| 199 | + _configure_timing(task, 1, num_samples) |
| 200 | + _commit_output_task(task, 1, num_samples) |
| 201 | + |
| 202 | + return task |
0 commit comments