|
| 1 | +"""Data acquisition script that continuously acquires analog input data.""" |
| 2 | + |
| 3 | +import time |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import nidaqmx |
| 7 | +import nidaqmx.system |
| 8 | +from nidaqmx.constants import ( |
| 9 | + AcquisitionType, |
| 10 | + CurrentShuntResistorLocation, |
| 11 | + CurrentUnits, |
| 12 | + Edge, |
| 13 | + ExcitationSource, |
| 14 | + FilterResponse, |
| 15 | + Slope, |
| 16 | + StrainGageBridgeType, |
| 17 | + TerminalConfiguration, |
| 18 | +) |
| 19 | +from nidaqmx.errors import DaqError |
| 20 | + |
| 21 | +import nipanel |
| 22 | + |
| 23 | +panel_script_path = Path(__file__).with_name("nidaqmx_analog_input_filtering_panel.py") |
| 24 | +panel = nipanel.create_streamlit_panel(panel_script_path) |
| 25 | +panel.set_value("is_running", False) |
| 26 | + |
| 27 | +system = nidaqmx.system.System.local() |
| 28 | + |
| 29 | +available_channel_names = [] |
| 30 | +for dev in system.devices: |
| 31 | + for chan in dev.ai_physical_chans: |
| 32 | + available_channel_names.append(chan.name) |
| 33 | +panel.set_value("available_channel_names", available_channel_names) |
| 34 | + |
| 35 | +available_trigger_sources = [""] |
| 36 | +for dev in system.devices: |
| 37 | + if hasattr(dev, "terminals"): |
| 38 | + for term in dev.terminals: |
| 39 | + available_trigger_sources.append(term) |
| 40 | +panel.set_value("available_trigger_sources", available_trigger_sources) |
| 41 | +try: |
| 42 | + panel.set_value("daq_error", "") |
| 43 | + print(f"Panel URL: {panel.panel_url}") |
| 44 | + print(f"Waiting for the 'Run' button to be pressed...") |
| 45 | + print(f"(Press Ctrl + C to quit)") |
| 46 | + while True: |
| 47 | + panel.set_value("run_button", False) |
| 48 | + while not panel.get_value("run_button", False): |
| 49 | + time.sleep(0.1) |
| 50 | + # How to use nidaqmx: https://nidaqmx-python.readthedocs.io/en/stable/ |
| 51 | + with nidaqmx.Task() as task: |
| 52 | + |
| 53 | + chan_type = panel.get_value("chan_type", "1") |
| 54 | + |
| 55 | + if chan_type == "2": |
| 56 | + chan = task.ai_channels.add_ai_current_chan( |
| 57 | + panel.get_value("physical_channel", ""), |
| 58 | + max_val=panel.get_value("max_value_current", 0.01), |
| 59 | + min_val=panel.get_value("min_value_current", -0.01), |
| 60 | + ext_shunt_resistor_val=panel.get_value("shunt_resistor_value", 249.0), |
| 61 | + shunt_resistor_loc=panel.get_value( |
| 62 | + "shunt_location", CurrentShuntResistorLocation.EXTERNAL |
| 63 | + ), |
| 64 | + units=panel.get_value("units", CurrentUnits.AMPS), |
| 65 | + ) |
| 66 | + |
| 67 | + elif chan_type == "3": |
| 68 | + chan = task.ai_channels.add_ai_strain_gage_chan( |
| 69 | + panel.get_value("physical_channel", ""), |
| 70 | + nominal_gage_resistance=panel.get_value("gage_resistance", 350.0), |
| 71 | + voltage_excit_source=ExcitationSource.EXTERNAL, # Only mode that works |
| 72 | + max_val=panel.get_value("max_value_strain", 0.001), |
| 73 | + min_val=panel.get_value("min_value_strain", -0.001), |
| 74 | + poisson_ratio=panel.get_value("poisson_ratio", 0.3), |
| 75 | + lead_wire_resistance=panel.get_value("wire_resistance", 0.0), |
| 76 | + initial_bridge_voltage=panel.get_value("initial_voltage", 0.0), |
| 77 | + gage_factor=panel.get_value("gage_factor", 2.0), |
| 78 | + voltage_excit_val=panel.get_value("voltage_excitation_value", 0.0), |
| 79 | + strain_config=panel.get_value( |
| 80 | + "strain_configuration", StrainGageBridgeType.FULL_BRIDGE_I |
| 81 | + ), |
| 82 | + ) |
| 83 | + else: |
| 84 | + chan = task.ai_channels.add_ai_voltage_chan( |
| 85 | + panel.get_value("physical_channel", ""), |
| 86 | + terminal_config=panel.get_value( |
| 87 | + "terminal_configuration", TerminalConfiguration.DEFAULT |
| 88 | + ), |
| 89 | + max_val=panel.get_value("max_value_voltage", 5.0), |
| 90 | + min_val=panel.get_value("min_value_voltage", -5.0), |
| 91 | + ) |
| 92 | + task.timing.cfg_samp_clk_timing( |
| 93 | + source=panel.get_value("source", ""), # "" - means Onboard Clock (default value) |
| 94 | + rate=panel.get_value("rate", 1000.0), |
| 95 | + sample_mode=AcquisitionType.CONTINUOUS, |
| 96 | + samps_per_chan=panel.get_value("total_samples", 100), |
| 97 | + ) |
| 98 | + panel.set_value("sample_rate", task.timing.samp_clk_rate) |
| 99 | + # Not all hardware supports all filter types. |
| 100 | + # Refer to your device documentation for more information. |
| 101 | + if panel.get_value("filter", "Filter") == "Filter": |
| 102 | + chan.ai_filter_enable = True |
| 103 | + chan.ai_filter_freq = panel.get_value("filter_freq", 0.0) |
| 104 | + chan.ai_filter_response = panel.get_value("filter_response", FilterResponse.COMB) |
| 105 | + chan.ai_filter_order = panel.get_value("filter_order", 1) |
| 106 | + panel.set_value("actual_filter_freq", chan.ai_filter_freq) |
| 107 | + panel.set_value("actual_filter_response", chan.ai_filter_response) |
| 108 | + panel.set_value("actual_filter_order", chan.ai_filter_order) |
| 109 | + else: |
| 110 | + panel.set_value("actual_filter_freq", 0.0) |
| 111 | + panel.set_value("actual_filter_response", FilterResponse.COMB) |
| 112 | + panel.set_value("actual_filter_order", 0) |
| 113 | + # Not all hardware supports all filter types. |
| 114 | + # Refer to your device documentation for more information. |
| 115 | + trigger_type = panel.get_value("trigger_type") |
| 116 | + if trigger_type == "5": |
| 117 | + task.triggers.start_trigger.cfg_anlg_edge_start_trig( |
| 118 | + trigger_source=panel.get_value("analog_source", ""), |
| 119 | + trigger_slope=panel.get_value("slope", Slope.FALLING), |
| 120 | + trigger_level=panel.get_value("level", 0.0), |
| 121 | + ) |
| 122 | + |
| 123 | + if trigger_type == "2": |
| 124 | + task.triggers.start_trigger.cfg_dig_edge_start_trig( |
| 125 | + trigger_source=panel.get_value("digital_source", ""), |
| 126 | + trigger_edge=panel.get_value("edge", Edge.FALLING), |
| 127 | + ) |
| 128 | + task.triggers.start_trigger.anlg_edge_hyst = hysteresis = panel.get_value( |
| 129 | + "hysteresis", 0.0 |
| 130 | + ) |
| 131 | + |
| 132 | + try: |
| 133 | + task.start() |
| 134 | + panel.set_value("is_running", True) |
| 135 | + |
| 136 | + panel.set_value("stop_button", False) |
| 137 | + while not panel.get_value("stop_button", False): |
| 138 | + data = task.read( |
| 139 | + number_of_samples_per_channel=100 # pyright: ignore[reportArgumentType] |
| 140 | + ) |
| 141 | + panel.set_value("acquired_data", data) |
| 142 | + except KeyboardInterrupt: |
| 143 | + pass |
| 144 | + finally: |
| 145 | + task.stop() |
| 146 | + panel.set_value("is_running", False) |
| 147 | + |
| 148 | +except DaqError as e: |
| 149 | + daq_error = str(e) |
| 150 | + print(daq_error) |
| 151 | + panel.set_value("daq_error", daq_error) |
| 152 | + |
| 153 | +except KeyboardInterrupt: |
| 154 | + pass |
0 commit comments