Skip to content

Commit 704d0b6

Browse files
authored
Add PulseAudio audio device (#18)
* Add `PulseAudio` audio device * Install `libpulse-dev` in Linux CI * Add `alsa` and `pulse` features * Detailed message of exceeding maximum parameters * Suppress unexpected cfg from `winapi` crate * Suppress unexpected cfg from `winapi` crate
1 parent 1330a8a commit 704d0b6

File tree

6 files changed

+337
-7
lines changed

6 files changed

+337
-7
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
if: ${{ matrix.os == 'ubuntu-latest' }}
2525
run: |
2626
sudo apt-get update # Run update first or install might start failing eventually.
27-
sudo apt-get install --no-install-recommends -y libasound2-dev libudev-dev pkg-config
27+
sudo apt-get install --no-install-recommends -y libasound2-dev libudev-dev libpulse-dev pkg-config
2828
- run: rustup update
2929
- run: rustc --version && cargo --version
3030
- name: Build

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ members = ["android-examples", "wasm-examples", "ios-example/Rust-TinyAudioExamp
2828
[profile.dev.package."*"]
2929
opt-level = 3
3030

31+
[features]
32+
default = ["alsa"]
33+
alsa = ["dep:alsa-sys"]
34+
pulse = ["dep:libpulse-sys"]
35+
3136
[target.'cfg(target_os = "android")'.dependencies]
3237
ndk = { version = "0.9.0", default-features = false, features = ["audio", "api-level-27"] }
3338

@@ -54,7 +59,8 @@ features = [
5459
]
5560

5661
[target.'cfg(target_os = "linux")'.dependencies]
57-
alsa-sys = { version = "0.3.1" }
62+
alsa-sys = { version = "0.3.1", optional = true }
63+
libpulse-sys = { version = "1.23.0", optional = true }
5864

5965
[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
6066
coreaudio-sys = { version = "0.2.8" }

src/alsa.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Linux output device via `alsa`.
22
3-
#![cfg(target_os = "linux")]
3+
#![cfg(all(target_os = "linux", feature = "alsa"))]
44

55
use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters};
66
use alsa_sys::*;

src/directsound.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
#![cfg(target_os = "windows")]
44
#![allow(non_snake_case)]
5+
#![allow(unexpected_cfgs)]
56

67
use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters};
78
use std::{

src/lib.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod aaudio;
1010
mod alsa;
1111
mod coreaudio;
1212
mod directsound;
13+
mod pulse;
1314
mod web;
1415

1516
#[doc(hidden)]
@@ -134,10 +135,26 @@ where
134135

135136
#[cfg(target_os = "linux")]
136137
{
137-
return Ok(OutputDevice::new(alsa::AlsaSoundDevice::new(
138-
params,
139-
data_callback,
140-
)?));
138+
#[cfg(feature = "alsa")]
139+
{
140+
return Ok(OutputDevice::new(alsa::AlsaSoundDevice::new(
141+
params,
142+
data_callback,
143+
)?));
144+
}
145+
146+
#[cfg(all(feature = "pulse", not(feature = "alsa")))]
147+
{
148+
return Ok(OutputDevice::new(pulse::PulseSoundDevice::new(
149+
params,
150+
data_callback,
151+
)?));
152+
}
153+
154+
#[cfg(all(not(feature = "alsa"), not(feature = "pulse")))]
155+
{
156+
compile_error!("Select \"alsa\" or \"pulse\" feature to use an audio device on Linux")
157+
}
141158
}
142159

143160
#[cfg(all(target_os = "unknown", target_arch = "wasm32"))]

src/pulse.rs

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
//! Linux output device via `PulseAudio`.
2+
3+
#![cfg(all(target_os = "linux", feature = "pulse"))]
4+
#![cfg_attr(feature = "alsa", allow(dead_code))]
5+
6+
use crate::{AudioOutputDevice, BaseAudioOutputDevice, OutputDeviceParameters};
7+
use libpulse_sys::*;
8+
use std::{
9+
any::Any,
10+
cell::Cell,
11+
error::Error,
12+
ffi::{c_void, CStr},
13+
panic::{self, AssertUnwindSafe},
14+
ptr,
15+
sync::{
16+
atomic::{AtomicBool, Ordering},
17+
Arc,
18+
},
19+
thread::JoinHandle,
20+
};
21+
22+
pub struct PulseSoundDevice {
23+
thread_handle: Option<JoinHandle<Result<(), String>>>,
24+
is_running: Arc<AtomicBool>,
25+
}
26+
27+
impl Drop for PulseSoundDevice {
28+
fn drop(&mut self) {
29+
self.is_running.store(false, Ordering::Relaxed);
30+
let res = self
31+
.thread_handle
32+
.take()
33+
.expect("PulseAudio thread must exist!")
34+
.join()
35+
// propagate panic
36+
.unwrap();
37+
38+
if let Err(_error) = res {
39+
// The error from the PulseAudio thread,
40+
// can be printed or returned if needed
41+
}
42+
}
43+
}
44+
45+
impl BaseAudioOutputDevice for PulseSoundDevice {}
46+
47+
impl AudioOutputDevice for PulseSoundDevice {
48+
fn new<C>(params: OutputDeviceParameters, data_callback: C) -> Result<Self, Box<dyn Error>>
49+
where
50+
C: FnMut(&mut [f32]) + Send + 'static,
51+
Self: Sized,
52+
{
53+
let is_running = Arc::new(AtomicBool::new(true));
54+
let thread_handle = std::thread::Builder::new()
55+
.name("PulseAudioThread".to_string())
56+
.spawn({
57+
let is_running = is_running.clone();
58+
move || run(params, is_running, data_callback)
59+
})?;
60+
61+
Ok(Self {
62+
thread_handle: Some(thread_handle),
63+
is_running,
64+
})
65+
}
66+
}
67+
68+
fn run<C>(
69+
params: OutputDeviceParameters,
70+
is_running: Arc<AtomicBool>,
71+
mut data_callback: C,
72+
) -> Result<(), String>
73+
where
74+
C: FnMut(&mut [f32]) + 'static,
75+
{
76+
unsafe {
77+
let mainloop = pa_mainloop_new();
78+
if mainloop.is_null() {
79+
return Err("failed to create PulseAudio mainloop".to_owned());
80+
}
81+
82+
let _free_mainloop = defer(|| pa_mainloop_free(mainloop));
83+
84+
let api = pa_mainloop_get_api(mainloop);
85+
if api.is_null() {
86+
return Err("failed to get PulseAudio mainloop api".to_owned());
87+
}
88+
89+
let context = pa_context_new(api, "default\0".as_ptr().cast());
90+
if context.is_null() {
91+
return Err("failed to create PulseAudio context".to_owned());
92+
}
93+
94+
let _unref_context = defer(|| {
95+
pa_context_disconnect(context);
96+
pa_context_unref(context);
97+
});
98+
99+
check(
100+
pa_context_connect(context, ptr::null(), PA_CONTEXT_NOFLAGS, ptr::null()),
101+
context,
102+
)?;
103+
104+
loop {
105+
match pa_context_get_state(context) {
106+
PA_CONTEXT_FAILED => {
107+
return Err("the connection failed or was disconnected".to_owned());
108+
}
109+
PA_CONTEXT_TERMINATED => return Ok(()),
110+
PA_CONTEXT_READY => break,
111+
_ => {}
112+
}
113+
114+
check(pa_mainloop_iterate(mainloop, 1, ptr::null_mut()), context)?;
115+
}
116+
117+
let sample_rate = u32::try_from(params.sample_rate)
118+
.ok()
119+
.filter(|&sample_rate| sample_rate <= PA_RATE_MAX)
120+
.ok_or_else(|| {
121+
format!(
122+
"sample rate {} exceeds maximum allowed {}",
123+
params.sample_rate, PA_RATE_MAX,
124+
)
125+
})?;
126+
127+
let channels_count = u8::try_from(params.channels_count)
128+
.ok()
129+
.filter(|&channels_count| channels_count <= PA_CHANNELS_MAX)
130+
.ok_or_else(|| {
131+
format!(
132+
"channels count {} exceeds maximum allowed {}",
133+
params.channels_count, PA_CHANNELS_MAX,
134+
)
135+
})?;
136+
137+
let spec = pa_sample_spec {
138+
format: PA_SAMPLE_FLOAT32LE,
139+
rate: sample_rate,
140+
channels: channels_count,
141+
};
142+
143+
if pa_sample_spec_valid(&spec) == 0 {
144+
return Err("spec is not valid".to_owned());
145+
}
146+
147+
let stream = check_ptr(
148+
pa_stream_new(
149+
context,
150+
"PulseAudio Stream\0".as_ptr().cast(),
151+
&spec,
152+
ptr::null(),
153+
),
154+
context,
155+
)?;
156+
157+
let _unref_stream = defer(|| {
158+
pa_stream_disconnect(stream);
159+
pa_stream_unref(stream);
160+
});
161+
162+
if params.channel_sample_count % usize::from(spec.channels) != 0 {
163+
return Err("the length of the data to write (in bytes) must be in multiples of the stream sample spec frame size".to_owned());
164+
}
165+
166+
let mut buffer = vec![0.; params.channel_sample_count];
167+
let mut callback = move |nbytes, stream| {
168+
let bytelen = buffer.len() * size_of::<f32>();
169+
for _ in 0..nbytes / bytelen {
170+
data_callback(&mut buffer);
171+
check(
172+
pa_stream_write(
173+
stream,
174+
buffer.as_ptr().cast(),
175+
bytelen,
176+
None,
177+
0,
178+
PA_SEEK_RELATIVE,
179+
),
180+
context,
181+
)?;
182+
}
183+
184+
Ok(())
185+
};
186+
187+
enum WriteState {
188+
Ok,
189+
PulseError(String),
190+
Panicked(Box<dyn Any + Send + 'static>),
191+
}
192+
193+
struct WriteCallback<'cb> {
194+
callback: &'cb mut dyn FnMut(usize, *mut pa_stream) -> Result<(), String>,
195+
state: &'cb Cell<WriteState>,
196+
}
197+
198+
extern "C" fn write_cb(stream: *mut pa_stream, nbytes: usize, userdata: *mut c_void) {
199+
unsafe {
200+
let cb_mut: &mut WriteCallback<'_> = &mut *userdata.cast();
201+
202+
let res =
203+
panic::catch_unwind(AssertUnwindSafe(|| (cb_mut.callback)(nbytes, stream)));
204+
205+
let state = match res {
206+
Ok(Ok(())) => WriteState::Ok,
207+
Ok(Err(error)) => WriteState::PulseError(error),
208+
Err(message) => WriteState::Panicked(message),
209+
};
210+
211+
cb_mut.state.set(state);
212+
}
213+
}
214+
215+
let state = Cell::new(WriteState::Ok);
216+
let mut write = WriteCallback {
217+
callback: &mut callback,
218+
state: &state,
219+
};
220+
221+
pa_stream_set_write_callback(
222+
stream,
223+
Some(write_cb),
224+
(&mut write as *mut WriteCallback<'_>).cast(),
225+
);
226+
227+
// Unset the pointer to `WriteCallback`
228+
// so that it isn't called after the function returns.
229+
// This also allows safely drop the `write` value from the stack after.
230+
let _unset_write_callback =
231+
defer(|| pa_stream_set_write_callback(stream, None, ptr::null_mut()));
232+
233+
check(
234+
pa_stream_connect_playback(
235+
stream,
236+
ptr::null(),
237+
ptr::null(),
238+
PA_STREAM_START_CORKED,
239+
ptr::null(),
240+
ptr::null_mut(),
241+
),
242+
context,
243+
)?;
244+
245+
while is_running.load(Ordering::Relaxed) {
246+
check(pa_mainloop_iterate(mainloop, 1, ptr::null_mut()), context)?;
247+
248+
if pa_stream_is_corked(stream) == 1 {
249+
pa_stream_cork(stream, 0, None, ptr::null_mut());
250+
}
251+
252+
match state.replace(WriteState::Ok) {
253+
WriteState::Ok => {}
254+
WriteState::PulseError(error) => return Err(error),
255+
WriteState::Panicked(message) => panic::panic_any(message),
256+
}
257+
}
258+
259+
Ok(())
260+
}
261+
}
262+
263+
fn check(code: i32, context: *const pa_context) -> Result<(), String> {
264+
if code < 0 {
265+
Err(context_error(context))
266+
} else {
267+
Ok(())
268+
}
269+
}
270+
271+
fn check_ptr<T>(ptr: *mut T, context: *const pa_context) -> Result<*mut T, String> {
272+
if ptr.is_null() {
273+
Err(context_error(context))
274+
} else {
275+
Ok(ptr)
276+
}
277+
}
278+
279+
fn context_error(context: *const pa_context) -> String {
280+
unsafe {
281+
let error = pa_context_errno(context);
282+
CStr::from_ptr(pa_strerror(error))
283+
.to_string_lossy()
284+
.into_owned()
285+
}
286+
}
287+
288+
fn defer<F>(f: F) -> impl Drop
289+
where
290+
F: Fn(),
291+
{
292+
struct Defer<F>(F)
293+
where
294+
F: Fn();
295+
296+
impl<F> Drop for Defer<F>
297+
where
298+
F: Fn(),
299+
{
300+
fn drop(&mut self) {
301+
(self.0)();
302+
}
303+
}
304+
305+
Defer(f)
306+
}

0 commit comments

Comments
 (0)