Skip to content

Commit 020be3c

Browse files
committed
Tidy pairing websocket loops.
1 parent cc704ba commit 020be3c

File tree

1 file changed

+36
-32
lines changed

1 file changed

+36
-32
lines changed

crates/net/src/pairing/websocket.rs

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
use super::{DeviceEnrollment, Error, Result, ServerPairUrl};
33
use crate::NetworkAccount;
44
use futures::{
5-
select,
65
stream::{SplitSink, SplitStream},
7-
FutureExt, SinkExt, StreamExt,
6+
SinkExt, StreamExt,
87
};
98
use prost::bytes::Bytes;
109
use snow::{Builder, HandshakeState, Keypair, TransportState};
@@ -70,7 +69,7 @@ enum IncomingAction {
7069
HandleMessage(PairingMessage),
7170
}
7271

73-
/// Listen for incoming messages on the stream.
72+
/// Listen for incoming messages on the websocket stream.
7473
async fn listen(
7574
mut rx: WsStream,
7675
tx: mpsc::Sender<RelayPacket>,
@@ -102,6 +101,7 @@ async fn listen(
102101
}
103102
}
104103
}
104+
tracing::debug!("pairing::websocket::connection_closed");
105105
}
106106

107107
/// Offer is the device that is authenticated and can
@@ -217,26 +217,29 @@ impl<'a> OfferPairing<'a> {
217217
let (close_tx, mut close_rx) = mpsc::channel::<()>(1);
218218
tokio::task::spawn(listen(stream, offer_tx, close_tx));
219219
loop {
220-
select! {
221-
event = offer_rx.recv().fuse() => {
222-
if let Some(event) = event {
223-
self.incoming(event).await?;
224-
if self.is_finished() {
225-
break;
226-
}
220+
tokio::select! {
221+
biased;
222+
// Explicit shutdown notification
223+
Some(_) = shutdown_rx.recv() => {
224+
tracing::debug!("pairing::offer::shutdown_received");
225+
if let Err(error) = self.tx.send(Message::Close(Some(CloseFrame {
226+
code: CloseCode::Normal,
227+
reason: Utf8Bytes::from_static("closed"),
228+
}))).await {
229+
tracing::error!(
230+
error = %error,
231+
"pairing::offer::websocket_close_frame::error");
227232
}
233+
break;
228234
}
229-
event = close_rx.recv().fuse() => {
230-
if event.is_some() {
231-
break;
232-
}
235+
// Close signal from the websocket stream
236+
Some(_) = close_rx.recv() => {
237+
break;
233238
}
234-
event = shutdown_rx.recv().fuse() => {
235-
if event.is_some() {
236-
let _ = self.tx.send(Message::Close(Some(CloseFrame {
237-
code: CloseCode::Normal,
238-
reason: Utf8Bytes::from_static("closed"),
239-
}))).await;
239+
// Incoming event
240+
Some(event) = offer_rx.recv() => {
241+
self.incoming(event).await?;
242+
if self.is_finished() {
240243
break;
241244
}
242245
}
@@ -633,26 +636,27 @@ impl<'a> AcceptPairing<'a> {
633636
tokio::task::spawn(listen(stream, offer_tx, close_tx));
634637

635638
loop {
636-
select! {
637-
event = offer_rx.recv().fuse() => {
639+
tokio::select! {
640+
biased;
641+
event = shutdown_rx.recv() => {
642+
if event.is_some() {
643+
let _ = self.tx.send(Message::Close(Some(CloseFrame {
644+
code: CloseCode::Normal,
645+
reason: Utf8Bytes::from_static("closed"),
646+
}))).await;
647+
break;
648+
}
649+
}
650+
event = offer_rx.recv() => {
638651
if let Some(event) = event {
639652
self.incoming(event).await?;
640653
if self.is_finished() {
641654
break;
642655
}
643656
}
644657
}
645-
event = close_rx.recv().fuse() => {
646-
if event.is_some() {
647-
break;
648-
}
649-
}
650-
event = shutdown_rx.recv().fuse() => {
658+
event = close_rx.recv() => {
651659
if event.is_some() {
652-
let _ = self.tx.send(Message::Close(Some(CloseFrame {
653-
code: CloseCode::Normal,
654-
reason: Utf8Bytes::from_static("closed"),
655-
}))).await;
656660
break;
657661
}
658662
}

0 commit comments

Comments
 (0)