Skip to content

Commit 76b0957

Browse files
authored
E2E audio test (#724)
1 parent 6c9e3d0 commit 76b0957

File tree

4 files changed

+334
-1
lines changed

4 files changed

+334
-1
lines changed

livekit/tests/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,11 @@ E2E test feature:
66

77
```sh
88
livekit-server --dev
9-
cargo test --features __lk-e2e-test
9+
cargo test --features default,__lk-e2e-test -- --nocapture
10+
```
11+
12+
Tip: If you are using Rust Analyzer in Visual Studio Code, you can enable this feature to get code completion for these tests. Add the following setting to *.vscode/settings.json*:
13+
14+
```json
15+
"rust-analyzer.cargo.features": ["default", "__lk-e2e-test"]
1016
```

livekit/tests/audio_test.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#[cfg(feature = "__lk-e2e-test")]
2+
use {
3+
anyhow::{anyhow, Ok, Result},
4+
common::{
5+
audio::{ChannelIterExt, FreqAnalyzer, SineParameters, SineTrack},
6+
test_rooms,
7+
},
8+
futures_util::StreamExt,
9+
libwebrtc::audio_stream::native::NativeAudioStream,
10+
livekit::prelude::*,
11+
std::{sync::Arc, time::Duration},
12+
tokio::time::timeout,
13+
};
14+
15+
mod common;
16+
17+
struct TestParams {
18+
pub_rate_hz: u32,
19+
pub_channels: u32,
20+
sub_rate_hz: u32,
21+
sub_channels: u32,
22+
}
23+
24+
#[cfg(feature = "__lk-e2e-test")]
25+
#[test_log::test(tokio::test)]
26+
async fn test_audio() -> Result<()> {
27+
let test_params = [
28+
TestParams { pub_rate_hz: 48_000, pub_channels: 1, sub_rate_hz: 48_000, sub_channels: 1 },
29+
TestParams { pub_rate_hz: 48_000, pub_channels: 2, sub_rate_hz: 48_000, sub_channels: 2 },
30+
TestParams { pub_rate_hz: 48_000, pub_channels: 2, sub_rate_hz: 24_000, sub_channels: 2 },
31+
TestParams { pub_rate_hz: 24_000, pub_channels: 2, sub_rate_hz: 24_000, sub_channels: 1 },
32+
];
33+
for params in test_params {
34+
log::info!("Testing with {}", params);
35+
test_audio_with(params).await?;
36+
}
37+
Ok(())
38+
}
39+
40+
/// Tests audio transfer between two participants.
41+
///
42+
/// Verifies that audio can be published and received correctly
43+
/// between two participants by detecting the frequency of the sine wave on the subscriber end.
44+
///
45+
#[cfg(feature = "__lk-e2e-test")]
46+
async fn test_audio_with(params: TestParams) -> Result<()> {
47+
let mut rooms = test_rooms(2).await?;
48+
let (pub_room, _) = rooms.pop().unwrap();
49+
let (_, mut sub_room_events) = rooms.pop().unwrap();
50+
51+
const SINE_FREQ: f64 = 60.0;
52+
const SINE_AMPLITUDE: f64 = 1.0;
53+
const FRAMES_TO_ANALYZE: usize = 100;
54+
55+
let sine_params = SineParameters {
56+
freq: SINE_FREQ,
57+
amplitude: SINE_AMPLITUDE,
58+
sample_rate: params.pub_rate_hz,
59+
num_channels: params.pub_channels,
60+
};
61+
let mut sine_track = SineTrack::new(Arc::new(pub_room), sine_params);
62+
sine_track.publish().await?;
63+
64+
let analyze_frames = async move {
65+
let track: RemoteTrack = loop {
66+
let Some(event) = sub_room_events.recv().await else {
67+
Err(anyhow!("Never received track"))?
68+
};
69+
let RoomEvent::TrackSubscribed { track, publication: _, participant: _ } = event else {
70+
continue;
71+
};
72+
break track.into();
73+
};
74+
let RemoteTrack::Audio(track) = track else { Err(anyhow!("Expected audio track"))? };
75+
let mut stream = NativeAudioStream::new(
76+
track.rtc_track(),
77+
params.sub_rate_hz as i32,
78+
params.sub_channels as i32,
79+
);
80+
81+
tokio::spawn(async move {
82+
let mut frames_analyzed = 0;
83+
let mut analyzers = vec![FreqAnalyzer::new(); params.sub_channels as usize];
84+
85+
while let Some(frame) = stream.next().await {
86+
assert!(frame.data.len() > 0);
87+
assert_eq!(frame.num_channels, params.sub_channels);
88+
assert_eq!(frame.sample_rate, params.sub_rate_hz);
89+
assert_eq!(frame.samples_per_channel, frame.data.len() as u32 / frame.num_channels);
90+
91+
for channel_idx in 0..params.sub_channels as usize {
92+
analyzers[channel_idx].analyze(frame.channel_iter(channel_idx));
93+
}
94+
frames_analyzed += 1;
95+
if frames_analyzed >= FRAMES_TO_ANALYZE {
96+
break;
97+
}
98+
}
99+
assert_eq!(frames_analyzed, FRAMES_TO_ANALYZE);
100+
101+
for (channel_idx, detected_freq) in analyzers
102+
.into_iter()
103+
.map(|analyzer| analyzer.estimated_freq(params.sub_rate_hz))
104+
.enumerate()
105+
{
106+
assert!(
107+
(detected_freq - SINE_FREQ).abs() < 20.0, // Expect within 20Hz
108+
"Detected sine frequency not within range for channel {}: {}Hz",
109+
channel_idx,
110+
detected_freq
111+
);
112+
}
113+
})
114+
.await?;
115+
Ok(())
116+
};
117+
timeout(Duration::from_secs(15), analyze_frames).await??;
118+
Ok(())
119+
}
120+
121+
impl std::fmt::Display for TestParams {
122+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123+
write!(
124+
f,
125+
"{}Hz, {}ch. -> {}Hz, {}ch.",
126+
self.pub_rate_hz, self.pub_channels, self.sub_rate_hz, self.sub_channels
127+
)
128+
}
129+
}

livekit/tests/common/e2e/audio.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
use libwebrtc::{
2+
audio_source::native::NativeAudioSource,
3+
prelude::{AudioFrame, AudioSourceOptions, RtcAudioSource},
4+
};
5+
use livekit::{
6+
options::TrackPublishOptions,
7+
track::{LocalAudioTrack, LocalTrack},
8+
Room, RoomResult,
9+
};
10+
use std::sync::Arc;
11+
use tokio::{sync::oneshot, task::JoinHandle};
12+
13+
/// Parameters for the sine wave generated with [`SineTrack`].
14+
#[derive(Clone, Debug)]
15+
pub struct SineParameters {
16+
pub sample_rate: u32,
17+
pub freq: f64,
18+
pub amplitude: f64,
19+
pub num_channels: u32,
20+
}
21+
22+
/// Audio track which generates and publishes a sine wave.
23+
///
24+
/// This implementation was taken from the *wgpu_room* example.
25+
///
26+
pub struct SineTrack {
27+
rtc_source: NativeAudioSource,
28+
params: SineParameters,
29+
room: Arc<Room>,
30+
handle: Option<TrackHandle>,
31+
}
32+
33+
struct TrackHandle {
34+
close_tx: oneshot::Sender<()>,
35+
track: LocalAudioTrack,
36+
task: JoinHandle<()>,
37+
}
38+
39+
impl SineTrack {
40+
pub fn new(room: Arc<Room>, params: SineParameters) -> Self {
41+
Self {
42+
rtc_source: NativeAudioSource::new(
43+
AudioSourceOptions::default(),
44+
params.sample_rate,
45+
params.num_channels,
46+
1000,
47+
),
48+
params,
49+
room,
50+
handle: None,
51+
}
52+
}
53+
54+
pub async fn publish(&mut self) -> RoomResult<()> {
55+
let (close_tx, close_rx) = oneshot::channel();
56+
let track = LocalAudioTrack::create_audio_track(
57+
"sine-track",
58+
RtcAudioSource::Native(self.rtc_source.clone()),
59+
);
60+
let task =
61+
tokio::spawn(Self::track_task(close_rx, self.rtc_source.clone(), self.params.clone()));
62+
self.room
63+
.local_participant()
64+
.publish_track(LocalTrack::Audio(track.clone()), TrackPublishOptions::default())
65+
.await?;
66+
let handle = TrackHandle { close_tx, track, task };
67+
self.handle = Some(handle);
68+
Ok(())
69+
}
70+
71+
pub async fn unpublish(&mut self) -> RoomResult<()> {
72+
if let Some(handle) = self.handle.take() {
73+
handle.close_tx.send(()).ok();
74+
handle.task.await.ok();
75+
self.room.local_participant().unpublish_track(&handle.track.sid()).await?;
76+
}
77+
Ok(())
78+
}
79+
80+
async fn track_task(
81+
mut close_rx: oneshot::Receiver<()>,
82+
rtc_source: NativeAudioSource,
83+
params: SineParameters,
84+
) {
85+
let num_channels = params.num_channels as usize;
86+
let samples_count = (params.sample_rate / 100) as usize * num_channels;
87+
let mut samples_10ms = vec![0; samples_count];
88+
let mut phase = 0;
89+
loop {
90+
if close_rx.try_recv().is_ok() {
91+
break;
92+
}
93+
for i in (0..samples_count).step_by(num_channels) {
94+
let val = params.amplitude
95+
* f64::sin(
96+
std::f64::consts::PI
97+
* 2.0
98+
* params.freq
99+
* (phase as f64 / params.sample_rate as f64),
100+
);
101+
phase += 1;
102+
for c in 0..num_channels {
103+
// WebRTC uses 16-bit signed PCM
104+
samples_10ms[i + c] = (val * 32768.0) as i16;
105+
}
106+
}
107+
let frame = AudioFrame {
108+
data: samples_10ms.as_slice().into(),
109+
sample_rate: params.sample_rate,
110+
num_channels: params.num_channels,
111+
samples_per_channel: samples_count as u32 / params.num_channels,
112+
};
113+
rtc_source.capture_frame(&frame).await.unwrap();
114+
}
115+
}
116+
}
117+
118+
/// Analyzes samples to estimate the frequency of the signal using the zero crossing method.
119+
#[derive(Clone)]
120+
pub struct FreqAnalyzer {
121+
zero_crossings: usize,
122+
samples_analyzed: usize,
123+
}
124+
125+
impl FreqAnalyzer {
126+
pub fn new() -> Self {
127+
Self { zero_crossings: 0, samples_analyzed: 0 }
128+
}
129+
130+
pub fn analyze(&mut self, samples: impl IntoIterator<Item = i16>) {
131+
let mut iter = samples.into_iter();
132+
let mut prev = match iter.next() {
133+
Some(v) => v,
134+
None => return,
135+
};
136+
let mut count = 0;
137+
for curr in iter {
138+
if (prev >= 0 && curr < 0) || (prev < 0 && curr >= 0) {
139+
self.zero_crossings += 1;
140+
}
141+
prev = curr;
142+
count += 1;
143+
}
144+
self.samples_analyzed += count + 1;
145+
}
146+
147+
pub fn estimated_freq(&self, sample_rate: u32) -> f64 {
148+
let num_cycles = self.zero_crossings as f64 / 2.0;
149+
let duration_seconds = self.samples_analyzed as f64 / sample_rate as f64;
150+
if duration_seconds == 0.0 {
151+
return 0.0;
152+
}
153+
num_cycles / duration_seconds
154+
}
155+
}
156+
157+
pub trait ChannelIterExt<'a> {
158+
/// Returns an iterator over the samples in a specific channel.
159+
///
160+
/// # Arguments
161+
/// * `channel_index` - Index of the channel to iterate over (must be less than `num_channels`).
162+
///
163+
/// # Panics
164+
/// Panics if `channel_index` is greater than or equal to `num_channels`.
165+
///
166+
fn channel_iter(&'a self, channel_index: usize) -> ChannelIter<'a>;
167+
}
168+
169+
impl<'a> ChannelIterExt<'a> for AudioFrame<'a> {
170+
fn channel_iter(&'a self, channel_index: usize) -> ChannelIter<'a> {
171+
assert!(channel_index < self.num_channels as usize);
172+
ChannelIter { frame: self, channel_index, index: 0 }
173+
}
174+
}
175+
176+
/// Iterator over an individual channel in an interleaved [`AudioFrame`].
177+
pub struct ChannelIter<'a> {
178+
frame: &'a AudioFrame<'a>,
179+
channel_index: usize,
180+
index: usize,
181+
}
182+
183+
impl<'a> Iterator for ChannelIter<'a> {
184+
type Item = i16;
185+
186+
fn next(&mut self) -> Option<Self::Item> {
187+
let inner_index =
188+
self.index * (self.frame.num_channels as usize) + (self.channel_index as usize);
189+
if inner_index >= self.frame.data.len() {
190+
return None;
191+
}
192+
let sample = self.frame.data[inner_index];
193+
self.index += 1;
194+
Some(sample)
195+
}
196+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use tokio::{
99
time::{self, timeout},
1010
};
1111

12+
pub mod audio;
13+
1214
struct TestEnvironment {
1315
api_key: String,
1416
api_secret: String,

0 commit comments

Comments
 (0)