Skip to content

Commit 0757cda

Browse files
authored
fix(engineio): fix Noop packet emission when upgrading transport (#554)
1 parent 7ab9071 commit 0757cda

File tree

5 files changed

+75
-18
lines changed

5 files changed

+75
-18
lines changed

crates/engineioxide/src/packet.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ impl From<Packet> for tokio_tungstenite::tungstenite::Utf8Bytes {
146146
String::from(value).into()
147147
}
148148
}
149+
impl From<Packet> for Bytes {
150+
fn from(value: Packet) -> Self {
151+
String::from(value).into()
152+
}
153+
}
149154
/// Deserialize a [Packet] from a [String] according to the Engine.IO protocol
150155
impl TryFrom<Str> for Packet {
151156
type Error = Error;

crates/engineioxide/src/transport/polling/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,19 @@ where
102102
return Err(Error::TransportMismatch);
103103
}
104104

105+
if socket.is_upgrading() {
106+
#[cfg(feature = "tracing")]
107+
tracing::debug!(?sid, "socket is upgrading, sending NOOP packet");
108+
109+
#[cfg(feature = "v3")]
110+
let data = payload::packet_encoder(Packet::Noop, socket.protocol, socket.supports_binary);
111+
#[cfg(not(feature = "v3"))]
112+
let data = payload::packet_encoder(Packet::Noop, socket.protocol);
113+
114+
let is_binary = false; // The noop packet is guaranteed to be serialized as text
115+
return Ok(http_response(StatusCode::OK, data, is_binary)?);
116+
}
117+
105118
// If the socket is already locked, it means that the socket is being used by another request
106119
// In case of multiple http polling, session should be closed
107120
let rx = match socket.internal_rx.try_lock() {

crates/engineioxide/src/transport/polling/payload/encoder.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ pub async fn v4_encoder(
112112
/// Encode one packet into a *binary* payload according to the
113113
/// [engine.io v3 protocol](https://github.com/socketio/engine.io-protocol/tree/v3#payload)
114114
#[cfg(feature = "v3")]
115-
pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> {
115+
pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) {
116116
use crate::transport::polling::payload::BINARY_PACKET_SEPARATOR_V3;
117117
use bytes::BufMut;
118118

@@ -147,13 +147,12 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Resu
147147
data.extend_from_slice(packet.as_bytes()); // packet
148148
}
149149
};
150-
Ok(())
151150
}
152151

153152
/// Encode one packet into a *string* payload according to the
154153
/// [engine.io v3 protocol](https://github.com/socketio/engine.io-protocol/tree/v3#payload)
155154
#[cfg(feature = "v3")]
156-
pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> {
155+
pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) {
157156
use crate::transport::polling::payload::STRING_PACKET_SEPARATOR_V3;
158157
use bytes::BufMut;
159158
let packet: String = packet.into();
@@ -164,7 +163,6 @@ pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> R
164163
packet
165164
);
166165
data.put_slice(packet.as_bytes());
167-
Ok(())
168166
}
169167

170168
/// Encode multiple packet packet into a *string* payload if there is no binary packet or into a *binary* payload if there are binary packets
@@ -202,11 +200,11 @@ pub async fn v3_binary_encoder(
202200

203201
if has_binary {
204202
for packet in packet_buffer {
205-
v3_bin_packet_encoder(packet, &mut data)?;
203+
v3_bin_packet_encoder(packet, &mut data);
206204
}
207205
} else {
208206
for packet in packet_buffer {
209-
v3_string_packet_encoder(packet, &mut data)?;
207+
v3_string_packet_encoder(packet, &mut data);
210208
}
211209
}
212210

@@ -216,11 +214,11 @@ pub async fn v3_binary_encoder(
216214
for packet in packets {
217215
match packet {
218216
Packet::BinaryV3(_) | Packet::Binary(_) => {
219-
v3_bin_packet_encoder(packet, &mut data)?;
217+
v3_bin_packet_encoder(packet, &mut data);
220218
has_binary = true;
221219
}
222220
packet => {
223-
v3_string_packet_encoder(packet, &mut data)?;
221+
v3_string_packet_encoder(packet, &mut data);
224222
}
225223
};
226224
}
@@ -250,15 +248,15 @@ pub async fn v3_string_encoder(
250248
let current_size = data.len() + PUNCTUATION_LEN + max_packet_size_len;
251249
while let Some(packets) = try_recv_packet(&mut rx, current_size, max_payload, true) {
252250
for packet in packets {
253-
v3_string_packet_encoder(packet, &mut data)?;
251+
v3_string_packet_encoder(packet, &mut data);
254252
}
255253
}
256254

257255
// If there is no packet in the buffer, wait for the next packet
258256
if data.is_empty() {
259257
let packets = recv_packet(&mut rx).await?;
260258
for packet in packets {
261-
v3_string_packet_encoder(packet, &mut data)?;
259+
v3_string_packet_encoder(packet, &mut data);
262260
}
263261
}
264262

crates/engineioxide/src/transport/polling/payload/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,36 @@ pub async fn encoder(
8787
encoder::v4_encoder(rx, max_payload).await
8888
}
8989
}
90+
91+
/// Encodes a single packet into a byte array.
92+
pub fn packet_encoder(
93+
packet: Packet,
94+
#[allow(unused_variables)] protocol: ProtocolVersion,
95+
#[cfg(feature = "v3")] supports_binary: bool,
96+
) -> Bytes {
97+
#[cfg(feature = "v3")]
98+
{
99+
match protocol {
100+
ProtocolVersion::V4 => packet.into(),
101+
ProtocolVersion::V3 if supports_binary => {
102+
use bytes::BytesMut;
103+
104+
let size_hint = packet.get_size_hint(!supports_binary) + 2;
105+
let mut bytes = BytesMut::with_capacity(size_hint);
106+
encoder::v3_bin_packet_encoder(packet, &mut bytes);
107+
bytes.freeze()
108+
}
109+
ProtocolVersion::V3 => {
110+
use bytes::BytesMut;
111+
112+
let size_hint = packet.get_size_hint(!supports_binary) + 2;
113+
let mut bytes = BytesMut::with_capacity(size_hint);
114+
encoder::v3_string_packet_encoder(packet, &mut bytes);
115+
bytes.freeze()
116+
}
117+
}
118+
}
119+
120+
#[cfg(not(feature = "v3"))]
121+
packet.into()
122+
}

crates/engineioxide/src/transport/ws.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,18 @@ where
320320
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
321321
{
322322
#[cfg(feature = "tracing")]
323-
tracing::debug!("websocket connection upgrade");
323+
tracing::debug!("starting websocket connection upgrade");
324+
325+
// The socket is upgrading, from now on all polling
326+
// request will return a `Packet::Noop`
327+
socket.start_upgrade();
328+
329+
// We send a last Noop request to close a potential waiting polling request.
330+
socket.send(Packet::Noop)?;
331+
332+
// wait for any current polling connection to finish by waiting for the socket to be unlocked
333+
// All other polling connection will be immediately closed with a NOOP packet.
334+
let _lock = socket.internal_rx.lock().await;
324335

325336
// Fetch the next packet from the ws stream, it should be a PingUpgrade packet
326337
let msg = match ws.next().await {
@@ -329,16 +340,15 @@ where
329340
};
330341
match Packet::try_from(msg)? {
331342
Packet::PingUpgrade => {
332-
socket.start_upgrade();
343+
#[cfg(feature = "tracing")]
344+
tracing::debug!("received first ping upgrade");
345+
333346
// Respond with a PongUpgrade packet
334347
ws.send(Message::Text(Packet::PongUpgrade.into())).await?;
335348
}
336349
p => Err(Error::BadPacket(p))?,
337350
};
338351

339-
// send a NOOP packet to any pending polling request so it closes gracefully
340-
socket.send(Packet::Noop)?;
341-
342352
// Fetch the next packet from the ws stream, it should be an Upgrade packet
343353
let msg = match ws.next().await {
344354
Some(Ok(Message::Text(d))) => d,
@@ -356,13 +366,11 @@ where
356366
match Packet::try_from(msg)? {
357367
Packet::Upgrade => {
358368
#[cfg(feature = "tracing")]
359-
tracing::debug!("ws upgraded successful")
369+
tracing::debug!("ws upgraded successfully")
360370
}
361371
p => Err(Error::BadPacket(p))?,
362372
};
363373

364-
// wait for any polling connection to finish by waiting for the socket to be unlocked
365-
let _ = socket.internal_rx.lock().await;
366374
socket.upgrade_to_websocket();
367375
Ok(())
368376
}

0 commit comments

Comments
 (0)