Skip to content

Commit b295fcd

Browse files
committed
tests: test for dual-channel waveform synchronization
- The test checks that the phase relationship between the two channels is maintained. - This test covers the issue described in [m2k-fw#20](analogdevicesinc/m2k-fw#20) Signed-off-by: Adrian-Stanea <Adrian.Stanea@analog.com>
1 parent d617ebe commit b295fcd

File tree

3 files changed

+428
-20
lines changed

3 files changed

+428
-20
lines changed

tests/analog_functions.py

Lines changed: 271 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,23 @@
2626
import libm2k
2727
import time
2828
from multiprocessing.pool import ThreadPool
29-
import threading
3029
import os
31-
from pandas import DataFrame
30+
from pathlib import Path
3231
import pandas
3332
import random
34-
import sys
3533
import reset_def_values as reset
36-
from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file, plot_to_file_multiline
34+
from helpers import (
35+
get_result_files,
36+
get_sample_rate_display_format,
37+
get_time_format,
38+
save_data_to_csv,
39+
plot_to_file,
40+
plot_to_file_multiline,
41+
)
3742
from open_context import ctx_timeout, ctx
38-
from create_files import results_file, results_dir, csv
3943

4044
from shapefile import shape_gen, Shape
45+
import logging
4146

4247
# dicts that will be saved to csv files
4348
shape_csv_vals = {}
@@ -1685,4 +1690,264 @@ def configure_trigger(trig: libm2k.M2kHardwareTrigger,
16851690
ylim=(-6, 6),
16861691
)
16871692
aout.stop()
1688-
return result
1693+
return result
1694+
1695+
1696+
def test_dual_channel_sync(
1697+
ain: libm2k.M2kAnalogIn,
1698+
aout: libm2k.M2kAnalogOut,
1699+
trig: libm2k.M2kHardwareTrigger,
1700+
ctx: libm2k.M2k,
1701+
) -> tuple[bool, str]:
1702+
"""Test dual-channel waveform synchronization with hardware oversampling.
1703+
1704+
Validation Strategy:
1705+
1. Detects falling edges in the ramp signal (CH1)
1706+
2. Verifies that triangle wave (CH0) minima align with these falling edges
1707+
within a specified tolerance (200 samples)
1708+
3. Checks monotonicity of the triangle wave before and after each minimum
1709+
using filtered data with relaxed constraints (70% threshold) to account
1710+
for noise and low ADC resolution sampling (1 MHz vs 75 MHz DAC rate)
1711+
4. Ensures both channels have sufficient signal amplitude (> 1.0V range)
1712+
"""
1713+
1714+
ctx.reset()
1715+
ctx.calibrateADC()
1716+
ctx.calibrateDAC()
1717+
ctx.setTimeout(10_000) # [ms]
1718+
1719+
file_name, dir_name, csv_path = get_result_files(gen_reports)
1720+
1721+
reset.analog_in(ain)
1722+
reset.analog_out(aout)
1723+
reset.trigger(trig)
1724+
1725+
# Generate waveform data programmatically
1726+
# CH0: Triangle wave - 750,000 samples, 0-5V amplitude
1727+
n_samples = 750_000
1728+
n_rising = 375_042
1729+
amplitude = 5.0
1730+
1731+
n_falling = n_samples - n_rising
1732+
step = amplitude / (n_rising - 1)
1733+
1734+
rising = np.linspace(0, amplitude, n_rising)
1735+
falling_start = amplitude - step
1736+
falling_end = falling_start - step * (n_falling - 1)
1737+
falling = np.linspace(falling_start, falling_end, n_falling)
1738+
ch0_buffer = np.concatenate([rising, falling])
1739+
1740+
# CH1: Ramp/stair pattern - 20 samples, values 0-4 repeated
1741+
ch1_buffer = np.tile(np.arange(5), 4).astype(float)
1742+
1743+
logger = logging.getLogger(__name__)
1744+
logger.info(f"Generated waveform data: CH0={len(ch0_buffer)} samples, CH1={len(ch1_buffer)} samples")
1745+
1746+
# Hardware configuration
1747+
dac_sr = 75_000_000 # 75 MHz base sample rate
1748+
adc_sr = 1_000_000 # 1 MHz ADC sample rate
1749+
1750+
# Oversampling configuration
1751+
ch0_oversampling = 1 # native rate
1752+
ch1_oversampling = 750_000 # repeat each sample 750000 times
1753+
1754+
# Trigger configuration
1755+
trig_delay = 8_000 # Pre-trigger delay in samples
1756+
trig_level = 1.5 # [V]
1757+
trig_hysteresis = 0.25 # [V]
1758+
1759+
# Capture configuration
1760+
in_samples = 75_000 # Capture 75ms worth of data at 1MHz (fewer periods for easier visual inspection)
1761+
1762+
# Configure analog input
1763+
ain.setSampleRate(adc_sr)
1764+
actual_adc_sr = ain.getSampleRate()
1765+
assert abs(actual_adc_sr - adc_sr) / adc_sr < 0.001, (
1766+
f"ADC sample rate mismatch: expected {adc_sr}, got {actual_adc_sr}"
1767+
)
1768+
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
1769+
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
1770+
assert ain.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_1), "ADC CH1 not enabled"
1771+
assert ain.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_2), "ADC CH2 not enabled"
1772+
ain.setRange(libm2k.ANALOG_IN_CHANNEL_1, libm2k.PLUS_MINUS_25V)
1773+
ain.setRange(libm2k.ANALOG_IN_CHANNEL_2, libm2k.PLUS_MINUS_25V)
1774+
assert ain.getRange(libm2k.ANALOG_IN_CHANNEL_1) == libm2k.PLUS_MINUS_25V, (
1775+
"ADC CH1 range mismatch"
1776+
)
1777+
assert ain.getRange(libm2k.ANALOG_IN_CHANNEL_2) == libm2k.PLUS_MINUS_25V, (
1778+
"ADC CH2 range mismatch"
1779+
)
1780+
1781+
# Configure analog output
1782+
aout.setSampleRate(libm2k.ANALOG_IN_CHANNEL_1, dac_sr)
1783+
aout.setSampleRate(libm2k.ANALOG_IN_CHANNEL_2, dac_sr)
1784+
assert aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_1) == dac_sr, (
1785+
f"DAC CH1 sample rate mismatch: expected {dac_sr}, got {aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_1)}"
1786+
)
1787+
assert aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_2) == dac_sr, (
1788+
f"DAC CH2 sample rate mismatch: expected {dac_sr}, got {aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_2)}"
1789+
)
1790+
aout.setOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_1, ch0_oversampling)
1791+
aout.setOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_2, ch1_oversampling)
1792+
assert aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_1) == ch0_oversampling, (
1793+
f"DAC CH1 oversampling mismatch: expected {ch0_oversampling}, got {aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_1)}"
1794+
)
1795+
assert aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_2) == ch1_oversampling, (
1796+
f"DAC CH2 oversampling mismatch: expected {ch1_oversampling}, got {aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_2)}"
1797+
)
1798+
aout.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
1799+
aout.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
1800+
assert aout.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_1), "DAC CH1 not enabled"
1801+
assert aout.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_2), "DAC CH2 not enabled"
1802+
aout.setCyclic(True)
1803+
1804+
# Configure trigger on CH0
1805+
trig.setAnalogSource(libm2k.ANALOG_IN_CHANNEL_1)
1806+
assert trig.getAnalogSource() == libm2k.ANALOG_IN_CHANNEL_1, (
1807+
"Trigger source mismatch"
1808+
)
1809+
trig.setAnalogSourceChannel(libm2k.ANALOG_IN_CHANNEL_1)
1810+
trig.setAnalogMode(libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG)
1811+
assert trig.getAnalogMode(libm2k.ANALOG_IN_CHANNEL_1) == libm2k.ANALOG, (
1812+
"Trigger mode mismatch"
1813+
)
1814+
trig.setAnalogCondition(libm2k.ANALOG_IN_CHANNEL_1, libm2k.RISING_EDGE_ANALOG)
1815+
assert (
1816+
trig.getAnalogCondition(libm2k.ANALOG_IN_CHANNEL_1) == libm2k.RISING_EDGE_ANALOG
1817+
), "Trigger condition mismatch"
1818+
trig.setAnalogLevel(libm2k.ANALOG_IN_CHANNEL_1, trig_level)
1819+
assert abs(trig.getAnalogLevel(libm2k.ANALOG_IN_CHANNEL_1) - trig_level) < 0.05, (
1820+
f"Trigger level mismatch: expected {trig_level}, got {trig.getAnalogLevel(libm2k.ANALOG_IN_CHANNEL_1)}"
1821+
)
1822+
trig.setAnalogHysteresis(libm2k.ANALOG_IN_CHANNEL_1, trig_hysteresis)
1823+
assert (
1824+
abs(trig.getAnalogHysteresis(libm2k.ANALOG_IN_CHANNEL_1) - trig_hysteresis)
1825+
< 0.05
1826+
), (
1827+
f"Trigger hysteresis mismatch: expected {trig_hysteresis}, got {trig.getAnalogHysteresis(libm2k.ANALOG_IN_CHANNEL_1)}"
1828+
)
1829+
trig.setAnalogDelay(-trig_delay) # Negative delay to see pre-trigger data
1830+
assert trig.getAnalogDelay() == -trig_delay, (
1831+
f"Trigger delay mismatch: expected {-trig_delay}, got {trig.getAnalogDelay()}"
1832+
)
1833+
1834+
# Start acquisition before pushing to ensure we capture the signal
1835+
ain.startAcquisition(in_samples)
1836+
1837+
# Synchronized push - both channels simultaneously
1838+
aout.push([ch0_buffer, ch1_buffer])
1839+
1840+
try:
1841+
input_data = ain.getSamples(in_samples)
1842+
except Exception as e:
1843+
ain.stopAcquisition()
1844+
aout.stop()
1845+
return False, f"Timeout during acquisition: {e}"
1846+
1847+
ain.stopAcquisition()
1848+
aout.stop()
1849+
1850+
ch0_data = np.array(input_data[libm2k.ANALOG_IN_CHANNEL_1]) # Triangle
1851+
ch1_data = np.array(input_data[libm2k.ANALOG_IN_CHANNEL_2]) # Ramp
1852+
1853+
if gen_reports:
1854+
subdir_name = f"{dir_name}/dual_channel_sync"
1855+
os.makedirs(subdir_name, exist_ok=True)
1856+
x_time, x_label = get_time_format(in_samples, adc_sr)
1857+
plot_to_file(
1858+
"Dual Channel Sync Test (HW Oversampling)",
1859+
ch0_data,
1860+
subdir_name,
1861+
"dual_channel_sync_captured.png",
1862+
data1=ch1_data,
1863+
x_data=x_time,
1864+
xlabel=x_label,
1865+
)
1866+
1867+
# Validation: Check synchronization between channels
1868+
1869+
# Detect falling edges in stair step signal using derivative
1870+
ch1_diff = np.diff(ch1_data)
1871+
1872+
# Find indices where there are large negative jumps (falling edges)
1873+
falling_threshold = -0.5 # Significant drop
1874+
falling_edges = np.where(ch1_diff < falling_threshold)[0]
1875+
1876+
if len(falling_edges) == 0:
1877+
return (
1878+
False,
1879+
"No falling edges detected in ramp signal - signal may not be outputting correctly",
1880+
)
1881+
1882+
# Verify signal ranges before running sync validation
1883+
ch0_range = ch0_data.max() - ch0_data.min()
1884+
ch1_range = ch1_data.max() - ch1_data.min()
1885+
if ch0_range < 1.0:
1886+
return False, f"CH0 (triangle) signal too weak: range = {ch0_range:.2f}V"
1887+
if ch1_range < 1.0:
1888+
return False, f"CH1 (ramp) signal too weak: range = {ch1_range:.2f}V"
1889+
1890+
# Validate synchronization: Check that triangle minima align with stair step falling edges
1891+
window_size = 250
1892+
tolerance = 10
1893+
1894+
sync_errors = []
1895+
1896+
for i, edge_idx in enumerate(falling_edges):
1897+
# Define window around the falling edge
1898+
window_start = max(0, edge_idx - window_size)
1899+
window_end = min(len(ch0_data), edge_idx + window_size)
1900+
1901+
# Extract triangle data (CH0) in this window
1902+
triangle_window = ch0_data[window_start:window_end]
1903+
1904+
if len(triangle_window) < 10:
1905+
logger.warning(
1906+
f"Edge {i}: insufficient data in window for analysis, skipping"
1907+
)
1908+
continue
1909+
1910+
# Find where triangle transitions from decreasing to increasing (local minimum)
1911+
min_idx = np.argmin(triangle_window)
1912+
min_position = window_start + min_idx
1913+
1914+
# Check if minimum is close to falling edge
1915+
distance = abs(min_position - edge_idx)
1916+
if distance > tolerance:
1917+
sync_errors.append(
1918+
f"Edge {i}: triangle minimum at {min_position} is {distance} samples "
1919+
f"from falling edge at {edge_idx} (tolerance: {tolerance})"
1920+
)
1921+
1922+
if min_idx > 0:
1923+
filtered_before = np.convolve(
1924+
triangle_window[: min_idx + 1], np.ones(25) / 25, mode="valid"
1925+
)
1926+
if len(filtered_before) > 1:
1927+
decreasing_count = np.sum(np.diff(filtered_before) <= 0)
1928+
total_count = len(np.diff(filtered_before))
1929+
if decreasing_count < total_count * 0.7:
1930+
sync_errors.append(
1931+
f"Edge {i}: triangle not decreasing before minimum at {min_position}. "
1932+
f"Decreasing: {decreasing_count}/{total_count} samples"
1933+
)
1934+
if min_idx < len(triangle_window) - 1:
1935+
filtered_after = np.convolve(
1936+
triangle_window[min_idx:], np.ones(25) / 25, mode="valid"
1937+
)
1938+
if len(filtered_after) > 1:
1939+
increasing_count = np.sum(np.diff(filtered_after) >= 0)
1940+
total_count = len(np.diff(filtered_after))
1941+
if increasing_count < total_count * 0.7:
1942+
sync_errors.append(
1943+
f"Edge {i}: triangle not increasing after minimum at {min_position}. "
1944+
f"Increasing: {increasing_count}/{total_count} samples"
1945+
)
1946+
1947+
if sync_errors:
1948+
return False, f"Sync errors ({len(sync_errors)}): " + "; ".join(sync_errors[:3])
1949+
1950+
return (
1951+
True,
1952+
f"Sync validated: {len(falling_edges)} stair step falling edges detected, all triangle minima within {tolerance} samples tolerance",
1953+
)

0 commit comments

Comments
 (0)