Skip to content

Commit 0fd0954

Browse files
committed
chore: simplify ws close handling
1 parent d1b00ee commit 0fd0954

File tree

1 file changed

+164
-128
lines changed
  • crates/rproxy/src/server/proxy/ws

1 file changed

+164
-128
lines changed

crates/rproxy/src/server/proxy/ws/proxy.rs

Lines changed: 164 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use time::{UtcDateTime, format_description::well_known::Iso8601};
3131
use tokio::{net::TcpStream, sync::broadcast};
3232
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
3333
use tracing::{debug, error, info, trace, warn};
34-
use tungstenite::Utf8Bytes;
3534
use uuid::Uuid;
3635
use x509_parser::asn1_rs::ToStatic;
3736

@@ -55,6 +54,9 @@ const WS_CLNT_ERROR: &str = "client error";
5554
const WS_BKND_ERROR: &str = "backend error";
5655
const WS_CLOSE_OK: &str = "";
5756

57+
const WS_CLOSE_REASON_NORMAL: &str = "normal-close";
58+
const WS_CLOSE_REASON_UNSPECIFIED: &str = "unexpected-close";
59+
5860
const WS_LABEL_BKND: &str = "backend";
5961
const WS_LABEL_CLNT: &str = "client";
6062

@@ -668,87 +670,33 @@ where
668670
}
669671
}
670672

671-
if let Err(msg) = pumping &&
672-
msg != WS_CLOSE_OK
673-
{
674-
debug!(
675-
proxy = P::name(),
676-
connection_id = %self.info.conn_id(),
677-
worker_id = %self.worker_id,
678-
msg = %msg,
679-
"Closing client websocket session..."
680-
);
681-
let _ = self // only 1 possible error (i.e. "already closed")
682-
.clnt_tx
683-
.clone() // .close() consumes it
684-
.close(Some(actix_ws::CloseReason {
685-
code: awc::ws::CloseCode::Error,
686-
description: Some(String::from(WS_BKND_ERROR)),
687-
}))
688-
.await;
689-
690-
debug!(
691-
proxy = P::name(),
692-
connection_id = %self.info.conn_id(),
693-
worker_id = %self.worker_id,
694-
msg = %msg,
695-
"Closing backend websocket session..."
696-
);
697-
if let Err(err) = self
698-
.bknd_tx
699-
.send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame {
700-
code: tungstenite::protocol::frame::coding::CloseCode::Error,
701-
reason: msg.into(),
702-
})))
703-
.await
704-
{
705-
error!(
706-
proxy = P::name(),
707-
connection_id = %self.info.conn_id(),
708-
worker_id = %self.worker_id,
709-
msg = %msg,
710-
error = ?err,
711-
"Failed to close backend websocket session"
712-
);
713-
}
714-
} else {
715-
debug!(
716-
proxy = P::name(),
717-
connection_id = %self.info.conn_id(),
718-
worker_id = %self.worker_id,
719-
"Closing client websocket session..."
720-
);
721-
let _ = self // only 1 possible error (i.e. "already closed")
722-
.clnt_tx
723-
.clone() // .close() consumes it
724-
.close(Some(actix_ws::CloseReason {
725-
code: awc::ws::CloseCode::Normal,
726-
description: None,
727-
}))
728-
.await;
729-
730-
debug!(
731-
proxy = P::name(),
732-
connection_id = %self.info.conn_id(),
733-
worker_id = %self.worker_id,
734-
"Closing backend websocket session..."
735-
);
736-
if let Err(err) = self
737-
.bknd_tx
738-
.send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame {
739-
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
740-
reason: Utf8Bytes::default(),
741-
})))
742-
.await
743-
{
744-
error!(
745-
proxy = P::name(),
746-
connection_id = %self.info.conn_id(),
747-
worker_id = %self.worker_id,
748-
error = ?err,
749-
"Failed to close backend websocket session"
750-
);
751-
}
673+
if let Err(reason) = pumping {
674+
let (frame_clnt, frame_bknd) = match reason {
675+
WS_CLOSE_OK => (
676+
actix_ws::CloseReason {
677+
code: awc::ws::CloseCode::Normal,
678+
description: WS_CLOSE_REASON_NORMAL.to_string().into(),
679+
},
680+
tungstenite::protocol::CloseFrame {
681+
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
682+
reason: WS_CLOSE_REASON_NORMAL.into(),
683+
},
684+
),
685+
686+
_ => (
687+
actix_ws::CloseReason {
688+
code: awc::ws::CloseCode::Error,
689+
description: reason.to_string().into(),
690+
},
691+
tungstenite::protocol::CloseFrame {
692+
code: tungstenite::protocol::frame::coding::CloseCode::Error,
693+
reason: reason.into(),
694+
},
695+
),
696+
};
697+
698+
_ = self.close_clnt_session(frame_clnt).await;
699+
_ = self.close_bknd_session(frame_bknd).await;
752700
}
753701

754702
info!(
@@ -849,7 +797,7 @@ where
849797
match clnt_msg {
850798
Some(Ok(msg)) => {
851799
match msg {
852-
// binary
800+
// binary msg from client
853801
actix_ws::Message::Binary(bytes) => {
854802
#[cfg(feature = "chaos")]
855803
if self.chaos.stream_is_blocked.load(Ordering::Relaxed) {
@@ -885,7 +833,7 @@ where
885833
Ok(())
886834
}
887835

888-
// text
836+
// text msg from client
889837
actix_ws::Message::Text(text) => {
890838
#[cfg(feature = "chaos")]
891839
if self.chaos.stream_is_blocked.load(Ordering::Relaxed) {
@@ -928,8 +876,16 @@ where
928876
Ok(())
929877
}
930878

931-
// ping
879+
// ping msg from client
932880
actix_ws::Message::Ping(bytes) => {
881+
#[cfg(debug_assertions)]
882+
debug!(
883+
proxy = P::name(),
884+
connection_id = %self.info.conn_id(),
885+
worker_id = %self.worker_id,
886+
"Handling client's ping..."
887+
);
888+
933889
#[cfg(feature = "chaos")]
934890
if self.chaos.stream_is_blocked.load(Ordering::Relaxed) {
935891
return Ok(());
@@ -961,8 +917,16 @@ where
961917
Ok(())
962918
}
963919

964-
// pong
920+
// pong msg from client
965921
actix_ws::Message::Pong(bytes) => {
922+
#[cfg(debug_assertions)]
923+
debug!(
924+
proxy = P::name(),
925+
connection_id = %self.info.conn_id(),
926+
worker_id = %self.worker_id,
927+
"Received pong form client"
928+
);
929+
966930
if let Some(pong) = ProxyWsPing::from_bytes(bytes) &&
967931
let Some((_, ping)) = self.pings.remove_sync(&pong.id) &&
968932
pong == ping
@@ -990,30 +954,26 @@ where
990954
Ok(())
991955
}
992956

993-
// close
957+
// close msg from client
994958
actix_ws::Message::Close(reason) => {
995-
if let Err(err) = self
996-
.bknd_tx
997-
.send(tungstenite::Message::Close(reason.map(|r| {
998-
tungstenite::protocol::CloseFrame {
999-
code: tungstenite::protocol::frame::coding::CloseCode::from(
1000-
u16::from(r.code),
1001-
),
1002-
reason: r.description.unwrap_or_default().into(),
1003-
}
1004-
})))
1005-
.await
1006-
{
1007-
error!(
1008-
proxy = P::name(),
1009-
connection_id = %self.info.conn_id(),
1010-
worker_id = %self.worker_id,
1011-
error = ?err,
1012-
"Failed to close backend websocket session"
1013-
);
1014-
return Err(WS_BKND_ERROR);
1015-
}
1016-
Err(WS_CLOSE_OK)
959+
return self
960+
.close_bknd_session(
961+
reason
962+
.map(|r| tungstenite::protocol::CloseFrame {
963+
code: tungstenite::protocol::frame::coding::CloseCode::from(
964+
u16::from(r.code),
965+
),
966+
reason: r
967+
.description
968+
.map_or(WS_CLOSE_REASON_NORMAL.into(), |r| r.into()),
969+
})
970+
.unwrap_or(tungstenite::protocol::CloseFrame {
971+
code:
972+
tungstenite::protocol::frame::coding::CloseCode::Normal,
973+
reason: WS_CLOSE_REASON_NORMAL.into(),
974+
}),
975+
)
976+
.await;
1017977
}
1018978

1019979
_ => Ok(()),
@@ -1134,6 +1094,14 @@ where
11341094

11351095
// ping
11361096
tungstenite::Message::Ping(bytes) => {
1097+
#[cfg(debug_assertions)]
1098+
debug!(
1099+
proxy = P::name(),
1100+
connection_id = %self.info.conn_id(),
1101+
worker_id = %self.worker_id,
1102+
"Handling backend's ping..."
1103+
);
1104+
11371105
#[cfg(feature = "chaos")]
11381106
if self.chaos.stream_is_blocked.load(Ordering::Relaxed) {
11391107
return Ok(());
@@ -1168,6 +1136,14 @@ where
11681136

11691137
// pong
11701138
tungstenite::Message::Pong(bytes) => {
1139+
#[cfg(debug_assertions)]
1140+
debug!(
1141+
proxy = P::name(),
1142+
connection_id = %self.info.conn_id(),
1143+
worker_id = %self.worker_id,
1144+
"Received pong form backend"
1145+
);
1146+
11711147
if let Some(pong) = ProxyWsPing::from_bytes(bytes) &&
11721148
let Some((_, ping)) = self.pings.remove_sync(&pong.id) &&
11731149
pong == ping
@@ -1197,25 +1173,19 @@ where
11971173

11981174
// close
11991175
tungstenite::Message::Close(reason) => {
1200-
if let Err(err) = self
1201-
.clnt_tx
1202-
.clone() // .close() consumes it
1203-
.close(reason.map(|reason| actix_ws::CloseReason {
1204-
code: u16::from(reason.code).into(),
1205-
description: reason.reason.to_string().into(),
1206-
}))
1207-
.await
1208-
{
1209-
error!(
1210-
proxy = P::name(),
1211-
connection_id = %self.info.conn_id(),
1212-
worker_id = %self.worker_id,
1213-
error = ?err,
1214-
"Failed to proxy close websocket message to client"
1215-
);
1216-
return Err(WS_CLNT_ERROR);
1217-
}
1218-
Err(WS_CLOSE_OK)
1176+
return self
1177+
.close_clnt_session(
1178+
reason
1179+
.map(|reason| actix_ws::CloseReason {
1180+
code: u16::from(reason.code).into(),
1181+
description: reason.reason.to_string().into(),
1182+
})
1183+
.unwrap_or(actix_ws::CloseReason {
1184+
code: awc::ws::CloseCode::Normal,
1185+
description: WS_CLOSE_REASON_UNSPECIFIED.to_string().into(),
1186+
}),
1187+
)
1188+
.await;
12191189
}
12201190

12211191
_ => Ok(()),
@@ -1244,6 +1214,72 @@ where
12441214
}
12451215
}
12461216
}
1217+
1218+
async fn close_clnt_session(
1219+
&mut self,
1220+
frame: actix_ws::CloseReason,
1221+
) -> Result<(), &'static str> {
1222+
debug!(
1223+
proxy = P::name(),
1224+
connection_id = %self.info.conn_id(),
1225+
worker_id = %self.worker_id,
1226+
msg = ?frame.description,
1227+
"Closing client websocket session..."
1228+
);
1229+
let _ = self // only 1 possible "already closed" error (which we ignore)
1230+
.clnt_tx
1231+
.clone() // .close() consumes it
1232+
.close(Some(frame))
1233+
.await;
1234+
Ok(())
1235+
}
1236+
1237+
async fn close_bknd_session(
1238+
&mut self,
1239+
frame: tungstenite::protocol::CloseFrame,
1240+
) -> Result<(), &'static str> {
1241+
debug!(
1242+
proxy = P::name(),
1243+
connection_id = %self.info.conn_id(),
1244+
worker_id = %self.worker_id,
1245+
msg = %frame.reason,
1246+
"Closing backend websocket session..."
1247+
);
1248+
1249+
if let Err(err) = self
1250+
.bknd_tx
1251+
.send(tungstenite::Message::Close(Some(
1252+
frame.clone(), // it's cheap to clone
1253+
)))
1254+
.await
1255+
{
1256+
if let tungstenite::error::Error::Protocol(protocol_err) = err {
1257+
if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing {
1258+
return Ok(());
1259+
}
1260+
error!(
1261+
proxy = P::name(),
1262+
connection_id = %self.info.conn_id(),
1263+
worker_id = %self.worker_id,
1264+
msg = %frame.reason,
1265+
error = ?protocol_err,
1266+
"Failed to close backend websocket session"
1267+
);
1268+
} else {
1269+
error!(
1270+
proxy = P::name(),
1271+
connection_id = %self.info.conn_id(),
1272+
worker_id = %self.worker_id,
1273+
msg = %frame.reason,
1274+
error = ?err,
1275+
"Failed to close backend websocket session"
1276+
);
1277+
}
1278+
return Err(WS_BKND_ERROR);
1279+
}
1280+
1281+
Ok(())
1282+
}
12471283
}
12481284

12491285
// ProxyWsPostprocessor ------------------------------------------------

0 commit comments

Comments
 (0)