diff --git a/Cargo.toml b/Cargo.toml index fd873391..3ad98c38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ rubato = "0.16" smallvec = "1.11" symphonia = { version = "0.5", default-features = false } vecmath = "1.0" +aec-rs = "1.0.0" +lazy_static = "1.5.0" [target.'cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64"))'.dependencies] no_denormals = "0.2.0" diff --git a/examples/microphone.rs b/examples/microphone.rs index 7b112fcc..f3521e63 100644 --- a/examples/microphone.rs +++ b/examples/microphone.rs @@ -4,7 +4,10 @@ use web_audio_api::context::{ use web_audio_api::media_devices; use web_audio_api::media_devices::{enumerate_devices_sync, MediaDeviceInfo, MediaDeviceInfoKind}; use web_audio_api::media_devices::{MediaStreamConstraints, MediaTrackConstraints}; +use web_audio_api::media_recorder::{MediaRecorder, MediaRecorderOptions}; use web_audio_api::node::AudioNode; +use std::fs::File; +use std::io::Write; // Pipe microphone stream into audio context // @@ -73,18 +76,53 @@ fn main() { let mut constraints = MediaTrackConstraints::default(); constraints.device_id = source_id; // constraints.channel_count = Some(2); + constraints.echo_cancellation = Some(false); // Enable echo cancellation let stream_constraints = MediaStreamConstraints::AudioWithConstraints(constraints); let mic = media_devices::get_user_media_sync(stream_constraints); + println!("\n✓ Microphone stream created with echo cancellation enabled"); + println!("You should be able to speak without hearing feedback/echo.\n"); + // create media stream source node with mic stream let stream_source = context.create_media_stream_source(&mic); + + // Create a media stream destination to capture audio + let dest = context.create_media_stream_destination(); + stream_source.connect(&dest); + + // Also connect to speakers so you can hear yourself stream_source.connect(&context.destination()); - loop { - std::thread::sleep(std::time::Duration::from_secs(1)); - } + // Create media recorder + let options = MediaRecorderOptions::default(); // default to audio/wav + let recorder = MediaRecorder::new(dest.stream(), options); + + // Create a file to write the recording + let mut file = File::create("recording.wav").expect("Failed to create file"); + + recorder.set_ondataavailable(move |event| { + eprintln!( + "Recording... timecode {:.3}s, chunk size {} bytes", + event.timecode, + event.blob.size() + ); + file.write_all(&event.blob.data).unwrap(); + }); + + // Start recording + recorder.start(); + println!("Recording to 'recording.wav' for 10 seconds..."); + + // Record for 10 seconds + std::thread::sleep(std::time::Duration::from_secs(10)); + + // Stop recording and wait for final data + let (send, recv) = crossbeam_channel::bounded(1); + recorder.set_onstop(move |_| { + let _ = send.send(()); + }); + recorder.stop(); + let _ = recv.recv(); - // println!("Closing microphone"); - // mic.get_tracks()[0].close(); - // std::thread::sleep(std::time::Duration::from_secs(2)); + println!("Recording saved successfully!"); } diff --git a/src/io/echo_cancellation.rs b/src/io/echo_cancellation.rs new file mode 100644 index 00000000..5ce4018b --- /dev/null +++ b/src/io/echo_cancellation.rs @@ -0,0 +1,145 @@ +use crate::buffer::AudioBuffer; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use super::echo_reference::ECHO_REFERENCE_MANAGER; +use std::cell::RefCell; + +// Wrapper to make EchoCanceller Send + Sync +pub struct EchoCanceller { + inner: RefCell, + reference_buffer: Arc>>, +} + +struct EchoCancellerInner { + aec: aec_rs::Aec, + config: aec_rs::AecConfig, + frame_buffer: Vec, + reference_frame_buffer: Vec, + input_buffer: VecDeque, + output_buffer: VecDeque, +} + +// SAFETY: EchoCanceller is only used from the audio thread +unsafe impl Send for EchoCanceller {} +unsafe impl Sync for EchoCanceller {} + +impl EchoCanceller { + pub fn new(sample_rate: f32, frame_size: usize) -> Self { + let config = aec_rs::AecConfig { + sample_rate: sample_rate as u32, + filter_length: (sample_rate * 0.4) as i32, // 200ms filter (typical for room echo) + frame_size, + enable_preprocess: true, // Disable preprocessing to reduce artifacts + }; + + let aec = aec_rs::Aec::new(&config); + + let reference_buffer = Arc::new(Mutex::new(VecDeque::new())); + + // Register this buffer with the global echo reference manager + ECHO_REFERENCE_MANAGER.register_buffer(Arc::downgrade(&reference_buffer)); + + let inner = EchoCancellerInner { + aec, + config, + frame_buffer: vec![0.0; frame_size], + reference_frame_buffer: vec![0.0; frame_size], + input_buffer: VecDeque::new(), + output_buffer: VecDeque::new(), + }; + + Self { + inner: RefCell::new(inner), + reference_buffer, + } + } + + pub fn get_reference_buffer(&self) -> Arc>> { + Arc::clone(&self.reference_buffer) + } + + pub fn process(&self, input: &[f32]) -> Vec { + let mut output = Vec::with_capacity(input.len()); + let mut inner = self.inner.borrow_mut(); + + // Add input to buffer + inner.input_buffer.extend(input); + + // Process complete frames + while inner.input_buffer.len() >= inner.config.frame_size { + // Fill frame buffer from input buffer + for i in 0..inner.config.frame_size { + inner.frame_buffer[i] = inner.input_buffer.pop_front().unwrap(); + } + + // Get reference samples from the buffer + { + let mut ref_buffer = self.reference_buffer.lock().unwrap(); + for i in 0..inner.config.frame_size { + inner.reference_frame_buffer[i] = ref_buffer.pop_front().unwrap_or(0.0); + } + } + + // Convert f32 to i16 for AEC processing + let mut input_i16: Vec = inner.frame_buffer + .iter() + .map(|&x| (x * 32767.0).clamp(-32768.0, 32767.0) as i16) + .collect(); + + let mut reference_i16: Vec = inner.reference_frame_buffer + .iter() + .map(|&x| (x * 32767.0).clamp(-32768.0, 32767.0) as i16) + .collect(); + + let mut output_i16 = vec![0i16; inner.config.frame_size]; + + // Apply echo cancellation + inner.aec.cancel_echo(&mut input_i16, &mut reference_i16, &mut output_i16); + + // Convert back to f32 and add to output buffer + for &sample in output_i16.iter() { + inner.output_buffer.push_back(sample as f32 / 32767.0); + } + } + + // Return available output, maintaining input size + for _ in 0..input.len() { + if let Some(sample) = inner.output_buffer.pop_front() { + output.push(sample); + } else { + // If we don't have enough output yet, return the input unprocessed + // This happens during initial buffering + let idx = output.len(); + if idx < input.len() { + output.push(input[idx]); + } + } + } + + output + } + + pub fn add_reference_audio(&self, audio: &AudioBuffer) { + let mut ref_buffer = self.reference_buffer.lock().unwrap(); + let inner = self.inner.borrow(); + + // Mix all channels to mono for reference + let num_samples = audio.length(); + let num_channels = audio.number_of_channels(); + + for sample_idx in 0..num_samples { + let mut mixed_sample = 0.0; + for ch in 0..num_channels { + mixed_sample += audio.get_channel_data(ch)[sample_idx]; + } + mixed_sample /= num_channels as f32; + + ref_buffer.push_back(mixed_sample); + + // Keep buffer size reasonable (max 1 second) + if ref_buffer.len() > inner.config.sample_rate as usize { + ref_buffer.pop_front(); + } + } + } +} diff --git a/src/io/echo_reference.rs b/src/io/echo_reference.rs new file mode 100644 index 00000000..b150aad9 --- /dev/null +++ b/src/io/echo_reference.rs @@ -0,0 +1,62 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex, Weak}; + +/// Global echo reference manager to share audio output with echo cancellers +pub struct EchoReferenceManager { + reference_buffers: Arc>>>>>, +} + +impl EchoReferenceManager { + pub fn new() -> Self { + Self { + reference_buffers: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Register a reference buffer for echo cancellation + pub fn register_buffer(&self, buffer: Weak>>) { + let mut buffers = self.reference_buffers.lock().unwrap(); + // Clean up any dead weak references + buffers.retain(|b| b.upgrade().is_some()); + buffers.push(buffer); + } + + /// Send reference audio to all registered echo cancellers + pub fn send_reference(&self, audio: &[f32], channels: usize) { + let mut buffers = self.reference_buffers.lock().unwrap(); + + // Clean up dead references and send to alive ones + buffers.retain(|weak_buffer| { + if let Some(buffer) = weak_buffer.upgrade() { + if let Ok(mut buf) = buffer.lock() { + // Mix to mono if needed + if channels > 1 { + for frame in audio.chunks(channels) { + let mono_sample: f32 = frame.iter().sum::() / channels as f32; + buf.push_back(mono_sample); + + // Keep buffer size reasonable (max 1 second at 48kHz) + if buf.len() > 48000 { + buf.pop_front(); + } + } + } else { + buf.extend(audio); + // Keep buffer size reasonable + while buf.len() > 48000 { + buf.pop_front(); + } + } + } + true + } else { + false + } + }); + } +} + +// Global instance +lazy_static::lazy_static! { + pub static ref ECHO_REFERENCE_MANAGER: EchoReferenceManager = EchoReferenceManager::new(); +} \ No newline at end of file diff --git a/src/io/microphone.rs b/src/io/microphone.rs index 76aa32fe..92da7c89 100644 --- a/src/io/microphone.rs +++ b/src/io/microphone.rs @@ -1,4 +1,5 @@ use std::error::Error; +use std::sync::{Arc, Mutex}; use crate::buffer::{AudioBuffer, AudioBufferOptions}; use crate::io::AudioBackendManager; @@ -6,11 +7,14 @@ use crate::RENDER_QUANTUM_SIZE; use crossbeam_channel::{Receiver, Sender, TryRecvError}; +use super::echo_cancellation::EchoCanceller; + pub(crate) struct MicrophoneStream { receiver: Receiver, number_of_channels: usize, sample_rate: f32, stream: Box, + echo_canceller: Option>>, } impl MicrophoneStream { @@ -23,8 +27,27 @@ impl MicrophoneStream { number_of_channels: backend.number_of_channels(), sample_rate: backend.sample_rate(), stream: backend, + echo_canceller: None, + } + } + + pub(crate) fn with_echo_canceller( + receiver: Receiver, + backend: Box, + echo_canceller: Arc>, + ) -> Self { + Self { + receiver, + number_of_channels: backend.number_of_channels(), + sample_rate: backend.sample_rate(), + stream: backend, + echo_canceller: Some(echo_canceller), } } + + pub(crate) fn echo_canceller(&self) -> Option<&Arc>> { + self.echo_canceller.as_ref() + } } impl Drop for MicrophoneStream { @@ -38,7 +61,7 @@ impl Iterator for MicrophoneStream { type Item = Result>; fn next(&mut self) -> Option { - let next = match self.receiver.try_recv() { + let mut next = match self.receiver.try_recv() { Ok(buffer) => { // new frame was ready buffer @@ -60,6 +83,23 @@ impl Iterator for MicrophoneStream { return None; } }; + + // Apply echo cancellation if enabled + if let Some(echo_canceller) = &self.echo_canceller { + let canceller = echo_canceller.lock().unwrap(); + + // Process each channel through echo cancellation + let mut processed_channels = Vec::with_capacity(self.number_of_channels); + + for ch in 0..self.number_of_channels { + let input_data = next.get_channel_data(ch); + let processed = canceller.process(input_data); + processed_channels.push(processed); + } + + // Create new buffer with processed audio + next = AudioBuffer::from(processed_channels, self.sample_rate); + } Some(Ok(next)) } diff --git a/src/io/mod.rs b/src/io/mod.rs index d4952910..b652d9a8 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -25,6 +25,12 @@ mod cubeb; #[cfg(any(feature = "cubeb", feature = "cpal"))] mod microphone; +#[cfg(any(feature = "cubeb", feature = "cpal"))] +mod echo_cancellation; + +#[cfg(any(feature = "cubeb", feature = "cpal"))] +pub(crate) mod echo_reference; + #[derive(Debug)] pub(crate) struct ControlThreadInit { pub state: Arc, @@ -116,6 +122,7 @@ pub(crate) fn build_output( pub(crate) fn build_input( options: AudioContextOptions, number_of_channels: Option, + echo_cancellation: Option, ) -> MediaStream { #[cfg(all(not(feature = "cubeb"), not(feature = "cpal")))] { @@ -124,19 +131,38 @@ pub(crate) fn build_input( #[cfg(any(feature = "cubeb", feature = "cpal"))] { + use std::sync::{Arc, Mutex}; + let (backend, receiver) = { #[cfg(feature = "cubeb")] { - cubeb::CubebBackend::build_input(options, number_of_channels) + cubeb::CubebBackend::build_input(options.clone(), number_of_channels) } #[cfg(all(not(feature = "cubeb"), feature = "cpal"))] { - cpal::CpalBackend::build_input(options, number_of_channels) + cpal::CpalBackend::build_input(options.clone(), number_of_channels) } }; - let media_iter = microphone::MicrophoneStream::new(receiver, Box::new(backend)); + let media_iter = if echo_cancellation == Some(true) { + // Create echo canceller with appropriate frame size + let sample_rate = backend.sample_rate(); + // Use 20ms frame size (common for echo cancellation) + let frame_size = ((sample_rate * 0.02) as usize).max(160); + let echo_canceller = Arc::new(Mutex::new( + echo_cancellation::EchoCanceller::new(sample_rate, frame_size) + )); + + microphone::MicrophoneStream::with_echo_canceller( + receiver, + Box::new(backend), + echo_canceller + ) + } else { + microphone::MicrophoneStream::new(receiver, Box::new(backend)) + }; + let track = MediaStreamTrack::from_iter(media_iter); MediaStream::from_tracks(vec![track]) } diff --git a/src/media_devices/mod.rs b/src/media_devices/mod.rs index 61df0613..5de3a261 100644 --- a/src/media_devices/mod.rs +++ b/src/media_devices/mod.rs @@ -145,7 +145,7 @@ pub struct MediaTrackConstraints { // ConstrainDOMString resizeMode; pub sample_rate: Option, // ConstrainULong sampleSize; - // ConstrainBoolean echoCancellation; + pub echo_cancellation: Option, // ConstrainBoolean autoGainControl; // ConstrainBoolean noiseSuppression; pub latency: Option, @@ -219,9 +219,11 @@ fn is_valid_device_id(device_id: &str) -> bool { /// std::thread::sleep(std::time::Duration::from_secs(4)); /// ``` pub fn get_user_media_sync(constraints: MediaStreamConstraints) -> MediaStream { - let (channel_count, mut options) = match constraints { - MediaStreamConstraints::Audio => (None, AudioContextOptions::default()), - MediaStreamConstraints::AudioWithConstraints(cs) => (cs.channel_count, cs.into()), + let (channel_count, echo_cancellation, mut options) = match constraints { + MediaStreamConstraints::Audio => (None, None, AudioContextOptions::default()), + MediaStreamConstraints::AudioWithConstraints(ref cs) => { + (cs.channel_count, cs.echo_cancellation, cs.clone().into()) + } }; if !is_valid_device_id(&options.sink_id) { @@ -229,5 +231,5 @@ pub fn get_user_media_sync(constraints: MediaStreamConstraints) -> MediaStream { options.sink_id = String::from(""); } - crate::io::build_input(options, channel_count) + crate::io::build_input(options, channel_count, echo_cancellation) } diff --git a/src/render/thread.rs b/src/render/thread.rs index 9fa63be9..d09ccd25 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -502,6 +502,19 @@ impl RenderThread { *sample = value; } } + + // Send reference audio for echo cancellation + #[cfg(any(feature = "cubeb", feature = "cpal"))] + { + use crate::io::echo_reference::ECHO_REFERENCE_MANAGER; + let mut reference_data = Vec::with_capacity(RENDER_QUANTUM_SIZE * self.number_of_channels); + for sample_idx in 0..RENDER_QUANTUM_SIZE { + for ch in 0..self.number_of_channels { + reference_data.push(destination_buffer.channel_data(ch)[sample_idx]); + } + } + ECHO_REFERENCE_MANAGER.send_reference(&reference_data, self.number_of_channels); + } if data.len() != chunk_size { // this is the last chunk, and it contained less than RENDER_QUANTUM_SIZE samples