Skip to content

Commit 838afc6

Browse files
committed
Update to parameters; added window to reduce swaying.
1 parent cbe5b3b commit 838afc6

File tree

4 files changed

+27
-28
lines changed

4 files changed

+27
-28
lines changed

cas_client/src/adaptive_concurrency_control.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ impl ConcurrencyControllerState {
4343

4444
/// A controller for robustly adjusting the amount of concurrancy on upload and download paths.
4545
///
46-
/// By default, the controller dynamically adjusts the concurrency within bounds so that 80% of the
47-
/// transfers are completed within 20 seconds; it increases the concurrency as long as this criteria
48-
/// is met. When more than 20% of the transfers begin taking longer than that, concurrency is reduced.
49-
/// Concurrency adjustments are throttled so that increasing the concurrency happens only every
46+
/// By default, the controller dynamically adjusts the concurrency within bounds so that between 70%
47+
/// and 90% of the transfers are completed within 20 seconds; it increases the concurrency as long as
48+
/// this criteria is met. When more than 20% of the transfers begin taking longer than that, concurrency
49+
/// is reduced. Concurrency adjustments are throttled so that increasing the concurrency happens only every
5050
/// 500ms, and decreasing it happens at most once every 250ms. (These values are all defaults; see
5151
/// the constants and their definitions in constants.rs).
5252
///
@@ -169,7 +169,7 @@ impl AdaptiveConcurrencyController {
169169

170170
let success_ratio = state_lg.add_report(success);
171171

172-
if success && (success_ratio > 0.8) {
172+
if success && (success_ratio > *CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_UPPER) {
173173
// Consider adjusting the concurrency
174174
if state_lg.last_adjustment_time.elapsed() > self.min_concurrency_increase_delay {
175175
// Enough time has passed, so add a new permit.
@@ -183,22 +183,14 @@ impl AdaptiveConcurrencyController {
183183
);
184184
}
185185
}
186-
} else {
186+
} else if !success && (success_ratio < *CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_LOWER) {
187187
// Had a failure, so attempt to decrease the number of permits.
188188
if state_lg.last_adjustment_time.elapsed() > self.min_concurrency_decrease_delay {
189-
let decrease_reason = {
190-
if success {
191-
format!("success_ratio = {success_ratio:.2} < 0.8")
192-
} else {
193-
format!("failed transfer with success_ratio = {success_ratio:.2}")
194-
}
195-
};
196-
197189
// Enough time has passed, so add a new permit.
198190
if self.concurrency_semaphore.decrement_total_permits() {
199191
state_lg.last_adjustment_time = Instant::now();
200192
debug!(
201-
"Decreasing concurrency for {} to {} due to {decrease_reason}",
193+
"Decreasing concurrency for {} to {} due to failed transfer with success_ration = {success_ratio:.2}",
202194
self.logging_tag,
203195
self.concurrency_semaphore.total_permits()
204196
);

cas_client/src/constants.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ utils::configurable_constants! {
1212
ref CLIENT_RETRY_MAX_DURATION_MS: u64 = 6 * 60 * 1000; // 6m
1313

1414
/// The target time for a small transfer to complete.
15-
ref CONCURRENCY_CONTROL_TARGET_TIME_SMALL_TRANSFER_MS : u64 = 5 * 1000;
15+
ref CONCURRENCY_CONTROL_TARGET_TIME_SMALL_TRANSFER_MS : u64 = 10 * 1000;
1616

1717
/// The target time for a large transfer to complete. Default is 20 seconds.
1818
ref CONCURRENCY_CONTROL_TARGET_TIME_LARGE_TRANSFER_MS : u64 = 20 * 1000;
@@ -27,7 +27,13 @@ utils::configurable_constants! {
2727
ref CONCURRENCY_CONTROL_MIN_DECREASE_WINDOW_MS : u64 = 250;
2828

2929
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
30-
ref CONCURRENCY_CONTROL_TRACKING_SIZE : usize = 32;
30+
ref CONCURRENCY_CONTROL_TRACKING_SIZE : usize = 20;
31+
32+
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
33+
ref CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_LOWER: f64 = 0.7;
34+
35+
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
36+
ref CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_UPPER: f64 = 0.9;
3137

3238
/// The maximum time window within which to examine successes and failures when adjusting the concurrancy.
3339
ref CONCURRENCY_CONTROL_TRACKING_WINDOW_MS : u64 = 30 * 1000;

cas_client/src/remote_client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ impl Client for RemoteClient {
618618

619619
let n_raw_bytes = serialized_cas_object.raw_num_bytes;
620620
let xorb_hash = serialized_cas_object.hash;
621+
let n_transfer_bytes = serialized_cas_object.serialized_data.len() as u64;
621622

622623
let progress_callback = move |bytes_sent: u64| {
623624
if let Some(utr) = upload_tracker.as_ref() {
@@ -641,7 +642,7 @@ impl Client for RemoteClient {
641642
let api_tag = "cas::upload_xorb";
642643

643644
let response: UploadXorbResponse = RetryWrapper::new(api_tag)
644-
.with_connection_permit(upload_permit)
645+
.with_connection_permit(upload_permit, Some(n_transfer_bytes))
645646
.run_and_extract_json(move || {
646647
let upload_stream = upload_stream.clone_with_reset();
647648
let url = url.clone();
@@ -806,7 +807,7 @@ impl Client for RemoteClient {
806807
let url = Url::parse(&format!("{}/shard/{key}", self.endpoint))?;
807808

808809
let response: UploadShardResponse = RetryWrapper::new(api_tag)
809-
.with_connection_permit(upload_permit)
810+
.with_connection_permit(upload_permit, Some(shard_data.len() as u64))
810811
.run_and_extract_json(move || {
811812
client
812813
.request(method.clone(), url.clone())

cas_client/src/retry_wrapper.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub struct RetryWrapper {
2626
no_retry_on_429: bool,
2727
log_errors_as_info: bool,
2828
api_tag: &'static str,
29-
connection_permit: Option<Mutex<Option<ConnectionPermit>>>,
29+
connection_permit: Option<Mutex<Option<(ConnectionPermit, Option<u64>)>>>,
3030
}
3131

3232
impl RetryWrapper {
@@ -61,8 +61,8 @@ impl RetryWrapper {
6161
self
6262
}
6363

64-
pub fn with_connection_permit(mut self, permit: ConnectionPermit) -> Self {
65-
self.connection_permit = Some(Mutex::new(Some(permit)));
64+
pub fn with_connection_permit(mut self, permit: ConnectionPermit, n_bytes: Option<u64>) -> Self {
65+
self.connection_permit = Some(Mutex::new(Some((permit, n_bytes))));
6666
self
6767
}
6868

@@ -206,7 +206,7 @@ impl RetryWrapper {
206206

207207
if let Some(p) = &self_.connection_permit {
208208
if let Some(p) = p.lock().await.as_mut() {
209-
p.transfer_starting()
209+
p.0.transfer_starting()
210210
}
211211
}
212212

@@ -220,7 +220,7 @@ impl RetryWrapper {
220220
Ok(resp) => self_.process_ok_response(try_idx, resp),
221221
};
222222

223-
let (n_bytes, processing_result) = match checked_result {
223+
let (reply_bytes, processing_result) = match checked_result {
224224
Ok(ok_response) => (ok_response.content_length().unwrap_or(0), process_fn(ok_response).await),
225225
Err(e) => (0, Err(e)),
226226
};
@@ -231,17 +231,17 @@ impl RetryWrapper {
231231

232232
match &processing_result {
233233
Ok(_) => {
234-
if let Some(permit) = maybe_permit.take() {
235-
permit.report_completion(n_bytes, true).await;
234+
if let Some((permit, maybe_size)) = maybe_permit.take() {
235+
permit.report_completion(maybe_size.unwrap_or(reply_bytes), true).await;
236236
}
237237
},
238238
Err(RetryableReqwestError::FatalError(_)) => {
239-
if let Some(permit) = maybe_permit.take() {
239+
if let Some((permit, _)) = maybe_permit.take() {
240240
permit.report_completion(0, false).await;
241241
}
242242
},
243243
Err(RetryableReqwestError::RetryableError(_)) => {
244-
if let Some(permit) = maybe_permit.as_ref() {
244+
if let Some((permit, _)) = maybe_permit.as_ref() {
245245
permit.report_retryable_failure().await;
246246
}
247247
},

0 commit comments

Comments
 (0)