Skip to content

Commit 579b75a

Browse files
committed
Rework sync callback mechanism
1 parent 2407441 commit 579b75a

File tree

4 files changed

+213
-40
lines changed

4 files changed

+213
-40
lines changed

src/backend/aggregate_device.rs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ impl AggregateDevice {
152152
debug_assert_running_serially();
153153
let waiting_time = Duration::new(5, 0);
154154

155-
let condvar_pair = Arc::new((Mutex::new(()), Condvar::new()));
156-
let mut cloned_condvar_pair = condvar_pair.clone();
157-
let data_ptr = &mut cloned_condvar_pair as *mut Arc<(Mutex<()>, Condvar)>;
155+
let condvar_pair = Box::new((Mutex::new(()), Condvar::new()));
156+
let data_ptr = Box::into_raw(condvar_pair);
157+
sync_callback_registry_register(data_ptr as usize);
158158

159159
let address = get_property_address(
160160
Property::HardwareDevices,
@@ -177,12 +177,14 @@ impl AggregateDevice {
177177
data_ptr as *mut c_void,
178178
);
179179
assert_eq!(status, NO_ERR);
180+
sync_callback_registry_unregister(data_ptr as usize);
181+
unsafe { drop(Box::from_raw(data_ptr)) };
180182
});
181183

182184
let device = Self::create_blank_device(plugin_id)?;
183185

184186
// Wait until the aggregate is created.
185-
let (lock, cvar) = &*condvar_pair;
187+
let (lock, cvar) = unsafe { &*data_ptr };
186188
let guard = lock.lock().unwrap();
187189
let (_guard, timeout_res) = cvar
188190
.wait_timeout_while(guard, waiting_time, |()| !get_devices().contains(&device))
@@ -202,10 +204,12 @@ impl AggregateDevice {
202204
data: *mut c_void,
203205
) -> OSStatus {
204206
assert_eq!(id, kAudioObjectSystemObject);
205-
let pair = unsafe { &mut *(data as *mut Arc<(Mutex<()>, Condvar)>) };
206-
let (lock, cvar) = &**pair;
207-
let _guard = lock.lock().unwrap();
208-
cvar.notify_one();
207+
with_sync_callback_ptr(data, || {
208+
let pair = unsafe { &*(data as *const (Mutex<()>, Condvar)) };
209+
let (lock, cvar) = pair;
210+
let _guard = lock.lock().unwrap();
211+
cvar.notify_one();
212+
});
209213
NO_ERR
210214
}
211215

@@ -314,9 +318,9 @@ impl AggregateDevice {
314318

315319
let waiting_time = Duration::new(5, 0);
316320

317-
let condvar_pair = Arc::new((Mutex::new(AudioObjectID::default()), Condvar::new()));
318-
let mut cloned_condvar_pair = condvar_pair.clone();
319-
let data_ptr = &mut cloned_condvar_pair as *mut Arc<(Mutex<AudioObjectID>, Condvar)>;
321+
let condvar_pair = Box::new((Mutex::new(AudioObjectID::default()), Condvar::new()));
322+
let data_ptr = Box::into_raw(condvar_pair);
323+
sync_callback_registry_register(data_ptr as usize);
320324

321325
let status = audio_object_add_property_listener(
322326
device_id,
@@ -325,22 +329,27 @@ impl AggregateDevice {
325329
data_ptr as *mut c_void,
326330
);
327331
if status != NO_ERR {
332+
sync_callback_registry_unregister(data_ptr as usize);
333+
unsafe { drop(Box::from_raw(data_ptr)) };
328334
return Err(Error::from(status));
329335
}
330336

331-
let remove_listener = || -> OSStatus {
332-
audio_object_remove_property_listener(
337+
let cleanup = || {
338+
let status = audio_object_remove_property_listener(
333339
device_id,
334340
&address,
335341
devices_changed_callback,
336342
data_ptr as *mut c_void,
337-
)
343+
);
344+
sync_callback_registry_unregister(data_ptr as usize);
345+
unsafe { drop(Box::from_raw(data_ptr)) };
346+
status
338347
};
339348

340349
Self::set_sub_devices(device_id, input_id, output_id)?;
341350

342351
// Wait until the sub devices are added.
343-
let (lock, cvar) = &*condvar_pair;
352+
let (lock, cvar) = unsafe { &*data_ptr };
344353
let device = lock.lock().unwrap();
345354
if *device != device_id {
346355
let (dev, timeout_res) = cvar.wait_timeout(device, waiting_time).unwrap();
@@ -353,13 +362,10 @@ impl AggregateDevice {
353362
);
354363
}
355364
if *dev != device_id {
356-
let status = remove_listener();
365+
let status = cleanup();
357366
// If the error is kAudioHardwareBadObjectError, it implies `device_id` is somehow
358367
// dead, so its listener should receive nothing. It's ok to leave here.
359368
assert!(status == NO_ERR || status == (kAudioHardwareBadObjectError as OSStatus));
360-
// TODO: Destroy the aggregate device immediately if error is not
361-
// kAudioHardwareBadObjectError. Otherwise the `devices_changed_callback` is able
362-
// to touch the `cloned_condvar_pair` after it's freed.
363369
return Err(Error::from(waiting_time));
364370
}
365371
}
@@ -370,15 +376,17 @@ impl AggregateDevice {
370376
_addresses: *const AudioObjectPropertyAddress,
371377
data: *mut c_void,
372378
) -> OSStatus {
373-
let pair = unsafe { &mut *(data as *mut Arc<(Mutex<AudioObjectID>, Condvar)>) };
374-
let (lock, cvar) = &**pair;
375-
let mut device = lock.lock().unwrap();
376-
*device = id;
377-
cvar.notify_one();
379+
with_sync_callback_ptr(data, || {
380+
let pair = unsafe { &*(data as *const (Mutex<AudioObjectID>, Condvar)) };
381+
let (lock, cvar) = pair;
382+
let mut device = lock.lock().unwrap();
383+
*device = id;
384+
cvar.notify_one();
385+
});
378386
NO_ERR
379387
}
380388

381-
let status = remove_listener();
389+
let status = cleanup();
382390
assert_eq!(status, NO_ERR);
383391
Ok(())
384392
}

src/backend/mod.rs

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use cubeb_backend::{
4141
};
4242
use mach::mach_time::{mach_absolute_time, mach_timebase_info};
4343
use std::cmp;
44+
use std::collections::HashSet;
4445
use std::ffi::{CStr, CString};
4546
use std::fmt;
4647
use std::mem;
@@ -49,7 +50,7 @@ use std::ptr;
4950
use std::slice;
5051
use std::str::FromStr;
5152
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
52-
use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
53+
use std::sync::{Arc, Condvar, LazyLock, Mutex, MutexGuard, Weak};
5354
use std::time::{Duration, Instant};
5455
const NO_ERR: OSStatus = 0;
5556

@@ -70,6 +71,28 @@ const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
7071

7172
const MACOS_KERNEL_MAJOR_VERSION_MONTEREY: u32 = 21;
7273

74+
// Global registry for tracking valid sync callback pointers.
75+
static SYNC_CALLBACK_REGISTRY: LazyLock<Mutex<HashSet<usize>>> =
76+
LazyLock::new(|| Mutex::new(HashSet::new()));
77+
78+
fn sync_callback_registry_register(ptr: usize) {
79+
SYNC_CALLBACK_REGISTRY.lock().unwrap().insert(ptr);
80+
}
81+
82+
fn sync_callback_registry_unregister(ptr: usize) {
83+
SYNC_CALLBACK_REGISTRY.lock().unwrap().remove(&ptr);
84+
}
85+
86+
fn with_sync_callback_ptr<F>(ptr: *mut c_void, f: F)
87+
where
88+
F: FnOnce(),
89+
{
90+
let guard = SYNC_CALLBACK_REGISTRY.lock().unwrap();
91+
if guard.contains(&(ptr as usize)) {
92+
f();
93+
}
94+
}
95+
7396
#[derive(Debug, PartialEq)]
7497
enum ParseMacOSKernelVersionError {
7598
SysCtl,
@@ -1508,16 +1531,16 @@ fn set_buffer_size_sync(unit: AudioUnit, devtype: DeviceType, frames: u32) -> Re
15081531
}
15091532

15101533
let waiting_time = Duration::from_millis(100);
1511-
let pair = Arc::new((Mutex::new(false), Condvar::new()));
1512-
let mut pair2 = pair.clone();
1513-
let pair_ptr = &mut pair2;
1534+
let pair = Box::new((Mutex::new(false), Condvar::new()));
1535+
let pair_ptr = Box::into_raw(pair);
1536+
sync_callback_registry_register(pair_ptr as usize);
15141537

15151538
assert_eq!(
15161539
audio_unit_add_property_listener(
15171540
unit,
15181541
kAudioDevicePropertyBufferFrameSize,
15191542
buffer_size_changed_callback,
1520-
pair_ptr,
1543+
pair_ptr as *mut c_void,
15211544
),
15221545
NO_ERR
15231546
);
@@ -1528,10 +1551,14 @@ fn set_buffer_size_sync(unit: AudioUnit, devtype: DeviceType, frames: u32) -> Re
15281551
unit,
15291552
kAudioDevicePropertyBufferFrameSize,
15301553
buffer_size_changed_callback,
1531-
pair_ptr,
1554+
pair_ptr as *mut c_void,
15321555
),
15331556
NO_ERR
15341557
);
1558+
// Unregister blocks if callback is in-flight (holds registry lock)
1559+
sync_callback_registry_unregister(pair_ptr as usize);
1560+
// Safe to drop: callback has either completed or will exit early
1561+
unsafe { drop(Box::from_raw(pair_ptr)) };
15351562
});
15361563

15371564
set_buffer_size(unit, devtype, frames).map_err(|e| {
@@ -1544,7 +1571,7 @@ fn set_buffer_size_sync(unit: AudioUnit, devtype: DeviceType, frames: u32) -> Re
15441571
Error::error()
15451572
})?;
15461573

1547-
let (lock, cvar) = &*pair;
1574+
let (lock, cvar) = unsafe { &*pair_ptr };
15481575
let changed = lock.lock().unwrap();
15491576
if !*changed {
15501577
let (chg, timeout_res) = cvar.wait_timeout(changed, waiting_time).unwrap();
@@ -1584,16 +1611,17 @@ fn set_buffer_size_sync(unit: AudioUnit, devtype: DeviceType, frames: u32) -> Re
15841611
in_element: AudioUnitElement,
15851612
) {
15861613
if in_scope == 0 {
1587-
// filter out the callback for global scope.
15881614
return;
15891615
}
1590-
assert!(in_element == AU_IN_BUS || in_element == AU_OUT_BUS);
1591-
assert_eq!(in_property_id, kAudioDevicePropertyBufferFrameSize);
1592-
let pair = unsafe { &mut *(in_client_data as *mut Arc<(Mutex<bool>, Condvar)>) };
1593-
let (lock, cvar) = &**pair;
1594-
let mut changed = lock.lock().unwrap();
1595-
*changed = true;
1596-
cvar.notify_one();
1616+
with_sync_callback_ptr(in_client_data, || {
1617+
assert!(in_element == AU_IN_BUS || in_element == AU_OUT_BUS);
1618+
assert_eq!(in_property_id, kAudioDevicePropertyBufferFrameSize);
1619+
let pair = unsafe { &*(in_client_data as *const (Mutex<bool>, Condvar)) };
1620+
let (lock, cvar) = pair;
1621+
let mut changed = lock.lock().unwrap();
1622+
*changed = true;
1623+
cvar.notify_one();
1624+
});
15971625
}
15981626

15991627
Ok(())

src/backend/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ mod device_property;
88
mod interfaces;
99
mod manual;
1010
mod parallel;
11+
mod sync_callback;
1112
mod tone;
1213
mod utils;

src/backend/tests/sync_callback.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright © 2026 Mozilla Foundation
2+
//
3+
// This program is made available under an ISC-style license. See the
4+
// accompanying file LICENSE for details.
5+
6+
use super::*;
7+
use std::sync::atomic::{AtomicBool, Ordering};
8+
use std::sync::Arc;
9+
10+
// Test basic registration and unregistration
11+
#[test]
12+
fn test_sync_callback_register_unregister() {
13+
let dummy_data = Box::new(42u32);
14+
let ptr = Box::into_raw(dummy_data);
15+
let ptr_usize = ptr as usize;
16+
17+
// Register the pointer
18+
sync_callback_registry_register(ptr_usize);
19+
20+
// Verify the callback is called when registered
21+
let called = Arc::new(AtomicBool::new(false));
22+
let called_clone = called.clone();
23+
with_sync_callback_ptr(ptr as *mut c_void, || {
24+
called_clone.store(true, Ordering::SeqCst);
25+
});
26+
assert!(
27+
called.load(Ordering::SeqCst),
28+
"Callback should be called when pointer is registered"
29+
);
30+
31+
// Unregister the pointer
32+
sync_callback_registry_unregister(ptr_usize);
33+
34+
// Verify the callback is NOT called after unregistration
35+
let called2 = Arc::new(AtomicBool::new(false));
36+
let called2_clone = called2.clone();
37+
with_sync_callback_ptr(ptr as *mut c_void, || {
38+
called2_clone.store(true, Ordering::SeqCst);
39+
});
40+
assert!(
41+
!called2.load(Ordering::SeqCst),
42+
"Callback should NOT be called after unregistration"
43+
);
44+
45+
// Clean up
46+
unsafe { drop(Box::from_raw(ptr)) };
47+
}
48+
49+
// Test that unregistered pointers don't trigger callbacks
50+
#[test]
51+
fn test_sync_callback_unregistered_pointer() {
52+
let dummy_data = Box::new(100u32);
53+
let ptr = Box::into_raw(dummy_data);
54+
55+
// Try to use callback without registration
56+
let called = Arc::new(AtomicBool::new(false));
57+
let called_clone = called.clone();
58+
with_sync_callback_ptr(ptr as *mut c_void, || {
59+
called_clone.store(true, Ordering::SeqCst);
60+
});
61+
assert!(
62+
!called.load(Ordering::SeqCst),
63+
"Callback should NOT be called for unregistered pointer"
64+
);
65+
66+
// Clean up
67+
unsafe { drop(Box::from_raw(ptr)) };
68+
}
69+
70+
// Test multiple registrations don't cause issues
71+
#[test]
72+
fn test_sync_callback_multiple_pointers() {
73+
let data1 = Box::new(1u32);
74+
let ptr1 = Box::into_raw(data1);
75+
let ptr1_usize = ptr1 as usize;
76+
77+
let data2 = Box::new(2u32);
78+
let ptr2 = Box::into_raw(data2);
79+
let ptr2_usize = ptr2 as usize;
80+
81+
// Register both pointers
82+
sync_callback_registry_register(ptr1_usize);
83+
sync_callback_registry_register(ptr2_usize);
84+
85+
// Both should work
86+
let called1 = Arc::new(AtomicBool::new(false));
87+
let called1_clone = called1.clone();
88+
with_sync_callback_ptr(ptr1 as *mut c_void, || {
89+
called1_clone.store(true, Ordering::SeqCst);
90+
});
91+
assert!(
92+
called1.load(Ordering::SeqCst),
93+
"First pointer callback should be called"
94+
);
95+
96+
let called2 = Arc::new(AtomicBool::new(false));
97+
let called2_clone = called2.clone();
98+
with_sync_callback_ptr(ptr2 as *mut c_void, || {
99+
called2_clone.store(true, Ordering::SeqCst);
100+
});
101+
assert!(
102+
called2.load(Ordering::SeqCst),
103+
"Second pointer callback should be called"
104+
);
105+
106+
// Unregister first pointer
107+
sync_callback_registry_unregister(ptr1_usize);
108+
109+
// First should not work, second should still work
110+
let called1_after = Arc::new(AtomicBool::new(false));
111+
let called1_after_clone = called1_after.clone();
112+
with_sync_callback_ptr(ptr1 as *mut c_void, || {
113+
called1_after_clone.store(true, Ordering::SeqCst);
114+
});
115+
assert!(
116+
!called1_after.load(Ordering::SeqCst),
117+
"First pointer callback should NOT be called after unregister"
118+
);
119+
120+
let called2_after = Arc::new(AtomicBool::new(false));
121+
let called2_after_clone = called2_after.clone();
122+
with_sync_callback_ptr(ptr2 as *mut c_void, || {
123+
called2_after_clone.store(true, Ordering::SeqCst);
124+
});
125+
assert!(
126+
called2_after.load(Ordering::SeqCst),
127+
"Second pointer callback should still be called"
128+
);
129+
130+
// Clean up
131+
sync_callback_registry_unregister(ptr2_usize);
132+
unsafe {
133+
drop(Box::from_raw(ptr1));
134+
drop(Box::from_raw(ptr2));
135+
};
136+
}

0 commit comments

Comments
 (0)