Skip to content

Commit de5dc23

Browse files
committed
fix(ws): enable hyper upgrades and drain request body for WebSocket support
1 parent 9c220a7 commit de5dc23

File tree

4 files changed

+24
-4
lines changed

4 files changed

+24
-4
lines changed

crates/rustapi-core/src/server.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ impl Server {
6161
}
6262
});
6363

64-
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
64+
if let Err(err) = http1::Builder::new()
65+
.serve_connection(io, service)
66+
.with_upgrades()
67+
.await
68+
{
6569
error!("Connection error: {}", err);
6670
}
6771
});
@@ -144,7 +148,8 @@ async fn handle_request(
144148
fn log_request(method: &http::Method, path: &str, status: StatusCode, start: std::time::Instant) {
145149
let elapsed = start.elapsed();
146150

147-
if status.is_success() {
151+
// 1xx (Informational), 2xx (Success), 3xx (Redirection) are considered successful requests
152+
if status.is_success() || status.is_redirection() || status.is_informational() {
148153
info!(
149154
method = %method,
150155
path = %path,

crates/rustapi-ws/src/extractor.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ impl FromRequest for WebSocket {
7979
let method = req.method();
8080

8181
// Validate the upgrade request
82-
let sec_key = validate_upgrade_request(method, headers).map_err(ApiError::from)?;
82+
// Note: we clone sec_key to avoid keeping borrow of headers
83+
let sec_key = validate_upgrade_request(method, headers)
84+
.map_err(ApiError::from)?
85+
.to_string();
8386

8487
// Parse requested protocols
8588
let protocols = headers
@@ -97,6 +100,12 @@ impl FromRequest for WebSocket {
97100
// Capture OnUpgrade future
98101
let on_upgrade = req.extensions_mut().remove::<OnUpgrade>();
99102

103+
// IMPORTANT: Consume the request body to ensure hyper allows the upgrade.
104+
if let Some(stream) = req.take_stream() {
105+
use http_body_util::BodyExt;
106+
let _ = stream.collect().await;
107+
}
108+
100109
Ok(Self {
101110
sec_key,
102111
protocols,
@@ -106,6 +115,7 @@ impl FromRequest for WebSocket {
106115
}
107116
}
108117

118+
109119
impl OperationModifier for WebSocket {
110120
fn update_operation(_op: &mut Operation) {
111121
// WebSocket endpoints don't have regular request body parameters

crates/rustapi-ws/src/upgrade.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,11 @@ impl IntoResponse for WebSocketUpgrade {
192192
callback(socket).await;
193193
}
194194
Err(e) => {
195-
tracing::error!("WebSocket upgrade failed: {}", e);
195+
tracing::error!("WebSocket upgrade failed: {:?}", e);
196+
// Also try to print the source if available
197+
if let Some(source) = std::error::Error::source(&e) {
198+
tracing::error!("Cause: {:?}", source);
199+
}
196200
}
197201
}
198202
});

examples/websocket/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sy
311311
tracing_subscriber::fmt()
312312
.with_env_filter(
313313
tracing_subscriber::EnvFilter::from_default_env()
314+
.add_directive("rustapi_ws=debug".parse().unwrap())
314315
.add_directive("websocket_example=debug".parse().unwrap())
315316
.add_directive("info".parse().unwrap()),
316317
)

0 commit comments

Comments
 (0)