diff --git a/src/io/manager.rs b/src/io/manager.rs index 41746ea..02d0144 100644 --- a/src/io/manager.rs +++ b/src/io/manager.rs @@ -1,15 +1,15 @@ use anyhow::{Context, Result}; use crossbeam::channel::{Sender, bounded}; -use jack::{AsyncClient, Client, ClientOptions, contrib::ClosureProcessHandler}; +use jack::{AsyncClient, Client, ClientOptions}; use log::error; -use crate::io::processor::{ProcessHandler, Processor}; +use crate::io::processor::Processor; use crate::io::recorder::Recorder; use crate::sim::chain::AmplifierChain; /// Manages the audio processing chain and JACK client pub struct ProcessorManager { - _active_client: AsyncClient>, + _active_client: AsyncClient, recorder: Option, /// GUI → audio thread: push a completely new preset amp_tx: Sender>, @@ -24,18 +24,18 @@ impl ProcessorManager { /// Creates a new ProcessorManager pub fn new() -> Result { let (client, _) = Client::new("rustortion", ClientOptions::NO_START_SERVER) - .context("Failed to create JACK client")?; + .context("failed to create JACK client")?; let (tx_amp, rx_amp) = bounded::>(1); - let processor = Processor::new_with_channel(&client, rx_amp, None); - let handler = ClosureProcessHandler::new(processor.into_process_handler()); + let processor = + Processor::new(&client, rx_amp, None).context("error creating processor")?; let sample_rate = client.sample_rate() as f32; let _active_client = client - .activate_async(Notifications, handler) - .with_context(|| "Failed to activate async")?; + .activate_async(Notifications, processor) + .context("failed to activate async client")?; Ok(Self { sample_rate, @@ -62,7 +62,7 @@ impl ProcessorManager { } let recorder = Recorder::new(self.sample_rate as u32, record_dir) - .context("Failed to start recorder")?; + .context("failed to start recorder")?; let _ = recorder.sender(); self.recorder = Some(recorder); diff --git a/src/io/processor.rs b/src/io/processor.rs index 846fdac..b05dba5 100644 --- a/src/io/processor.rs +++ b/src/io/processor.rs @@ -1,46 +1,62 @@ use crate::io::recorder::{AudioBlock, BLOCK_FRAMES}; use crate::sim::chain::AmplifierChain; +use anyhow::{Context, Result}; use crossbeam::channel::{Receiver, Sender}; -use jack::{AudioIn, AudioOut, Client, Control, Port, ProcessScope}; -use log::{error, info}; +use jack::{AudioIn, AudioOut, Client, Control, Frames, Port, ProcessHandler, ProcessScope}; +use log::{debug, error}; use rubato::{ Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction, }; -pub type ProcessHandler = Box Control + Send + 'static>; +const CHANNELS: usize = 1; +const OVERSAMPLE_FACTOR: f64 = 4.0; +const MAX_BLOCK_SIZE: usize = 4096; pub struct Processor { - /// Amplifier. + /// Amplifier chain, used for processing amp simulations on the input. chain: Box, + /// Channel for updating the amplifier chain. rx_chain: Receiver>, - /// Optional recorder. + /// Optional recorder channel. tx_audio: Option>, in_port: Port, - out_l: Port, - out_r: Port, + out_port_left: Port, + out_port_right: Port, upsampler: SincFixedIn, downsampler: SincFixedIn, /// Reusable buffer for input frames. - input_frames: Vec>, + input_buffer: Vec>, + /// Reusable buffer for upsampled frames. + upsampled_buffer: Vec>, + /// Reusable buffer for downsampled frames. + downsampled_buffer: Vec>, } impl Processor { - pub fn new_with_channel( + pub fn new( client: &Client, rx_chain: Receiver>, tx_audio: Option>, - ) -> Self { - let in_port = client.register_port("in", AudioIn::default()).unwrap(); - let out_l = client.register_port("out_l", AudioOut::default()).unwrap(); - let out_r = client.register_port("out_r", AudioOut::default()).unwrap(); - - let _ = client.connect_ports_by_name("system:capture_1", "rustortion:in"); - let _ = client.connect_ports_by_name("rustortion:out_l", "system:playback_1"); - let _ = client.connect_ports_by_name("rustortion:out_r", "system:playback_2"); - - let channels = 1; - let oversample_factor: f32 = 4.0; - let max_chunk_size = 128; + ) -> Result { + let in_port = client + .register_port("in_port", AudioIn::default()) + .context("failed to register in port")?; + let out_port_left = client + .register_port("out_port_left", AudioOut::default()) + .context("failed to register out port left")?; + let out_port_right = client + .register_port("out_port_right", AudioOut::default()) + .context("failed to register out port right")?; + + client + .connect_ports_by_name("system:capture_1", "rustortion:in_port") + .context("failed to connect to in port")?; + client + .connect_ports_by_name("rustortion:out_port_left", "system:playback_1") + .context("failed to connect to out port left")?; + client + .connect_ports_by_name("rustortion:out_port_right", "system:playback_2") + .context("failed to connect to out port right")?; let interp_params = SincInterpolationParameters { sinc_len: 256, @@ -57,107 +73,151 @@ impl Processor { window: WindowFunction::BlackmanHarris2, }; - let upsampler = SincFixedIn::::new( - oversample_factor as f64, + let mut upsampler = SincFixedIn::::new( + OVERSAMPLE_FACTOR, 1.0, interp_params, - max_chunk_size, - channels, + MAX_BLOCK_SIZE, + CHANNELS, ) - .unwrap(); + .context("failed to create upsampler")?; - let downsampler = SincFixedIn::::new( - 1.0 / oversample_factor as f64, + let mut downsampler = SincFixedIn::::new( + 1.0 / OVERSAMPLE_FACTOR, 1.0, down_interp_params, - (max_chunk_size as f32 * oversample_factor) as usize, - channels, + MAX_BLOCK_SIZE * OVERSAMPLE_FACTOR as usize, + CHANNELS, ) - .unwrap(); + .context("failed to create downsampler")?; - let input_frames = vec![Vec::with_capacity(client.buffer_size() as usize)]; + let buffer_size = client.buffer_size() as usize; + upsampler + .set_chunk_size(buffer_size) + .context("failed to set upsampler chunk size")?; + downsampler + .set_chunk_size(buffer_size * OVERSAMPLE_FACTOR as usize) + .context("failed to set downsampler chunk size")?; - Self { + let input_buffer = vec![Vec::with_capacity(buffer_size)]; + let upsampled_buffer = upsampler.output_buffer_allocate(true); + let downsampled_buffer = downsampler.output_buffer_allocate(true); + + Ok(Self { chain: Box::new(AmplifierChain::new("Default")), rx_chain, tx_audio, in_port, - out_l, - out_r, + out_port_left, + out_port_right, upsampler, downsampler, - input_frames, - } + input_buffer, + upsampled_buffer, + downsampled_buffer, + }) } +} - pub fn into_process_handler(mut self) -> ProcessHandler { - Box::new(move |_client: &Client, ps: &ProcessScope| -> Control { - if let Ok(new_chain) = self.rx_chain.try_recv() { - self.chain = new_chain; - info!("Received new chain"); - } +impl ProcessHandler for Processor { + fn process(&mut self, _c: &Client, ps: &ProcessScope) -> Control { + if let Ok(new_chain) = self.rx_chain.try_recv() { + self.chain = new_chain; + debug!("Received new chain"); + } - let n_frames = ps.n_frames() as usize; - let in_buf = self.in_port.as_slice(ps); + let n_frames = ps.n_frames() as usize; + let input = self.in_port.as_slice(ps); - let out_buf_l = self.out_l.as_mut_slice(ps); - let out_buf_r = self.out_r.as_mut_slice(ps); + self.input_buffer[0].clear(); - { - let input_channel = &mut self.input_frames[0]; - input_channel.clear(); + debug_assert!( + self.input_buffer[0].capacity() >= n_frames, + "input_buffer too small; buffer_size callback missing an allocation" + ); - if input_channel.capacity() < n_frames { - input_channel.reserve(n_frames - input_channel.capacity()); - } + self.input_buffer[0].extend_from_slice(input); - input_channel.extend_from_slice(in_buf); + let (_, upsampled_frames) = match self.upsampler.process_into_buffer( + &self.input_buffer, + &mut self.upsampled_buffer, + None, + ) { + Ok(data) => data, + Err(e) => { + error!("Upsampler error: {e}"); + return Control::Continue; } + }; - let mut upsampled = match self.upsampler.process(&self.input_frames, None) { - Ok(data) => data, - Err(e) => { - error!("Upsampler error: {e}"); - return Control::Continue; - } - }; - - let chain = self.chain.as_mut(); - for s in &mut upsampled[0] { - *s = chain.process(*s); - } + let chain = self.chain.as_mut(); + for s in &mut self.upsampled_buffer[0][..upsampled_frames] { + *s = chain.process(*s); + } - let downsampled = match self.downsampler.process(&upsampled, None) { - Ok(data) => data, - Err(e) => { - error!("Downsampler error: {e}"); - return Control::Continue; - } - }; - - let final_samples = &downsampled[0]; - let frames_to_copy = final_samples.len().min(n_frames); - - out_buf_l[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]); - out_buf_r[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]); - for i in frames_to_copy..n_frames { - out_buf_l[i] = 0.0; - out_buf_r[i] = 0.0; + let (_, downsampled_frames) = match self.downsampler.process_into_buffer( + &self.upsampled_buffer, + &mut self.downsampled_buffer, + None, + ) { + Ok(data) => data, + Err(e) => { + error!("Downsampler error: {e}"); + return Control::Continue; } + }; + + let final_samples = &self.downsampled_buffer[0][..downsampled_frames]; + let frames_to_copy = final_samples.len().min(n_frames); + + let out_buffer_left = self.out_port_left.as_mut_slice(ps); + let out_buffer_right = self.out_port_right.as_mut_slice(ps); - if let Some(ref tx) = self.tx_audio { - let mut block = AudioBlock::with_capacity(BLOCK_FRAMES * 2); - for &s in final_samples.iter().take(BLOCK_FRAMES) { - let v = (s * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16; - block.extend_from_slice(&[v, v]); - } + out_buffer_left[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]); + out_buffer_right[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]); + for i in frames_to_copy..n_frames { + out_buffer_left[i] = 0.0; + out_buffer_right[i] = 0.0; + } - if let Err(e) = tx.try_send(block) { - error!("Error sending audio block: {e}"); - } + // If the recording channel is available, handle sending audio blocks to the recorder. + if let Some(ref tx) = self.tx_audio { + let mut block = AudioBlock::with_capacity(BLOCK_FRAMES * 2); + for &s in final_samples.iter().take(BLOCK_FRAMES) { + let v = (s * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16; + block.extend_from_slice(&[v, v]); } - Control::Continue - }) + if let Err(e) = tx.try_send(block) { + error!("Error sending audio block: {e}"); + } + } + + Control::Continue + } + + fn buffer_size(&mut self, _c: &Client, frames: Frames) -> Control { + debug!("JACK buffer size changed to {frames} frames"); + let new_size = frames as usize; + let cap = self.input_buffer[0].capacity(); + + if cap < new_size { + self.input_buffer[0].reserve_exact(new_size - cap); + } + + if let Err(e) = self.upsampler.set_chunk_size(new_size) { + error!("Upsampler cannot grow to {new_size}: {e}"); + } else { + self.upsampled_buffer = self.upsampler.output_buffer_allocate(true); + } + + let needed_down = new_size * OVERSAMPLE_FACTOR as usize; + if let Err(e) = self.downsampler.set_chunk_size(needed_down) { + error!("Downsampler cannot grow to {needed_down}: {e}"); + } else { + self.downsampled_buffer = self.downsampler.output_buffer_allocate(true); + } + + Control::Continue } } diff --git a/src/main.rs b/src/main.rs index 574b8d7..895d536 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,19 +53,19 @@ __________ __ __ .__ for &key in &required_vars { match std::env::var(key) { Ok(val) => info!("{} = {}", key, val), - Err(_) => anyhow::bail!("Environment variable '{}' must be set.", key), + Err(_) => anyhow::bail!("environment variable '{}' must be set.", key), } } let mut processor_manager = - ProcessorManager::new().context("Failed to create ProcessorManager")?; + ProcessorManager::new().context("failed to create ProcessorManager")?; let chain = create_mesa_boogie_dual_rectifier(processor_manager.sample_rate()); if recording { processor_manager .enable_recording(&args.recording_dir) - .with_context(|| format!("Failed to enable recording in '{}'", args.recording_dir))?; + .with_context(|| format!("failed to enable recording in '{}'", args.recording_dir))?; } processor_manager.set_amp_chain(chain); @@ -77,7 +77,7 @@ __________ __ __ .__ info!("Ctrl+C received, shutting down..."); shutdown_flag.store(false, Ordering::SeqCst); }) - .expect("Error setting Ctrl+C handler"); + .expect("error setting Ctrl+C handler"); while running.load(Ordering::SeqCst) { thread::sleep(Duration::from_secs(1));