Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions p2p/src/service_impl/webrtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use super::TaskSpawner;
const CHUNK_SIZE: usize = 16 * 1024;

pub enum Cmd {
PeerAdd(PeerAddArgs),
PeerAdd {
args: PeerAddArgs,
abort: oneshot::Receiver<()>,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -83,6 +86,7 @@ pub enum PeerConnectionKind {

pub struct PeerState {
pub cmd_sender: mpsc::UnboundedSender<PeerCmd>,
pub abort: oneshot::Sender<()>,
}

#[derive(thiserror::Error, derive_more::From, Debug)]
Expand Down Expand Up @@ -250,6 +254,15 @@ async fn peer_start(args: PeerAddArgs) {
}
};

let (main_channel_open_tx, main_channel_open) = oneshot::channel::<()>();
let mut main_channel_open_tx = Some(main_channel_open_tx);
main_channel.on_open(move || {
if let Some(tx) = main_channel_open_tx.take() {
let _ = tx.send(());
}
std::future::ready(())
});

let answer = if is_outgoing {
let answer_fut = async {
let sdp = pc.local_sdp().await.unwrap();
Expand Down Expand Up @@ -357,6 +370,8 @@ async fn peer_start(args: PeerAddArgs) {
return;
}
Some(PeerCmd::ConnectionAuthorizationSend(Some(auth))) => {
let _ = main_channel_open.await;

// Add a delay for sending messages after channel
// was opened. Some initial messages get lost otherwise.
// TODO(binier): find deeper cause and fix it.
Expand Down Expand Up @@ -691,8 +706,13 @@ pub trait P2pServiceWebrtc: redux::Service {
spawner.spawn_main("webrtc", async move {
while let Some(cmd) = cmd_receiver.recv().await {
match cmd {
Cmd::PeerAdd(args) => {
spawn_local(peer_start(args));
Cmd::PeerAdd { args, abort } => {
spawn_local(async move {
tokio::select! {
_ = abort => {}
_ = peer_start(args) => {}
}
});
}
}
}
Expand All @@ -706,42 +726,52 @@ pub trait P2pServiceWebrtc: redux::Service {

fn outgoing_init(&mut self, peer_id: PeerId) {
let (peer_cmd_sender, peer_cmd_receiver) = mpsc::unbounded_channel();
let (abort_sender, abort_receiver) = oneshot::channel();

self.peers().insert(
peer_id,
PeerState {
cmd_sender: peer_cmd_sender,
abort: abort_sender,
},
);
let event_sender = self.event_sender().clone();
let event_sender =
Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
let _ = self.cmd_sender().send(Cmd::PeerAdd(PeerAddArgs {
peer_id,
kind: PeerConnectionKind::Outgoing,
event_sender,
cmd_receiver: peer_cmd_receiver,
}));
let _ = self.cmd_sender().send(Cmd::PeerAdd {
args: PeerAddArgs {
peer_id,
kind: PeerConnectionKind::Outgoing,
event_sender,
cmd_receiver: peer_cmd_receiver,
},
abort: abort_receiver,
});
}

fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
let (peer_cmd_sender, peer_cmd_receiver) = mpsc::unbounded_channel();
let (abort_sender, abort_receiver) = oneshot::channel();

self.peers().insert(
peer_id,
PeerState {
cmd_sender: peer_cmd_sender,
abort: abort_sender,
},
);
let event_sender = self.event_sender().clone();
let event_sender =
Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
let _ = self.cmd_sender().send(Cmd::PeerAdd(PeerAddArgs {
peer_id,
kind: PeerConnectionKind::Incoming(Box::new(offer)),
event_sender,
cmd_receiver: peer_cmd_receiver,
}));
let _ = self.cmd_sender().send(Cmd::PeerAdd {
args: PeerAddArgs {
peer_id,
kind: PeerConnectionKind::Incoming(Box::new(offer)),
event_sender,
cmd_receiver: peer_cmd_receiver,
},
abort: abort_receiver,
});
}

fn set_answer(&mut self, peer_id: PeerId, answer: webrtc::Answer) {
Expand Down
Loading