Skip to content

Commit 868fc77

Browse files
rogurotusalexlapa
andauthored
Fix deadlock in on_track callback (#131)
Additionally: - fix `set_send`, `set_recv` race - fix false track deduplication Co-authored-by: alexlapa <[email protected]>
1 parent 1f3fc1e commit 868fc77

File tree

11 files changed

+618
-262
lines changed

11 files changed

+618
-262
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,6 @@ jobs:
247247
if: ${{ matrix.platform != 'android'
248248
&& matrix.platform != 'ios' }}
249249

250-
- name: Set up JDK 11
251-
uses: actions/setup-java@v3
252-
with:
253-
distribution: zulu
254-
java-version: 11
255-
if: ${{ matrix.platform == 'android' }}
256250
- name: Test on `${{ matrix.platform }}` platform with emulator
257251
uses: reactivecircus/android-emulator-runner@v2
258252
with:

crates/native/src/api.rs

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use libwebrtc_sys as sys;
1111

1212
use crate::{
1313
devices::{self, DeviceState},
14+
pc::PeerConnectionId,
1415
renderer::FrameHandler,
16+
user_media::TrackOrigin,
1517
Webrtc,
1618
};
1719

@@ -1657,6 +1659,12 @@ pub struct MediaStreamTrack {
16571659
/// Unique identifier (GUID) of this [`MediaStreamTrack`].
16581660
pub id: String,
16591661

1662+
/// Unique identifier of the [`PeerConnection`] from which this
1663+
/// [`MediaStreamTrack`] was received.
1664+
///
1665+
/// Always [`None`] for local [`MediaStreamTrack`]s.
1666+
pub peer_id: Option<u64>,
1667+
16601668
/// Label identifying the track source, as in "internal microphone".
16611669
pub device_id: String,
16621670

@@ -2170,8 +2178,13 @@ pub fn microphone_volume() -> anyhow::Result<u32> {
21702178
}
21712179

21722180
/// Disposes the specified [`MediaStreamTrack`].
2173-
pub fn dispose_track(track_id: String, kind: MediaType) {
2174-
WEBRTC.lock().unwrap().dispose_track(track_id, kind);
2181+
pub fn dispose_track(track_id: String, peer_id: Option<u64>, kind: MediaType) {
2182+
let track_origin = TrackOrigin::from(peer_id.map(PeerConnectionId::from));
2183+
2184+
WEBRTC
2185+
.lock()
2186+
.unwrap()
2187+
.dispose_track(track_origin, track_id, kind);
21752188
}
21762189

21772190
/// Returns the [readyState][0] property of the [`MediaStreamTrack`] by its ID
@@ -2180,9 +2193,15 @@ pub fn dispose_track(track_id: String, kind: MediaType) {
21802193
/// [0]: https://w3.org/TR/mediacapture-streams#dfn-readystate
21812194
pub fn track_state(
21822195
track_id: String,
2196+
peer_id: Option<u64>,
21832197
kind: MediaType,
21842198
) -> anyhow::Result<TrackState> {
2185-
WEBRTC.lock().unwrap().track_state(track_id, kind)
2199+
let track_origin = TrackOrigin::from(peer_id.map(PeerConnectionId::from));
2200+
2201+
WEBRTC
2202+
.lock()
2203+
.unwrap()
2204+
.track_state(track_id, track_origin, kind)
21862205
}
21872206

21882207
/// Changes the [enabled][1] property of the [`MediaStreamTrack`] by its ID and
@@ -2191,33 +2210,49 @@ pub fn track_state(
21912210
/// [1]: https://w3.org/TR/mediacapture-streams#track-enabled
21922211
pub fn set_track_enabled(
21932212
track_id: String,
2213+
peer_id: Option<u64>,
21942214
kind: MediaType,
21952215
enabled: bool,
21962216
) -> anyhow::Result<()> {
2197-
WEBRTC
2198-
.lock()
2199-
.unwrap()
2200-
.set_track_enabled(track_id, kind, enabled)
2217+
let track_origin = TrackOrigin::from(peer_id.map(PeerConnectionId::from));
2218+
2219+
WEBRTC.lock().unwrap().set_track_enabled(
2220+
track_id,
2221+
track_origin,
2222+
kind,
2223+
enabled,
2224+
)
22012225
}
22022226

22032227
/// Clones the specified [`MediaStreamTrack`].
22042228
pub fn clone_track(
22052229
track_id: String,
2230+
peer_id: Option<u64>,
22062231
kind: MediaType,
22072232
) -> anyhow::Result<MediaStreamTrack> {
2208-
WEBRTC.lock().unwrap().clone_track(track_id, kind)
2233+
let track_origin = TrackOrigin::from(peer_id.map(PeerConnectionId::from));
2234+
2235+
WEBRTC
2236+
.lock()
2237+
.unwrap()
2238+
.clone_track(track_id, track_origin, kind)
22092239
}
22102240

22112241
/// Registers an observer to the [`MediaStreamTrack`] events.
22122242
pub fn register_track_observer(
22132243
cb: StreamSink<TrackEvent>,
2244+
peer_id: Option<u64>,
22142245
track_id: String,
22152246
kind: MediaType,
22162247
) -> anyhow::Result<()> {
2217-
WEBRTC
2218-
.lock()
2219-
.unwrap()
2220-
.register_track_observer(track_id, kind, cb.into())
2248+
let track_origin = TrackOrigin::from(peer_id.map(PeerConnectionId::from));
2249+
2250+
WEBRTC.lock().unwrap().register_track_observer(
2251+
track_id,
2252+
track_origin,
2253+
kind,
2254+
cb.into(),
2255+
)
22212256
}
22222257

22232258
/// Sets the provided [`OnDeviceChangeCallback`] as the callback to be called
@@ -2241,16 +2276,20 @@ pub fn set_on_device_changed(cb: StreamSink<()>) -> anyhow::Result<()> {
22412276
pub fn create_video_sink(
22422277
cb: StreamSink<TextureEvent>,
22432278
sink_id: i64,
2279+
peer_id: Option<u64>,
22442280
track_id: String,
22452281
callback_ptr: u64,
22462282
texture_id: i64,
22472283
) -> anyhow::Result<()> {
22482284
let handler = FrameHandler::new(callback_ptr as _, cb.into(), texture_id);
2249-
2250-
WEBRTC
2251-
.lock()
2252-
.unwrap()
2253-
.create_video_sink(sink_id, track_id, handler)
2285+
let track_origin = TrackOrigin::from(peer_id.map(PeerConnectionId::from));
2286+
2287+
WEBRTC.lock().unwrap().create_video_sink(
2288+
sink_id,
2289+
track_id,
2290+
track_origin,
2291+
handler,
2292+
)
22542293
}
22552294

22562295
/// Destroys the [`VideoSink`] by the provided ID.

crates/native/src/bridge_generated.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ fn wire_microphone_volume_impl(port_: MessagePort) {
587587
fn wire_dispose_track_impl(
588588
port_: MessagePort,
589589
track_id: impl Wire2Api<String> + UnwindSafe,
590+
peer_id: impl Wire2Api<Option<u64>> + UnwindSafe,
590591
kind: impl Wire2Api<MediaType> + UnwindSafe,
591592
) {
592593
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, ()>(
@@ -597,14 +598,18 @@ fn wire_dispose_track_impl(
597598
},
598599
move || {
599600
let api_track_id = track_id.wire2api();
601+
let api_peer_id = peer_id.wire2api();
600602
let api_kind = kind.wire2api();
601-
move |task_callback| Ok(dispose_track(api_track_id, api_kind))
603+
move |task_callback| {
604+
Ok(dispose_track(api_track_id, api_peer_id, api_kind))
605+
}
602606
},
603607
)
604608
}
605609
fn wire_track_state_impl(
606610
port_: MessagePort,
607611
track_id: impl Wire2Api<String> + UnwindSafe,
612+
peer_id: impl Wire2Api<Option<u64>> + UnwindSafe,
608613
kind: impl Wire2Api<MediaType> + UnwindSafe,
609614
) {
610615
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, TrackState>(
@@ -615,14 +620,18 @@ fn wire_track_state_impl(
615620
},
616621
move || {
617622
let api_track_id = track_id.wire2api();
623+
let api_peer_id = peer_id.wire2api();
618624
let api_kind = kind.wire2api();
619-
move |task_callback| track_state(api_track_id, api_kind)
625+
move |task_callback| {
626+
track_state(api_track_id, api_peer_id, api_kind)
627+
}
620628
},
621629
)
622630
}
623631
fn wire_set_track_enabled_impl(
624632
port_: MessagePort,
625633
track_id: impl Wire2Api<String> + UnwindSafe,
634+
peer_id: impl Wire2Api<Option<u64>> + UnwindSafe,
626635
kind: impl Wire2Api<MediaType> + UnwindSafe,
627636
enabled: impl Wire2Api<bool> + UnwindSafe,
628637
) {
@@ -634,17 +643,24 @@ fn wire_set_track_enabled_impl(
634643
},
635644
move || {
636645
let api_track_id = track_id.wire2api();
646+
let api_peer_id = peer_id.wire2api();
637647
let api_kind = kind.wire2api();
638648
let api_enabled = enabled.wire2api();
639649
move |task_callback| {
640-
set_track_enabled(api_track_id, api_kind, api_enabled)
650+
set_track_enabled(
651+
api_track_id,
652+
api_peer_id,
653+
api_kind,
654+
api_enabled,
655+
)
641656
}
642657
},
643658
)
644659
}
645660
fn wire_clone_track_impl(
646661
port_: MessagePort,
647662
track_id: impl Wire2Api<String> + UnwindSafe,
663+
peer_id: impl Wire2Api<Option<u64>> + UnwindSafe,
648664
kind: impl Wire2Api<MediaType> + UnwindSafe,
649665
) {
650666
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, MediaStreamTrack>(
@@ -655,13 +671,17 @@ fn wire_clone_track_impl(
655671
},
656672
move || {
657673
let api_track_id = track_id.wire2api();
674+
let api_peer_id = peer_id.wire2api();
658675
let api_kind = kind.wire2api();
659-
move |task_callback| clone_track(api_track_id, api_kind)
676+
move |task_callback| {
677+
clone_track(api_track_id, api_peer_id, api_kind)
678+
}
660679
},
661680
)
662681
}
663682
fn wire_register_track_observer_impl(
664683
port_: MessagePort,
684+
peer_id: impl Wire2Api<Option<u64>> + UnwindSafe,
665685
track_id: impl Wire2Api<String> + UnwindSafe,
666686
kind: impl Wire2Api<MediaType> + UnwindSafe,
667687
) {
@@ -672,11 +692,13 @@ fn wire_register_track_observer_impl(
672692
mode: FfiCallMode::Stream,
673693
},
674694
move || {
695+
let api_peer_id = peer_id.wire2api();
675696
let api_track_id = track_id.wire2api();
676697
let api_kind = kind.wire2api();
677698
move |task_callback| {
678699
register_track_observer(
679700
task_callback.stream_sink::<_, TrackEvent>(),
701+
api_peer_id,
680702
api_track_id,
681703
api_kind,
682704
)
@@ -701,6 +723,7 @@ fn wire_set_on_device_changed_impl(port_: MessagePort) {
701723
fn wire_create_video_sink_impl(
702724
port_: MessagePort,
703725
sink_id: impl Wire2Api<i64> + UnwindSafe,
726+
peer_id: impl Wire2Api<Option<u64>> + UnwindSafe,
704727
track_id: impl Wire2Api<String> + UnwindSafe,
705728
callback_ptr: impl Wire2Api<u64> + UnwindSafe,
706729
texture_id: impl Wire2Api<i64> + UnwindSafe,
@@ -713,13 +736,15 @@ fn wire_create_video_sink_impl(
713736
},
714737
move || {
715738
let api_sink_id = sink_id.wire2api();
739+
let api_peer_id = peer_id.wire2api();
716740
let api_track_id = track_id.wire2api();
717741
let api_callback_ptr = callback_ptr.wire2api();
718742
let api_texture_id = texture_id.wire2api();
719743
move |task_callback| {
720744
create_video_sink(
721745
task_callback.stream_sink::<_, TextureEvent>(),
722746
api_sink_id,
747+
api_peer_id,
723748
api_track_id,
724749
api_callback_ptr,
725750
api_texture_id,
@@ -1055,6 +1080,7 @@ impl support::IntoDart for MediaStreamTrack {
10551080
fn into_dart(self) -> support::DartAbi {
10561081
vec![
10571082
self.id.into_into_dart().into_dart(),
1083+
self.peer_id.into_dart(),
10581084
self.device_id.into_into_dart().into_dart(),
10591085
self.kind.into_into_dart().into_dart(),
10601086
self.enabled.into_into_dart().into_dart(),
@@ -1950,46 +1976,51 @@ mod io {
19501976
pub extern "C" fn wire_dispose_track(
19511977
port_: i64,
19521978
track_id: *mut wire_uint_8_list,
1979+
peer_id: *mut u64,
19531980
kind: i32,
19541981
) {
1955-
wire_dispose_track_impl(port_, track_id, kind)
1982+
wire_dispose_track_impl(port_, track_id, peer_id, kind)
19561983
}
19571984

19581985
#[no_mangle]
19591986
pub extern "C" fn wire_track_state(
19601987
port_: i64,
19611988
track_id: *mut wire_uint_8_list,
1989+
peer_id: *mut u64,
19621990
kind: i32,
19631991
) {
1964-
wire_track_state_impl(port_, track_id, kind)
1992+
wire_track_state_impl(port_, track_id, peer_id, kind)
19651993
}
19661994

19671995
#[no_mangle]
19681996
pub extern "C" fn wire_set_track_enabled(
19691997
port_: i64,
19701998
track_id: *mut wire_uint_8_list,
1999+
peer_id: *mut u64,
19712000
kind: i32,
19722001
enabled: bool,
19732002
) {
1974-
wire_set_track_enabled_impl(port_, track_id, kind, enabled)
2003+
wire_set_track_enabled_impl(port_, track_id, peer_id, kind, enabled)
19752004
}
19762005

19772006
#[no_mangle]
19782007
pub extern "C" fn wire_clone_track(
19792008
port_: i64,
19802009
track_id: *mut wire_uint_8_list,
2010+
peer_id: *mut u64,
19812011
kind: i32,
19822012
) {
1983-
wire_clone_track_impl(port_, track_id, kind)
2013+
wire_clone_track_impl(port_, track_id, peer_id, kind)
19842014
}
19852015

19862016
#[no_mangle]
19872017
pub extern "C" fn wire_register_track_observer(
19882018
port_: i64,
2019+
peer_id: *mut u64,
19892020
track_id: *mut wire_uint_8_list,
19902021
kind: i32,
19912022
) {
1992-
wire_register_track_observer_impl(port_, track_id, kind)
2023+
wire_register_track_observer_impl(port_, peer_id, track_id, kind)
19932024
}
19942025

19952026
#[no_mangle]
@@ -2001,13 +2032,15 @@ mod io {
20012032
pub extern "C" fn wire_create_video_sink(
20022033
port_: i64,
20032034
sink_id: i64,
2035+
peer_id: *mut u64,
20042036
track_id: *mut wire_uint_8_list,
20052037
callback_ptr: u64,
20062038
texture_id: i64,
20072039
) {
20082040
wire_create_video_sink_impl(
20092041
port_,
20102042
sink_id,
2043+
peer_id,
20112044
track_id,
20122045
callback_ptr,
20132046
texture_id,
@@ -2085,6 +2118,11 @@ mod io {
20852118
support::new_leak_box_ptr(wire_RtcConfiguration::new_with_null_ptr())
20862119
}
20872120

2121+
#[no_mangle]
2122+
pub extern "C" fn new_box_autoadd_u64_0(value: u64) -> *mut u64 {
2123+
support::new_leak_box_ptr(value)
2124+
}
2125+
20882126
#[no_mangle]
20892127
pub extern "C" fn new_box_autoadd_video_constraints_0(
20902128
) -> *mut wire_VideoConstraints {
@@ -2261,6 +2299,11 @@ mod io {
22612299
Wire2Api::<RtcConfiguration>::wire2api(*wrap).into()
22622300
}
22632301
}
2302+
impl Wire2Api<u64> for *mut u64 {
2303+
fn wire2api(self) -> u64 {
2304+
unsafe { *support::box_from_leak_ptr(self) }
2305+
}
2306+
}
22642307
impl Wire2Api<VideoConstraints> for *mut wire_VideoConstraints {
22652308
fn wire2api(self) -> VideoConstraints {
22662309
let wrap = unsafe { support::box_from_leak_ptr(self) };

0 commit comments

Comments
 (0)