Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/io/manager.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use anyhow::{Context, Result};
use crossbeam::channel::{Sender, bounded};
use jack::{AsyncClient, Client, ClientOptions, contrib::ClosureProcessHandler};
use jack::{AsyncClient, Client, ClientOptions};
use log::error;

use crate::io::processor::{ProcessHandler, Processor};
use crate::io::processor::Processor;
use crate::io::recorder::Recorder;
use crate::sim::chain::AmplifierChain;

/// Manages the audio processing chain and JACK client
pub struct ProcessorManager {
_active_client: AsyncClient<Notifications, ClosureProcessHandler<(), ProcessHandler>>,
_active_client: AsyncClient<Notifications, Processor>,
recorder: Option<Recorder>,
/// GUI → audio thread: push a completely new preset
amp_tx: Sender<Box<AmplifierChain>>,
Expand All @@ -24,18 +24,18 @@ impl ProcessorManager {
/// Creates a new ProcessorManager
pub fn new() -> Result<Self> {
let (client, _) = Client::new("rustortion", ClientOptions::NO_START_SERVER)
.context("Failed to create JACK client")?;
.context("failed to create JACK client")?;

let (tx_amp, rx_amp) = bounded::<Box<AmplifierChain>>(1);

let processor = Processor::new_with_channel(&client, rx_amp, None);
let handler = ClosureProcessHandler::new(processor.into_process_handler());
let processor =
Processor::new(&client, rx_amp, None).context("error creating processor")?;

let sample_rate = client.sample_rate() as f32;

let _active_client = client
.activate_async(Notifications, handler)
.with_context(|| "Failed to activate async")?;
.activate_async(Notifications, processor)
.context("failed to activate async client")?;

Ok(Self {
sample_rate,
Expand All @@ -62,7 +62,7 @@ impl ProcessorManager {
}

let recorder = Recorder::new(self.sample_rate as u32, record_dir)
.context("Failed to start recorder")?;
.context("failed to start recorder")?;
let _ = recorder.sender();

self.recorder = Some(recorder);
Expand Down
246 changes: 153 additions & 93 deletions src/io/processor.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,62 @@
use crate::io::recorder::{AudioBlock, BLOCK_FRAMES};
use crate::sim::chain::AmplifierChain;
use anyhow::{Context, Result};
use crossbeam::channel::{Receiver, Sender};
use jack::{AudioIn, AudioOut, Client, Control, Port, ProcessScope};
use log::{error, info};
use jack::{AudioIn, AudioOut, Client, Control, Frames, Port, ProcessHandler, ProcessScope};
use log::{debug, error};
use rubato::{
Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction,
};

pub type ProcessHandler = Box<dyn FnMut(&Client, &ProcessScope) -> Control + Send + 'static>;
const CHANNELS: usize = 1;
const OVERSAMPLE_FACTOR: f64 = 4.0;
const MAX_BLOCK_SIZE: usize = 4096;

pub struct Processor {
/// Amplifier.
/// Amplifier chain, used for processing amp simulations on the input.
chain: Box<AmplifierChain>,
/// Channel for updating the amplifier chain.
rx_chain: Receiver<Box<AmplifierChain>>,
/// Optional recorder.
/// Optional recorder channel.
tx_audio: Option<Sender<AudioBlock>>,
in_port: Port<AudioIn>,
out_l: Port<AudioOut>,
out_r: Port<AudioOut>,
out_port_left: Port<AudioOut>,
out_port_right: Port<AudioOut>,
upsampler: SincFixedIn<f32>,
downsampler: SincFixedIn<f32>,
/// Reusable buffer for input frames.
input_frames: Vec<Vec<f32>>,
input_buffer: Vec<Vec<f32>>,
/// Reusable buffer for upsampled frames.
upsampled_buffer: Vec<Vec<f32>>,
/// Reusable buffer for downsampled frames.
downsampled_buffer: Vec<Vec<f32>>,
}

impl Processor {
pub fn new_with_channel(
pub fn new(
client: &Client,
rx_chain: Receiver<Box<AmplifierChain>>,
tx_audio: Option<Sender<AudioBlock>>,
) -> Self {
let in_port = client.register_port("in", AudioIn::default()).unwrap();
let out_l = client.register_port("out_l", AudioOut::default()).unwrap();
let out_r = client.register_port("out_r", AudioOut::default()).unwrap();

let _ = client.connect_ports_by_name("system:capture_1", "rustortion:in");
let _ = client.connect_ports_by_name("rustortion:out_l", "system:playback_1");
let _ = client.connect_ports_by_name("rustortion:out_r", "system:playback_2");

let channels = 1;
let oversample_factor: f32 = 4.0;
let max_chunk_size = 128;
) -> Result<Self> {
let in_port = client
.register_port("in_port", AudioIn::default())
.context("failed to register in port")?;
let out_port_left = client
.register_port("out_port_left", AudioOut::default())
.context("failed to register out port left")?;
let out_port_right = client
.register_port("out_port_right", AudioOut::default())
.context("failed to register out port right")?;

client
.connect_ports_by_name("system:capture_1", "rustortion:in_port")
.context("failed to connect to in port")?;
client
.connect_ports_by_name("rustortion:out_port_left", "system:playback_1")
.context("failed to connect to out port left")?;
client
.connect_ports_by_name("rustortion:out_port_right", "system:playback_2")
.context("failed to connect to out port right")?;

let interp_params = SincInterpolationParameters {
sinc_len: 256,
Expand All @@ -57,107 +73,151 @@ impl Processor {
window: WindowFunction::BlackmanHarris2,
};

let upsampler = SincFixedIn::<f32>::new(
oversample_factor as f64,
let mut upsampler = SincFixedIn::<f32>::new(
OVERSAMPLE_FACTOR,
1.0,
interp_params,
max_chunk_size,
channels,
MAX_BLOCK_SIZE,
CHANNELS,
)
.unwrap();
.context("failed to create upsampler")?;

let downsampler = SincFixedIn::<f32>::new(
1.0 / oversample_factor as f64,
let mut downsampler = SincFixedIn::<f32>::new(
1.0 / OVERSAMPLE_FACTOR,
1.0,
down_interp_params,
(max_chunk_size as f32 * oversample_factor) as usize,
channels,
MAX_BLOCK_SIZE * OVERSAMPLE_FACTOR as usize,
CHANNELS,
)
.unwrap();
.context("failed to create downsampler")?;

let input_frames = vec![Vec::with_capacity(client.buffer_size() as usize)];
let buffer_size = client.buffer_size() as usize;
upsampler
.set_chunk_size(buffer_size)
.context("failed to set upsampler chunk size")?;
downsampler
.set_chunk_size(buffer_size * OVERSAMPLE_FACTOR as usize)
.context("failed to set downsampler chunk size")?;

Self {
let input_buffer = vec![Vec::with_capacity(buffer_size)];
let upsampled_buffer = upsampler.output_buffer_allocate(true);
let downsampled_buffer = downsampler.output_buffer_allocate(true);

Ok(Self {
chain: Box::new(AmplifierChain::new("Default")),
rx_chain,
tx_audio,
in_port,
out_l,
out_r,
out_port_left,
out_port_right,
upsampler,
downsampler,
input_frames,
}
input_buffer,
upsampled_buffer,
downsampled_buffer,
})
}
}

pub fn into_process_handler(mut self) -> ProcessHandler {
Box::new(move |_client: &Client, ps: &ProcessScope| -> Control {
if let Ok(new_chain) = self.rx_chain.try_recv() {
self.chain = new_chain;
info!("Received new chain");
}
impl ProcessHandler for Processor {
fn process(&mut self, _c: &Client, ps: &ProcessScope) -> Control {
if let Ok(new_chain) = self.rx_chain.try_recv() {
self.chain = new_chain;
debug!("Received new chain");
}

let n_frames = ps.n_frames() as usize;
let in_buf = self.in_port.as_slice(ps);
let n_frames = ps.n_frames() as usize;
let input = self.in_port.as_slice(ps);

let out_buf_l = self.out_l.as_mut_slice(ps);
let out_buf_r = self.out_r.as_mut_slice(ps);
self.input_buffer[0].clear();

{
let input_channel = &mut self.input_frames[0];
input_channel.clear();
debug_assert!(
self.input_buffer[0].capacity() >= n_frames,
"input_buffer too small; buffer_size callback missing an allocation"
);

if input_channel.capacity() < n_frames {
input_channel.reserve(n_frames - input_channel.capacity());
}
self.input_buffer[0].extend_from_slice(input);

input_channel.extend_from_slice(in_buf);
let (_, upsampled_frames) = match self.upsampler.process_into_buffer(
&self.input_buffer,
&mut self.upsampled_buffer,
None,
) {
Ok(data) => data,
Err(e) => {
error!("Upsampler error: {e}");
return Control::Continue;
}
};

let mut upsampled = match self.upsampler.process(&self.input_frames, None) {
Ok(data) => data,
Err(e) => {
error!("Upsampler error: {e}");
return Control::Continue;
}
};

let chain = self.chain.as_mut();
for s in &mut upsampled[0] {
*s = chain.process(*s);
}
let chain = self.chain.as_mut();
for s in &mut self.upsampled_buffer[0][..upsampled_frames] {
*s = chain.process(*s);
}

let downsampled = match self.downsampler.process(&upsampled, None) {
Ok(data) => data,
Err(e) => {
error!("Downsampler error: {e}");
return Control::Continue;
}
};

let final_samples = &downsampled[0];
let frames_to_copy = final_samples.len().min(n_frames);

out_buf_l[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
out_buf_r[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
for i in frames_to_copy..n_frames {
out_buf_l[i] = 0.0;
out_buf_r[i] = 0.0;
let (_, downsampled_frames) = match self.downsampler.process_into_buffer(
&self.upsampled_buffer,
&mut self.downsampled_buffer,
None,
) {
Ok(data) => data,
Err(e) => {
error!("Downsampler error: {e}");
return Control::Continue;
}
};

let final_samples = &self.downsampled_buffer[0][..downsampled_frames];
let frames_to_copy = final_samples.len().min(n_frames);

let out_buffer_left = self.out_port_left.as_mut_slice(ps);
let out_buffer_right = self.out_port_right.as_mut_slice(ps);

if let Some(ref tx) = self.tx_audio {
let mut block = AudioBlock::with_capacity(BLOCK_FRAMES * 2);
for &s in final_samples.iter().take(BLOCK_FRAMES) {
let v = (s * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
block.extend_from_slice(&[v, v]);
}
out_buffer_left[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
out_buffer_right[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
for i in frames_to_copy..n_frames {
out_buffer_left[i] = 0.0;
out_buffer_right[i] = 0.0;
}

if let Err(e) = tx.try_send(block) {
error!("Error sending audio block: {e}");
}
// If the recording channel is available, handle sending audio blocks to the recorder.
if let Some(ref tx) = self.tx_audio {
let mut block = AudioBlock::with_capacity(BLOCK_FRAMES * 2);
for &s in final_samples.iter().take(BLOCK_FRAMES) {
let v = (s * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
block.extend_from_slice(&[v, v]);
}

Control::Continue
})
if let Err(e) = tx.try_send(block) {
error!("Error sending audio block: {e}");
}
}

Control::Continue
}

fn buffer_size(&mut self, _c: &Client, frames: Frames) -> Control {
debug!("JACK buffer size changed to {frames} frames");
let new_size = frames as usize;
let cap = self.input_buffer[0].capacity();

if cap < new_size {
self.input_buffer[0].reserve_exact(new_size - cap);
}

if let Err(e) = self.upsampler.set_chunk_size(new_size) {
error!("Upsampler cannot grow to {new_size}: {e}");
} else {
self.upsampled_buffer = self.upsampler.output_buffer_allocate(true);
}

let needed_down = new_size * OVERSAMPLE_FACTOR as usize;
if let Err(e) = self.downsampler.set_chunk_size(needed_down) {
error!("Downsampler cannot grow to {needed_down}: {e}");
} else {
self.downsampled_buffer = self.downsampler.output_buffer_allocate(true);
}

Control::Continue
}
}
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ __________ __ __ .__
for &key in &required_vars {
match std::env::var(key) {
Ok(val) => info!("{} = {}", key, val),
Err(_) => anyhow::bail!("Environment variable '{}' must be set.", key),
Err(_) => anyhow::bail!("environment variable '{}' must be set.", key),
}
}

let mut processor_manager =
ProcessorManager::new().context("Failed to create ProcessorManager")?;
ProcessorManager::new().context("failed to create ProcessorManager")?;

let chain = create_mesa_boogie_dual_rectifier(processor_manager.sample_rate());

if recording {
processor_manager
.enable_recording(&args.recording_dir)
.with_context(|| format!("Failed to enable recording in '{}'", args.recording_dir))?;
.with_context(|| format!("failed to enable recording in '{}'", args.recording_dir))?;
}

processor_manager.set_amp_chain(chain);
Expand All @@ -77,7 +77,7 @@ __________ __ __ .__
info!("Ctrl+C received, shutting down...");
shutdown_flag.store(false, Ordering::SeqCst);
})
.expect("Error setting Ctrl+C handler");
.expect("error setting Ctrl+C handler");

while running.load(Ordering::SeqCst) {
thread::sleep(Duration::from_secs(1));
Expand Down