Skip to content

Commit 05e1758

Browse files
committed
updated.
1 parent c35f7d2 commit 05e1758

File tree

3 files changed

+196
-133
lines changed

3 files changed

+196
-133
lines changed

cas_client/src/adaptive_concurrency/controller.rs

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tracing::{debug, info};
77
use utils::adjustable_semaphore::{AdjustableSemaphore, AdjustableSemaphorePermit};
88
use utils::ExpWeightedMovingAvg;
99

10-
use crate::adaptive_concurrency::latency_prediction::LatencyPredictor;
10+
use crate::adaptive_concurrency::latency_prediction::{LatencyPredictor, LatencyPredictorSnapshot};
1111
use crate::constants::*;
1212
use crate::CasClientError;
1313

@@ -21,7 +21,7 @@ struct ConcurrencyControllerState {
2121
// i.e. actual < predicted
2222
// -- then we increase concurrency; when this is greater than ln(1.1)
2323
// -- i.e. actual > 1.1 * predicted -- we decrease concurrency.
24-
prediction_deviance: ExpWeightedMovingAvg,
24+
deviance_tracking: ExpWeightedMovingAvg,
2525

2626
// The last time we adjusted the permits.
2727
last_adjustment_time: Instant,
@@ -32,10 +32,12 @@ struct ConcurrencyControllerState {
3232

3333
impl ConcurrencyControllerState {
3434
fn new() -> Self {
35-
let emwa_half_life = Duration::from_millis(*CONCURRENCY_CONTROL_TRACKING_HALF_LIFE_MS);
35+
let latency_half_life = Duration::from_millis(*CONCURRENCY_CONTROL_LATENCY_TRACKING_HALF_LIFE_MS);
36+
let success_half_life = Duration::from_millis(*CONCURRENCY_CONTROL_SUCCESS_TRACKING_HALF_LIFE_MS);
37+
3638
Self {
37-
latency_predictor: LatencyPredictor::new(emwa_half_life),
38-
prediction_deviance: ExpWeightedMovingAvg::new(emwa_half_life),
39+
latency_predictor: LatencyPredictor::new(latency_half_life),
40+
deviance_tracking: ExpWeightedMovingAvg::new(success_half_life),
3941
last_adjustment_time: Instant::now(),
4042
last_logging_time: Instant::now(),
4143
}
@@ -112,6 +114,7 @@ impl AdaptiveConcurrencyController {
112114
controller: Arc::clone(self),
113115
transfer_start_time: Instant::now(),
114116
starting_concurrency: self.concurrency_semaphore.active_permits(),
117+
latency_model_at_start: self.state.lock().await.latency_predictor.model_snapshot(),
115118
})
116119
}
117120

@@ -127,66 +130,72 @@ impl AdaptiveConcurrencyController {
127130
}
128131

129132
/// Update
130-
async fn report_and_update(
131-
&self,
132-
actual_completion_time: Duration,
133-
starting_concurrency: usize,
134-
n_bytes_if_known: Option<u64>,
135-
is_succes: bool,
136-
) {
133+
async fn report_and_update(&self, permit: &ConnectionPermit, n_bytes_if_known: Option<u64>, is_success: bool) {
134+
let actual_completion_time = permit.transfer_start_time.elapsed();
135+
137136
let mut state_lg = self.state.lock().await;
138137

138+
let max_dev = 1. + CONCURRENCY_CONTROL_DEVIANCE_MAX_SPREAD.max(0.);
139+
let min_dev = 1. / max_dev;
140+
141+
let ok_dev = 1. + CONCURRENCY_CONTROL_DEVIANCE_TARGET_SPREAD.max(0.);
142+
let incr_dev = 1. / ok_dev;
143+
139144
// First, calculate the predicted vs. actual time completion for this model.
140145
let deviance_ratio = {
141146
if let Some(n_bytes) = n_bytes_if_known {
142147
let cur_concurrency = self.concurrency_semaphore.active_permits();
143-
let avg_concurrency = ((cur_concurrency + starting_concurrency) as f64) / 2.;
148+
let avg_concurrency = ((cur_concurrency + permit.starting_concurrency) as f64) / 2.;
144149

145-
if is_succes {
150+
if is_success {
146151
let t_actual = actual_completion_time.as_secs_f64().max(1e-4);
147-
let t_pred = state_lg
148-
.latency_predictor
149-
.predicted_latency(n_bytes, avg_concurrency)
150-
.as_secs_f64()
151-
.max(1e-4);
152152

153-
let dev_ratio = (t_actual / t_pred).min(*CONCURRENCY_CONTROL_FAILURE_DEVIANCE_PENALTY);
153+
let concurrency = permit.starting_concurrency as f64;
154154

155-
eprintln!(
156-
"success = {is_succes}; t_pred = {t_pred}; t_actual = {t_actual}; dev_ratio = {dev_ratio}"
157-
);
155+
// Get the predicted time using the model when this started.
156+
let t_pred = permit
157+
.latency_model_at_start
158+
.as_ref()
159+
.map(|lm| lm.predicted_latency(n_bytes, concurrency))
160+
.unwrap_or(t_actual);
161+
162+
let dev_ratio = t_actual / t_pred.max(1e-6);
158163

159164
state_lg
160165
.latency_predictor
161166
.update(n_bytes, actual_completion_time, avg_concurrency);
162167

168+
eprintln!(
169+
"success = {is_success}; n_bytes={n_bytes}, avg_con = {avg_concurrency}, t_pred = {t_pred}; t_actual = {t_actual}; dev_ratio = {dev_ratio}"
170+
);
171+
163172
dev_ratio
164173
} else {
165174
eprintln!("failure, bytes known.");
166175

167176
// If it's not a success, then update the deviance with the penalty factor.
168-
*CONCURRENCY_CONTROL_FAILURE_DEVIANCE_PENALTY
177+
max_dev
169178
}
170179
} else {
171180
// This would be a failure case, so update the
172-
debug_assert!(!is_succes);
181+
debug_assert!(!is_success);
173182

174183
eprintln!("failure, bytes unknown.");
175184

176-
*CONCURRENCY_CONTROL_FAILURE_DEVIANCE_PENALTY
185+
max_dev
177186
}
178187
};
179188

180189
eprintln!("dev_ratio = {}; ln = {}", deviance_ratio, deviance_ratio.ln());
181190

182191
// Update the deviance with this value; we're tracking the log of the ratio due
183192
// to the additive averaging.
184-
state_lg.prediction_deviance.update(deviance_ratio.ln());
193+
state_lg.deviance_tracking.update(deviance_ratio.clamp(min_dev, max_dev).ln()); // deviance_ratio.ln());
185194

186195
// Now, get the current predicted deviance and see what the range is.
187-
let cur_deviance = state_lg.prediction_deviance.value().exp();
196+
let cur_deviance = state_lg.deviance_tracking.value().exp();
188197

189-
if is_succes && cur_deviance < *CONCURRENCY_CONTROL_CONCURRENCY_INCREASABLE_DEVIANCE {
198+
if is_success && cur_deviance < incr_dev {
190199
// Attempt to increase the deviance.
191200
if state_lg.last_adjustment_time.elapsed() > self.min_concurrency_increase_delay {
192201
self.concurrency_semaphore.increment_total_permits();
@@ -198,14 +207,14 @@ impl AdaptiveConcurrencyController {
198207
self.concurrency_semaphore.total_permits()
199208
);
200209
}
201-
} else if cur_deviance > *CONCURRENCY_CONTROL_ACCEPTABLE_DEVIANCE {
210+
} else if !is_success && cur_deviance > ok_dev {
202211
// Attempt to decrease the deviance.
203212
if state_lg.last_adjustment_time.elapsed() > self.min_concurrency_decrease_delay {
204213
self.concurrency_semaphore.decrement_total_permits();
205214
state_lg.last_adjustment_time = Instant::now();
206215

207216
eprintln!(
208-
"Concurrency control for {}:Lowered concurrency to {}; latency deviance = {cur_deviance}.",
217+
"Concurrency control for {}: Lowered concurrency to {}; latency deviance = {cur_deviance}.",
209218
self.logging_tag,
210219
self.concurrency_semaphore.total_permits()
211220
);
@@ -217,8 +226,8 @@ impl AdaptiveConcurrencyController {
217226
"Concurrency control for {}: Current concurrency = {}; predicted bandwidth = {}; deviance = {}",
218227
self.logging_tag,
219228
self.concurrency_semaphore.total_permits(),
220-
state_lg.latency_predictor.predicted_bandwidth(),
221-
state_lg.prediction_deviance.value().exp()
229+
state_lg.latency_predictor.predicted_bandwidth().unwrap_or_default(),
230+
state_lg.deviance_tracking.value().exp()
222231
);
223232
}
224233
}
@@ -231,6 +240,7 @@ pub struct ConnectionPermit {
231240
controller: Arc<AdaptiveConcurrencyController>,
232241
transfer_start_time: Instant,
233242
starting_concurrency: usize,
243+
latency_model_at_start: Option<LatencyPredictorSnapshot>,
234244
}
235245

236246
impl ConnectionPermit {
@@ -241,17 +251,11 @@ impl ConnectionPermit {
241251

242252
/// Call this after a successful transfer, providing the byte count.
243253
pub(crate) async fn report_completion(self, n_bytes: u64, success: bool) {
244-
self.controller
245-
.clone()
246-
.report_and_update(self.transfer_start_time.elapsed(), self.starting_concurrency, Some(n_bytes), success)
247-
.await;
254+
self.controller.clone().report_and_update(&self, Some(n_bytes), success).await;
248255
}
249256

250257
pub(crate) async fn report_retryable_failure(&self) {
251-
self.controller
252-
.clone()
253-
.report_and_update(self.transfer_start_time.elapsed(), self.starting_concurrency, None, false)
254-
.await;
258+
self.controller.clone().report_and_update(&self, None, false).await;
255259
}
256260
}
257261

@@ -279,7 +283,7 @@ impl ConcurrencyControllerState {
279283

280284
Self {
281285
latency_predictor: LatencyPredictor::new(emwa_half_life),
282-
prediction_deviance: ExpWeightedMovingAvg::new(emwa_half_life),
286+
deviance_tracking: ExpWeightedMovingAvg::new(emwa_half_life),
283287
last_adjustment_time: Instant::now(),
284288
last_logging_time: Instant::now(),
285289
}

0 commit comments

Comments
 (0)