|
1 | 1 | """Data acquisition script that continuously acquires analog input data.""" |
2 | 2 |
|
| 3 | +import os |
3 | 4 | import time |
4 | 5 | from pathlib import Path |
5 | 6 |
|
6 | | -import nidaqmx |
7 | | -import nidaqmx.system |
8 | | -from nidaqmx.constants import ( |
| 7 | +os.environ["NIDAQMX_ENABLE_WAVEFORM_SUPPORT"] = "1" |
| 8 | +import nidaqmx # noqa: E402 # Must import after setting os environment variable |
| 9 | +import nidaqmx.system # noqa: E402 |
| 10 | +from nidaqmx.constants import ( # noqa: E402 |
9 | 11 | AcquisitionType, |
10 | 12 | CurrentShuntResistorLocation, |
11 | 13 | CurrentUnits, |
|
15 | 17 | Slope, |
16 | 18 | StrainGageBridgeType, |
17 | 19 | TerminalConfiguration, |
| 20 | + UsageTypeAI, |
18 | 21 | ) |
19 | | -from nidaqmx.errors import DaqError |
| 22 | +from nidaqmx.errors import DaqError # noqa: E402 |
20 | 23 |
|
21 | | -import nipanel |
| 24 | +import nipanel # noqa: E402 |
22 | 25 |
|
23 | 26 | panel_script_path = Path(__file__).with_name("nidaqmx_analog_input_filtering_panel.py") |
24 | 27 | panel = nipanel.create_streamlit_panel(panel_script_path) |
|
29 | 32 | available_channel_names = [] |
30 | 33 | for dev in system.devices: |
31 | 34 | for chan in dev.ai_physical_chans: |
32 | | - available_channel_names.append(chan.name) |
| 35 | + if UsageTypeAI.VOLTAGE in chan.ai_meas_types: |
| 36 | + available_channel_names.append(chan.name) |
33 | 37 | panel.set_value("available_channel_names", available_channel_names) |
34 | 38 |
|
35 | 39 | available_trigger_sources = [""] |
|
38 | 42 | for term in dev.terminals: |
39 | 43 | available_trigger_sources.append(term) |
40 | 44 | panel.set_value("available_trigger_sources", available_trigger_sources) |
| 45 | + |
41 | 46 | try: |
42 | | - panel.set_value("daq_error", "") |
43 | 47 | print(f"Panel URL: {panel.panel_url}") |
44 | 48 | print(f"Waiting for the 'Run' button to be pressed...") |
45 | 49 | print(f"(Press Ctrl + C to quit)") |
46 | 50 | while True: |
47 | | - panel.set_value("run_button", False) |
48 | | - while not panel.get_value("run_button", False): |
| 51 | + while not panel.get_value("is_running", False): |
49 | 52 | time.sleep(0.1) |
50 | | - # How to use nidaqmx: https://nidaqmx-python.readthedocs.io/en/stable/ |
51 | | - with nidaqmx.Task() as task: |
52 | | - |
53 | | - chan_type = panel.get_value("chan_type", "1") |
54 | | - |
55 | | - if chan_type == "2": |
56 | | - chan = task.ai_channels.add_ai_current_chan( |
57 | | - panel.get_value("physical_channel", ""), |
58 | | - max_val=panel.get_value("max_value_current", 0.01), |
59 | | - min_val=panel.get_value("min_value_current", -0.01), |
60 | | - ext_shunt_resistor_val=panel.get_value("shunt_resistor_value", 249.0), |
61 | | - shunt_resistor_loc=panel.get_value( |
62 | | - "shunt_location", CurrentShuntResistorLocation.EXTERNAL |
63 | | - ), |
64 | | - units=panel.get_value("units", CurrentUnits.AMPS), |
65 | | - ) |
66 | 53 |
|
67 | | - elif chan_type == "3": |
68 | | - chan = task.ai_channels.add_ai_strain_gage_chan( |
69 | | - panel.get_value("physical_channel", ""), |
70 | | - nominal_gage_resistance=panel.get_value("gage_resistance", 350.0), |
71 | | - voltage_excit_source=ExcitationSource.EXTERNAL, # Only mode that works |
72 | | - max_val=panel.get_value("max_value_strain", 0.001), |
73 | | - min_val=panel.get_value("min_value_strain", -0.001), |
74 | | - poisson_ratio=panel.get_value("poisson_ratio", 0.3), |
75 | | - lead_wire_resistance=panel.get_value("wire_resistance", 0.0), |
76 | | - initial_bridge_voltage=panel.get_value("initial_voltage", 0.0), |
77 | | - gage_factor=panel.get_value("gage_factor", 2.0), |
78 | | - voltage_excit_val=panel.get_value("voltage_excitation_value", 0.0), |
79 | | - strain_config=panel.get_value( |
80 | | - "strain_configuration", StrainGageBridgeType.FULL_BRIDGE_I |
81 | | - ), |
82 | | - ) |
83 | | - else: |
84 | | - chan = task.ai_channels.add_ai_voltage_chan( |
85 | | - panel.get_value("physical_channel", ""), |
86 | | - terminal_config=panel.get_value( |
87 | | - "terminal_configuration", TerminalConfiguration.DEFAULT |
88 | | - ), |
89 | | - max_val=panel.get_value("max_value_voltage", 5.0), |
90 | | - min_val=panel.get_value("min_value_voltage", -5.0), |
91 | | - ) |
92 | | - task.timing.cfg_samp_clk_timing( |
93 | | - source=panel.get_value("source", ""), # "" - means Onboard Clock (default value) |
94 | | - rate=panel.get_value("rate", 1000.0), |
95 | | - sample_mode=AcquisitionType.CONTINUOUS, |
96 | | - samps_per_chan=panel.get_value("total_samples", 100), |
97 | | - ) |
98 | | - panel.set_value("sample_rate", task.timing.samp_clk_rate) |
99 | | - # Not all hardware supports all filter types. |
100 | | - # Refer to your device documentation for more information. |
101 | | - if panel.get_value("filter", "Filter") == "Filter": |
102 | | - chan.ai_filter_enable = True |
103 | | - chan.ai_filter_freq = panel.get_value("filter_freq", 0.0) |
104 | | - chan.ai_filter_response = panel.get_value("filter_response", FilterResponse.COMB) |
105 | | - chan.ai_filter_order = panel.get_value("filter_order", 1) |
106 | | - panel.set_value("actual_filter_freq", chan.ai_filter_freq) |
107 | | - panel.set_value("actual_filter_response", chan.ai_filter_response) |
108 | | - panel.set_value("actual_filter_order", chan.ai_filter_order) |
109 | | - else: |
110 | | - panel.set_value("actual_filter_freq", 0.0) |
111 | | - panel.set_value("actual_filter_response", FilterResponse.COMB) |
112 | | - panel.set_value("actual_filter_order", 0) |
113 | | - # Not all hardware supports all filter types. |
114 | | - # Refer to your device documentation for more information. |
115 | | - trigger_type = panel.get_value("trigger_type") |
116 | | - if trigger_type == "5": |
117 | | - task.triggers.start_trigger.cfg_anlg_edge_start_trig( |
118 | | - trigger_source=panel.get_value("analog_source", ""), |
119 | | - trigger_slope=panel.get_value("slope", Slope.FALLING), |
120 | | - trigger_level=panel.get_value("level", 0.0), |
121 | | - ) |
| 54 | + print(f"Running...") |
| 55 | + try: |
| 56 | + # How to use nidaqmx: https://nidaqmx-python.readthedocs.io/en/stable/ |
| 57 | + panel.set_value("daq_error", "") |
| 58 | + with nidaqmx.Task() as task: |
122 | 59 |
|
123 | | - if trigger_type == "2": |
124 | | - task.triggers.start_trigger.cfg_dig_edge_start_trig( |
125 | | - trigger_source=panel.get_value("digital_source", ""), |
126 | | - trigger_edge=panel.get_value("edge", Edge.FALLING), |
127 | | - ) |
128 | | - task.triggers.start_trigger.anlg_edge_hyst = hysteresis = panel.get_value( |
129 | | - "hysteresis", 0.0 |
130 | | - ) |
| 60 | + chan_type = panel.get_value("chan_type", "1") |
131 | 61 |
|
132 | | - try: |
133 | | - task.start() |
134 | | - panel.set_value("is_running", True) |
| 62 | + if chan_type == "2": |
| 63 | + chan = task.ai_channels.add_ai_current_chan( |
| 64 | + panel.get_value("physical_channel", ""), |
| 65 | + max_val=panel.get_value("max_value_current", 0.01), |
| 66 | + min_val=panel.get_value("min_value_current", -0.01), |
| 67 | + ext_shunt_resistor_val=panel.get_value("shunt_resistor_value", 249.0), |
| 68 | + shunt_resistor_loc=panel.get_value( |
| 69 | + "shunt_location", CurrentShuntResistorLocation.EXTERNAL |
| 70 | + ), |
| 71 | + units=panel.get_value("units", CurrentUnits.AMPS), |
| 72 | + ) |
135 | 73 |
|
136 | | - panel.set_value("stop_button", False) |
137 | | - while not panel.get_value("stop_button", False): |
138 | | - data = task.read( |
139 | | - number_of_samples_per_channel=100 # pyright: ignore[reportArgumentType] |
| 74 | + elif chan_type == "3": |
| 75 | + chan = task.ai_channels.add_ai_strain_gage_chan( |
| 76 | + panel.get_value("physical_channel", ""), |
| 77 | + nominal_gage_resistance=panel.get_value("gage_resistance", 350.0), |
| 78 | + voltage_excit_source=ExcitationSource.EXTERNAL, # Only mode that works |
| 79 | + max_val=panel.get_value("max_value_strain", 0.001), |
| 80 | + min_val=panel.get_value("min_value_strain", -0.001), |
| 81 | + poisson_ratio=panel.get_value("poisson_ratio", 0.3), |
| 82 | + lead_wire_resistance=panel.get_value("wire_resistance", 0.0), |
| 83 | + initial_bridge_voltage=panel.get_value("initial_voltage", 0.0), |
| 84 | + gage_factor=panel.get_value("gage_factor", 2.0), |
| 85 | + voltage_excit_val=panel.get_value("voltage_excitation_value", 0.0), |
| 86 | + strain_config=panel.get_value( |
| 87 | + "strain_configuration", StrainGageBridgeType.FULL_BRIDGE_I |
| 88 | + ), |
| 89 | + ) |
| 90 | + else: |
| 91 | + chan = task.ai_channels.add_ai_voltage_chan( |
| 92 | + panel.get_value("physical_channel", ""), |
| 93 | + terminal_config=panel.get_value( |
| 94 | + "terminal_configuration", TerminalConfiguration.DEFAULT |
| 95 | + ), |
| 96 | + max_val=panel.get_value("max_value_voltage", 5.0), |
| 97 | + min_val=panel.get_value("min_value_voltage", -5.0), |
140 | 98 | ) |
141 | | - panel.set_value("acquired_data", data) |
142 | | - except KeyboardInterrupt: |
143 | | - pass |
144 | | - finally: |
145 | | - task.stop() |
146 | | - panel.set_value("is_running", False) |
147 | | - |
148 | | -except DaqError as e: |
149 | | - daq_error = str(e) |
150 | | - print(daq_error) |
151 | | - panel.set_value("daq_error", daq_error) |
| 99 | + task.timing.cfg_samp_clk_timing( |
| 100 | + source=panel.get_value( |
| 101 | + "source", "" |
| 102 | + ), # "" - means Onboard Clock (default value) |
| 103 | + rate=panel.get_value("rate", 1000.0), |
| 104 | + sample_mode=AcquisitionType.CONTINUOUS, |
| 105 | + samps_per_chan=panel.get_value("total_samples", 100), |
| 106 | + ) |
| 107 | + panel.set_value("sample_rate", task.timing.samp_clk_rate) |
| 108 | + # Not all hardware supports all filter types. |
| 109 | + # Refer to your device documentation for more information. |
| 110 | + if panel.get_value("filter", "Filter") == "Filter": |
| 111 | + chan.ai_filter_enable = True |
| 112 | + chan.ai_filter_freq = panel.get_value("filter_freq", 0.0) |
| 113 | + chan.ai_filter_response = panel.get_value( |
| 114 | + "filter_response", FilterResponse.COMB |
| 115 | + ) |
| 116 | + chan.ai_filter_order = panel.get_value("filter_order", 1) |
| 117 | + panel.set_value("actual_filter_freq", chan.ai_filter_freq) |
| 118 | + panel.set_value("actual_filter_response", chan.ai_filter_response) |
| 119 | + panel.set_value("actual_filter_order", chan.ai_filter_order) |
| 120 | + else: |
| 121 | + panel.set_value("actual_filter_freq", 0.0) |
| 122 | + panel.set_value("actual_filter_response", FilterResponse.COMB) |
| 123 | + panel.set_value("actual_filter_order", 0) |
| 124 | + # Not all hardware supports all filter types. |
| 125 | + # Refer to your device documentation for more information. |
| 126 | + trigger_type = panel.get_value("trigger_type") |
| 127 | + if trigger_type == "5": |
| 128 | + task.triggers.start_trigger.cfg_anlg_edge_start_trig( |
| 129 | + trigger_source=panel.get_value("analog_source", ""), |
| 130 | + trigger_slope=panel.get_value("slope", Slope.FALLING), |
| 131 | + trigger_level=panel.get_value("level", 0.0), |
| 132 | + ) |
| 133 | + |
| 134 | + if trigger_type == "2": |
| 135 | + task.triggers.start_trigger.cfg_dig_edge_start_trig( |
| 136 | + trigger_source=panel.get_value("digital_source", ""), |
| 137 | + trigger_edge=panel.get_value("edge", Edge.FALLING), |
| 138 | + ) |
| 139 | + task.triggers.start_trigger.anlg_edge_hyst = hysteresis = panel.get_value( |
| 140 | + "hysteresis", 0.0 |
| 141 | + ) |
| 142 | + |
| 143 | + try: |
| 144 | + task.start() |
| 145 | + while panel.get_value("is_running", False): |
| 146 | + waveform = task.read_waveform(number_of_samples_per_channel=100) |
| 147 | + panel.set_value("waveform", waveform) |
| 148 | + except KeyboardInterrupt: |
| 149 | + pass |
| 150 | + finally: |
| 151 | + task.stop() |
| 152 | + panel.set_value("is_running", False) |
| 153 | + print(f"Stopped") |
| 154 | + |
| 155 | + except DaqError as e: |
| 156 | + daq_error = str(e) |
| 157 | + panel.set_value("daq_error", daq_error) |
| 158 | + panel.set_value("is_running", False) |
| 159 | + print(f"ERRORED") |
152 | 160 |
|
153 | 161 | except KeyboardInterrupt: |
154 | 162 | pass |
0 commit comments