Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ impl Source for MixerSource {
// Ok(())
// }
}

fn set_pause_handle(&mut self, pause_handle: crate::source::PauseHandle) {
// TODO create MixerController in pausable. It will only set paused on this
// pause handle when all sources in the mixer have been paused
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have been paused or exhausted.

// Create one here, then call set_pause_handle on all sources in this
// mixer with that mixercontroller.
todo!()
}
}

impl Iterator for MixerSource {
Expand Down
60 changes: 48 additions & 12 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::math::nz;
use crate::source::{Empty, SeekError, Source, Zero};
use crate::source::{Empty, PauseHandle, SeekError, Source};
use crate::Sample;

use crate::common::{ChannelCount, SampleRate};
Expand All @@ -26,15 +25,18 @@ use std::sync::mpsc::{channel, Receiver, Sender};
/// - If you pass `false`, then the queue will report that it has finished playing.
///
pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
let pause_handle = Arc::new(Mutex::new(None));
let input = Arc::new(SourcesQueueInput {
next_sounds: Mutex::new(Vec::new()),
keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
pause_handle: Arc::clone(&pause_handle),
});

let output = SourcesQueueOutput {
current: Box::new(Empty::new()) as Box<_>,
signal_after_end: None,
input: input.clone(),
pause_handle,
};

(input, output)
Expand All @@ -51,6 +53,7 @@ pub struct SourcesQueueInput {

// See constructor.
keep_alive_if_empty: AtomicBool,
pause_handle: Arc<Mutex<Option<PauseHandle>>>,
}

impl SourcesQueueInput {
Expand All @@ -60,10 +63,21 @@ impl SourcesQueueInput {
where
T: Source + Send + 'static,
{
let is_paused = source.is_paused();
self.next_sounds
.lock()
.unwrap()
.push((Box::new(source) as Box<_>, None));
if is_paused {
if let Some(pause_handle) = self
.pause_handle
.lock()
.expect("audio thread should not panic")
.as_ref()
{
pause_handle.unpause();
}
}
}

/// Adds a new source to the end of the queue.
Expand Down Expand Up @@ -104,12 +118,12 @@ impl SourcesQueueInput {
pub struct SourcesQueueOutput {
// The current iterator that produces samples.
current: Box<dyn Source + Send>,

// Signal this sender before picking from `next`.
signal_after_end: Option<Sender<()>>,

// The next sounds.
input: Arc<SourcesQueueInput>,
// Signal whether this source is paused to the output
pause_handle: Arc<Mutex<Option<PauseHandle>>>,
}

const THRESHOLD: usize = 512;
Expand Down Expand Up @@ -180,6 +194,17 @@ impl Source for SourcesQueueOutput {
fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
self.current.try_seek(pos)
}

fn set_pause_handle(&mut self, pause_handle: PauseHandle) {
*self
.pause_handle
.lock()
.expect("audio thread should not panic") = Some(pause_handle);
}

fn is_paused(&self) -> bool {
false
}
}

impl Iterator for SourcesQueueOutput {
Expand All @@ -195,8 +220,9 @@ impl Iterator for SourcesQueueOutput {

// Since `self.current` has finished, we need to pick the next sound.
// In order to avoid inlining this expensive operation, the code is in another function.
if self.go_next().is_err() {
return None;
match self.go_next() {
Next::Paused | Next::ShouldEnd => return None,
Next::IsNowCurrent => (),
}
}
}
Expand All @@ -207,12 +233,18 @@ impl Iterator for SourcesQueueOutput {
}
}

enum Next {
Paused,
ShouldEnd,
IsNowCurrent,
}

impl SourcesQueueOutput {
// Called when `current` is empty, and we must jump to the next element.
// Returns `Ok` if the sound should continue playing, or an error if it should stop.
//
// This method is separate so that it is not inlined.
fn go_next(&mut self) -> Result<(), ()> {
fn go_next(&mut self) -> Next {
if let Some(signal_after_end) = self.signal_after_end.take() {
let _ = signal_after_end.send(());
}
Expand All @@ -221,12 +253,16 @@ impl SourcesQueueOutput {
let mut next = self.input.next_sounds.lock().unwrap();

if next.is_empty() {
let silence = Box::new(Zero::new_samples(nz!(1), nz!(44100), THRESHOLD)) as Box<_>;
if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
// Play a short silence in order to avoid spinlocking.
(silence, None)
self.pause_handle
.lock()
.expect("audio thread should not panic")
.as_ref()
.expect("should be set before next is called")
.pause();
return Next::Paused;
} else {
return Err(());
return Next::ShouldEnd;
}
} else {
next.remove(0)
Expand All @@ -235,7 +271,7 @@ impl SourcesQueueOutput {

self.current = next;
self.signal_after_end = signal_after_end;
Ok(())
Next::IsNowCurrent
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub mod noise;
#[cfg_attr(docsrs, doc(cfg(feature = "noise")))]
pub use self::noise::{Pink, WhiteUniform};

pub(crate) mod pausing;
pub(crate) use pausing::{PauseControl, PauseHandle};
/// A source of samples.
///
/// # A quick lesson about sounds
Expand Down Expand Up @@ -181,6 +183,15 @@ pub trait Source: Iterator<Item = Sample> {
/// `None` indicates at the same time "infinite" or "unknown".
fn total_duration(&self) -> Option<Duration>;

#[expect(unused_variables)]
fn set_pause_handle(&mut self, pause_handle: PauseHandle) {
todo!()
}

fn is_paused(&self) -> bool {
todo!()
}

/// Stores the source in a buffer in addition to returning it. This iterator can be cloned.
#[inline]
fn buffered(self) -> Buffered<Self>
Expand Down
12 changes: 12 additions & 0 deletions src/source/pausable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::time::Duration;

use super::SeekError;
use crate::common::{ChannelCount, SampleRate};
use crate::source::pausing;
use crate::source::PauseHandle;
use crate::Source;

/// Builds a `Pausable` object.
Expand All @@ -18,6 +20,7 @@ where
input: source,
paused_channels,
remaining_paused_samples: 0,
pause_handle: None,
}
}

Expand All @@ -32,6 +35,7 @@ pub struct Pausable<I> {
input: I,
paused_channels: Option<ChannelCount>,
remaining_paused_samples: u16,
pause_handle: Option<pausing::PauseHandle>,
}

impl<I> Pausable<I>
Expand Down Expand Up @@ -130,4 +134,12 @@ where
fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
self.input.try_seek(pos)
}

fn set_pause_handle(&mut self, pause_handle: PauseHandle) {
self.pause_handle = Some(pause_handle)
}

fn is_paused(&self) -> bool {
self.paused_channels.is_some()
}
}
72 changes: 72 additions & 0 deletions src/source/pausing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use cpal::traits::StreamTrait;
use std::sync::{Arc, Mutex, Weak};

// TODO add more such as one for sources passed to a mixer (should only call
// pause on the downstream PauseHandle when all stream in the mixer have been
// baused
#[derive(Debug, Clone)]
pub(crate) enum PauseControl {
StreamMixer(StreamMixerControl),
}

impl PauseControl {
fn pause(&self) {
match self {
PauseControl::StreamMixer(stream_mixer_control) => stream_mixer_control.pause(),
}
}
fn unpause(&self) {
match self {
PauseControl::StreamMixer(stream_mixer_control) => stream_mixer_control.unpause(),
}
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct StreamMixerControl {
pub(crate) stream: Arc<Mutex<Option<Weak<cpal::Stream>>>>,
}

impl StreamMixerControl {
fn pause(&self) {
let stream = self.stream.lock().expect("audio thread should not panic");
let stream = stream.as_ref().expect("should be set just after creation");
let Some(stream) = stream.upgrade() else {
return; // stream has been dropped
};
stream.pause().unwrap(); // TODO (defer till design done and working) errors
}
fn unpause(&self) {
let stream = self.stream.lock().expect("audio thread should not panic");
let stream = stream.as_ref().expect("should be set just after creation");
let Some(stream) = stream.upgrade() else {
return; // stream has been dropped
};
stream.play().unwrap(); // TODO (defer till design done and working) errors
}
}

impl StreamMixerControl {
pub(crate) fn set_stream(&self, stream: std::sync::Weak<cpal::Stream>) {
*self.stream.lock().expect("audio thread should not panic") = Some(stream);
}
}

#[derive(Debug, Clone)]
pub struct PauseHandle {
pub(crate) control: Arc<PauseControl>,
}

impl PauseHandle {
pub(crate) fn pause(&self) {
self.control.pause();
}

pub(crate) fn unpause(&self) {
self.control.unpause();
}

pub(crate) fn new(control: PauseControl) -> Self {
Self { control: Arc::new(control) }
}
}
19 changes: 16 additions & 3 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
//! There is also a convenience function `play` for using that output mixer to
//! play a single sound.
use crate::common::{assert_error_traits, ChannelCount, SampleRate};
use crate::decoder;
use crate::math::nz;
use crate::mixer::{mixer, Mixer, MixerSource};
use crate::sink::Sink;
use crate::source::{pausing, PauseControl, PauseHandle};
use crate::{decoder, Source};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{BufferSize, Sample, SampleFormat, StreamConfig};
use std::fmt;
use std::io::{Read, Seek};
use std::marker::Sync;
use std::num::NonZero;
use std::sync::Arc;

const HZ_44100: SampleRate = nz!(44_100);

Expand Down Expand Up @@ -46,7 +48,7 @@ pub struct OutputStream {
config: OutputStreamConfig,
mixer: Mixer,
log_on_drop: bool,
_stream: cpal::Stream,
_stream: Arc<cpal::Stream>,
}

impl OutputStream {
Expand Down Expand Up @@ -450,9 +452,20 @@ impl OutputStream {
E: FnMut(cpal::StreamError) + Send + 'static,
{
Self::validate_config(config);
let (controller, source) = mixer(config.channel_count, config.sample_rate);
let pause_handle = PauseHandle::new(PauseControl::StreamMixer(
pausing::StreamMixerControl::default(),
));
let (controller, mut source) = mixer(config.channel_count, config.sample_rate);
source.set_pause_handle(pause_handle.clone());
Self::init_stream(device, config, source, error_callback).and_then(|stream| {
stream.play().map_err(StreamError::PlayStreamError)?;
let stream = Arc::new(stream);
#[expect(irrefutable_let_patterns, reason = "more controllers will follow")]
let PauseControl::StreamMixer(ref control) = *pause_handle.control
else {
unreachable!("just set pause_handle to StreamMixer")
};
control.set_stream(Arc::downgrade(&stream));
Ok(Self {
_stream: stream,
mixer: controller,
Expand Down
Loading