diff --git a/p2p/src/service_impl/webrtc/mod.rs b/p2p/src/service_impl/webrtc/mod.rs index 0fd03da2f9..c363f39b6b 100644 --- a/p2p/src/service_impl/webrtc/mod.rs +++ b/p2p/src/service_impl/webrtc/mod.rs @@ -42,7 +42,10 @@ use super::TaskSpawner; const CHUNK_SIZE: usize = 16 * 1024; pub enum Cmd { - PeerAdd(PeerAddArgs), + PeerAdd { + args: PeerAddArgs, + abort: oneshot::Receiver<()>, + }, } #[derive(Debug)] @@ -83,6 +86,7 @@ pub enum PeerConnectionKind { pub struct PeerState { pub cmd_sender: mpsc::UnboundedSender, + pub abort: oneshot::Sender<()>, } #[derive(thiserror::Error, derive_more::From, Debug)] @@ -250,6 +254,15 @@ async fn peer_start(args: PeerAddArgs) { } }; + let (main_channel_open_tx, main_channel_open) = oneshot::channel::<()>(); + let mut main_channel_open_tx = Some(main_channel_open_tx); + main_channel.on_open(move || { + if let Some(tx) = main_channel_open_tx.take() { + let _ = tx.send(()); + } + std::future::ready(()) + }); + let answer = if is_outgoing { let answer_fut = async { let sdp = pc.local_sdp().await.unwrap(); @@ -357,6 +370,8 @@ async fn peer_start(args: PeerAddArgs) { return; } Some(PeerCmd::ConnectionAuthorizationSend(Some(auth))) => { + let _ = main_channel_open.await; + // Add a delay for sending messages after channel // was opened. Some initial messages get lost otherwise. // TODO(binier): find deeper cause and fix it. @@ -691,8 +706,13 @@ pub trait P2pServiceWebrtc: redux::Service { spawner.spawn_main("webrtc", async move { while let Some(cmd) = cmd_receiver.recv().await { match cmd { - Cmd::PeerAdd(args) => { - spawn_local(peer_start(args)); + Cmd::PeerAdd { args, abort } => { + spawn_local(async move { + tokio::select! { + _ = abort => {} + _ = peer_start(args) => {} + } + }); } } } @@ -706,42 +726,52 @@ pub trait P2pServiceWebrtc: redux::Service { fn outgoing_init(&mut self, peer_id: PeerId) { let (peer_cmd_sender, peer_cmd_receiver) = mpsc::unbounded_channel(); + let (abort_sender, abort_receiver) = oneshot::channel(); self.peers().insert( peer_id, PeerState { cmd_sender: peer_cmd_sender, + abort: abort_sender, }, ); let event_sender = self.event_sender().clone(); let event_sender = Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok()); - let _ = self.cmd_sender().send(Cmd::PeerAdd(PeerAddArgs { - peer_id, - kind: PeerConnectionKind::Outgoing, - event_sender, - cmd_receiver: peer_cmd_receiver, - })); + let _ = self.cmd_sender().send(Cmd::PeerAdd { + args: PeerAddArgs { + peer_id, + kind: PeerConnectionKind::Outgoing, + event_sender, + cmd_receiver: peer_cmd_receiver, + }, + abort: abort_receiver, + }); } fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) { let (peer_cmd_sender, peer_cmd_receiver) = mpsc::unbounded_channel(); + let (abort_sender, abort_receiver) = oneshot::channel(); self.peers().insert( peer_id, PeerState { cmd_sender: peer_cmd_sender, + abort: abort_sender, }, ); let event_sender = self.event_sender().clone(); let event_sender = Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok()); - let _ = self.cmd_sender().send(Cmd::PeerAdd(PeerAddArgs { - peer_id, - kind: PeerConnectionKind::Incoming(Box::new(offer)), - event_sender, - cmd_receiver: peer_cmd_receiver, - })); + let _ = self.cmd_sender().send(Cmd::PeerAdd { + args: PeerAddArgs { + peer_id, + kind: PeerConnectionKind::Incoming(Box::new(offer)), + event_sender, + cmd_receiver: peer_cmd_receiver, + }, + abort: abort_receiver, + }); } fn set_answer(&mut self, peer_id: PeerId, answer: webrtc::Answer) {