Skip to content

Commit f44cd4e

Browse files
committed
make some socket and buffer parameters configurable
hack: use raw sockets, set config like gRPC make socket and buffer paramaters a bit more configurable
1 parent c9d7a18 commit f44cd4e

File tree

8 files changed

+67
-9
lines changed

8 files changed

+67
-9
lines changed

Cargo.lock

Lines changed: 14 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ rmp-serde = { version = "1.3" }
4444
rustls-pemfile = { version = "2.2" }
4545
rustls-pki-types = { version = "1.12" }
4646
serde = { version = "1.0" }
47+
socket2 = { version = "0.6" }
4748
thiserror = { version = "1.0" }
4849
tokio = { version = "1.39", features = ["net", "rt"] }
4950
tokio-rustls = { version = "0.26" }

protosocket-connection/src/connection.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct Connection<Bindings: ConnectionBindings> {
3838
receive_buffer_unread_index: usize,
3939
receive_buffer: Vec<u8>,
4040
max_buffer_length: usize,
41+
buffer_allocation_increment: usize,
4142
deserializer: Bindings::Deserializer,
4243
serializer: Bindings::Serializer,
4344
reactor: Bindings::Reactor,
@@ -125,6 +126,7 @@ where
125126
deserializer: Bindings::Deserializer,
126127
serializer: Bindings::Serializer,
127128
max_buffer_length: usize,
129+
buffer_allocation_increment: usize,
128130
max_queued_send_messages: usize,
129131
outbound_messages: mpsc::Receiver<<Bindings::Serializer as Serializer>::Message>,
130132
reactor: Bindings::Reactor,
@@ -140,6 +142,7 @@ where
140142
receive_buffer: Vec::new(),
141143
max_buffer_length,
142144
receive_buffer_unread_index: 0,
145+
buffer_allocation_increment,
143146
deserializer,
144147
serializer,
145148
reactor,
@@ -148,12 +151,11 @@ where
148151

149152
/// ensure buffer state and read from the inbound stream
150153
fn poll_read_inbound(&mut self, context: &mut Context<'_>) -> ReadBufferState {
151-
const BUFFER_INCREMENT: usize = 1 << 20;
152154
if self.receive_buffer.len() < self.max_buffer_length
153-
&& self.receive_buffer.len() - self.receive_buffer_unread_index < BUFFER_INCREMENT
155+
&& self.receive_buffer.len() - self.receive_buffer_unread_index < self.buffer_allocation_increment
154156
{
155157
self.receive_buffer
156-
.resize(self.receive_buffer.len() + BUFFER_INCREMENT, 0);
158+
.resize(self.receive_buffer.len() + self.buffer_allocation_increment, 0);
157159
}
158160

159161
if 0 < self.receive_buffer.len() - self.receive_buffer_unread_index {

protosocket-prost/src/prost_client_registry.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{ProstClientConnectionBindings, ProstSerializer};
99
#[derive(Debug, Clone)]
1010
pub struct ClientRegistry<TConnector = TcpConnector> {
1111
max_buffer_length: usize,
12+
buffer_allocation_interval: usize,
1213
max_queued_outbound_messages: usize,
1314
runtime: tokio::runtime::Handle,
1415
stream_connector: TConnector,
@@ -44,6 +45,7 @@ where
4445
Self {
4546
max_buffer_length: 4 * (1 << 20),
4647
max_queued_outbound_messages: 256,
48+
buffer_allocation_interval: 1 << 20,
4749
runtime,
4850
stream_connector: connector,
4951
}
@@ -91,6 +93,7 @@ where
9193
ProstSerializer::default(),
9294
ProstSerializer::default(),
9395
self.max_buffer_length,
96+
self.buffer_allocation_interval,
9497
self.max_queued_outbound_messages,
9598
outbound_messages,
9699
message_reactor,

protosocket-rpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ futures = { workspace = true }
1717
k-lock = { workspace = true }
1818
log = { workspace = true }
1919
rustls-pki-types = { workspace = true }
20+
socket2 = { workspace = true, features = ["all"] }
2021
tokio = { workspace = true }
2122
tokio-rustls = { workspace = true }
2223
tokio-util = { workspace = true }

protosocket-rpc/src/client/configuration.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ impl tokio_rustls::rustls::client::danger::ServerCertVerifier for DoNothingVerif
170170
#[derive(Debug, Clone)]
171171
pub struct Configuration<TStreamConnector> {
172172
max_buffer_length: usize,
173+
buffer_allocation_interval: usize,
173174
max_queued_outbound_messages: usize,
174175
stream_connector: TStreamConnector,
175176
}
@@ -182,6 +183,7 @@ where
182183
log::trace!("new client configuration");
183184
Self {
184185
max_buffer_length: 4 * (1 << 20), // 4 MiB
186+
buffer_allocation_interval: 1 << 20,
185187
max_queued_outbound_messages: 256,
186188
stream_connector,
187189
}
@@ -200,6 +202,13 @@ where
200202
pub fn max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
201203
self.max_queued_outbound_messages = max_queued_outbound_messages;
202204
}
205+
206+
/// Amount of buffer to allocate at one time when buffer needs extension.
207+
///
208+
/// Default: 1MiB
209+
pub fn buffer_allocation_interval(&mut self, buffer_allocation_interval: usize) {
210+
self.buffer_allocation_interval = buffer_allocation_interval;
211+
}
203212
}
204213

205214
/// Connect a new protosocket rpc client to a server
@@ -247,6 +256,7 @@ where
247256
Deserializer::default(),
248257
Serializer::default(),
249258
configuration.max_buffer_length,
259+
configuration.buffer_allocation_interval,
250260
configuration.max_queued_outbound_messages,
251261
outbound_messages,
252262
message_reactor,

protosocket-rpc/src/server/socket_server.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::ffi::c_int;
12
use std::future::Future;
23
use std::io::Error;
34
use std::pin::Pin;
@@ -27,6 +28,7 @@ where
2728
socket_server: TSocketService,
2829
listener: tokio::net::TcpListener,
2930
max_buffer_length: usize,
31+
buffer_allocation_increment: usize,
3032
max_queued_outbound_messages: usize,
3133
}
3234

@@ -38,13 +40,36 @@ where
3840
pub async fn new(
3941
address: std::net::SocketAddr,
4042
socket_server: TSocketService,
43+
max_buffer_length: usize,
44+
buffer_allocation_increment: usize,
45+
max_queued_outbound_messages: usize,
46+
listen_backlog: u32,
4147
) -> crate::Result<Self> {
42-
let listener = tokio::net::TcpListener::bind(address).await?;
48+
let socket = socket2::Socket::new(
49+
match address {
50+
std::net::SocketAddr::V4(_) => socket2::Domain::IPV4,
51+
std::net::SocketAddr::V6(_) => socket2::Domain::IPV6,
52+
},
53+
socket2::Type::STREAM,
54+
None,
55+
)?;
56+
57+
socket.set_nonblocking(true)?;
58+
socket.set_tcp_nodelay(true)?;
59+
socket.set_keepalive(true)?;
60+
socket.set_reuse_port(true)?;
61+
socket.set_reuse_address(true)?;
62+
63+
socket.bind(&address.into())?;
64+
socket.listen(listen_backlog as c_int)?;
65+
66+
let listener = tokio::net::TcpListener::from_std(socket.into())?;
4367
Ok(Self {
4468
socket_server,
4569
listener,
46-
max_buffer_length: 16 * (2 << 20),
47-
max_queued_outbound_messages: 128,
70+
max_buffer_length,
71+
buffer_allocation_increment,
72+
max_queued_outbound_messages,
4873
})
4974
}
5075

@@ -84,6 +109,7 @@ where
84109
let serializer = self.socket_server.serializer();
85110
let max_buffer_length = self.max_buffer_length;
86111
let max_queued_outbound_messages = self.max_queued_outbound_messages;
112+
let buffer_allocation_increment = self.buffer_allocation_increment;
87113

88114
let stream_future = self.socket_server.accept_stream(stream);
89115

@@ -97,6 +123,7 @@ where
97123
deserializer,
98124
serializer,
99125
max_buffer_length,
126+
buffer_allocation_increment,
100127
max_queued_outbound_messages,
101128
outbound_messages_receiver,
102129
submitter,

protosocket-server/src/connection_server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub struct ProtosocketServer<Connector: ServerConnector> {
5757
connector: Connector,
5858
listener: tokio::net::TcpListener,
5959
max_buffer_length: usize,
60+
buffer_allocation_interval: usize,
6061
max_queued_outbound_messages: usize,
6162
runtime: tokio::runtime::Handle,
6263
}
@@ -78,6 +79,7 @@ impl<Connector: ServerConnector> ProtosocketServer<Connector> {
7879
listener,
7980
max_buffer_length: 16 * (2 << 20),
8081
max_queued_outbound_messages: 128,
82+
buffer_allocation_interval: 1 << 20,
8183
runtime,
8284
})
8385
}
@@ -114,6 +116,7 @@ impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
114116
self.connector.deserializer(),
115117
self.connector.serializer(),
116118
self.max_buffer_length,
119+
self.buffer_allocation_interval,
117120
self.max_queued_outbound_messages,
118121
outbound_messages,
119122
reactor,

0 commit comments

Comments
 (0)