Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
if: ${{ matrix.os == 'ubuntu-latest' }}
run: |
sudo apt-get update # Run update first or install might start failing eventually.
sudo apt-get install --no-install-recommends -y libasound2-dev libudev-dev pkg-config
sudo apt-get install --no-install-recommends -y libasound2-dev libudev-dev libpulse-dev pkg-config
- run: rustup update
- run: rustc --version && cargo --version
- name: Build
Expand Down
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ members = ["android-examples", "wasm-examples", "ios-example/Rust-TinyAudioExamp
[profile.dev.package."*"]
opt-level = 3

[features]
default = ["alsa"]
alsa = ["dep:alsa-sys"]
pulse = ["dep:libpulse-sys"]

[target.'cfg(target_os = "android")'.dependencies]
ndk = { version = "0.9.0", default-features = false, features = ["audio", "api-level-27"] }

Expand All @@ -54,7 +59,8 @@ features = [
]

[target.'cfg(target_os = "linux")'.dependencies]
alsa-sys = { version = "0.3.1" }
alsa-sys = { version = "0.3.1", optional = true }
libpulse-sys = { version = "1.23.0", optional = true }

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
coreaudio-sys = { version = "0.2.8" }
Expand Down
2 changes: 1 addition & 1 deletion src/alsa.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Linux output device via `alsa`.

#![cfg(target_os = "linux")]
#![cfg(all(target_os = "linux", feature = "alsa"))]

use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters};
use alsa_sys::*;
Expand Down
25 changes: 21 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod aaudio;
mod alsa;
mod coreaudio;
mod directsound;
mod pulse;
mod web;

#[doc(hidden)]
Expand Down Expand Up @@ -134,10 +135,26 @@ where

#[cfg(target_os = "linux")]
{
return Ok(OutputDevice::new(alsa::AlsaSoundDevice::new(
params,
data_callback,
)?));
#[cfg(feature = "alsa")]
{
return Ok(OutputDevice::new(alsa::AlsaSoundDevice::new(
params,
data_callback,
)?));
}

#[cfg(all(feature = "pulse", not(feature = "alsa")))]
{
return Ok(OutputDevice::new(pulse::PulseSoundDevice::new(
params,
data_callback,
)?));
}

#[cfg(all(not(feature = "alsa"), not(feature = "pulse")))]
{
compile_error!("Select \"alsa\" or \"pulse\" feature to use an audio device on Linux")
}
}

#[cfg(all(target_os = "unknown", target_arch = "wasm32"))]
Expand Down
306 changes: 306 additions & 0 deletions src/pulse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
//! Linux output device via `PulseAudio`.

#![cfg(all(target_os = "linux", feature = "pulse"))]
#![cfg_attr(feature = "alsa", allow(dead_code))]

use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters};
use libpulse_sys::*;
use std::{
any::Any,
cell::Cell,
error::Error,
ffi::{c_void, CStr},
panic::{self, AssertUnwindSafe},
ptr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::JoinHandle,
};

pub struct PulseSoundDevice {
thread_handle: Option<JoinHandle<Result<(), String>>>,
is_running: Arc<AtomicBool>,
}

impl Drop for PulseSoundDevice {
fn drop(&mut self) {
self.is_running.store(false, Ordering::Relaxed);
let res = self
.thread_handle
.take()
.expect("PulseAudio thread must exist!")
.join()
// propagate panic
.unwrap();

if let Err(_error) = res {
// The error from the PulseAudio thread,
// can be printed or returned if needed
}
}
}

impl BaseAudioOutputDevice for PulseSoundDevice {}

impl AudioOutputDevice for PulseSoundDevice {
fn new<C>(params: OutputDeviceParameters, data_callback: C) -> Result<Self, Box<dyn Error>>
where
C: FnMut(&mut [f32]) + Send + 'static,
Self: Sized,
{
let is_running = Arc::new(AtomicBool::new(true));
let thread_handle = std::thread::Builder::new()
.name("PulseAudioThread".to_string())
.spawn({
let is_running = is_running.clone();
move || run(params, is_running, data_callback)
})?;

Ok(Self {
thread_handle: Some(thread_handle),
is_running,
})
}
}

fn run<C>(
params: OutputDeviceParameters,
is_running: Arc<AtomicBool>,
mut data_callback: C,
) -> Result<(), String>
where
C: FnMut(&mut [f32]) + 'static,
{
unsafe {
let mainloop = pa_mainloop_new();
if mainloop.is_null() {
return Err("failed to create PulseAudio mainloop".to_owned());
}

let _free_mainloop = defer(|| pa_mainloop_free(mainloop));

let api = pa_mainloop_get_api(mainloop);
if api.is_null() {
return Err("failed to get PulseAudio mainloop api".to_owned());
}

let context = pa_context_new(api, "default\0".as_ptr().cast());
if context.is_null() {
return Err("failed to create PulseAudio context".to_owned());
}

let _unref_context = defer(|| {
pa_context_disconnect(context);
pa_context_unref(context);
});

check(
pa_context_connect(context, ptr::null(), PA_CONTEXT_NOFLAGS, ptr::null()),
context,
)?;

loop {
match pa_context_get_state(context) {
PA_CONTEXT_FAILED => {
return Err("the connection failed or was disconnected".to_owned());
}
PA_CONTEXT_TERMINATED => return Ok(()),
PA_CONTEXT_READY => break,
_ => {}
}

check(pa_mainloop_iterate(mainloop, 1, ptr::null_mut()), context)?;
}

let sample_rate = u32::try_from(params.sample_rate)
.ok()
.filter(|&sample_rate| sample_rate <= PA_RATE_MAX)
.ok_or_else(|| {
format!(
"sample rate {} exceeds maximum allowed {}",
params.sample_rate, PA_RATE_MAX,
)
})?;

let channels_count = u8::try_from(params.channels_count)
.ok()
.filter(|&channels_count| channels_count <= PA_CHANNELS_MAX)
.ok_or_else(|| {
format!(
"channels count {} exceeds maximum allowed {}",
params.channels_count, PA_CHANNELS_MAX,
)
})?;

let spec = pa_sample_spec {
format: PA_SAMPLE_FLOAT32LE,
rate: sample_rate,
channels: channels_count,
};

if pa_sample_spec_valid(&spec) == 0 {
return Err("spec is not valid".to_owned());
}

let stream = check_ptr(
pa_stream_new(
context,
"PulseAudio Stream\0".as_ptr().cast(),
&spec,
ptr::null(),
),
context,
)?;

let _unref_stream = defer(|| {
pa_stream_disconnect(stream);
pa_stream_unref(stream);
});

if params.channel_sample_count % usize::from(spec.channels) != 0 {
return Err("the length of the data to write (in bytes) must be in multiples of the stream sample spec frame size".to_owned());
}

let mut buffer = vec![0.; params.channel_sample_count];
let mut callback = move |nbytes, stream| {
let bytelen = buffer.len() * size_of::<f32>();
for _ in 0..nbytes / bytelen {
data_callback(&mut buffer);
check(
pa_stream_write(
stream,
buffer.as_ptr().cast(),
bytelen,
None,
0,
PA_SEEK_RELATIVE,
),
context,
)?;
}

Ok(())
};

enum WriteState {
Ok,
PulseError(String),
Panicked(Box<dyn Any + Send + 'static>),
}

struct WriteCallback<'cb> {
callback: &'cb mut dyn FnMut(usize, *mut pa_stream) -> Result<(), String>,
state: &'cb Cell<WriteState>,
}

extern "C" fn write_cb(stream: *mut pa_stream, nbytes: usize, userdata: *mut c_void) {
unsafe {
let cb_mut: &mut WriteCallback<'_> = &mut *userdata.cast();

let res =
panic::catch_unwind(AssertUnwindSafe(|| (cb_mut.callback)(nbytes, stream)));

let state = match res {
Ok(Ok(())) => WriteState::Ok,
Ok(Err(error)) => WriteState::PulseError(error),
Err(message) => WriteState::Panicked(message),
};

cb_mut.state.set(state);
}
}

let state = Cell::new(WriteState::Ok);
let mut write = WriteCallback {
callback: &mut callback,
state: &state,
};

pa_stream_set_write_callback(
stream,
Some(write_cb),
(&mut write as *mut WriteCallback<'_>).cast(),
);

// Unset the pointer to `WriteCallback`
// so that it isn't called after the function returns.
// This also allows safely drop the `write` value from the stack after.
let _unset_write_callback =
defer(|| pa_stream_set_write_callback(stream, None, ptr::null_mut()));

check(
pa_stream_connect_playback(
stream,
ptr::null(),
ptr::null(),
PA_STREAM_START_CORKED,
ptr::null(),
ptr::null_mut(),
),
context,
)?;

while is_running.load(Ordering::Relaxed) {
check(pa_mainloop_iterate(mainloop, 1, ptr::null_mut()), context)?;

if pa_stream_is_corked(stream) == 1 {
pa_stream_cork(stream, 0, None, ptr::null_mut());
}

match state.replace(WriteState::Ok) {
WriteState::Ok => {}
WriteState::PulseError(error) => return Err(error),
WriteState::Panicked(message) => panic::panic_any(message),
}
}

Ok(())
}
}

fn check(code: i32, context: *const pa_context) -> Result<(), String> {
if code < 0 {
Err(context_error(context))
} else {
Ok(())
}
}

fn check_ptr<T>(ptr: *mut T, context: *const pa_context) -> Result<*mut T, String> {
if ptr.is_null() {
Err(context_error(context))
} else {
Ok(ptr)
}
}

fn context_error(context: *const pa_context) -> String {
unsafe {
let error = pa_context_errno(context);
CStr::from_ptr(pa_strerror(error))
.to_string_lossy()
.into_owned()
}
}

fn defer<F>(f: F) -> impl Drop
where
F: Fn(),
{
struct Defer<F>(F)
where
F: Fn();

impl<F> Drop for Defer<F>
where
F: Fn(),
{
fn drop(&mut self) {
(self.0)();
}
}

Defer(f)
}
Loading