Skip to content

Commit ac6001f

Browse files
committed
Updates; clippy fixes.
1 parent ae7d766 commit ac6001f

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

cas_client/src/adaptive_concurrency/controller.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ impl ConnectionPermit {
239239
}
240240

241241
pub(crate) async fn report_retryable_failure(&self) {
242-
self.controller.clone().report_and_update(&self, None, false).await;
242+
self.controller.clone().report_and_update(self, None, false).await;
243243
}
244244
}
245245

@@ -248,8 +248,8 @@ impl ConnectionPermit {
248248
mod test_constants {
249249

250250
pub const TR_HALF_LIFE_MS: u64 = 10;
251-
pub const INCR_SPACING_MS: u64 = 4;
252-
pub const DECR_SPACING_MS: u64 = 2;
251+
pub const INCR_SPACING_MS: u64 = 200;
252+
pub const DECR_SPACING_MS: u64 = 100;
253253

254254
pub const TARGET_TIME_MS_S: u64 = 5;
255255
pub const TARGET_TIME_MS_L: u64 = 20;
@@ -303,10 +303,12 @@ mod tests {
303303

304304
let controller = AdaptiveConcurrencyController::new_testing(1, (1, 4));
305305

306-
for _ in 0..10 {
306+
for i in 0..10 {
307307
let permit = controller.acquire_connection_permit().await.unwrap();
308-
advance(Duration::from_millis(1)).await;
308+
// Increase the duration, so we're always going faster than predicted
309+
advance(Duration::from_millis(12 - i)).await;
309310
permit.report_completion(B, true).await;
311+
310312
advance(Duration::from_millis(INCR_SPACING_MS + 1)).await;
311313
}
312314

@@ -324,19 +326,38 @@ mod tests {
324326
// Advance on so that the first success will trigger an adjustment.
325327
advance(Duration::from_millis(INCR_SPACING_MS + 1)).await;
326328

327-
let t = Instant::now();
329+
for i in 0..5 {
330+
let permit = controller.acquire_connection_permit().await.unwrap();
331+
// Increase the duration, so we're always going faster than predicted
332+
advance(Duration::from_millis(12 - i)).await;
333+
permit.report_completion(B, true).await;
334+
335+
// Don't advance, so it should have only incremented by one as not enough time
336+
// will have passed for more.
337+
}
338+
339+
assert_eq!(controller.available_permits(), 2);
340+
assert_eq!(controller.total_permits(), 2);
341+
342+
// Now, advance the clock by enough time to allow another change.
343+
advance(Duration::from_millis(INCR_SPACING_MS + 1)).await;
328344

329-
while t.elapsed() < Duration::from_millis(INCR_SPACING_MS + 2) {
345+
for i in 5..10 {
330346
let permit = controller.acquire_connection_permit().await.unwrap();
331-
advance(Duration::from_millis(1)).await;
347+
// Increase the duration, so we're always going faster than predicted
348+
advance(Duration::from_millis(12 - i)).await;
332349
permit.report_completion(B, true).await;
350+
351+
// Don't advance, so it should have only incremented by one as not enough time
352+
// will have passed for more.
333353
}
334354

335355
// The window above should have had exactly two increases; one at the first success and one within the next
336356
// interval.
337357
assert_eq!(controller.available_permits(), 3);
338358
assert_eq!(controller.total_permits(), 3);
339359
}
360+
340361
#[tokio::test]
341362
async fn test_permit_increase_on_slow_but_good_enough() {
342363
time::pause();

cas_client/src/adaptive_concurrency/latency_prediction.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,7 @@ impl LatencyPredictor {
7878
///
7979
/// - `size_bytes`: the size of the completed transmission.
8080
/// - `duration`: the time taken to complete the transmission.
81-
/// Updates the latency model with a new observation.
82-
///
83-
/// Applies exponential decay to prior statistics and incorporates the new sample
84-
/// using a numerically stable linear regression formula.
85-
///
86-
/// - `size_bytes`: the size of the completed transmission.
87-
/// - `duration`: the time taken to complete the transmission.
88-
/// - `n_concurrent`: the number of concurrent connections at the time.
81+
/// - `avg_concurrent`: an estimate of the average number of concurrent connections during transfer.
8982
pub fn update(&mut self, size_bytes: u64, duration: Duration, avg_concurrent: f64) {
9083
let now = Instant::now();
9184
let elapsed = now.duration_since(self.last_update).as_secs_f64();
@@ -188,9 +181,7 @@ impl LatencyPredictor {
188181
let query_bytes = 10 * 1024 * 1024;
189182

190183
// How long would it take to transmit this at full bandwidth
191-
let Some(min_latency) = self.predicted_latency(query_bytes, 1.) else {
192-
return None;
193-
};
184+
let min_latency = self.predicted_latency(query_bytes, 1.)?;
194185

195186
// Report bytes per sec in this model.
196187
Some(query_bytes as f64 / min_latency.max(1e-6))

0 commit comments

Comments
 (0)