Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/rpc/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct KrpcSocket {

impl KrpcSocket {
pub(crate) fn new(config: &Config) -> Result<Self, std::io::Error> {
let request_timeout = config.request_timeout;
let port = config.port;
Comment on lines 42 to 43
Copy link
Contributor

@SeverinAlexB SeverinAlexB Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request_timeout is still configurable. Is it used somewhere else? Should it be removed? Should the constant timeout time be kept for backward compatability?


let socket = if let Some(port) = port {
Expand All @@ -63,7 +62,7 @@ impl KrpcSocket {
socket,
next_tid: 0,
server_mode: config.server_mode,
inflight_requests: InflightRequests::new(request_timeout),
inflight_requests: InflightRequests::new(),
last_cleanup: Instant::now(),
local_addr,
poll_interval: MIN_POLL_INTERVAL,
Expand Down
69 changes: 56 additions & 13 deletions src/rpc/socket/inflight_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ use std::collections::BTreeMap;
use std::net::SocketAddrV4;
use std::time::{Duration, Instant};

const MIN_TIMEOUT_MS: u64 = 200;
const INITIAL_ESTIMATED_RTT_MS: u64 = 2000;
const DEVIATION_RTT_MS: u64 = 500;
/// Conservative learning rate for estimated RTT (lower = more stable, higher = faster adaptation)
const ALPHA: f64 = 0.05;
/// Conservative learning rate for RTT deviation (lower = more stable, higher = faster adaptation)
const BETA: f64 = 0.1;
Comment on lines +8 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling it ALPHA and BETA which nobody except for devs familiar with the RTT calculation actually understands, why not call it something more reasonable?

What comes to my mind: RTT_LEARNING_RATE_ALPHA, or RTT_LEARNING_RATE_ESTIMATE, or RTT_LEARNING_RATE_DEVIATION.


#[derive(Debug, Clone)]
pub struct InflightRequest {
pub to: SocketAddrV4,
Expand All @@ -26,14 +34,16 @@ impl InflightRequest {
pub struct InflightRequests {
// BTreeMap provides O(log n) lookup, insertion, and deletion keyed by transaction_id.
requests: BTreeMap<u32, InflightRequest>,
timeout: Duration,
estimated_rtt: Duration,
deviation_rtt: Duration,
}

impl InflightRequests {
pub fn new(timeout: Duration) -> Self {
pub fn new() -> Self {
Self {
requests: BTreeMap::new(),
timeout,
estimated_rtt: Duration::from_millis(INITIAL_ESTIMATED_RTT_MS),
deviation_rtt: Duration::from_millis(DEVIATION_RTT_MS),
Comment on lines +45 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract all RTT methods/variable in it's own struct so we have a clear separation between the inflight request struct and RTT?

This way, it would also be easier to write tests.

}
}

Expand All @@ -51,7 +61,7 @@ impl InflightRequests {
/// Check if a transaction_id is still inflight and not expired O(log n)
pub fn contains(&self, transaction_id: u32) -> bool {
if let Some(request) = self.requests.get(&transaction_id) {
return request.sent_at.elapsed() < self.timeout;
return request.sent_at.elapsed() < self.request_timeout();
}
false
}
Expand All @@ -61,8 +71,8 @@ impl InflightRequests {
pub fn remove(&mut self, transaction_id: u32, from: &SocketAddrV4) -> Option<InflightRequest> {
let request = self.requests.get(&transaction_id)?;

// Drop immediately if expired; avoid accepting late responses
if request.sent_at.elapsed() >= self.timeout {
let elapsed = request.sent_at.elapsed();
if elapsed >= self.request_timeout() {
self.requests.remove(&transaction_id);
return None;
}
Expand All @@ -71,20 +81,53 @@ impl InflightRequests {
return None;
}

self.requests.remove(&transaction_id)
let request = self.requests.remove(&transaction_id)?;

self.update_rtt_estimates(elapsed);

Some(request)
}

/// Check if there are no inflight requests
pub fn is_empty(&self) -> bool {
self.requests.is_empty()
}

fn request_timeout(&self) -> Duration {
let timeout = self.estimated_rtt + self.deviation_rtt.mul_f64(4.0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.0 = Magic number

timeout.max(Duration::from_millis(MIN_TIMEOUT_MS))
}

/// Cleanup expired requests based on timeout
/// Updates RTT estimates using exponentially weighted moving averages (EWMA).
/// - Estimated RTT = (1-α) * old_estimate + α * sample
/// - Deviation RTT = (1-β) * old_deviation + β * |sample - new_estimate|
///
/// Conservative learning rates (α=0.05, β=0.1) make the algorithm less sensitive to
/// temporary network fluctuations for stable DHT timeout calculations.
Comment on lines +101 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would likely be great to link a RTT guide so the next developer can easily familiarize themselves with the concept
https://how.dev/answers/how-to-compute-devrtt-estimated-rtt-time-out-interval-in-ccn

fn update_rtt_estimates(&mut self, sample_rtt: Duration) {
let sample_rtt_secs = sample_rtt.as_secs_f64();
let est_rtt_secs = self.estimated_rtt.as_secs_f64();
let dev_rtt_secs = self.deviation_rtt.as_secs_f64();

// Update estimated RTT using exponentially weighted moving average
let new_est_rtt = (1.0 - ALPHA) * est_rtt_secs + ALPHA * sample_rtt_secs;

// Update deviation RTT based on absolute difference from new estimate
let new_dev_rtt =
(1.0 - BETA) * dev_rtt_secs + BETA * (sample_rtt_secs - new_est_rtt).abs();

self.estimated_rtt = Duration::from_secs_f64(new_est_rtt);
self.deviation_rtt = Duration::from_secs_f64(new_dev_rtt);
}

/// Cleanup expired requests based on adaptive timeout
/// O(n) scans all requests to remove expired ones
pub fn cleanup(&mut self) {
let timeout = self.request_timeout();
let now = Instant::now();
let cutoff = now - self.timeout;
let cutoff = now - timeout;

// Remove expired requests in a single pass using retain
self.requests.retain(|_, request| request.sent_at > cutoff);
}

pub fn is_empty(&self) -> bool {
self.requests.is_empty()
}
}