Skip to content

Commit 3a12160

Browse files
More tests for the grpc behaviour, but looking good so far....
1 parent 894ed0c commit 3a12160

File tree

5 files changed

+133
-41
lines changed

5 files changed

+133
-41
lines changed

ext/hyper_ruby/src/grpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,5 +124,5 @@ pub fn create_grpc_error_response(http_status: u16, grpc_status: u32, message: &
124124
}
125125

126126
// Create response with custom body that includes trailers
127-
builder.body(BodyWithTrailers::new(Bytes::new(), trailers)).unwrap()
127+
builder.body(BodyWithTrailers::new(Bytes::new(), Some(trailers))).unwrap()
128128
}

ext/hyper_ruby/src/lib.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tokio::task::JoinHandle;
2323
use crossbeam_channel;
2424

2525
use hyper::service::service_fn;
26-
use hyper::{Error, Request as HyperRequest, Response as HyperResponse, StatusCode, Method, header::HeaderMap};
26+
use hyper::{Error, Request as HyperRequest, Response as HyperResponse, StatusCode};
2727
use hyper::body::Incoming;
2828
use hyper_util::rt::TokioIo;
2929
use hyper_util::server::conn::auto;
@@ -35,6 +35,9 @@ use log::{debug, info, warn};
3535

3636
use env_logger;
3737
use crate::response::BodyWithTrailers;
38+
use std::sync::Once;
39+
40+
static LOGGER_INIT: Once = Once::new();
3841

3942
#[global_allocator]
4043
static GLOBAL: Jemalloc = Jemalloc;
@@ -43,13 +46,15 @@ static GLOBAL: Jemalloc = Jemalloc;
4346
struct ServerConfig {
4447
bind_address: String,
4548
tokio_threads: Option<usize>,
49+
debug: bool,
4650
}
4751

4852
impl ServerConfig {
4953
fn new() -> Self {
5054
Self {
5155
bind_address: String::from("127.0.0.1:3000"),
5256
tokio_threads: None,
57+
debug: false,
5358
}
5459
}
5560
}
@@ -92,6 +97,19 @@ impl Server {
9297
server_config.tokio_threads = Some(usize::try_convert(tokio_threads)?);
9398
}
9499

100+
if let Some(debug) = config.get(magnus::Symbol::new("debug")) {
101+
server_config.debug = bool::try_convert(debug)?;
102+
}
103+
104+
// Initialize logging if debug is enabled, but only do it once
105+
if server_config.debug {
106+
LOGGER_INIT.call_once(|| {
107+
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hyper=debug,h2=debug"))
108+
.write_style(env_logger::WriteStyle::Always)
109+
.init();
110+
});
111+
}
112+
95113
Ok(())
96114
}
97115

@@ -116,26 +134,26 @@ impl Server {
116134
Ok(work_request) => {
117135
let hyper_request = work_request.request;
118136

119-
println!("\nProcessing request:");
120-
println!(" Method: {}", hyper_request.method());
121-
println!(" Path: {}", hyper_request.uri().path());
122-
println!(" Headers: {:?}", hyper_request.headers());
137+
debug!("Processing request:");
138+
debug!(" Method: {}", hyper_request.method());
139+
debug!(" Path: {}", hyper_request.uri().path());
140+
debug!(" Headers: {:?}", hyper_request.headers());
123141

124142
// Convert to appropriate request type
125143
let value = if grpc::is_grpc_request(&hyper_request) {
126-
println!("Request identified as gRPC");
144+
debug!("Request identified as gRPC");
127145
if let Some(grpc_request) = GrpcRequest::new(hyper_request) {
128146
grpc_request.into_value()
129147
} else {
130-
println!("Failed to create GrpcRequest");
148+
warn!("Failed to create GrpcRequest");
131149
// Invalid gRPC request path
132150
let response = GrpcResponse::error(3_u32.into_value(), RString::new("Invalid gRPC request path")).unwrap()
133151
.into_hyper_response();
134-
work_request.response_tx.send(response).unwrap_or_else(|e| println!("Failed to send response: {:?}", e));
152+
work_request.response_tx.send(response).unwrap_or_else(|e| warn!("Failed to send response: {:?}", e));
135153
continue;
136154
}
137155
} else {
138-
println!("Request identified as HTTP");
156+
debug!("Request identified as HTTP");
139157
Request::new(hyper_request).into_value()
140158
};
141159

@@ -147,19 +165,19 @@ impl Server {
147165
} else if let Ok(http_response) = Obj::<Response>::try_convert(result) {
148166
(*http_response).clone().into_hyper_response()
149167
} else {
150-
println!("Block returned invalid response type");
168+
warn!("Block returned invalid response type");
151169
create_error_response("Internal server error")
152170
}
153171
},
154172
Err(e) => {
155-
println!("Block call failed: {:?}", e);
173+
warn!("Block call failed: {:?}", e);
156174
create_error_response("Internal server error")
157175
}
158176
};
159177

160178
match work_request.response_tx.send(hyper_response) {
161179
Ok(_) => (),
162-
Err(e) => println!("Failed to send response: {:?}", e),
180+
Err(e) => warn!("Failed to send response: {:?}", e),
163181
}
164182
}
165183
Err(_) => {
@@ -253,7 +271,7 @@ impl Server {
253271
if bind_address.starts_with("unix:") {
254272
let path = bind_address.trim_start_matches("unix:");
255273
std::fs::remove_file(path).unwrap_or_else(|e| {
256-
println!("Failed to remove socket file: {:?}", e);
274+
warn!("Failed to remove socket file: {:?}", e);
257275
});
258276
}
259277

@@ -349,20 +367,13 @@ fn create_error_response(error_message: &str) -> HyperResponse<BodyWithTrailers>
349367
let builder = HyperResponse::builder()
350368
.status(StatusCode::INTERNAL_SERVER_ERROR)
351369
.header("content-type", "text/plain");
352-
353-
let trailers = HeaderMap::new();
354370

355-
builder.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), trailers))
371+
builder.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None))
356372
.unwrap()
357373
}
358374

359375
#[magnus::init]
360376
fn init(ruby: &Ruby) -> Result<(), MagnusError> {
361-
// Initialize logging
362-
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hyper=debug,h2=debug"))
363-
.write_style(env_logger::WriteStyle::Always)
364-
.init();
365-
366377
let module = ruby.define_module("HyperRuby")?;
367378

368379
let server_class = module.define_class("Server", ruby.class_object())?;

ext/hyper_ruby/src/request.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use hyper::Request as HyperRequest;
88
use rb_sys::{rb_str_set_len, rb_str_modify, rb_str_modify_expand, rb_str_capacity, RSTRING_PTR, VALUE};
99

1010
use crate::grpc;
11+
use log::debug;
1112

1213
// Trait for common buffer filling behavior
1314
trait FillBuffer {
@@ -144,15 +145,15 @@ impl Request {
144145

145146
impl GrpcRequest {
146147
pub fn new(request: HyperRequest<Bytes>) -> Option<Self> {
147-
println!("Creating GrpcRequest from path: {}", request.uri().path());
148+
debug!("Creating GrpcRequest from path: {}", request.uri().path());
148149

149150
// Path format could be "/Echo" or "/echo.Echo/Echo" - handle both
150151
let path = request.uri().path();
151152
let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
152-
println!(" Path parts: {:?}", parts);
153+
debug!(" Path parts: {:?}", parts);
153154

154155
if parts.is_empty() {
155-
println!(" Failed: Empty path");
156+
debug!(" Failed: Empty path");
156157
return None;
157158
}
158159

@@ -164,7 +165,7 @@ impl GrpcRequest {
164165
(format!("echo.{}", parts[0]), parts[0].to_string())
165166
};
166167

167-
println!(" Extracted service: {}, method: {}", service, method);
168+
debug!(" Extracted service: {}, method: {}", service, method);
168169

169170
Some(Self {
170171
request,

ext/hyper_ruby/src/response.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ impl std::error::Error for ResponseError {}
2121
pub struct BodyWithTrailers {
2222
data: Bytes,
2323
trailers_sent: bool,
24-
trailers: HeaderMap,
24+
trailers: Option<HeaderMap>,
2525
}
2626

2727
impl BodyWithTrailers {
28-
pub fn new(data: Bytes, trailers: HeaderMap) -> Self {
28+
pub fn new(data: Bytes, trailers: Option<HeaderMap>) -> Self {
2929
Self {
3030
data,
3131
trailers_sent: false,
@@ -54,7 +54,9 @@ impl Body for BodyWithTrailers {
5454

5555
if !self.trailers_sent {
5656
self.trailers_sent = true;
57-
return std::task::Poll::Ready(Some(Ok(Frame::trailers(self.trailers.clone()))));
57+
if let Some(trailers) = &self.trailers {
58+
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers.clone()))));
59+
}
5860
}
5961

6062
std::task::Poll::Ready(None)
@@ -85,19 +87,16 @@ impl Response {
8587
Ok(ForEach::Continue)
8688
}).unwrap();
8789

88-
let mut trailers = HeaderMap::new();
89-
trailers.insert("grpc-status", "0".parse().unwrap());
90-
9190
if body.len() > 0 {
9291
unsafe {
9392
let rust_body = Bytes::copy_from_slice(body.as_slice());
94-
match builder.body(BodyWithTrailers::new(rust_body, trailers)) {
93+
match builder.body(BodyWithTrailers::new(rust_body, None)) {
9594
Ok(response) => Ok(Self { response }),
9695
Err(_) => Err(MagnusError::new(magnus::exception::runtime_error(), "Failed to create response"))
9796
}
9897
}
9998
} else {
100-
match builder.body(BodyWithTrailers::new(Bytes::new(), trailers)) {
99+
match builder.body(BodyWithTrailers::new(Bytes::new(), None)) {
101100
Ok(response) => Ok(Self { response }),
102101
Err(_) => Err(MagnusError::new(magnus::exception::runtime_error(), "Failed to create response"))
103102
}
@@ -141,7 +140,7 @@ impl GrpcResponse {
141140
trailers.insert("grpc-accept-encoding", "identity".parse().unwrap());
142141
trailers.insert("accept-encoding", "identity".parse().unwrap());
143142

144-
Ok(Self { response: builder.body(BodyWithTrailers::new(framed_message, trailers)).unwrap() })
143+
Ok(Self { response: builder.body(BodyWithTrailers::new(framed_message, Some(trailers))).unwrap() })
145144
}
146145

147146
pub fn error(status: Value, message: RString) -> Result<Self, MagnusError> {
@@ -161,7 +160,7 @@ impl GrpcResponse {
161160
trailers.insert("grpc-message", message_str.parse().unwrap());
162161
}
163162

164-
Ok(Self { response: builder.body(BodyWithTrailers::new(Bytes::new(), trailers)).unwrap() })
163+
Ok(Self { response: builder.body(BodyWithTrailers::new(Bytes::new(), Some(trailers))).unwrap() })
165164
}
166165

167166
pub fn status(&self) -> u16 {

test/test_hyper_ruby.rb

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def test_head
9999

100100
def test_blocking
101101
buffer = String.new(capacity: 1024)
102-
with_server(-> (request) { handler_grpc(request, buffer) }) do |client|
102+
with_server(-> (request) { handler_accept(request, buffer) }) do |client|
103103
gets
104104
end
105105
end
@@ -146,20 +146,77 @@ def test_grpc_request
146146
# Create request message
147147
request = Echo::EchoRequest.new(message: "Hello GRPC")
148148

149-
puts "\n=== Starting gRPC request ==="
150149
# Make the gRPC call
151150
response = stub.echo(request)
152-
puts "=== gRPC request complete ===\n"
153151

154152
# Check the response
155153
assert_instance_of Echo::EchoResponse, response
156154
assert_equal "Hello GRPC response", response.message
157155
end
158156
end
159157

158+
def test_concurrent_grpc_requests
159+
buffer = String.new(capacity: 1024)
160+
with_server(-> (request) { handler_grpc(request, buffer) }) do |_client|
161+
# Create a gRPC stub using the standard Ruby gRPC client
162+
stub = Echo::Echo::Stub.new(
163+
"127.0.0.1:3010",
164+
:this_channel_is_insecure,
165+
channel_args: {
166+
'grpc.enable_http_proxy' => 0
167+
}
168+
)
169+
170+
# Create multiple threads to send requests concurrently
171+
threads = 5.times.map do |i|
172+
Thread.new do
173+
request = Echo::EchoRequest.new(message: "Hello GRPC #{i}")
174+
response = stub.echo(request)
175+
[i, response]
176+
end
177+
end
178+
179+
# Collect and verify all responses
180+
responses = threads.map(&:value)
181+
responses.each do |i, response|
182+
assert_instance_of Echo::EchoResponse, response
183+
assert_equal "Hello GRPC #{i} response", response.message
184+
end
185+
end
186+
end
187+
188+
def test_request_type_detection
189+
with_server(-> (request) { handler_detect_type(request) }) do |client|
190+
# Test regular HTTP request
191+
http_response = client.post("/echo", body: "Hello HTTP")
192+
assert_equal 200, http_response.status
193+
assert_equal "text/plain", http_response.headers["content-type"]
194+
assert_equal "HTTP request: Hello HTTP", http_response.body
195+
196+
# Test gRPC request using the gRPC client
197+
stub = Echo::Echo::Stub.new(
198+
"127.0.0.1:3010",
199+
:this_channel_is_insecure,
200+
channel_args: {
201+
'grpc.enable_http_proxy' => 0
202+
}
203+
)
204+
205+
request = Echo::EchoRequest.new(message: "Hello gRPC")
206+
grpc_response = stub.echo(request)
207+
208+
assert_instance_of Echo::EchoResponse, grpc_response
209+
assert_equal "gRPC request: Hello gRPC", grpc_response.message
210+
end
211+
end
212+
160213
def with_server(request_handler, &block)
161214
server = HyperRuby::Server.new
162-
server.configure({ bind_address: "127.0.0.1:3010",tokio_threads: 1 })
215+
server.configure({
216+
bind_address: "127.0.0.1:3010",
217+
tokio_threads: 1,
218+
#debug: true
219+
})
163220
server.start
164221

165222
# Create ruby worker threads that process requests;
@@ -185,7 +242,11 @@ def with_server(request_handler, &block)
185242

186243
def with_unix_socket_server(request_handler, &block)
187244
server = HyperRuby::Server.new
188-
server.configure({ bind_address: "unix:/tmp/hyper_ruby_test.sock", tokio_threads: 1 })
245+
server.configure({
246+
bind_address: "unix:/tmp/hyper_ruby_test.sock",
247+
tokio_threads: 1,
248+
#debug: true
249+
})
189250
server.start
190251

191252
# Create ruby worker threads that process requests;
@@ -252,4 +313,24 @@ def handler_grpc(request, buffer)
252313
# Return gRPC response
253314
HyperRuby::GrpcResponse.new(0, response_data)
254315
end
316+
317+
def handler_detect_type(request)
318+
if request.is_a?(HyperRuby::GrpcRequest)
319+
# Handle gRPC request
320+
buffer = String.new(capacity: 1024)
321+
request.fill_body(buffer)
322+
echo_request = Echo::EchoRequest.decode(buffer)
323+
324+
echo_response = Echo::EchoResponse.new(message: "gRPC request: #{echo_request.message}")
325+
response_data = Echo::EchoResponse.encode(echo_response)
326+
327+
HyperRuby::GrpcResponse.new(0, response_data)
328+
else
329+
# Handle regular HTTP request
330+
buffer = String.new(capacity: 1024)
331+
request.fill_body(buffer)
332+
333+
HyperRuby::Response.new(200, { 'Content-Type' => 'text/plain' }, "HTTP request: #{buffer}")
334+
end
335+
end
255336
end

0 commit comments

Comments
 (0)