Skip to content

Commit b7423e4

Browse files
committed
Adaptive Concurrancy Controller.
1 parent 225f4b0 commit b7423e4

File tree

14 files changed

+1011
-52
lines changed

14 files changed

+1011
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cas_client/src/adaptive_concurrency_control.rs

Lines changed: 511 additions & 0 deletions
Large diffs are not rendered by default.

cas_client/src/constants.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,39 @@ utils::configurable_constants! {
1111
/// no more retries are attempted.
1212
ref CLIENT_RETRY_MAX_DURATION_MS: u64 = 6 * 60 * 1000; // 6m
1313

14+
/// The target time for a small transfer to complete.
15+
ref CONCURRENCY_CONTROL_TARGET_TIME_SMALL_TRANSFER_MS : u64 = 5 * 1000;
16+
17+
/// The target time for a large transfer to complete. Default is 20 seconds.
18+
ref CONCURRENCY_CONTROL_TARGET_TIME_LARGE_TRANSFER_MS : u64 = 20 * 1000;
19+
20+
/// The size of a large transfer.
21+
ref CONCURRENCY_CONTROL_LARGE_TRANSFER_NUM_BYTES : u64 = 64_000_000;
22+
23+
/// The minimum time in milliseconds between adjustments when increasing the concurrency.
24+
ref CONCURRENCY_CONTROL_MIN_INCREASE_WINDOW_MS : u64 = 500;
25+
26+
/// The minimum time in milliseconds between adjustments when decreasing the concurrency.
27+
ref CONCURRENCY_CONTROL_MIN_DECREASE_WINDOW_MS : u64 = 250;
28+
29+
/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
30+
ref CONCURRENCY_CONTROL_TRACKING_SIZE : usize = 32;
31+
32+
/// The maximum time window within which to examine successes and failures when adjusting the concurrancy.
33+
ref CONCURRENCY_CONTROL_TRACKING_WINDOW_MS : u64 = 30 * 1000;
34+
35+
/// Log the concurrency on this interval.
36+
ref CONCURRENCY_CONTROL_LOGGING_INTERVAL_MS: u64 = 10 * 1000;
37+
38+
/// The maximum number of simultaneous xorb and/or shard upload streams permitted by
39+
/// the adaptive concurrency control. Can be overwritten by environment variable "HF_XET_MAX_CONCURRENT_UPLOADS".
40+
ref MAX_CONCURRENT_UPLOADS: usize = 100;
41+
42+
/// The minimum number of simultaneous xorb and/or shard upload streams that the
43+
/// the adaptive concurrency control may reduce the concurrancy down to on slower connections.
44+
ref MIN_CONCURRENT_UPLOADS: usize = 2;
45+
46+
/// The starting number of concurrent upload streams, which will increase up to MAX_CONCURRENT_UPLOADS
47+
/// on successful completions.
48+
ref NUM_INITIAL_CONCURRENT_UPLOADS: usize = 16;
1449
}

cas_client/src/interface.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,18 @@ use merklehash::MerkleHash;
99
use progress_tracking::item_tracking::SingleItemProgressUpdater;
1010
use progress_tracking::upload_tracking::CompletionTracker;
1111

12+
use crate::adaptive_concurrency_control::{AdaptiveConcurrencyController, ConnectionPermit};
13+
use crate::constants::{MAX_CONCURRENT_UPLOADS, MIN_CONCURRENT_UPLOADS, NUM_INITIAL_CONCURRENT_UPLOADS};
1214
use crate::error::Result;
1315
#[cfg(not(target_family = "wasm"))]
1416
use crate::OutputProvider;
1517

18+
// The upload concurrency controller
19+
lazy_static::lazy_static! {
20+
static ref UPLOAD_CONCURRENCY_CONTROLLER : Arc<AdaptiveConcurrencyController>
21+
= AdaptiveConcurrencyController::new("uploads", *NUM_INITIAL_CONCURRENT_UPLOADS, (*MIN_CONCURRENT_UPLOADS, *MAX_CONCURRENT_UPLOADS));
22+
}
23+
1624
/// A Client to the Shard service. The shard service
1725
/// provides for
1826
/// 1. upload shard to the shard service
@@ -57,23 +65,56 @@ pub trait Client {
5765
) -> Result<Option<Bytes>>;
5866

5967
/// Upload a new shard.
60-
async fn upload_shard(
68+
async fn upload_shard_with_permit(
6169
&self,
6270
prefix: &str,
6371
hash: &MerkleHash,
6472
force_sync: bool,
6573
shard_data: bytes::Bytes,
6674
salt: &[u8; 32],
75+
upload_permit: ConnectionPermit,
6776
) -> Result<bool>;
6877

6978
/// Upload a new xorb.
70-
async fn upload_xorb(
79+
async fn upload_xorb_with_permit(
7180
&self,
7281
prefix: &str,
7382
serialized_cas_object: SerializedCasObject,
7483
upload_tracker: Option<Arc<CompletionTracker>>,
84+
upload_permit: ConnectionPermit,
7585
) -> Result<u64>;
7686

87+
/// Acquire an upload permit.
88+
async fn acquire_upload_permit(&self) -> Result<ConnectionPermit> {
89+
UPLOAD_CONCURRENCY_CONTROLLER.acquire_connection_permit().await
90+
}
91+
92+
/// Upload a new shard, acquiring the permit.
93+
async fn upload_shard(
94+
&self,
95+
prefix: &str,
96+
hash: &MerkleHash,
97+
force_sync: bool,
98+
shard_data: bytes::Bytes,
99+
salt: &[u8; 32],
100+
) -> Result<bool> {
101+
let permit = self.acquire_upload_permit().await?;
102+
self.upload_shard_with_permit(prefix, hash, force_sync, shard_data, salt, permit)
103+
.await
104+
}
105+
106+
/// Upload a new xorb, acquiring the permit.
107+
async fn upload_xorb(
108+
&self,
109+
prefix: &str,
110+
serialized_cas_object: SerializedCasObject,
111+
upload_tracker: Option<Arc<CompletionTracker>>,
112+
) -> Result<u64> {
113+
let permit = self.acquire_upload_permit().await?;
114+
self.upload_xorb_with_permit(prefix, serialized_cas_object, upload_tracker, permit)
115+
.await
116+
}
117+
77118
/// Check if a XORB already exists.
78119
async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool>;
79120

cas_client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,5 @@ mod output_provider;
2525
pub mod remote_client;
2626
mod retry_wrapper;
2727
mod upload_progress_stream;
28+
29+
mod adaptive_concurrency_control;

cas_client/src/local_client.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tempfile::TempDir;
2222
use tokio::runtime::Handle;
2323
use tracing::{debug, error, info, warn};
2424

25+
use crate::adaptive_concurrency_control::ConnectionPermit;
2526
use crate::error::{CasClientError, Result};
2627
use crate::output_provider::OutputProvider;
2728
use crate::Client;
@@ -214,11 +215,12 @@ impl LocalClient {
214215
/// LocalClient is responsible for writing/reading Xorbs on local disk.
215216
#[async_trait]
216217
impl Client for LocalClient {
217-
async fn upload_xorb(
218+
async fn upload_xorb_with_permit(
218219
&self,
219220
_prefix: &str,
220221
serialized_cas_object: SerializedCasObject,
221222
upload_tracker: Option<Arc<CompletionTracker>>,
223+
_upload_permit: ConnectionPermit,
222224
) -> Result<u64> {
223225
// moved hash validation into [CasObject::serialize], so removed from here.
224226
let hash = &serialized_cas_object.hash;
@@ -302,13 +304,14 @@ impl Client for LocalClient {
302304
true
303305
}
304306

305-
async fn upload_shard(
307+
async fn upload_shard_with_permit(
306308
&self,
307309
_prefix: &str, // Prefix not used in current implementation
308310
shard_hash: &MerkleHash,
309311
_force_sync: bool,
310312
shard_data: bytes::Bytes,
311313
salt: &[u8; 32],
314+
_upload_permit: ConnectionPermit,
312315
) -> Result<bool> {
313316
// Write out the shard to the shard directory.
314317
let shard = MDBShardFile::write_out_from_reader(&self.shard_dir, &mut Cursor::new(&shard_data))?;

cas_client/src/remote_client.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use utils::auth::AuthConfig;
2828
#[cfg(not(target_family = "wasm"))]
2929
use utils::singleflight::Group;
3030

31+
use crate::adaptive_concurrency_control::ConnectionPermit;
3132
#[cfg(not(target_family = "wasm"))]
3233
use crate::download_utils::*;
3334
use crate::error::{CasClientError, Result};
@@ -596,11 +597,12 @@ impl Client for RemoteClient {
596597
#[instrument(skip_all, name = "RemoteClient::upload_xorb", fields(key = Key{prefix : prefix.to_string(), hash : serialized_cas_object.hash}.to_string(),
597598
xorb.len = serialized_cas_object.serialized_data.len(), xorb.num_chunks = serialized_cas_object.num_chunks
598599
))]
599-
async fn upload_xorb(
600+
async fn upload_xorb_with_permit(
600601
&self,
601602
prefix: &str,
602603
serialized_cas_object: SerializedCasObject,
603604
upload_tracker: Option<Arc<CompletionTracker>>,
605+
upload_permit: ConnectionPermit,
604606
) -> Result<u64> {
605607
let key = Key {
606608
prefix: prefix.to_string(),
@@ -639,6 +641,7 @@ impl Client for RemoteClient {
639641
let api_tag = "cas::upload_xorb";
640642

641643
let response: UploadXorbResponse = RetryWrapper::new(api_tag)
644+
.with_connection_permit(upload_permit)
642645
.run_and_extract_json(move || {
643646
let upload_stream = upload_stream.clone_with_reset();
644647
let url = url.clone();
@@ -773,13 +776,14 @@ impl Client for RemoteClient {
773776

774777
#[instrument(skip_all, name = "RemoteClient::upload_shard", fields(shard.hash = hash.hex(), shard.len = shard_data.len()
775778
))]
776-
async fn upload_shard(
779+
async fn upload_shard_with_permit(
777780
&self,
778781
prefix: &str,
779782
hash: &MerkleHash,
780783
force_sync: bool,
781784
shard_data: bytes::Bytes,
782785
_salt: &[u8; 32],
786+
upload_permit: ConnectionPermit,
783787
) -> Result<bool> {
784788
if self.dry_run {
785789
return Ok(true);
@@ -801,6 +805,7 @@ impl Client for RemoteClient {
801805
let url = Url::parse(&format!("{}/shard/{key}", self.endpoint))?;
802806

803807
let response: UploadShardResponse = RetryWrapper::new(api_tag)
808+
.with_connection_permit(upload_permit)
804809
.run_and_extract_json(move || {
805810
client
806811
.request(method.clone(), url.clone())

cas_client/src/retry_wrapper.rs

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use std::sync::atomic::{AtomicUsize, Ordering};
22
use std::sync::Arc;
33

4+
use bytes::Bytes;
45
use reqwest::{Response, StatusCode};
56
use reqwest_retry::{default_on_request_failure, default_on_request_success, Retryable};
7+
use tokio::sync::Mutex;
68
use tokio_retry::strategy::{jitter, ExponentialBackoff};
79
use tokio_retry::RetryIf;
810
use tracing::{error, info};
911

12+
use crate::adaptive_concurrency_control::ConnectionPermit;
1013
use crate::constants::{CLIENT_RETRY_BASE_DELAY_MS, CLIENT_RETRY_MAX_ATTEMPTS};
1114
use crate::error::CasClientError;
1215
use crate::http_client::request_id_from_response;
@@ -23,6 +26,7 @@ pub struct RetryWrapper {
2326
no_retry_on_429: bool,
2427
log_errors_as_info: bool,
2528
api_tag: &'static str,
29+
connection_permit: Option<Mutex<Option<ConnectionPermit>>>,
2630
}
2731

2832
impl RetryWrapper {
@@ -33,6 +37,7 @@ impl RetryWrapper {
3337
no_retry_on_429: false,
3438
log_errors_as_info: false,
3539
api_tag,
40+
connection_permit: None,
3641
}
3742
}
3843

@@ -56,6 +61,11 @@ impl RetryWrapper {
5661
self
5762
}
5863

64+
pub fn with_connection_permit(mut self, permit: ConnectionPermit) -> Self {
65+
self.connection_permit = Some(Mutex::new(Some(permit)));
66+
self
67+
}
68+
5969
fn process_error_response(&self, try_idx: usize, err: reqwest_middleware::Error) -> RetryableReqwestError {
6070
let api = &self.api_tag;
6171

@@ -194,6 +204,12 @@ impl RetryWrapper {
194204
async move {
195205
let (make_request, process_fn, try_count, self_) = retry_info.as_ref();
196206

207+
if let Some(p) = &self_.connection_permit {
208+
if let Some(p) = p.lock().await.as_mut() {
209+
p.transfer_starting()
210+
}
211+
}
212+
197213
let resp_result = make_request().await;
198214
let try_idx = try_count.fetch_add(1, Ordering::Relaxed);
199215

@@ -204,10 +220,35 @@ impl RetryWrapper {
204220
Ok(resp) => self_.process_ok_response(try_idx, resp),
205221
};
206222

207-
match checked_result {
208-
Ok(ok_response) => process_fn(ok_response).await,
209-
Err(e) => Err(e),
223+
let (n_bytes, processing_result) = match checked_result {
224+
Ok(ok_response) => (ok_response.content_length().unwrap_or(0), process_fn(ok_response).await),
225+
Err(e) => (0, Err(e)),
226+
};
227+
228+
// Now, possibly adjust the connection permit.
229+
if let Some(permit_holder) = &self_.connection_permit {
230+
let mut maybe_permit = permit_holder.lock().await;
231+
232+
match &processing_result {
233+
Ok(_) => {
234+
if let Some(permit) = maybe_permit.take() {
235+
permit.report_completion(n_bytes, true).await;
236+
}
237+
},
238+
Err(RetryableReqwestError::FatalError(_)) => {
239+
if let Some(permit) = maybe_permit.take() {
240+
permit.report_completion(0, false).await;
241+
}
242+
},
243+
Err(RetryableReqwestError::RetryableError(_)) => {
244+
if let Some(permit) = maybe_permit.as_ref() {
245+
permit.report_retryable_failure().await;
246+
}
247+
},
248+
}
210249
}
250+
251+
processing_result
211252
}
212253
},
213254
|err: &RetryableReqwestError| matches!(err, RetryableReqwestError::RetryableError(_)),
@@ -278,6 +319,47 @@ impl RetryWrapper {
278319
.await
279320
}
280321

322+
/// Run a connection and process the result as bytes, retrying on transient errors or on issues not getting the
323+
/// full object.
324+
///
325+
/// The `make_request` function returns a future that resolves to a Result<Response> object as is returned by the
326+
/// client middleware. For example, `|| client.clone().get(url).send()` returns a future (as `send()` is async)
327+
/// that will then be evaluatated to get the response.
328+
///
329+
/// This functions acts just like the json() function on a client response, but retries the entire connection on
330+
/// transient errors.
331+
pub async fn run_and_extract_bytes<ReqFut, ReqFn>(self, make_request: ReqFn) -> Result<Bytes, CasClientError>
332+
where
333+
ReqFn: Fn() -> ReqFut + Send + 'static,
334+
ReqFut: std::future::Future<Output = Result<Response, reqwest_middleware::Error>> + 'static,
335+
{
336+
self.run_and_process(make_request, |resp: Response| {
337+
async move {
338+
// Extract the bytes from the final result.
339+
let r: Result<Bytes, reqwest::Error> = resp.bytes().await;
340+
341+
match r {
342+
Ok(v) => Ok(v),
343+
Err(e) => {
344+
#[cfg(not(target_arch = "wasm32"))]
345+
let is_connect = e.is_connect();
346+
#[cfg(target_arch = "wasm32")]
347+
let is_connect = false;
348+
349+
if is_connect || e.is_decode() || e.is_body() || e.is_timeout() {
350+
// We got an incomplete or corrupted response from the server, possibly due to a dropped
351+
// connection. Presumably this error is transient.
352+
Err(RetryableReqwestError::RetryableError(e.into()))
353+
} else {
354+
Err(RetryableReqwestError::FatalError(e.into()))
355+
}
356+
},
357+
}
358+
}
359+
})
360+
.await
361+
}
362+
281363
/// Run a connection and process the result object, retrying on transient errors.
282364
///
283365
/// The `make_request` function returns a future that resolves to a Result<Response> object as is returned by the

data/src/constants.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@ utils::configurable_constants! {
1313
/// set to 3 weeks.
1414
ref MDB_SHARD_LOCAL_CACHE_EXPIRATION_SECS: u64 = 3 * 7 * 24 * 3600;
1515

16-
/// The maximum number of simultaneous xorb upload streams.
17-
/// can be overwritten by environment variable "HF_XET_MAX_CONCURRENT_UPLOADS".
18-
/// The default value changes from 8 to 100 when "High Performance Mode" is enabled
19-
ref MAX_CONCURRENT_UPLOADS: usize = GlobalConfigMode::HighPerformanceOption {
20-
standard: 8,
21-
high_performance: 100,
22-
};
23-
2416
/// The maximum number of files to ingest at once on the upload path
2517
ref MAX_CONCURRENT_FILE_INGESTION: usize = GlobalConfigMode::HighPerformanceOption {
2618
standard: 8,

0 commit comments

Comments
 (0)