Skip to content

Commit 4dfa975

Browse files
More tests; recv header and keep-alive timeouts.
1 parent 3a12160 commit 4dfa975

File tree

10 files changed

+671
-356
lines changed

10 files changed

+671
-356
lines changed

bin/run-server.rb

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
$LOAD_PATH.unshift File.expand_path("../lib", __dir__)
5+
6+
puts "Loading hyper_ruby"
7+
8+
require "hyper_ruby"
9+
require "json"
10+
11+
# Create and configure the server
12+
server = HyperRuby::Server.new
13+
config = {
14+
bind_address: ENV.fetch("BIND_ADDRESS", "127.0.0.1:3000"),
15+
tokio_threads: ENV.fetch("TOKIO_THREADS", "1").to_i,
16+
debug: ENV.fetch("DEBUG", "false") == "true",
17+
recv_timeout: ENV.fetch("RECV_TIMEOUT", "30000").to_i
18+
}
19+
server.configure(config)
20+
21+
puts "Starting server with config: #{config}"
22+
23+
# Start the server
24+
server.start
25+
26+
puts "Server started"
27+
28+
# Create a worker thread to handle requests
29+
worker = Thread.new do
30+
server.run_worker do |request|
31+
buffer = String.new(capacity: 1024)
32+
request.fill_body(buffer)
33+
34+
# Create a response that echoes back request details
35+
response_data = {
36+
method: request.http_method,
37+
path: request.path,
38+
headers: request.headers,
39+
body: buffer
40+
}
41+
42+
HyperRuby::Response.new(
43+
200,
44+
{ "Content-Type" => "application/json" },
45+
JSON.pretty_generate(response_data)
46+
)
47+
end
48+
end
49+
50+
puts "Server running at #{config[:bind_address]}"
51+
puts "Press Ctrl+C to stop"
52+
53+
# Wait for Ctrl+C
54+
begin
55+
sleep
56+
rescue Interrupt
57+
puts "\nShutting down..."
58+
server.stop
59+
worker.join
60+
end

ext/hyper_ruby/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
1717
crossbeam-channel = "0.5.14"
1818
rb-sys = "0.9.110"
1919
hyper = { version = "1.0", features = ["http1", "http2", "server"] }
20-
hyper-util = { version = "0.1", features = ["tokio", "server", "http1", "http2"] }
20+
hyper-util = { version = "0.1", features = ["tokio", "server", "server-auto", "http1", "http2"] }
2121
http-body-util = "0.1.2"
2222
jemallocator = { version = "0.5.4", features = ["disable_initial_exec_tls"] }
2323
futures = "0.3.31"

ext/hyper_ruby/src/grpc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ pub fn create_grpc_error_response(http_status: u16, grpc_status: u32, message: &
115115
// Create trailers
116116
let mut trailers = HeaderMap::new();
117117
trailers.insert("grpc-status", grpc_status.to_string().parse().unwrap());
118-
trailers.insert("grpc-accept-encoding", "identity".parse().unwrap());
119-
trailers.insert("accept-encoding", "identity".parse().unwrap());
118+
trailers.insert("grpc-accept-encoding", "identity,gzip,deflate,zstd".parse().unwrap());
120119

121120
// Add grpc-message if provided
122121
if !message.is_empty() {

ext/hyper_ruby/src/lib.rs

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ use log::{debug, info, warn};
3636
use env_logger;
3737
use crate::response::BodyWithTrailers;
3838
use std::sync::Once;
39+
use tokio::time::timeout;
40+
use std::future::Future;
41+
use std::pin::Pin;
42+
use std::task::{Context, Poll};
43+
use tokio::time::{Sleep, sleep};
3944

4045
static LOGGER_INIT: Once = Once::new();
4146

@@ -47,6 +52,7 @@ struct ServerConfig {
4752
bind_address: String,
4853
tokio_threads: Option<usize>,
4954
debug: bool,
55+
recv_timeout: u64,
5056
}
5157

5258
impl ServerConfig {
@@ -55,6 +61,7 @@ impl ServerConfig {
5561
bind_address: String::from("127.0.0.1:3000"),
5662
tokio_threads: None,
5763
debug: false,
64+
recv_timeout: 30000, // Default 30 second timeout
5865
}
5966
}
6067
}
@@ -101,6 +108,10 @@ impl Server {
101108
server_config.debug = bool::try_convert(debug)?;
102109
}
103110

111+
if let Some(recv_timeout) = config.get(magnus::Symbol::new("recv_timeout")) {
112+
server_config.recv_timeout = u64::try_convert(recv_timeout)?;
113+
}
114+
104115
// Initialize logging if debug is enabled, but only do it once
105116
if server_config.debug {
106117
LOGGER_INIT.call_once(|| {
@@ -215,16 +226,19 @@ impl Server {
215226
let work_tx = work_tx.clone();
216227

217228
let server_task = tokio::spawn(async move {
229+
let timer = hyper_util::rt::TokioTimer::new();
230+
218231
if config.bind_address.starts_with("unix:") {
219232
let path = config.bind_address.trim_start_matches("unix:");
220233
let listener = UnixListener::bind(path).unwrap();
221234

222235
loop {
223236
let (stream, _) = listener.accept().await.unwrap();
224237
let work_tx = work_tx.clone();
238+
let timer = timer.clone();
225239

226240
tokio::task::spawn(async move {
227-
handle_connection(stream, work_tx).await;
241+
handle_connection(stream, work_tx, config.recv_timeout, timer).await;
228242
});
229243
}
230244
} else {
@@ -235,9 +249,10 @@ impl Server {
235249
loop {
236250
let (stream, _) = listener.accept().await.unwrap();
237251
let work_tx = work_tx.clone();
252+
let timer = timer.clone();
238253

239254
tokio::task::spawn(async move {
240-
handle_connection(stream, work_tx).await;
255+
handle_connection(stream, work_tx, config.recv_timeout, timer).await;
241256
});
242257
}
243258
}
@@ -282,19 +297,27 @@ impl Server {
282297
async fn handle_request(
283298
req: HyperRequest<Incoming>,
284299
work_tx: Arc<crossbeam_channel::Sender<RequestWithCompletion>>,
300+
recv_timeout: u64,
285301
) -> Result<HyperResponse<BodyWithTrailers>, Error> {
286302
debug!("Received request: {:?}", req);
287303
debug!("HTTP version: {:?}", req.version());
288304
debug!("Headers: {:?}", req.headers());
289305

290306
let (parts, body) = req.into_parts();
291307

292-
// Collect the body
293-
let body_bytes = match body.collect().await {
294-
Ok(collected) => collected.to_bytes(),
295-
Err(e) => {
308+
// Collect the body with timeout
309+
let body_bytes = match timeout(
310+
std::time::Duration::from_millis(recv_timeout),
311+
body.collect()
312+
).await {
313+
Ok(Ok(collected)) => collected.to_bytes(),
314+
Ok(Err(e)) => {
296315
debug!("Error collecting body: {:?}", e);
297316
return Err(e);
317+
},
318+
Err(_) => {
319+
debug!("Timeout collecting body");
320+
return Ok(create_timeout_response());
298321
}
299322
};
300323

@@ -336,23 +359,42 @@ async fn handle_request(
336359
}
337360
}
338361

362+
fn create_timeout_response() -> HyperResponse<BodyWithTrailers> {
363+
let builder = HyperResponse::builder()
364+
.status(StatusCode::REQUEST_TIMEOUT)
365+
.header("content-type", "text/plain");
366+
367+
builder.body(BodyWithTrailers::new(Bytes::from("Request timed out while receiving body"), None))
368+
.unwrap()
369+
}
370+
339371
async fn handle_connection(
340372
stream: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
341373
work_tx: Arc<crossbeam_channel::Sender<RequestWithCompletion>>,
374+
recv_timeout: u64,
375+
timer: hyper_util::rt::TokioTimer,
342376
) {
343377
info!("New connection established");
344378

345379
let service = service_fn(move |req: HyperRequest<Incoming>| {
346380
debug!("Service handling request");
347381
let work_tx = work_tx.clone();
348-
handle_request(req, work_tx)
382+
handle_request(req, work_tx, recv_timeout)
349383
});
350384

351385
let io = TokioIo::new(stream);
352386

353-
debug!("Setting up HTTP/2 connection");
354-
let builder = auto::Builder::new(hyper_util::rt::TokioExecutor::new());
355-
387+
debug!("Setting up connection");
388+
let mut builder = auto::Builder::new(hyper_util::rt::TokioExecutor::new());
389+
390+
builder.http1()
391+
.header_read_timeout(std::time::Duration::from_millis(recv_timeout))
392+
.timer(timer.clone());
393+
394+
builder.http2()
395+
.keep_alive_interval(std::time::Duration::from_secs(10))
396+
.timer(timer);
397+
356398
if let Err(err) = builder
357399
.serve_connection(io, service)
358400
.await

ext/hyper_ruby/src/response.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ impl std::error::Error for ResponseError {}
2020
#[derive(Debug, Clone)]
2121
pub struct BodyWithTrailers {
2222
data: Bytes,
23+
data_sent: bool,
2324
trailers_sent: bool,
2425
trailers: Option<HeaderMap>,
2526
}
@@ -28,6 +29,7 @@ impl BodyWithTrailers {
2829
pub fn new(data: Bytes, trailers: Option<HeaderMap>) -> Self {
2930
Self {
3031
data,
32+
data_sent: false,
3133
trailers_sent: false,
3234
trailers,
3335
}
@@ -46,9 +48,9 @@ impl Body for BodyWithTrailers {
4648
mut self: Pin<&mut Self>,
4749
_cx: &mut std::task::Context<'_>,
4850
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
49-
if !self.data.is_empty() {
51+
if !self.data_sent && !self.data.is_empty() {
52+
self.data_sent = true;
5053
let data = self.data.clone();
51-
self.data = Bytes::new();
5254
return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
5355
}
5456

@@ -90,12 +92,20 @@ impl Response {
9092
if body.len() > 0 {
9193
unsafe {
9294
let rust_body = Bytes::copy_from_slice(body.as_slice());
95+
builder_headers.insert(
96+
HeaderName::from_static("content-length"),
97+
rust_body.len().to_string().try_into().unwrap()
98+
);
9399
match builder.body(BodyWithTrailers::new(rust_body, None)) {
94100
Ok(response) => Ok(Self { response }),
95101
Err(_) => Err(MagnusError::new(magnus::exception::runtime_error(), "Failed to create response"))
96102
}
97103
}
98104
} else {
105+
builder_headers.insert(
106+
HeaderName::from_static("content-length"),
107+
"0".try_into().unwrap()
108+
);
99109
match builder.body(BodyWithTrailers::new(Bytes::new(), None)) {
100110
Ok(response) => Ok(Self { response }),
101111
Err(_) => Err(MagnusError::new(magnus::exception::runtime_error(), "Failed to create response"))
@@ -137,8 +147,7 @@ impl GrpcResponse {
137147

138148
let mut trailers = HeaderMap::new();
139149
trailers.insert("grpc-status", status.to_string().parse().unwrap());
140-
trailers.insert("grpc-accept-encoding", "identity".parse().unwrap());
141-
trailers.insert("accept-encoding", "identity".parse().unwrap());
150+
trailers.insert("grpc-accept-encoding", "identity,gzip,deflate,zstd".parse().unwrap());
142151

143152
Ok(Self { response: builder.body(BodyWithTrailers::new(framed_message, Some(trailers))).unwrap() })
144153
}
@@ -153,8 +162,7 @@ impl GrpcResponse {
153162

154163
let mut trailers = HeaderMap::new();
155164
trailers.insert("grpc-status", status_num.to_string().parse().unwrap());
156-
trailers.insert("grpc-accept-encoding", "identity".parse().unwrap());
157-
trailers.insert("accept-encoding", "identity".parse().unwrap());
165+
trailers.insert("grpc-accept-encoding", "identity,gzip,deflate,zstd".parse().unwrap());
158166

159167
if !message_str.is_empty() {
160168
trailers.insert("grpc-message", message_str.parse().unwrap());

0 commit comments

Comments
 (0)