Skip to content

Commit 9dd1b3b

Browse files
committed
Update; includes new logic.
1 parent 5c60275 commit 9dd1b3b

File tree

13 files changed

+433
-213
lines changed

13 files changed

+433
-213
lines changed

cas_client/src/adaptive_concurrency_control.rs renamed to cas_client/src/adaptive_concurrency/controller.rs

Lines changed: 107 additions & 191 deletions
Large diffs are not rendered by default.
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use std::time::Duration;
2+
3+
use tokio::time::Instant;
4+
5+
/// A latency predictor using a numerically stable, exponentially decayed linear regression:
6+
///
7+
/// We fit a model of the form:
8+
/// duration_secs ≈ base_time_secs + size_bytes * inv_throughput
9+
/// which is equivalent to:
10+
/// duration_secs ≈ intercept + slope * size_bytes
11+
///
12+
/// Internally, we use a stable, online update method based on weighted means and covariances:
13+
/// - mean_x, mean_y: weighted means of size and duration
14+
/// - s_xx, s_xy: exponentially decayed sums of (x - mean_x)^2 and (x - mean_x)(y - mean_y)
15+
///
16+
/// We apply decay on each update using exp2(-elapsed / half_life).
17+
///
18+
/// This avoids numerical instability from large sums and is robust to shifting distributions.
19+
pub struct LatencyPredictor {
20+
sum_w: f64,
21+
mean_x: f64,
22+
mean_y: f64,
23+
s_xx: f64,
24+
s_xy: f64,
25+
26+
base_time_secs: f64,
27+
inv_throughput: f64,
28+
decay_half_life_secs: f64,
29+
last_update: Instant,
30+
}
31+
32+
impl LatencyPredictor {
33+
pub fn new(decay_half_life: Duration) -> Self {
34+
Self {
35+
sum_w: 0.0,
36+
mean_x: 0.0,
37+
mean_y: 0.0,
38+
s_xx: 0.0,
39+
s_xy: 0.0,
40+
base_time_secs: 120.0, // 2 minutes, but no real weight on this.
41+
inv_throughput: 0.0,
42+
decay_half_life_secs: decay_half_life.as_secs_f64(),
43+
last_update: Instant::now(),
44+
}
45+
}
46+
47+
/// Updates the latency model with a new observation.
48+
///
49+
/// Applies exponential decay to prior statistics and incorporates the new sample
50+
/// using a numerically stable linear regression formula.
51+
///
52+
/// - `size_bytes`: the size of the completed transmission.
53+
/// - `duration`: the time taken to complete the transmission.
54+
/// Updates the latency model with a new observation.
55+
///
56+
/// Applies exponential decay to prior statistics and incorporates the new sample
57+
/// using a numerically stable linear regression formula.
58+
///
59+
/// - `size_bytes`: the size of the completed transmission.
60+
/// - `duration`: the time taken to complete the transmission.
61+
/// - `n_concurrent`: the number of concurrent connections at the time.
62+
pub fn update(&mut self, size_bytes: usize, duration: Duration, avg_concurrent: f64) {
63+
let now = Instant::now();
64+
let elapsed = now.duration_since(self.last_update).as_secs_f64();
65+
let decay = (-elapsed / self.decay_half_life_secs).exp2();
66+
67+
// Feature x: number of bytes transferred in this time, assuming that multiple similar
68+
// connections are active. This is just a way to treat the
69+
let x = (size_bytes as f64) * avg_concurrent.max(1.);
70+
71+
// Target y: the time it would take to transfer x bytes, i.e. secs / byte.
72+
let y = duration.as_secs_f64().max(1e-6);
73+
74+
// Decay previous statistics
75+
self.sum_w *= decay;
76+
self.s_xx *= decay;
77+
self.s_xy *= decay;
78+
79+
// Update means with numerically stable method
80+
let weight = 1.0;
81+
let new_sum_w = self.sum_w + weight;
82+
let delta_x = x - self.mean_x;
83+
let delta_y = y - self.mean_y;
84+
85+
let mean_x_new = self.mean_x + (weight * delta_x) / new_sum_w;
86+
let mean_y_new = self.mean_y + (weight * delta_y) / new_sum_w;
87+
88+
self.s_xx += weight * delta_x * (x - mean_x_new);
89+
self.s_xy += weight * delta_x * (y - mean_y_new);
90+
91+
self.mean_x = mean_x_new;
92+
self.mean_y = mean_y_new;
93+
self.sum_w = new_sum_w;
94+
95+
if self.s_xx > 1e-8 {
96+
let slope = self.s_xy / self.s_xx;
97+
let intercept = self.mean_y - slope * self.mean_x;
98+
99+
self.base_time_secs = intercept;
100+
self.inv_throughput = slope;
101+
} else {
102+
self.base_time_secs = self.mean_y;
103+
self.inv_throughput = 0.0;
104+
}
105+
106+
self.last_update = now;
107+
}
108+
109+
/// Predicts the expected completion time for a given transfer size and concurrency level.
110+
///
111+
/// First predicts the overall latency of a transfer, assuming that there is no concurrency and
112+
/// connections scale with
113+
///
114+
/// to reflect how concurrency reduces per-transfer time under stable throughput.
115+
///
116+
/// - `size_bytes`: the size of the transfer.
117+
/// - `n_concurrent`: the number of concurrent connections.
118+
pub fn predicted_latency(&self, size_bytes: u64, avg_concurrent: f64) -> Duration {
119+
let predicted_secs_without_concurrency = self.base_time_secs + size_bytes as f64 * self.inv_throughput;
120+
let predicted_secs = predicted_secs_without_concurrency * avg_concurrent.max(1.);
121+
Duration::from_secs_f64(predicted_secs)
122+
}
123+
124+
pub fn predicted_bandwidth(&self) -> f64 {
125+
let query_bytes = 10 * 1024 * 1024;
126+
127+
// How long would it take to transmit this at full bandwidth
128+
let min_latency = self.predicted_latency(query_bytes, 1.);
129+
130+
// Report bytes per sec in this model.
131+
query_bytes as f64 / min_latency.as_secs_f64().max(1e-6)
132+
}
133+
}
134+
135+
#[cfg(test)]
136+
mod tests {
137+
use tokio::time::{self, Duration as TokioDuration};
138+
139+
use super::*;
140+
141+
#[test]
142+
fn test_estimator_update() {
143+
let mut estimator = LatencyPredictor::new(Duration::from_secs_f64(10.0));
144+
estimator.update(1_000_000, Duration::from_millis(500), 1.);
145+
let expected = estimator.predicted_latency(1_000_000, 1.);
146+
assert!(expected.as_secs_f64() > 0.0);
147+
}
148+
149+
#[test]
150+
fn test_converges_to_constant_observation() {
151+
let mut predictor = LatencyPredictor::new(Duration::from_secs_f64(10.0));
152+
for _ in 0..10 {
153+
predictor.update(1000, Duration::from_secs_f64(1.0), 1.);
154+
}
155+
let prediction = predictor.predicted_latency(1000, 1.);
156+
assert!((prediction.as_secs_f64() - 1.0).abs() < 0.01);
157+
}
158+
159+
#[tokio::test]
160+
async fn test_decay_weighting_effect() {
161+
time::pause();
162+
let mut predictor = LatencyPredictor::new(Duration::from_secs_f64(2.0));
163+
predictor.update(1000, Duration::from_secs_f64(2.0), 1.);
164+
time::advance(TokioDuration::from_secs(2)).await;
165+
predictor.update(1000, Duration::from_secs_f64(1.0), 1.);
166+
let predicted = predictor.predicted_latency(1000, 1.).as_secs_f64();
167+
assert!(predicted > 1.0 && predicted < 1.6);
168+
}
169+
170+
#[test]
171+
fn test_scaling_with_concurrency() {
172+
let mut predictor = LatencyPredictor::new(Duration::from_secs_f64(10.0));
173+
for _ in 0..10 {
174+
predictor.update(1000, Duration::from_secs_f64(1.0), 1.);
175+
}
176+
let predicted_1 = predictor.predicted_latency(1000, 1.).as_secs_f64();
177+
let predicted_2 = predictor.predicted_latency(1000, 2.).as_secs_f64();
178+
let predicted_4 = predictor.predicted_latency(1000, 4.).as_secs_f64();
179+
assert!(predicted_2 > predicted_1);
180+
assert!(predicted_4 > predicted_2);
181+
}
182+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
mod controller;
2+
mod latency_prediction;
3+
4+
pub use controller::{AdaptiveConcurrencyController, ConnectionPermit};

cas_client/src/constants.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,35 @@ utils::configurable_constants! {
1111
/// no more retries are attempted.
1212
ref CLIENT_RETRY_MAX_DURATION_MS: u64 = 6 * 60 * 1000; // 6m
1313

14-
/// The target time for a small transfer to complete.
15-
ref CONCURRENCY_CONTROL_TARGET_TIME_SMALL_TRANSFER_MS : u64 = 10 * 1000;
16-
17-
/// The target time for a large transfer to complete. Default is 20 seconds.
18-
ref CONCURRENCY_CONTROL_TARGET_TIME_LARGE_TRANSFER_MS : u64 = 20 * 1000;
19-
20-
/// The size of a large transfer.
21-
ref CONCURRENCY_CONTROL_LARGE_TRANSFER_NUM_BYTES : u64 = 64_000_000;
14+
/// The maximum amount of time for a transfer to be deamed within target. Set to 45 sec.
15+
ref CONCURRENCY_CONTROL_MAX_WITHIN_TARGET_TRANSFER_TIME_MS: u64 = 45 * 1000;
2216

2317
/// The minimum time in milliseconds between adjustments when increasing the concurrency.
2418
ref CONCURRENCY_CONTROL_MIN_INCREASE_WINDOW_MS : u64 = 500;
2519

2620
/// The minimum time in milliseconds between adjustments when decreasing the concurrency.
2721
ref CONCURRENCY_CONTROL_MIN_DECREASE_WINDOW_MS : u64 = 250;
2822

29-
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
30-
ref CONCURRENCY_CONTROL_TRACKING_SIZE : usize = 20;
23+
/// Observations of observed transfer time and deviances are tracked using exponentially
24+
/// weighted decay. This is parameterized by the half life of a weighting for an observation.
25+
/// Thus if this value is 30 sec, it means that observations count for 50% weight after 30 seconds, 25% weight
26+
/// after 1 min, etc. This allows us adapt to changing network conditions and give more
27+
/// weight to newer observations, but still maintain history.
28+
///
29+
/// There are two things being tracked in this model; a prediction of the latency and a record of
30+
/// how accurate the model is. The primary assumption here is the following:
31+
///
32+
///
33+
ref CONCURRENCY_CONTROL_TRACKING_HALF_LIFE_MS : u64 = 30 * 1000;
34+
35+
ref CONCURRENCY_CONTROL_ACCEPTABLE_DEVIANCE : f64 = 1.1;
36+
37+
ref CONCURRENCY_CONTROL_CONCURRENCY_INCREASABLE_DEVIANCE: f64 = 1.05;
3138

32-
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
33-
ref CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_LOWER: f64 = 0.7;
39+
/// A failure -- a retry or outright failure -- counts as this weight.
40+
ref CONCURRENCY_CONTROL_FAILURE_DEVIANCE_PENALTY : f64 = 1.4;
3441

35-
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
36-
ref CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_UPPER: f64 = 0.9;
3742

38-
/// The maximum time window within which to examine successes and failures when adjusting the concurrancy.
39-
ref CONCURRENCY_CONTROL_TRACKING_WINDOW_MS : u64 = 30 * 1000;
4043

4144
/// Log the concurrency on this interval.
4245
ref CONCURRENCY_CONTROL_LOGGING_INTERVAL_MS: u64 = 10 * 1000;

cas_client/src/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use merklehash::MerkleHash;
99
use progress_tracking::item_tracking::SingleItemProgressUpdater;
1010
use progress_tracking::upload_tracking::CompletionTracker;
1111

12-
use crate::adaptive_concurrency_control::{AdaptiveConcurrencyController, ConnectionPermit};
12+
use crate::adaptive_concurrency::{AdaptiveConcurrencyController, ConnectionPermit};
1313
use crate::constants::{MAX_CONCURRENT_UPLOADS, MIN_CONCURRENT_UPLOADS, NUM_INITIAL_CONCURRENT_UPLOADS};
1414
use crate::error::Result;
1515
#[cfg(not(target_family = "wasm"))]

cas_client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ pub mod remote_client;
2626
mod retry_wrapper;
2727
mod upload_progress_stream;
2828

29-
mod adaptive_concurrency_control;
29+
mod adaptive_concurrency;

cas_client/src/local_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tempfile::TempDir;
2222
use tokio::runtime::Handle;
2323
use tracing::{debug, error, info, warn};
2424

25-
use crate::adaptive_concurrency_control::ConnectionPermit;
25+
use crate::adaptive_concurrency::ConnectionPermit;
2626
use crate::error::{CasClientError, Result};
2727
use crate::output_provider::OutputProvider;
2828
use crate::Client;

cas_client/src/remote_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use utils::auth::AuthConfig;
2828
#[cfg(not(target_family = "wasm"))]
2929
use utils::singleflight::Group;
3030

31-
use crate::adaptive_concurrency_control::ConnectionPermit;
31+
use crate::adaptive_concurrency::ConnectionPermit;
3232
#[cfg(not(target_family = "wasm"))]
3333
use crate::download_utils::*;
3434
use crate::error::{CasClientError, Result};

cas_client/src/retry_wrapper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff};
99
use tokio_retry::RetryIf;
1010
use tracing::{error, info};
1111

12-
use crate::adaptive_concurrency_control::ConnectionPermit;
12+
use crate::adaptive_concurrency::ConnectionPermit;
1313
use crate::constants::{CLIENT_RETRY_BASE_DELAY_MS, CLIENT_RETRY_MAX_ATTEMPTS};
1414
use crate::error::CasClientError;
1515
use crate::http_client::request_id_from_response;

utils/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ lazy_static = { workspace = true }
1818
parking_lot = { workspace = true }
1919
pin-project = { workspace = true }
2020
thiserror = { workspace = true }
21-
tokio = { workspace = true, features = ["time", "rt", "macros"] }
21+
tokio = { workspace = true, features = ["time", "rt", "macros", "test-util"] }
2222
tracing = { workspace = true }
2323

2424
[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]

0 commit comments

Comments
 (0)