Skip to content

Commit e2a2cb8

Browse files
OpenSauceCopilot
andauthored
Preallocate buffers (#8)
* Pre-allocate buffers, clean-up types etc * Be consistent with error messages * Add Channels and Oversample factors as consts * Update src/io/processor.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/io/processor.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * typos * Handle resizing off buffers --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 422effc commit e2a2cb8

File tree

3 files changed

+166
-106
lines changed

3 files changed

+166
-106
lines changed

src/io/manager.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use anyhow::{Context, Result};
22
use crossbeam::channel::{Sender, bounded};
3-
use jack::{AsyncClient, Client, ClientOptions, contrib::ClosureProcessHandler};
3+
use jack::{AsyncClient, Client, ClientOptions};
44
use log::error;
55

6-
use crate::io::processor::{ProcessHandler, Processor};
6+
use crate::io::processor::Processor;
77
use crate::io::recorder::Recorder;
88
use crate::sim::chain::AmplifierChain;
99

1010
/// Manages the audio processing chain and JACK client
1111
pub struct ProcessorManager {
12-
_active_client: AsyncClient<Notifications, ClosureProcessHandler<(), ProcessHandler>>,
12+
_active_client: AsyncClient<Notifications, Processor>,
1313
recorder: Option<Recorder>,
1414
/// GUI → audio thread: push a completely new preset
1515
amp_tx: Sender<Box<AmplifierChain>>,
@@ -24,18 +24,18 @@ impl ProcessorManager {
2424
/// Creates a new ProcessorManager
2525
pub fn new() -> Result<Self> {
2626
let (client, _) = Client::new("rustortion", ClientOptions::NO_START_SERVER)
27-
.context("Failed to create JACK client")?;
27+
.context("failed to create JACK client")?;
2828

2929
let (tx_amp, rx_amp) = bounded::<Box<AmplifierChain>>(1);
3030

31-
let processor = Processor::new_with_channel(&client, rx_amp, None);
32-
let handler = ClosureProcessHandler::new(processor.into_process_handler());
31+
let processor =
32+
Processor::new(&client, rx_amp, None).context("error creating processor")?;
3333

3434
let sample_rate = client.sample_rate() as f32;
3535

3636
let _active_client = client
37-
.activate_async(Notifications, handler)
38-
.with_context(|| "Failed to activate async")?;
37+
.activate_async(Notifications, processor)
38+
.context("failed to activate async client")?;
3939

4040
Ok(Self {
4141
sample_rate,
@@ -62,7 +62,7 @@ impl ProcessorManager {
6262
}
6363

6464
let recorder = Recorder::new(self.sample_rate as u32, record_dir)
65-
.context("Failed to start recorder")?;
65+
.context("failed to start recorder")?;
6666
let _ = recorder.sender();
6767

6868
self.recorder = Some(recorder);

src/io/processor.rs

Lines changed: 153 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,62 @@
11
use crate::io::recorder::{AudioBlock, BLOCK_FRAMES};
22
use crate::sim::chain::AmplifierChain;
3+
use anyhow::{Context, Result};
34
use crossbeam::channel::{Receiver, Sender};
4-
use jack::{AudioIn, AudioOut, Client, Control, Port, ProcessScope};
5-
use log::{error, info};
5+
use jack::{AudioIn, AudioOut, Client, Control, Frames, Port, ProcessHandler, ProcessScope};
6+
use log::{debug, error};
67
use rubato::{
78
Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction,
89
};
910

10-
pub type ProcessHandler = Box<dyn FnMut(&Client, &ProcessScope) -> Control + Send + 'static>;
11+
const CHANNELS: usize = 1;
12+
const OVERSAMPLE_FACTOR: f64 = 4.0;
13+
const MAX_BLOCK_SIZE: usize = 4096;
1114

1215
pub struct Processor {
13-
/// Amplifier.
16+
/// Amplifier chain, used for processing amp simulations on the input.
1417
chain: Box<AmplifierChain>,
18+
/// Channel for updating the amplifier chain.
1519
rx_chain: Receiver<Box<AmplifierChain>>,
16-
/// Optional recorder.
20+
/// Optional recorder channel.
1721
tx_audio: Option<Sender<AudioBlock>>,
1822
in_port: Port<AudioIn>,
19-
out_l: Port<AudioOut>,
20-
out_r: Port<AudioOut>,
23+
out_port_left: Port<AudioOut>,
24+
out_port_right: Port<AudioOut>,
2125
upsampler: SincFixedIn<f32>,
2226
downsampler: SincFixedIn<f32>,
2327
/// Reusable buffer for input frames.
24-
input_frames: Vec<Vec<f32>>,
28+
input_buffer: Vec<Vec<f32>>,
29+
/// Reusable buffer for upsampled frames.
30+
upsampled_buffer: Vec<Vec<f32>>,
31+
/// Reusable buffer for downsampled frames.
32+
downsampled_buffer: Vec<Vec<f32>>,
2533
}
2634

2735
impl Processor {
28-
pub fn new_with_channel(
36+
pub fn new(
2937
client: &Client,
3038
rx_chain: Receiver<Box<AmplifierChain>>,
3139
tx_audio: Option<Sender<AudioBlock>>,
32-
) -> Self {
33-
let in_port = client.register_port("in", AudioIn::default()).unwrap();
34-
let out_l = client.register_port("out_l", AudioOut::default()).unwrap();
35-
let out_r = client.register_port("out_r", AudioOut::default()).unwrap();
36-
37-
let _ = client.connect_ports_by_name("system:capture_1", "rustortion:in");
38-
let _ = client.connect_ports_by_name("rustortion:out_l", "system:playback_1");
39-
let _ = client.connect_ports_by_name("rustortion:out_r", "system:playback_2");
40-
41-
let channels = 1;
42-
let oversample_factor: f32 = 4.0;
43-
let max_chunk_size = 128;
40+
) -> Result<Self> {
41+
let in_port = client
42+
.register_port("in_port", AudioIn::default())
43+
.context("failed to register in port")?;
44+
let out_port_left = client
45+
.register_port("out_port_left", AudioOut::default())
46+
.context("failed to register out port left")?;
47+
let out_port_right = client
48+
.register_port("out_port_right", AudioOut::default())
49+
.context("failed to register out port right")?;
50+
51+
client
52+
.connect_ports_by_name("system:capture_1", "rustortion:in_port")
53+
.context("failed to connect to in port")?;
54+
client
55+
.connect_ports_by_name("rustortion:out_port_left", "system:playback_1")
56+
.context("failed to connect to out port left")?;
57+
client
58+
.connect_ports_by_name("rustortion:out_port_right", "system:playback_2")
59+
.context("failed to connect to out port right")?;
4460

4561
let interp_params = SincInterpolationParameters {
4662
sinc_len: 256,
@@ -57,107 +73,151 @@ impl Processor {
5773
window: WindowFunction::BlackmanHarris2,
5874
};
5975

60-
let upsampler = SincFixedIn::<f32>::new(
61-
oversample_factor as f64,
76+
let mut upsampler = SincFixedIn::<f32>::new(
77+
OVERSAMPLE_FACTOR,
6278
1.0,
6379
interp_params,
64-
max_chunk_size,
65-
channels,
80+
MAX_BLOCK_SIZE,
81+
CHANNELS,
6682
)
67-
.unwrap();
83+
.context("failed to create upsampler")?;
6884

69-
let downsampler = SincFixedIn::<f32>::new(
70-
1.0 / oversample_factor as f64,
85+
let mut downsampler = SincFixedIn::<f32>::new(
86+
1.0 / OVERSAMPLE_FACTOR,
7187
1.0,
7288
down_interp_params,
73-
(max_chunk_size as f32 * oversample_factor) as usize,
74-
channels,
89+
MAX_BLOCK_SIZE * OVERSAMPLE_FACTOR as usize,
90+
CHANNELS,
7591
)
76-
.unwrap();
92+
.context("failed to create downsampler")?;
7793

78-
let input_frames = vec![Vec::with_capacity(client.buffer_size() as usize)];
94+
let buffer_size = client.buffer_size() as usize;
95+
upsampler
96+
.set_chunk_size(buffer_size)
97+
.context("failed to set upsampler chunk size")?;
98+
downsampler
99+
.set_chunk_size(buffer_size * OVERSAMPLE_FACTOR as usize)
100+
.context("failed to set downsampler chunk size")?;
79101

80-
Self {
102+
let input_buffer = vec![Vec::with_capacity(buffer_size)];
103+
let upsampled_buffer = upsampler.output_buffer_allocate(true);
104+
let downsampled_buffer = downsampler.output_buffer_allocate(true);
105+
106+
Ok(Self {
81107
chain: Box::new(AmplifierChain::new("Default")),
82108
rx_chain,
83109
tx_audio,
84110
in_port,
85-
out_l,
86-
out_r,
111+
out_port_left,
112+
out_port_right,
87113
upsampler,
88114
downsampler,
89-
input_frames,
90-
}
115+
input_buffer,
116+
upsampled_buffer,
117+
downsampled_buffer,
118+
})
91119
}
120+
}
92121

93-
pub fn into_process_handler(mut self) -> ProcessHandler {
94-
Box::new(move |_client: &Client, ps: &ProcessScope| -> Control {
95-
if let Ok(new_chain) = self.rx_chain.try_recv() {
96-
self.chain = new_chain;
97-
info!("Received new chain");
98-
}
122+
impl ProcessHandler for Processor {
123+
fn process(&mut self, _c: &Client, ps: &ProcessScope) -> Control {
124+
if let Ok(new_chain) = self.rx_chain.try_recv() {
125+
self.chain = new_chain;
126+
debug!("Received new chain");
127+
}
99128

100-
let n_frames = ps.n_frames() as usize;
101-
let in_buf = self.in_port.as_slice(ps);
129+
let n_frames = ps.n_frames() as usize;
130+
let input = self.in_port.as_slice(ps);
102131

103-
let out_buf_l = self.out_l.as_mut_slice(ps);
104-
let out_buf_r = self.out_r.as_mut_slice(ps);
132+
self.input_buffer[0].clear();
105133

106-
{
107-
let input_channel = &mut self.input_frames[0];
108-
input_channel.clear();
134+
debug_assert!(
135+
self.input_buffer[0].capacity() >= n_frames,
136+
"input_buffer too small; buffer_size callback missing an allocation"
137+
);
109138

110-
if input_channel.capacity() < n_frames {
111-
input_channel.reserve(n_frames - input_channel.capacity());
112-
}
139+
self.input_buffer[0].extend_from_slice(input);
113140

114-
input_channel.extend_from_slice(in_buf);
141+
let (_, upsampled_frames) = match self.upsampler.process_into_buffer(
142+
&self.input_buffer,
143+
&mut self.upsampled_buffer,
144+
None,
145+
) {
146+
Ok(data) => data,
147+
Err(e) => {
148+
error!("Upsampler error: {e}");
149+
return Control::Continue;
115150
}
151+
};
116152

117-
let mut upsampled = match self.upsampler.process(&self.input_frames, None) {
118-
Ok(data) => data,
119-
Err(e) => {
120-
error!("Upsampler error: {e}");
121-
return Control::Continue;
122-
}
123-
};
124-
125-
let chain = self.chain.as_mut();
126-
for s in &mut upsampled[0] {
127-
*s = chain.process(*s);
128-
}
153+
let chain = self.chain.as_mut();
154+
for s in &mut self.upsampled_buffer[0][..upsampled_frames] {
155+
*s = chain.process(*s);
156+
}
129157

130-
let downsampled = match self.downsampler.process(&upsampled, None) {
131-
Ok(data) => data,
132-
Err(e) => {
133-
error!("Downsampler error: {e}");
134-
return Control::Continue;
135-
}
136-
};
137-
138-
let final_samples = &downsampled[0];
139-
let frames_to_copy = final_samples.len().min(n_frames);
140-
141-
out_buf_l[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
142-
out_buf_r[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
143-
for i in frames_to_copy..n_frames {
144-
out_buf_l[i] = 0.0;
145-
out_buf_r[i] = 0.0;
158+
let (_, downsampled_frames) = match self.downsampler.process_into_buffer(
159+
&self.upsampled_buffer,
160+
&mut self.downsampled_buffer,
161+
None,
162+
) {
163+
Ok(data) => data,
164+
Err(e) => {
165+
error!("Downsampler error: {e}");
166+
return Control::Continue;
146167
}
168+
};
169+
170+
let final_samples = &self.downsampled_buffer[0][..downsampled_frames];
171+
let frames_to_copy = final_samples.len().min(n_frames);
172+
173+
let out_buffer_left = self.out_port_left.as_mut_slice(ps);
174+
let out_buffer_right = self.out_port_right.as_mut_slice(ps);
147175

148-
if let Some(ref tx) = self.tx_audio {
149-
let mut block = AudioBlock::with_capacity(BLOCK_FRAMES * 2);
150-
for &s in final_samples.iter().take(BLOCK_FRAMES) {
151-
let v = (s * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
152-
block.extend_from_slice(&[v, v]);
153-
}
176+
out_buffer_left[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
177+
out_buffer_right[..frames_to_copy].copy_from_slice(&final_samples[..frames_to_copy]);
178+
for i in frames_to_copy..n_frames {
179+
out_buffer_left[i] = 0.0;
180+
out_buffer_right[i] = 0.0;
181+
}
154182

155-
if let Err(e) = tx.try_send(block) {
156-
error!("Error sending audio block: {e}");
157-
}
183+
// If the recording channel is available, handle sending audio blocks to the recorder.
184+
if let Some(ref tx) = self.tx_audio {
185+
let mut block = AudioBlock::with_capacity(BLOCK_FRAMES * 2);
186+
for &s in final_samples.iter().take(BLOCK_FRAMES) {
187+
let v = (s * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
188+
block.extend_from_slice(&[v, v]);
158189
}
159190

160-
Control::Continue
161-
})
191+
if let Err(e) = tx.try_send(block) {
192+
error!("Error sending audio block: {e}");
193+
}
194+
}
195+
196+
Control::Continue
197+
}
198+
199+
fn buffer_size(&mut self, _c: &Client, frames: Frames) -> Control {
200+
debug!("JACK buffer size changed to {frames} frames");
201+
let new_size = frames as usize;
202+
let cap = self.input_buffer[0].capacity();
203+
204+
if cap < new_size {
205+
self.input_buffer[0].reserve_exact(new_size - cap);
206+
}
207+
208+
if let Err(e) = self.upsampler.set_chunk_size(new_size) {
209+
error!("Upsampler cannot grow to {new_size}: {e}");
210+
} else {
211+
self.upsampled_buffer = self.upsampler.output_buffer_allocate(true);
212+
}
213+
214+
let needed_down = new_size * OVERSAMPLE_FACTOR as usize;
215+
if let Err(e) = self.downsampler.set_chunk_size(needed_down) {
216+
error!("Downsampler cannot grow to {needed_down}: {e}");
217+
} else {
218+
self.downsampled_buffer = self.downsampler.output_buffer_allocate(true);
219+
}
220+
221+
Control::Continue
162222
}
163223
}

src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,19 @@ __________ __ __ .__
5353
for &key in &required_vars {
5454
match std::env::var(key) {
5555
Ok(val) => info!("{} = {}", key, val),
56-
Err(_) => anyhow::bail!("Environment variable '{}' must be set.", key),
56+
Err(_) => anyhow::bail!("environment variable '{}' must be set.", key),
5757
}
5858
}
5959

6060
let mut processor_manager =
61-
ProcessorManager::new().context("Failed to create ProcessorManager")?;
61+
ProcessorManager::new().context("failed to create ProcessorManager")?;
6262

6363
let chain = create_mesa_boogie_dual_rectifier(processor_manager.sample_rate());
6464

6565
if recording {
6666
processor_manager
6767
.enable_recording(&args.recording_dir)
68-
.with_context(|| format!("Failed to enable recording in '{}'", args.recording_dir))?;
68+
.with_context(|| format!("failed to enable recording in '{}'", args.recording_dir))?;
6969
}
7070

7171
processor_manager.set_amp_chain(chain);
@@ -77,7 +77,7 @@ __________ __ __ .__
7777
info!("Ctrl+C received, shutting down...");
7878
shutdown_flag.store(false, Ordering::SeqCst);
7979
})
80-
.expect("Error setting Ctrl+C handler");
80+
.expect("error setting Ctrl+C handler");
8181

8282
while running.load(Ordering::SeqCst) {
8383
thread::sleep(Duration::from_secs(1));

0 commit comments

Comments
 (0)