Skip to content

Commit b420f15

Browse files
committed
Added comments and corrected typos.
1 parent 3889cd5 commit b420f15

File tree

5 files changed

+17
-9
lines changed

5 files changed

+17
-9
lines changed

cas_client/src/adaptive_concurrency/controller.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl ConcurrencyControllerState {
4444
}
4545
}
4646

47-
/// A controller for dynamically adjusting the amount of concurrancy on upload and download paths.
47+
/// A controller for dynamically adjusting the amount of concurrency on upload and download paths.
4848
///
4949
/// This controller uses two statistical models that adapt over time using exponentially weighted
5050
/// moving averages. The first is a model that predicts the overall current bandwith, and the second is
@@ -99,7 +99,7 @@ impl AdaptiveConcurrencyController {
9999
})
100100
}
101101

102-
/// Acquire a connection permit based on the current concurrancy.
102+
/// Acquire a connection permit based on the current concurrency.
103103
pub async fn acquire_connection_permit(self: &Arc<Self>) -> Result<ConnectionPermit, CasClientError> {
104104
let permit = self.concurrency_semaphore.acquire().await?;
105105

cas_client/src/constants.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ utils::configurable_constants! {
4545
ref MAX_CONCURRENT_UPLOADS: usize = 100;
4646

4747
/// The minimum number of simultaneous xorb and/or shard upload streams that the
48-
/// the adaptive concurrency control may reduce the concurrancy down to on slower connections.
48+
/// the adaptive concurrency control may reduce the concurrency down to on slower connections.
4949
ref MIN_CONCURRENT_UPLOADS: usize = 2;
5050

5151
/// The starting number of concurrent upload streams, which will increase up to MAX_CONCURRENT_UPLOADS

cas_client/src/retry_wrapper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ impl RetryWrapper {
228228
Ok(resp) => self_.process_ok_response(try_idx, resp),
229229
};
230230

231+
// reply_bytes is ignored if the size was specified earlier, as in the case for upload.
231232
let (reply_bytes, processing_result) = match checked_result {
232233
Ok(ok_response) => (ok_response.content_length().unwrap_or(0), process_fn(ok_response).await),
233234
Err(e) => (0, Err(e)),

data/src/file_upload_session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl FileUploadSession {
184184
let file_id = self.completion_tracker.register_new_file(file_name.clone(), file_size).await;
185185

186186
// Now, spawn a task
187-
let ingestion_concurrancy_limiter = file_parallel_limiter.clone();
187+
let ingestion_concurrency_limiter = file_parallel_limiter.clone();
188188
let session = self.clone();
189189

190190
cleaning_tasks.push(tokio::spawn(async move {
@@ -201,7 +201,7 @@ impl FileUploadSession {
201201
"file.defrag_prevented_dedup_chunks" = tracing::field::Empty,
202202
);
203203
// First, get a permit to process this file.
204-
let _processing_permit = ingestion_concurrancy_limiter.acquire().await?;
204+
let _processing_permit = ingestion_concurrency_limiter.acquire().await?;
205205

206206
async move {
207207
let mut reader = File::open(&file_path)?;

utils/src/adjustable_semaphore.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use std::sync::Arc;
55
use tokio::sync::{AcquireError, Semaphore};
66

77
/// An adjustable semaphore in which the total number of permits can be adjusted at any time
8-
/// between a minimum and a maximum bound. Adjustments do not affect any permits currently
9-
/// issued; if an adjustment cannot be permformed immediately, then it is resolved before any
10-
/// new permits are issued.
8+
/// between a minimum and a maximum bound.
9+
///
10+
/// Unlike the tokio Semaphore, decreasing the number of permits may be done at any time and
11+
/// are resolved lazily if needed; any permits currently issued remain valid, but no new permits
12+
/// are issued until any requested decreases are resolved.
1113
pub struct AdjustableSemaphore {
1214
semaphore: Arc<Semaphore>,
1315
total_permits: AtomicUsize,
@@ -17,7 +19,7 @@ pub struct AdjustableSemaphore {
1719
}
1820

1921
/// A permit issued by the AdjustableSemaphore. On drop, this attempts to
20-
/// resolve an enqueued permit decrease if one is needed.
22+
/// resolve any enqueued permit decrease if one is needed.
2123
pub struct AdjustableSemaphorePermit {
2224
permit: Option<tokio::sync::OwnedSemaphorePermit>,
2325
parent: Arc<AdjustableSemaphore>,
@@ -65,6 +67,11 @@ impl AdjustableSemaphore {
6567
// A few debug mode consistency checks.
6668
debug_assert!(self.semaphore.available_permits() <= self.max_permits);
6769

70+
// To ensure that the fairness property of the enclosed tokio semaphore is respected,
71+
// this function must only call this class and not attempt to resolve anything else.
72+
//
73+
// As a result, any decreases are resolved by the Drop trait of this permit, which may
74+
// mean that permit is forgotten to resolve an outstanding decrease.
6875
let permit = self.semaphore.clone().acquire_owned().await?;
6976

7077
Ok(AdjustableSemaphorePermit {

0 commit comments

Comments
 (0)