Skip to content

Commit b8a7928

Browse files
Merge pull request #3 from BetterStackHQ/alistair/max-connection-age
Add support for max_connection_age to limit GRPC connections.
2 parents 81eafe3 + 5c3ddb2 commit b8a7928

File tree

7 files changed

+271
-18
lines changed

7 files changed

+271
-18
lines changed

.node-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
22.18.0

Gemfile.lock

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ GEM
77
remote: https://rubygems.org/
88
specs:
99
concurrent-ruby (1.3.4)
10-
google-protobuf (3.25.6)
11-
google-protobuf (3.25.6-arm64-darwin)
10+
google-protobuf (3.25.8)
1211
googleapis-common-protos-types (1.18.0)
1312
google-protobuf (>= 3.18, < 5.a)
1413
grpc (1.70.1)

bin/bench-server.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
bind_address: ENV.fetch("BIND_ADDRESS", "127.0.0.1:3000"),
1515
tokio_threads: ENV.fetch("TOKIO_THREADS", "1").to_i,
1616
debug: ENV.fetch("DEBUG", "false") == "true",
17-
recv_timeout: ENV.fetch("RECV_TIMEOUT", "30000").to_i
17+
recv_timeout: ENV.fetch("RECV_TIMEOUT", "30000").to_i,
18+
max_connection_age: ENV.fetch("MAX_CONNECTION_AGE", "30000").to_i
1819
}
1920
server.configure(config)
2021

2122
puts "Starting server with config: #{config}"
2223

2324
accept_response = HyperRuby::Response.new(
24-
200,
25+
202,
2526
{ "Content-Type" => "application/json" },
2627
{ "message" => "Accepted" }.to_json
2728
)

ext/hyper_ruby/src/lib.rs

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
1616

1717
use std::cell::RefCell;
1818
use std::net::SocketAddr;
19+
use std::sync::atomic::{AtomicU64, Ordering};
1920

2021
use tokio::net::{TcpListener, UnixListener};
2122

@@ -80,6 +81,7 @@ struct ServerConfig {
8081
recv_timeout: u64,
8182
channel_capacity: usize,
8283
send_timeout: u64,
84+
max_connection_age: Option<u64>,
8385
}
8486

8587
impl ServerConfig {
@@ -91,6 +93,7 @@ impl ServerConfig {
9193
recv_timeout: 30000, // Default 30 second timeout
9294
channel_capacity: 5000, // Default capacity for worker channel
9395
send_timeout: 1000, // Default 1 second timeout for send backpressure
96+
max_connection_age: None, // No limit by default
9497
}
9598
}
9699
}
@@ -109,6 +112,7 @@ struct Server {
109112
work_tx: RefCell<Option<Arc<crossbeam_channel::Sender<RequestWithCompletion>>>>,
110113
runtime: RefCell<Option<Arc<tokio::runtime::Runtime>>>,
111114
shutdown: RefCell<Option<broadcast::Sender<()>>>,
115+
total_connections: Arc<AtomicU64>,
112116
}
113117

114118
impl Server {
@@ -121,9 +125,14 @@ impl Server {
121125
work_tx: RefCell::new(None),
122126
runtime: RefCell::new(None),
123127
shutdown: RefCell::new(None),
128+
total_connections: Arc::new(AtomicU64::new(0)),
124129
}
125130
}
126131

132+
pub fn total_connections(&self) -> u64 {
133+
self.total_connections.load(Ordering::Relaxed)
134+
}
135+
127136
pub fn configure(&self, config: magnus::RHash) -> Result<(), MagnusError> {
128137
let mut server_config = self.config.borrow_mut();
129138
if let Some(bind_address) = config.get(magnus::Symbol::new("bind_address")) {
@@ -150,6 +159,10 @@ impl Server {
150159
server_config.send_timeout = u64::try_convert(send_timeout)?;
151160
}
152161

162+
if let Some(max_connection_age) = config.get(magnus::Symbol::new("max_connection_age")) {
163+
server_config.max_connection_age = Some(u64::try_convert(max_connection_age)?);
164+
}
165+
153166
// Initialize logging if not already initialized
154167
LOGGER_INIT.call_once(|| {
155168
let mut builder = env_logger::Builder::from_env(env_logger::Env::default());
@@ -263,7 +276,9 @@ impl Server {
263276
*self.work_tx.borrow_mut() = Some(work_tx.clone());
264277

265278
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
266-
*self.shutdown.borrow_mut() = Some(shutdown_tx);
279+
*self.shutdown.borrow_mut() = Some(shutdown_tx.clone());
280+
281+
let total_connections = self.total_connections.clone();
267282

268283
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
269284

@@ -346,17 +361,19 @@ impl Server {
346361
};
347362

348363
// Now that we have successfully bound, spawn the server task
364+
let max_connection_age = config.max_connection_age;
349365
let server_task = tokio::spawn(async move {
350366
let graceful_shutdown = GracefulShutdown::new();
351367
let mut shutdown_rx = shutdown_rx;
352368

353369
loop {
354370
tokio::select! {
355-
Ok((stream, _)) = listener.accept() => {
371+
Ok((stream, _)) = listener.accept() => {
372+
total_connections.fetch_add(1, Ordering::Relaxed);
356373
info!("New connection established");
357-
374+
358375
let io = TokioIo::new(stream);
359-
376+
360377
debug!("Setting up connection");
361378

362379
let builder = builder.clone();
@@ -365,13 +382,50 @@ impl Server {
365382
debug!("Service handling request");
366383
handle_request(req, work_tx.clone(), config.recv_timeout, config.send_timeout)
367384
}));
368-
let fut = graceful_shutdown.watch(conn.into_owned());
369-
tokio::task::spawn(async move {
370-
if let Err(err) = fut.await {
371-
warn!("Error serving connection: {:?}", err);
372-
}
373-
});
374-
},
385+
// If max_connection_age is set, handle the connection with a timeout
386+
// but still integrate with server-wide graceful shutdown via broadcast channel
387+
if let Some(max_age_ms) = max_connection_age {
388+
let conn = conn.into_owned();
389+
let mut conn_shutdown_rx = shutdown_tx.subscribe();
390+
tokio::task::spawn(async move {
391+
tokio::pin!(conn);
392+
let sleep = tokio::time::sleep(std::time::Duration::from_millis(max_age_ms));
393+
tokio::pin!(sleep);
394+
let mut graceful_shutdown_started = false;
395+
396+
loop {
397+
tokio::select! {
398+
result = conn.as_mut() => {
399+
if let Err(err) = result {
400+
warn!("Error serving connection: {:?}", err);
401+
}
402+
break;
403+
}
404+
_ = &mut sleep, if !graceful_shutdown_started => {
405+
debug!("Connection reached max age ({}ms), sending GOAWAY", max_age_ms);
406+
conn.as_mut().graceful_shutdown();
407+
graceful_shutdown_started = true;
408+
// Continue the loop to let the connection drain
409+
}
410+
_ = conn_shutdown_rx.recv(), if !graceful_shutdown_started => {
411+
debug!("Server shutdown requested, sending GOAWAY to connection");
412+
conn.as_mut().graceful_shutdown();
413+
graceful_shutdown_started = true;
414+
// Continue the loop to let the connection drain
415+
}
416+
}
417+
}
418+
});
419+
} else {
420+
// No max age, use the graceful shutdown watcher
421+
let fut = graceful_shutdown.watch(conn.into_owned());
422+
tokio::task::spawn(async move {
423+
if let Err(err) = fut.await {
424+
warn!("Error serving connection: {:?}", err);
425+
}
426+
});
427+
}
428+
},
375429
_ = shutdown_rx.recv() => {
376430
debug!("Graceful shutdown requested; shutting down");
377431
break;
@@ -589,6 +643,7 @@ fn init(ruby: &Ruby) -> Result<(), MagnusError> {
589643
server_class.define_method("start", method!(Server::start, 0))?;
590644
server_class.define_method("stop", method!(Server::stop, 0))?;
591645
server_class.define_method("run_worker", method!(Server::run_worker, 0))?;
646+
server_class.define_method("total_connections", method!(Server::total_connections, 0))?;
592647

593648
let response_class = module.define_class("Response", ruby.class_object())?;
594649
response_class.define_singleton_method("new", function!(Response::new, 3))?;

test/test_grpc.rb

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,101 @@ def test_grpc_compression
144144
'grpc.enable_http_proxy' => 0,
145145
}.merge(compression_channel_args)
146146
)
147-
147+
148148
request = Echo::EchoRequest.new(message: "Hello Compressed GRPC " + ("a" * 10000))
149149
response = stub.echo(request)
150-
150+
151151
assert_instance_of Echo::EchoResponse, response
152152
assert_equal "Decompressed: Hello Compressed GRPC " + ("a" * 10000), response.message
153153
end
154154
end
155155

156+
def test_max_connection_age_sends_goaway
157+
# Test that max_connection_age causes the server to send GOAWAY after the configured time,
158+
# forcing the client to establish a new connection
159+
buffer = String.new(capacity: 1024)
160+
server_config = {
161+
bind_address: "127.0.0.1:3010",
162+
tokio_threads: 1,
163+
max_connection_age: 500 # 500ms max connection age
164+
}
165+
166+
with_configured_server(server_config, -> (request) { handler_grpc(request, buffer) }) do |_client, server|
167+
stub = Echo::Echo::Stub.new(
168+
"127.0.0.1:3010",
169+
:this_channel_is_insecure,
170+
channel_args: {
171+
'grpc.enable_http_proxy' => 0
172+
}
173+
)
174+
175+
# Record initial connection count
176+
initial_connections = server.total_connections
177+
178+
# First request establishes a connection
179+
request = Echo::EchoRequest.new(message: "Request 1")
180+
response = stub.echo(request)
181+
assert_equal "Request 1 response", response.message
182+
183+
# Should have one connection now
184+
assert_equal initial_connections + 1, server.total_connections, "First request should establish one connection"
185+
186+
# Wait for max_connection_age to expire and GOAWAY to be sent
187+
sleep 0.7
188+
189+
# Make another request - gRPC client should establish a new connection after GOAWAY
190+
request = Echo::EchoRequest.new(message: "Request 2")
191+
response = stub.echo(request)
192+
assert_equal "Request 2 response", response.message
193+
194+
# Should have a second connection now (client reconnected after GOAWAY)
195+
assert_equal initial_connections + 2, server.total_connections, "Second request after GOAWAY should establish a new connection"
196+
end
197+
end
198+
199+
def test_long_max_connection_age_reuses_connection
200+
# Test that with a long max_connection_age, the connection is reused
201+
# (opposite of test_max_connection_age_sends_goaway)
202+
buffer = String.new(capacity: 1024)
203+
server_config = {
204+
bind_address: "127.0.0.1:3010",
205+
tokio_threads: 1,
206+
max_connection_age: 60000 # 60 seconds - much longer than test duration
207+
}
208+
209+
with_configured_server(server_config, -> (request) { handler_grpc(request, buffer) }) do |_client, server|
210+
stub = Echo::Echo::Stub.new(
211+
"127.0.0.1:3010",
212+
:this_channel_is_insecure,
213+
channel_args: {
214+
'grpc.enable_http_proxy' => 0
215+
}
216+
)
217+
218+
# Record initial connection count
219+
initial_connections = server.total_connections
220+
221+
# First request establishes a connection
222+
request = Echo::EchoRequest.new(message: "Request 1")
223+
response = stub.echo(request)
224+
assert_equal "Request 1 response", response.message
225+
226+
# Should have one connection now
227+
assert_equal initial_connections + 1, server.total_connections, "First request should establish one connection"
228+
229+
# Wait a bit (but less than max_connection_age)
230+
sleep 0.3
231+
232+
# Make another request - should reuse the same connection
233+
request = Echo::EchoRequest.new(message: "Request 2")
234+
response = stub.echo(request)
235+
assert_equal "Request 2 response", response.message
236+
237+
# Should still have only one connection (connection reused, no GOAWAY sent)
238+
assert_equal initial_connections + 1, server.total_connections, "Second request should reuse existing connection"
239+
end
240+
end
241+
156242
private
157243

158244
def handler_grpc(request, buffer)

test/test_helper.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def with_configured_server(config, request_handler, &block)
3131
end
3232

3333
client = HTTPX.with(origin: "http://127.0.0.1:3010")
34-
block.call(client)
34+
block.call(client, server)
3535

3636
ensure
3737
server.stop if server

0 commit comments

Comments
 (0)