diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9229df..ffea4fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,7 @@ jobs: if: ${{ matrix.os == 'ubuntu-latest' }} run: | sudo apt-get update # Run update first or install might start failing eventually. - sudo apt-get install --no-install-recommends -y libasound2-dev libudev-dev pkg-config + sudo apt-get install --no-install-recommends -y libasound2-dev libudev-dev libpulse-dev pkg-config - run: rustup update - run: rustc --version && cargo --version - name: Build diff --git a/Cargo.toml b/Cargo.toml index 4c788f2..79933f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,11 @@ members = ["android-examples", "wasm-examples", "ios-example/Rust-TinyAudioExamp [profile.dev.package."*"] opt-level = 3 +[features] +default = ["alsa"] +alsa = ["dep:alsa-sys"] +pulse = ["dep:libpulse-sys"] + [target.'cfg(target_os = "android")'.dependencies] ndk = { version = "0.9.0", default-features = false, features = ["audio", "api-level-27"] } @@ -54,7 +59,8 @@ features = [ ] [target.'cfg(target_os = "linux")'.dependencies] -alsa-sys = { version = "0.3.1" } +alsa-sys = { version = "0.3.1", optional = true } +libpulse-sys = { version = "1.23.0", optional = true } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] coreaudio-sys = { version = "0.2.8" } diff --git a/src/alsa.rs b/src/alsa.rs index 31c0809..258b996 100644 --- a/src/alsa.rs +++ b/src/alsa.rs @@ -1,6 +1,6 @@ //! Linux output device via `alsa`. -#![cfg(target_os = "linux")] +#![cfg(all(target_os = "linux", feature = "alsa"))] use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters}; use alsa_sys::*; diff --git a/src/directsound.rs b/src/directsound.rs index 35aba57..f36c7c9 100644 --- a/src/directsound.rs +++ b/src/directsound.rs @@ -2,6 +2,7 @@ #![cfg(target_os = "windows")] #![allow(non_snake_case)] +#![allow(unexpected_cfgs)] use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters}; use std::{ diff --git a/src/lib.rs b/src/lib.rs index 0cb0e93..9a3a4e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod aaudio; mod alsa; mod coreaudio; mod directsound; +mod pulse; mod web; #[doc(hidden)] @@ -134,10 +135,26 @@ where #[cfg(target_os = "linux")] { - return Ok(OutputDevice::new(alsa::AlsaSoundDevice::new( - params, - data_callback, - )?)); + #[cfg(feature = "alsa")] + { + return Ok(OutputDevice::new(alsa::AlsaSoundDevice::new( + params, + data_callback, + )?)); + } + + #[cfg(all(feature = "pulse", not(feature = "alsa")))] + { + return Ok(OutputDevice::new(pulse::PulseSoundDevice::new( + params, + data_callback, + )?)); + } + + #[cfg(all(not(feature = "alsa"), not(feature = "pulse")))] + { + compile_error!("Select \"alsa\" or \"pulse\" feature to use an audio device on Linux") + } } #[cfg(all(target_os = "unknown", target_arch = "wasm32"))] diff --git a/src/pulse.rs b/src/pulse.rs new file mode 100644 index 0000000..60b0149 --- /dev/null +++ b/src/pulse.rs @@ -0,0 +1,306 @@ +//! Linux output device via `PulseAudio`. + +#![cfg(all(target_os = "linux", feature = "pulse"))] +#![cfg_attr(feature = "alsa", allow(dead_code))] + +use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters}; +use libpulse_sys::*; +use std::{ + any::Any, + cell::Cell, + error::Error, + ffi::{c_void, CStr}, + panic::{self, AssertUnwindSafe}, + ptr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::JoinHandle, +}; + +pub struct PulseSoundDevice { + thread_handle: Option>>, + is_running: Arc, +} + +impl Drop for PulseSoundDevice { + fn drop(&mut self) { + self.is_running.store(false, Ordering::Relaxed); + let res = self + .thread_handle + .take() + .expect("PulseAudio thread must exist!") + .join() + // propagate panic + .unwrap(); + + if let Err(_error) = res { + // The error from the PulseAudio thread, + // can be printed or returned if needed + } + } +} + +impl BaseAudioOutputDevice for PulseSoundDevice {} + +impl AudioOutputDevice for PulseSoundDevice { + fn new(params: OutputDeviceParameters, data_callback: C) -> Result> + where + C: FnMut(&mut [f32]) + Send + 'static, + Self: Sized, + { + let is_running = Arc::new(AtomicBool::new(true)); + let thread_handle = std::thread::Builder::new() + .name("PulseAudioThread".to_string()) + .spawn({ + let is_running = is_running.clone(); + move || run(params, is_running, data_callback) + })?; + + Ok(Self { + thread_handle: Some(thread_handle), + is_running, + }) + } +} + +fn run( + params: OutputDeviceParameters, + is_running: Arc, + mut data_callback: C, +) -> Result<(), String> +where + C: FnMut(&mut [f32]) + 'static, +{ + unsafe { + let mainloop = pa_mainloop_new(); + if mainloop.is_null() { + return Err("failed to create PulseAudio mainloop".to_owned()); + } + + let _free_mainloop = defer(|| pa_mainloop_free(mainloop)); + + let api = pa_mainloop_get_api(mainloop); + if api.is_null() { + return Err("failed to get PulseAudio mainloop api".to_owned()); + } + + let context = pa_context_new(api, "default\0".as_ptr().cast()); + if context.is_null() { + return Err("failed to create PulseAudio context".to_owned()); + } + + let _unref_context = defer(|| { + pa_context_disconnect(context); + pa_context_unref(context); + }); + + check( + pa_context_connect(context, ptr::null(), PA_CONTEXT_NOFLAGS, ptr::null()), + context, + )?; + + loop { + match pa_context_get_state(context) { + PA_CONTEXT_FAILED => { + return Err("the connection failed or was disconnected".to_owned()); + } + PA_CONTEXT_TERMINATED => return Ok(()), + PA_CONTEXT_READY => break, + _ => {} + } + + check(pa_mainloop_iterate(mainloop, 1, ptr::null_mut()), context)?; + } + + let sample_rate = u32::try_from(params.sample_rate) + .ok() + .filter(|&sample_rate| sample_rate <= PA_RATE_MAX) + .ok_or_else(|| { + format!( + "sample rate {} exceeds maximum allowed {}", + params.sample_rate, PA_RATE_MAX, + ) + })?; + + let channels_count = u8::try_from(params.channels_count) + .ok() + .filter(|&channels_count| channels_count <= PA_CHANNELS_MAX) + .ok_or_else(|| { + format!( + "channels count {} exceeds maximum allowed {}", + params.channels_count, PA_CHANNELS_MAX, + ) + })?; + + let spec = pa_sample_spec { + format: PA_SAMPLE_FLOAT32LE, + rate: sample_rate, + channels: channels_count, + }; + + if pa_sample_spec_valid(&spec) == 0 { + return Err("spec is not valid".to_owned()); + } + + let stream = check_ptr( + pa_stream_new( + context, + "PulseAudio Stream\0".as_ptr().cast(), + &spec, + ptr::null(), + ), + context, + )?; + + let _unref_stream = defer(|| { + pa_stream_disconnect(stream); + pa_stream_unref(stream); + }); + + if params.channel_sample_count % usize::from(spec.channels) != 0 { + return Err("the length of the data to write (in bytes) must be in multiples of the stream sample spec frame size".to_owned()); + } + + let mut buffer = vec![0.; params.channel_sample_count]; + let mut callback = move |nbytes, stream| { + let bytelen = buffer.len() * size_of::(); + for _ in 0..nbytes / bytelen { + data_callback(&mut buffer); + check( + pa_stream_write( + stream, + buffer.as_ptr().cast(), + bytelen, + None, + 0, + PA_SEEK_RELATIVE, + ), + context, + )?; + } + + Ok(()) + }; + + enum WriteState { + Ok, + PulseError(String), + Panicked(Box), + } + + struct WriteCallback<'cb> { + callback: &'cb mut dyn FnMut(usize, *mut pa_stream) -> Result<(), String>, + state: &'cb Cell, + } + + extern "C" fn write_cb(stream: *mut pa_stream, nbytes: usize, userdata: *mut c_void) { + unsafe { + let cb_mut: &mut WriteCallback<'_> = &mut *userdata.cast(); + + let res = + panic::catch_unwind(AssertUnwindSafe(|| (cb_mut.callback)(nbytes, stream))); + + let state = match res { + Ok(Ok(())) => WriteState::Ok, + Ok(Err(error)) => WriteState::PulseError(error), + Err(message) => WriteState::Panicked(message), + }; + + cb_mut.state.set(state); + } + } + + let state = Cell::new(WriteState::Ok); + let mut write = WriteCallback { + callback: &mut callback, + state: &state, + }; + + pa_stream_set_write_callback( + stream, + Some(write_cb), + (&mut write as *mut WriteCallback<'_>).cast(), + ); + + // Unset the pointer to `WriteCallback` + // so that it isn't called after the function returns. + // This also allows safely drop the `write` value from the stack after. + let _unset_write_callback = + defer(|| pa_stream_set_write_callback(stream, None, ptr::null_mut())); + + check( + pa_stream_connect_playback( + stream, + ptr::null(), + ptr::null(), + PA_STREAM_START_CORKED, + ptr::null(), + ptr::null_mut(), + ), + context, + )?; + + while is_running.load(Ordering::Relaxed) { + check(pa_mainloop_iterate(mainloop, 1, ptr::null_mut()), context)?; + + if pa_stream_is_corked(stream) == 1 { + pa_stream_cork(stream, 0, None, ptr::null_mut()); + } + + match state.replace(WriteState::Ok) { + WriteState::Ok => {} + WriteState::PulseError(error) => return Err(error), + WriteState::Panicked(message) => panic::panic_any(message), + } + } + + Ok(()) + } +} + +fn check(code: i32, context: *const pa_context) -> Result<(), String> { + if code < 0 { + Err(context_error(context)) + } else { + Ok(()) + } +} + +fn check_ptr(ptr: *mut T, context: *const pa_context) -> Result<*mut T, String> { + if ptr.is_null() { + Err(context_error(context)) + } else { + Ok(ptr) + } +} + +fn context_error(context: *const pa_context) -> String { + unsafe { + let error = pa_context_errno(context); + CStr::from_ptr(pa_strerror(error)) + .to_string_lossy() + .into_owned() + } +} + +fn defer(f: F) -> impl Drop +where + F: Fn(), +{ + struct Defer(F) + where + F: Fn(); + + impl Drop for Defer + where + F: Fn(), + { + fn drop(&mut self) { + (self.0)(); + } + } + + Defer(f) +}