Skip to content

Commit 654557a

Browse files
authored
Allow configuring tcp-keepalive duration for rpc clients and servers
1 parent 5fd8197 commit 654557a

File tree

5 files changed

+43
-7
lines changed

5 files changed

+43
-7
lines changed

example-messagepack/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
3838
1 << 20,
3939
128,
4040
64 << 10,
41+
None,
4142
)
4243
.await?;
4344
server.set_max_queued_outbound_messages(512);

example-proto-tls/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
7373
1 << 20,
7474
128,
7575
64 << 10,
76+
None,
7677
)
7778
.await?;
7879
server.set_max_queued_outbound_messages(512);

example-proto/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
3939
1 << 20,
4040
128,
4141
64 << 10,
42+
None,
4243
)
4344
.await?;
4445
server.set_max_queued_outbound_messages(512);

protosocket-rpc/src/client/configuration.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use std::{future::Future, net::SocketAddr, sync::Arc};
2-
31
use protosocket::Connection;
2+
use socket2::TcpKeepalive;
3+
use std::{future::Future, net::SocketAddr, sync::Arc};
44
use tokio::{net::TcpStream, sync::mpsc};
55
use tokio_rustls::rustls::pki_types::ServerName;
66

@@ -172,6 +172,7 @@ pub struct Configuration<TStreamConnector> {
172172
max_buffer_length: usize,
173173
buffer_allocation_increment: usize,
174174
max_queued_outbound_messages: usize,
175+
tcp_keepalive_duration: Option<std::time::Duration>,
175176
stream_connector: TStreamConnector,
176177
}
177178

@@ -185,6 +186,7 @@ where
185186
max_buffer_length: 4 * (1 << 20), // 4 MiB
186187
buffer_allocation_increment: 1 << 20,
187188
max_queued_outbound_messages: 256,
189+
tcp_keepalive_duration: None,
188190
stream_connector,
189191
}
190192
}
@@ -209,6 +211,13 @@ where
209211
pub fn buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
210212
self.buffer_allocation_increment = buffer_allocation_increment;
211213
}
214+
215+
/// The duration to set for tcp_keepalive on the underlying socket.
216+
///
217+
/// Default: None
218+
pub fn tcp_keepalive_duration(&mut self, tcp_keepalive_duration: Option<std::time::Duration>) {
219+
self.tcp_keepalive_duration = tcp_keepalive_duration;
220+
}
212221
}
213222

214223
/// Connect a new protosocket rpc client to a server
@@ -233,8 +242,25 @@ where
233242
{
234243
log::trace!("new client {address}, {configuration:?}");
235244

236-
let stream = tokio::net::TcpStream::connect(address).await?;
237-
stream.set_nodelay(true)?;
245+
let socket = socket2::Socket::new(
246+
match address {
247+
SocketAddr::V4(_) => socket2::Domain::IPV4,
248+
SocketAddr::V6(_) => socket2::Domain::IPV6,
249+
},
250+
socket2::Type::STREAM,
251+
None,
252+
)?;
253+
254+
let mut tcp_keepalive = TcpKeepalive::new();
255+
if let Some(duration) = configuration.tcp_keepalive_duration {
256+
tcp_keepalive = tcp_keepalive.with_time(duration);
257+
}
258+
259+
socket.set_nonblocking(true)?;
260+
socket.set_tcp_nodelay(true)?;
261+
socket.set_tcp_keepalive(&tcp_keepalive)?;
262+
263+
let stream = TcpStream::from_std(socket.into())?;
238264

239265
let message_reactor: RpcCompletionReactor<
240266
Deserializer::Message,

protosocket-rpc/src/server/socket_server.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use protosocket::Connection;
2+
use socket2::TcpKeepalive;
13
use std::ffi::c_int;
24
use std::future::Future;
35
use std::io::Error;
46
use std::pin::Pin;
57
use std::task::Context;
68
use std::task::Poll;
7-
8-
use protosocket::Connection;
9+
use std::time::Duration;
910
use tokio::sync::mpsc;
1011

1112
use super::connection_server::RpcConnectionServer;
@@ -44,6 +45,7 @@ where
4445
buffer_allocation_increment: usize,
4546
max_queued_outbound_messages: usize,
4647
listen_backlog: u32,
48+
tcp_keepalive_duration: Option<Duration>,
4749
) -> crate::Result<Self> {
4850
let socket = socket2::Socket::new(
4951
match address {
@@ -54,9 +56,14 @@ where
5456
None,
5557
)?;
5658

59+
let mut tcp_keepalive = TcpKeepalive::new();
60+
if let Some(duration) = tcp_keepalive_duration {
61+
tcp_keepalive = tcp_keepalive.with_time(duration);
62+
}
63+
5764
socket.set_nonblocking(true)?;
5865
socket.set_tcp_nodelay(true)?;
59-
socket.set_keepalive(true)?;
66+
socket.set_tcp_keepalive(&tcp_keepalive)?;
6067
socket.set_reuse_port(true)?;
6168
socket.set_reuse_address(true)?;
6269

0 commit comments

Comments
 (0)