Skip to content

Commit 52c20c6

Browse files
authored
Merge pull request #1080 from openmina/fix/p2p/webrtc/wasm_panic
Fix(p2p/webrtc/wasm): in rare cases, use of some tokio channels can cause panic if used in main thread
2 parents d03842c + a12cc87 commit 52c20c6

File tree

2 files changed

+79
-31
lines changed

2 files changed

+79
-31
lines changed

core/src/channels.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,56 @@
1-
pub use tokio::sync::{broadcast, mpsc, oneshot, watch};
1+
pub use tokio::sync::{mpsc, oneshot};
2+
3+
pub mod broadcast {
4+
pub use tokio::sync::broadcast::*;
5+
6+
#[deprecated(note = "don't use across threads as it can cause panic in WASM")]
7+
#[inline(always)]
8+
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
9+
tokio::sync::broadcast::channel(capacity)
10+
}
11+
}
12+
13+
pub mod watch {
14+
pub use tokio::sync::watch::*;
15+
16+
#[deprecated(note = "don't use across threads as it can cause panic in WASM")]
17+
#[inline(always)]
18+
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
19+
tokio::sync::watch::channel(init)
20+
}
21+
}
22+
23+
#[allow(dead_code)]
24+
pub struct Aborter(mpsc::Receiver<()>, mpsc::Sender<()>);
25+
26+
#[derive(Clone)]
27+
pub struct Aborted(mpsc::Sender<()>);
28+
29+
impl Default for Aborter {
30+
fn default() -> Self {
31+
let (tx, rx) = mpsc::channel(1);
32+
Self(rx, tx)
33+
}
34+
}
35+
36+
impl Aborter {
37+
pub fn listener_count(&self) -> usize {
38+
self.1.strong_count().saturating_sub(1)
39+
}
40+
41+
/// Simply drops the object. No need to call manually, unless you
42+
/// temporarily have to retain object for some reason.
43+
pub fn abort_mut(&mut self) {
44+
std::mem::take(self);
45+
}
46+
47+
pub fn aborted(&self) -> Aborted {
48+
Aborted(self.1.clone())
49+
}
50+
}
51+
52+
impl Aborted {
53+
pub async fn wait(&self) {
54+
self.0.closed().await;
55+
}
56+
}

p2p/src/service_impl/webrtc/mod.rs

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tokio::task::spawn_local;
1919
#[cfg(target_arch = "wasm32")]
2020
use wasm_bindgen_futures::spawn_local;
2121

22-
use openmina_core::channels::{broadcast, mpsc, oneshot};
22+
use openmina_core::channels::{mpsc, oneshot, Aborted, Aborter};
2323

2424
use crate::identity::{EncryptableType, PublicKey};
2525
use crate::webrtc::{ConnectionAuth, ConnectionAuthEncrypted};
@@ -61,10 +61,7 @@ use super::TaskSpawner;
6161
const CHUNK_SIZE: usize = 16 * 1024;
6262

6363
pub enum Cmd {
64-
PeerAdd {
65-
args: PeerAddArgs,
66-
abort: broadcast::Receiver<()>,
67-
},
64+
PeerAdd { args: PeerAddArgs, aborted: Aborted },
6865
}
6966

7067
#[derive(Debug)]
@@ -105,7 +102,7 @@ pub enum PeerConnectionKind {
105102

106103
pub struct PeerState {
107104
pub cmd_sender: mpsc::UnboundedSender<PeerCmd>,
108-
pub abort: broadcast::Sender<()>,
105+
pub abort: Aborter,
109106
}
110107

111108
#[derive(thiserror::Error, derive_more::From, Debug)]
@@ -226,12 +223,7 @@ async fn wait_for_ice_gathering_complete(pc: &mut RTCConnection) {
226223
}
227224
}
228225

229-
async fn peer_start(
230-
api: Api,
231-
args: PeerAddArgs,
232-
abort: broadcast::Receiver<()>,
233-
closed: broadcast::Sender<()>,
234-
) {
226+
async fn peer_start(api: Api, args: PeerAddArgs, abort: Aborted, closed: mpsc::Sender<()>) {
235227
let PeerAddArgs {
236228
peer_id,
237229
kind,
@@ -353,7 +345,7 @@ async fn peer_start(
353345
if let Some(connected_tx) = connected_tx.take() {
354346
let _ = connected_tx.send(Err("disconnected"));
355347
} else {
356-
let _ = closed.send(());
348+
let _ = closed.try_send(());
357349
}
358350
}
359351
_ => {}
@@ -489,7 +481,7 @@ async fn peer_loop(
489481
event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
490482
mut cmd_receiver: mpsc::UnboundedReceiver<PeerCmd>,
491483
mut pc: RTCConnection,
492-
abort: broadcast::Receiver<()>,
484+
aborted: Aborted,
493485
) {
494486
// TODO(binier): maybe use small_vec (stack allocated) or something like that.
495487
let mut channels = Channels::new();
@@ -566,10 +558,10 @@ async fn peer_loop(
566558
let _ =
567559
internal_cmd_sender.send(PeerCmdInternal::ChannelOpened(id, result.await));
568560
};
569-
let mut aborted = abort.resubscribe();
561+
let mut aborted = aborted.clone();
570562
spawn_local(async move {
571563
tokio::select! {
572-
_ = aborted.recv() => {}
564+
_ = aborted.wait() => {}
573565
_ = fut => {}
574566
}
575567
});
@@ -698,10 +690,10 @@ async fn peer_loop(
698690
}
699691
};
700692

701-
let mut aborted = abort.resubscribe();
693+
let mut aborted = aborted.clone();
702694
spawn_local(async move {
703695
tokio::select! {
704-
_ = aborted.recv() => {}
696+
_ = aborted.wait() => {}
705697
_ = fut => {}
706698
}
707699
});
@@ -743,7 +735,7 @@ pub trait P2pServiceWebrtc: redux::Service {
743735
let conn_permits = Arc::new(Semaphore::const_new(MAX_PEERS));
744736
while let Some(cmd) = cmd_receiver.recv().await {
745737
match cmd {
746-
Cmd::PeerAdd { args, mut abort } => {
738+
Cmd::PeerAdd { args, aborted } => {
747739
#[allow(clippy::all)]
748740
let api = api.clone();
749741
let conn_permits = conn_permits.clone();
@@ -755,22 +747,21 @@ pub trait P2pServiceWebrtc: redux::Service {
755747
bug_condition!("P2P WebRTC Semaphore acquisition failed!");
756748
return;
757749
};
758-
let (closed_tx, mut closed) = broadcast::channel(1);
750+
let (closed_tx, mut closed) = mpsc::channel(1);
759751
let event_sender_clone = event_sender.clone();
760752
spawn_local(async move {
761753
// to avoid sending closed multiple times
762754
let _ = closed.recv().await;
763755
event_sender_clone(P2pConnectionEvent::Closed(peer_id).into());
764756
});
765757
tokio::select! {
766-
_ = peer_start(api, args, abort.resubscribe(), closed_tx.clone()) => {}
767-
_ = abort.recv() => {
768-
}
758+
_ = peer_start(api, args, aborted.clone(), closed_tx.clone()) => {}
759+
_ = aborted.wait() => {}
769760
}
770761

771762
// delay dropping permit to give some time for cleanup.
772763
sleep(Duration::from_millis(100)).await;
773-
let _ = closed_tx.send(());
764+
let _ = closed_tx.send(()).await;
774765
});
775766
}
776767
}
@@ -785,13 +776,14 @@ pub trait P2pServiceWebrtc: redux::Service {
785776

786777
fn outgoing_init(&mut self, peer_id: PeerId) {
787778
let (peer_cmd_sender, peer_cmd_receiver) = mpsc::unbounded_channel();
788-
let (abort_sender, abort_receiver) = broadcast::channel(1);
779+
let aborter = Aborter::default();
780+
let aborted = aborter.aborted();
789781

790782
self.peers().insert(
791783
peer_id,
792784
PeerState {
793785
cmd_sender: peer_cmd_sender,
794-
abort: abort_sender,
786+
abort: aborter,
795787
},
796788
);
797789
let event_sender = self.event_sender().clone();
@@ -804,19 +796,20 @@ pub trait P2pServiceWebrtc: redux::Service {
804796
event_sender,
805797
cmd_receiver: peer_cmd_receiver,
806798
},
807-
abort: abort_receiver,
799+
aborted,
808800
});
809801
}
810802

811803
fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
812804
let (peer_cmd_sender, peer_cmd_receiver) = mpsc::unbounded_channel();
813-
let (abort_sender, abort_receiver) = broadcast::channel(1);
805+
let aborter = Aborter::default();
806+
let aborted = aborter.aborted();
814807

815808
self.peers().insert(
816809
peer_id,
817810
PeerState {
818811
cmd_sender: peer_cmd_sender,
819-
abort: abort_sender,
812+
abort: aborter,
820813
},
821814
);
822815
let event_sender = self.event_sender().clone();
@@ -829,7 +822,7 @@ pub trait P2pServiceWebrtc: redux::Service {
829822
event_sender,
830823
cmd_receiver: peer_cmd_receiver,
831824
},
832-
abort: abort_receiver,
825+
aborted,
833826
});
834827
}
835828

0 commit comments

Comments
 (0)