Skip to content

Commit 906de4a

Browse files
committed
fix(webrtc-rs): connection "leak" due to rapid connection creation
1 parent 4e3def6 commit 906de4a

File tree

1 file changed

+33
-3
lines changed
  • p2p/src/service_impl/webrtc

1 file changed

+33
-3
lines changed

p2p/src/service_impl/webrtc/mod.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::{collections::BTreeMap, time::Duration};
1212

1313
use openmina_core::bug_condition;
1414
use serde::Serialize;
15+
use tokio::sync::Mutex;
1516

1617
#[cfg(not(target_arch = "wasm32"))]
1718
use tokio::task::spawn_local;
@@ -169,7 +170,7 @@ impl Default for RTCConfigIceServers {
169170
credential: Some("webrtc".to_owned()),
170171
},
171172
RTCConfigIceServer {
172-
urls: vec!["176.9.147.28:3478".to_owned()],
173+
urls: vec!["stun:176.9.147.28:3478".to_owned()],
173174
username: None,
174175
credential: None,
175176
},
@@ -261,6 +262,7 @@ async fn peer_start(api: Api, args: PeerAddArgs, abort: broadcast::Receiver<()>)
261262
Result::<_, Error>::Ok((pc, main_channel))
262263
};
263264

265+
#[allow(unused_mut)]
264266
let (mut pc, mut main_channel) = match fut.await {
265267
Ok(v) => v,
266268
Err(err) => {
@@ -476,6 +478,7 @@ impl Channels {
476478
}
477479

478480
// TODO(binier): remove unwraps
481+
#[allow(unused_mut)]
479482
async fn peer_loop(
480483
peer_id: PeerId,
481484
event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
@@ -593,6 +596,7 @@ async fn peer_loop(
593596
Err(err) => (None, Err(err.to_string())),
594597
};
595598

599+
#[allow(unused_mut)]
596600
if let Some(mut chan) = chan {
597601
fn process_msg(
598602
chan_id: ChannelId,
@@ -730,16 +734,42 @@ pub trait P2pServiceWebrtc: redux::Service {
730734
spawner.spawn_main("webrtc", async move {
731735
#[allow(clippy::all)]
732736
let api = build_api();
737+
let active_peers = Arc::new(Mutex::new(BTreeMap::new()));
733738
while let Some(cmd) = cmd_receiver.recv().await {
734739
match cmd {
735740
Cmd::PeerAdd { args, abort } => {
736741
#[allow(clippy::all)]
737-
let api_clone = api.clone();
742+
let api = api.clone();
743+
let active_peers = active_peers.clone();
738744
spawn_local(async move {
745+
let peer_id = args.peer_id;
746+
let (abort_finished_tx, abort_finished) = broadcast::channel::<()>(1);
739747
let mut aborted = abort.resubscribe();
748+
749+
if let Some(mut abort_finished) =
750+
active_peers.lock().await.insert(peer_id, abort_finished)
751+
{
752+
// wait for peer conn with same id to finish.
753+
let _ = abort_finished.recv().await;
754+
}
755+
if active_peers.lock().await.len() > 100 {
756+
sleep(Duration::from_millis(500)).await;
757+
}
758+
740759
tokio::select! {
741760
_ = aborted.recv() => {}
742-
_ = peer_start(api_clone, args, abort) => {}
761+
_ = peer_start(api, args, abort) => {}
762+
}
763+
764+
// delay removing active_peer to give some time for cleanup.
765+
sleep(Duration::from_millis(500)).await;
766+
let rx1 = abort_finished_tx.subscribe();
767+
let mut active_peers = active_peers.lock().await;
768+
if let Some(rx) = active_peers
769+
.remove(&peer_id)
770+
.filter(|rx2| !rx2.same_channel(&rx1))
771+
{
772+
active_peers.insert(peer_id, rx);
743773
}
744774
});
745775
}

0 commit comments

Comments
 (0)