diff --git a/Cargo.lock b/Cargo.lock index 0ad277732..a75a9875b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,6 +86,16 @@ dependencies = [ "libc", ] +[[package]] +name = "annotate-snippets" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "710e8eae58854cdc1790fcb56cca04d712a17be849eeb81da2a724bf4bae2bc4" +dependencies = [ + "anstyle", + "unicode-width", +] + [[package]] name = "anstream" version = "0.6.20" @@ -227,6 +237,25 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "annotate-snippets", + "bitflags 2.9.4", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -288,6 +317,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "cfg-expr" version = "0.20.3" @@ -334,6 +372,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading 0.8.9", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -365,6 +414,21 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "cookie-factory" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" + [[package]] name = "core-foundation" version = "0.9.4" @@ -992,6 +1056,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "gobject-sys" version = "0.21.2" @@ -1034,7 +1104,7 @@ dependencies = [ "futures-util", "glib", "gstreamer-sys", - "itertools", + "itertools 0.14.0", "kstring", "libc", "muldiv", @@ -1637,6 +1707,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1674,7 +1753,7 @@ dependencies = [ "bitflags 1.3.2", "lazy_static", "libc", - "libloading", + "libloading 0.7.4", "log", "pkg-config", ] @@ -1769,6 +1848,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link 0.2.0", +] + [[package]] name = "libm" version = "0.2.15" @@ -2034,12 +2123,15 @@ dependencies = [ "librespot-audio", "librespot-core", "librespot-metadata", + "libspa", "log", "ogg", + "pipewire", "portable-atomic", "portaudio-rs", "rand 0.9.2", "rand_distr", + "ringbuf", "rodio", "sdl2", "shell-words", @@ -2057,6 +2149,34 @@ dependencies = [ "protobuf-codegen", ] +[[package]] +name = "libspa" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b8cfa2a7656627b4c92c6b9ef929433acd673d5ab3708cda1b18478ac00df4" +dependencies = [ + "bitflags 2.9.4", + "cc", + "convert_case", + "cookie-factory", + "libc", + "libspa-sys", + "nix", + "nom 8.0.0", + "system-deps", +] + +[[package]] +name = "libspa-sys" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "901049455d2eb6decf9058235d745237952f4804bc584c5fcb41412e6adcc6e0" +dependencies = [ + "bindgen", + "cc", + "system-deps", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -2127,6 +2247,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2218,6 +2344,25 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "nonzero_ext" version = "0.3.0" @@ -2633,6 +2778,34 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pipewire" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9688b89abf11d756499f7c6190711d6dbe5a3acdb30c8fbf001d6596d06a8d44" +dependencies = [ + "anyhow", + "bitflags 2.9.4", + "libc", + "libspa", + "libspa-sys", + "nix", + "once_cell", + "pipewire-sys", + "thiserror 2.0.16", +] + +[[package]] +name = "pipewire-sys" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb028afee0d6ca17020b090e3b8fa2d7de23305aef975c7e5192a5050246ea36" +dependencies = [ + "bindgen", + "libspa-sys", + "system-deps", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -3055,6 +3228,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuf" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c" +dependencies = [ + "crossbeam-utils", + "portable-atomic", + "portable-atomic-util", +] + [[package]] name = "rodio" version = "0.21.1" @@ -4106,6 +4290,12 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-width" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 63a5927a7..216cb25fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,6 +102,11 @@ alsa-backend = ["librespot-playback/alsa-backend"] # Integrates with the PulseAudio sound server for advanced audio routing. pulseaudio-backend = ["librespot-playback/pulseaudio-backend"] +# pipewire-backend: PipeWire backend (Linux only). +# Modern audio server that provides low-latency audio with advanced routing capabilities. +# Designed as a replacement for both PulseAudio and JACK. +pipewire-backend = ["librespot-playback/pipewire-backend"] + # jackaudio-backend: JACK Audio Connection Kit backend. # Professional audio backend for low-latency, high-quality audio routing. jackaudio-backend = ["librespot-playback/jackaudio-backend"] diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 2001c680b..01b394a38 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -20,6 +20,7 @@ gstreamer-backend = [ "dep:gstreamer-audio", ] jackaudio-backend = ["dep:jack"] +pipewire-backend = ["dep:pipewire", "dep:libspa", "dep:ringbuf"] portaudio-backend = ["dep:portaudio-rs"] pulseaudio-backend = ["dep:libpulse-binding", "dep:libpulse-simple-binding"] rodio-backend = ["dep:cpal", "dep:rodio"] @@ -74,6 +75,12 @@ gstreamer-audio = { version = "0.24", optional = true } libpulse-binding = { version = "2", optional = true, default-features = false } libpulse-simple-binding = { version = "2", optional = true, default-features = false } +# Pipewire dependencies +libspa = { version = "0.9", optional = true } +pipewire = { version = "0.9", optional = true } +ringbuf = { version = "0.4", optional = true } + + # Rodio dependencies cpal = { version = "0.16", optional = true } rodio = { version = "0.21", optional = true, default-features = false, features = [ diff --git a/playback/src/audio_backend/mod.rs b/playback/src/audio_backend/mod.rs index f8f43e3fa..73c4d86ea 100644 --- a/playback/src/audio_backend/mod.rs +++ b/playback/src/audio_backend/mod.rs @@ -95,6 +95,11 @@ mod pulseaudio; #[cfg(feature = "pulseaudio-backend")] use self::pulseaudio::PulseAudioSink; +#[cfg(feature = "pipewire-backend")] +mod pipewire; +#[cfg(feature = "pipewire-backend")] +use self::pipewire::PipeWireSink; + #[cfg(feature = "jackaudio-backend")] mod jackaudio; #[cfg(feature = "jackaudio-backend")] @@ -130,6 +135,8 @@ pub const BACKENDS: &[(&str, SinkBuilder)] = &[ (PortAudioSink::NAME, mk_sink::>), #[cfg(feature = "pulseaudio-backend")] (PulseAudioSink::NAME, mk_sink::), + #[cfg(feature = "pipewire-backend")] + (PipeWireSink::NAME, mk_sink::), #[cfg(feature = "jackaudio-backend")] (JackSink::NAME, mk_sink::), #[cfg(feature = "gstreamer-backend")] diff --git a/playback/src/audio_backend/pipewire.rs b/playback/src/audio_backend/pipewire.rs new file mode 100644 index 000000000..ba7c1df77 --- /dev/null +++ b/playback/src/audio_backend/pipewire.rs @@ -0,0 +1,338 @@ +use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult}; +use crate::config::AudioFormat; +use crate::convert::Converter; +use crate::decoder::AudioPacket; +use crate::{NUM_CHANNELS, SAMPLE_RATE}; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use std::thread; +use thiserror::Error; + +use libspa::sys as spa_sys; +use pipewire as pw; +use pw::{properties::properties, spa}; +use ringbuf::{ + HeapRb, + traits::{Consumer, Producer, Split}, +}; +use spa::pod::Pod; + +type RingProducer = ringbuf::wrap::caching::Caching< + std::sync::Arc>>, + true, + false, +>; +type RingConsumer = ringbuf::wrap::caching::Caching< + std::sync::Arc>>, + false, + true, +>; + +// Ring buffer size: 1 second of audio at max quality (F64, stereo, 44.1kHz) +const RING_BUFFER_SIZE: usize = 44100 * 2 * 8; + +#[derive(Debug, Error)] +enum PipeWireError { + #[error(" Failed to Create Main Loop: {0}")] + MainLoopCreation(String), + + #[error(" Failed to Create Stream: {0}")] + StreamCreation(String), + + #[error(" Failed to Connect Stream: {0}")] + StreamConnect(String), + + #[error(" Stream Not Connected")] + NotConnected, +} + +impl From for SinkError { + fn from(e: PipeWireError) -> SinkError { + use PipeWireError::*; + let es = e.to_string(); + match e { + MainLoopCreation(_) | StreamCreation(_) | StreamConnect(_) => { + SinkError::ConnectionRefused(es) + } + NotConnected => SinkError::NotConnected(es), + } + } +} + +pub struct PipeWireSink { + format: AudioFormat, + // Lock-free ring buffer producer for audio data + producer: Option, + // Flag to signal thread to stop + quit_flag: Arc, + initialized: bool, + _main_loop_handle: Option>, +} + +fn calculate_sample_size(format: AudioFormat) -> usize { + use AudioFormat::*; + match format { + F64 => 8, + F32 | S32 | S24 => 4, + S24_3 => 3, + S16 => 2, + } +} + +fn convert_audio_format(format: AudioFormat) -> spa::param::audio::AudioFormat { + use AudioFormat::*; + match format { + F64 => spa::param::audio::AudioFormat::F64LE, + F32 => spa::param::audio::AudioFormat::F32LE, + S32 => spa::param::audio::AudioFormat::S32LE, + S24 => spa::param::audio::AudioFormat::S24_32LE, + S24_3 => spa::param::audio::AudioFormat::S24LE, + S16 => spa::param::audio::AudioFormat::S16LE, + } +} + +impl Open for PipeWireSink { + fn open(_device: Option, format: AudioFormat) -> Self { + info!("Using PipeWireSink with format: {format:?}"); + + Self { + format, + producer: None, + quit_flag: Arc::new(AtomicBool::new(false)), + initialized: false, + _main_loop_handle: None, + } + } +} + +impl Sink for PipeWireSink { + fn start(&mut self) -> SinkResult<()> { + if self.initialized { + return Ok(()); + } + + info!("Starting PipeWire sink..."); + + let format = self.format; + let quit_flag = Arc::clone(&self.quit_flag); + + // Create a lock-free ring buffer for real-time audio transfer + let ring_buffer = HeapRb::::new(RING_BUFFER_SIZE); + let (producer, consumer) = ring_buffer.split(); + + // Store the producer for write_bytes + self.producer = Some(producer); + + // Run PipeWire main loop in a separate thread with the consumer + let handle = thread::spawn(move || { + if let Err(e) = run_pipewire_loop(consumer, quit_flag, format) { + error!("PipeWire loop error: {}", e); + } + }); + + self._main_loop_handle = Some(handle); + self.initialized = true; + + // Give the thread a moment to initialize + thread::sleep(std::time::Duration::from_millis(100)); + + info!("PipeWire sink started successfully"); + Ok(()) + } + + fn stop(&mut self) -> SinkResult<()> { + if !self.initialized { + return Ok(()); + } + + info!("Stopping PipeWire sink..."); + + // Signal the thread to quit + self.quit_flag.store(true, Ordering::Relaxed); + + // Drop the producer to signal the consumer thread to exit + self.producer = None; + + // Wait for the thread to finish with a timeout + if let Some(handle) = self._main_loop_handle.take() { + // Give it a moment to exit gracefully + thread::sleep(std::time::Duration::from_millis(100)); + let _ = handle.join(); + } + + // Reset the quit flag for potential restart + self.quit_flag.store(false, Ordering::Relaxed); + self.initialized = false; + + info!("PipeWire sink stopped"); + Ok(()) + } + + sink_as_bytes!(); +} + +impl SinkAsBytes for PipeWireSink { + fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { + if !self.initialized { + return Err(PipeWireError::NotConnected.into()); + } + + if let Some(ref mut producer) = self.producer { + // Push data to the lock-free ring buffer in chunks + // This is much more efficient than byte-by-byte and is wait-free + let mut offset = 0; + while offset < data.len() { + let written = producer.push_slice(&data[offset..]); + if written == 0 { + // Ring buffer is full, wait a tiny bit for consumer to catch up + thread::sleep(std::time::Duration::from_micros(100)); + } else { + offset += written; + } + } + Ok(()) + } else { + Err(PipeWireError::NotConnected.into()) + } + } +} + +impl Drop for PipeWireSink { + fn drop(&mut self) { + let _ = self.stop(); + } +} + +impl PipeWireSink { + pub const NAME: &'static str = "pipewire"; +} + +fn run_pipewire_loop( + consumer: RingConsumer, + quit_flag: Arc, + format: AudioFormat, +) -> Result<(), PipeWireError> { + // Initialize PipeWire + pw::init(); + + let mainloop = pw::main_loop::MainLoopRc::new(None) + .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; + + let context = pw::context::ContextRc::new(&mainloop, None) + .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; + + let core = context + .connect_rc(None) + .map_err(|e| PipeWireError::MainLoopCreation(format!("{:?}", e)))?; + + let stream = pw::stream::StreamBox::new( + &core, + "librespot-playback", + properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_ROLE => "Music", + *pw::keys::MEDIA_CATEGORY => "Playback", + *pw::keys::AUDIO_CHANNELS => NUM_CHANNELS.to_string().as_str(), + *pw::keys::APP_NAME => "librespot", + }, + ) + .map_err(|e| PipeWireError::StreamCreation(format!("{:?}", e)))?; + + let sample_size = calculate_sample_size(format); + let stride = sample_size * NUM_CHANNELS as usize; + + // Clone mainloop for use in the listener callback + let mainloop_quit = mainloop.clone(); + + // Use PipeWire's real-time callback with lock-free ring buffer consumer + // This ensures optimal performance and real-time safety + let _listener = stream + .add_local_listener_with_user_data((consumer, quit_flag.clone())) + .process(move |stream, (consumer, quit_flag)| { + // Check if we should quit + if quit_flag.load(Ordering::Relaxed) { + mainloop_quit.quit(); + return; + } + + match stream.dequeue_buffer() { + None => { + // No buffer available, this is normal + } + Some(mut buffer) => { + let datas = buffer.datas_mut(); + let data = &mut datas[0]; + + let n_frames = if let Some(slice) = data.data() { + let n_frames = slice.len() / stride; + let total_bytes = n_frames * stride; + + // Pop data from the lock-free ring buffer + // This is wait-free and real-time safe + let bytes_read = consumer.pop_slice(slice); + + // Fill any remaining space with silence if underrun + if bytes_read < total_bytes { + slice[bytes_read..total_bytes].fill(0); + } + + n_frames + } else { + 0 + }; + + // Configure the buffer chunk metadata + let chunk = data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as _; + *chunk.size_mut() = (stride * n_frames) as _; + } + } + }) + .register() + .map_err(|e| PipeWireError::StreamCreation(format!("{:?}", e)))?; + + // Setup audio format parameters - matches pipewire_tone_test.rs + let mut audio_info = spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(convert_audio_format(format)); + audio_info.set_rate(SAMPLE_RATE); + audio_info.set_channels(NUM_CHANNELS as u32); + + let mut position = [0; spa::param::audio::MAX_CHANNELS]; + position[0] = spa_sys::SPA_AUDIO_CHANNEL_FL; + position[1] = spa_sys::SPA_AUDIO_CHANNEL_FR; + audio_info.set_position(position); + + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(pw::spa::pod::Object { + type_: spa_sys::SPA_TYPE_OBJECT_Format, + id: spa_sys::SPA_PARAM_EnumFormat, + properties: audio_info.into(), + }), + ) + .map_err(|e| PipeWireError::StreamCreation(format!("{:?}", e)))? + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + // Connect stream - matches pipewire_tone_test.rs + stream + .connect( + spa::utils::Direction::Output, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + ) + .map_err(|e| PipeWireError::StreamConnect(format!("{:?}", e)))?; + + // Run the main loop + mainloop.run(); + + Ok(()) +}