From 54c95c4e0f9ebd82dfeb2a991ee219e48fa4b678 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Wed, 2 Apr 2025 01:41:13 -0300 Subject: [PATCH] chore: enable nodelay on replicas --- Cargo.lock | 3 ++- crates/fluvio-service/src/server.rs | 4 ++++ crates/fluvio-socket/Cargo.toml | 3 ++- crates/fluvio-socket/src/socket.rs | 17 +++++++++++++++++ .../src/replication/follower/controller.rs | 1 + 5 files changed, 26 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d3891cce2..e177aeb33d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3060,7 +3060,7 @@ dependencies = [ [[package]] name = "fluvio-socket" -version = "0.15.2" +version = "0.15.3" dependencies = [ "async-channel 1.9.0", "async-lock", @@ -3073,6 +3073,7 @@ dependencies = [ "fluvio-protocol", "fluvio-types", "futures-util", + "libc", "nix", "once_cell", "pin-project", diff --git a/crates/fluvio-service/src/server.rs b/crates/fluvio-service/src/server.rs index 61dfe149b2..611c002ba5 100644 --- a/crates/fluvio-service/src/server.rs +++ b/crates/fluvio-service/src/server.rs @@ -128,6 +128,10 @@ where .unwrap_or_else(|_| "".to_owned()); debug!(%peer_addr, "Handling request"); + stream.set_nodelay(true).unwrap_or_else(|err| { + error!("Error setting nodelay: {}", err); + }); + let socket = { let fd = stream.as_raw_fd(); FluvioSocket::from_stream(Box::new(stream.clone()), Box::new(stream), fd) diff --git a/crates/fluvio-socket/Cargo.toml b/crates/fluvio-socket/Cargo.toml index 3b0323398f..82dbb64fb1 100644 --- a/crates/fluvio-socket/Cargo.toml +++ b/crates/fluvio-socket/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-socket" -version = "0.15.2" +version = "0.15.3" edition = "2021" authors = ["Fluvio Contributors "] description = "Provide TCP socket wrapper for fluvio protocol" @@ -31,6 +31,7 @@ pin-project = { workspace = true } thiserror = { workspace = true } semver = { workspace = true } nix = { workspace = true, features = ["uio"]} +libc = { workspace = true } # Fluvio dependencies fluvio-future = { workspace = true, features = ["net", "task", "retry"] } diff --git a/crates/fluvio-socket/src/socket.rs b/crates/fluvio-socket/src/socket.rs index c1a09a86eb..54df9c610f 100644 --- a/crates/fluvio-socket/src/socket.rs +++ b/crates/fluvio-socket/src/socket.rs @@ -119,6 +119,23 @@ impl FluvioSocket { let connector = DefaultDomainConnector::new(); Self::connect_with_connector(addr, &connector).await } + + #[cfg(not(target_arch = "wasm32"))] + pub fn set_nodelay(&self, value: bool) { + let fd = self.id(); + let opt = libc::IPPROTO_TCP; + let val = libc::TCP_NODELAY; + let value: i32 = if value { 1 } else { 0 }; + let payload = std::ptr::addr_of!(value).cast(); + unsafe { + libc::setsockopt(fd, opt, val, payload, std::mem::size_of::() as u32); + } + } + + #[cfg(target_arch = "wasm32")] + pub fn set_nodelay(&self, _value: bool) { + // No-op for WASM + } } cfg_if::cfg_if! { diff --git a/crates/fluvio-spu/src/replication/follower/controller.rs b/crates/fluvio-spu/src/replication/follower/controller.rs index 6894fb01aa..17998f3749 100644 --- a/crates/fluvio-spu/src/replication/follower/controller.rs +++ b/crates/fluvio-spu/src/replication/follower/controller.rs @@ -320,6 +320,7 @@ mod inner { match FluvioSocket::connect(&leader_endpoint).await { Ok(mut socket) => { + socket.set_nodelay(true); debug!("connected to leader"); match self.send_fetch_stream_request(&mut socket).await {