Skip to content

Commit daedbb3

Browse files
committed
Update.
1 parent edb828f commit daedbb3

File tree

11 files changed

+483
-167
lines changed

11 files changed

+483
-167
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cas_client/src/adaptive_concurrency_control.rs

Lines changed: 119 additions & 152 deletions
Large diffs are not rendered by default.

cas_client/src/interface.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use crate::constants::{MAX_CONCURRENT_UPLOADS, MIN_CONCURRENT_UPLOADS, NUM_INITI
1414
use crate::error::Result;
1515
#[cfg(not(target_family = "wasm"))]
1616
use crate::OutputProvider;
17-
use crate::OutputProvider;
1817

1918
// The upload concurrency controller
2019
lazy_static::lazy_static! {
@@ -66,28 +65,56 @@ pub trait Client {
6665
) -> Result<Option<Bytes>>;
6766

6867
/// Upload a new shard.
69-
async fn upload_shard(
68+
async fn upload_shard_with_permit(
7069
&self,
7170
prefix: &str,
7271
hash: &MerkleHash,
7372
force_sync: bool,
7473
shard_data: bytes::Bytes,
7574
salt: &[u8; 32],
75+
upload_permit: ConnectionPermit,
7676
) -> Result<bool>;
7777

7878
/// Upload a new xorb.
79-
async fn upload_xorb(
79+
async fn upload_xorb_with_permit(
8080
&self,
8181
prefix: &str,
8282
serialized_cas_object: SerializedCasObject,
8383
upload_tracker: Option<Arc<CompletionTracker>>,
84+
upload_permit: ConnectionPermit,
8485
) -> Result<u64>;
8586

8687
/// Acquire an upload permit.
8788
async fn acquire_upload_permit(&self) -> Result<ConnectionPermit> {
8889
UPLOAD_CONCURRENCY_CONTROLLER.acquire_connection_permit().await
8990
}
9091

92+
/// Upload a new shard, acquiring the permit.
93+
async fn upload_shard(
94+
&self,
95+
prefix: &str,
96+
hash: &MerkleHash,
97+
force_sync: bool,
98+
shard_data: bytes::Bytes,
99+
salt: &[u8; 32],
100+
) -> Result<bool> {
101+
let permit = self.acquire_upload_permit().await?;
102+
self.upload_shard_with_permit(prefix, hash, force_sync, shard_data, salt, permit)
103+
.await
104+
}
105+
106+
/// Upload a new xorb, acquiring the permit.
107+
async fn upload_xorb(
108+
&self,
109+
prefix: &str,
110+
serialized_cas_object: SerializedCasObject,
111+
upload_tracker: Option<Arc<CompletionTracker>>,
112+
) -> Result<u64> {
113+
let permit = self.acquire_upload_permit().await?;
114+
self.upload_xorb_with_permit(prefix, serialized_cas_object, upload_tracker, permit)
115+
.await
116+
}
117+
91118
/// Check if a XORB already exists.
92119
async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool>;
93120

cas_client/src/local_client.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tempfile::TempDir;
2222
use tokio::runtime::Handle;
2323
use tracing::{debug, error, info, warn};
2424

25+
use crate::adaptive_concurrency_control::ConnectionPermit;
2526
use crate::error::{CasClientError, Result};
2627
use crate::output_provider::OutputProvider;
2728
use crate::Client;
@@ -214,11 +215,12 @@ impl LocalClient {
214215
/// LocalClient is responsible for writing/reading Xorbs on local disk.
215216
#[async_trait]
216217
impl Client for LocalClient {
217-
async fn upload_xorb(
218+
async fn upload_xorb_with_permit(
218219
&self,
219220
_prefix: &str,
220221
serialized_cas_object: SerializedCasObject,
221222
upload_tracker: Option<Arc<CompletionTracker>>,
223+
_upload_permit: ConnectionPermit,
222224
) -> Result<u64> {
223225
// moved hash validation into [CasObject::serialize], so removed from here.
224226
let hash = &serialized_cas_object.hash;
@@ -302,13 +304,14 @@ impl Client for LocalClient {
302304
true
303305
}
304306

305-
async fn upload_shard(
307+
async fn upload_shard_with_permit(
306308
&self,
307309
_prefix: &str, // Prefix not used in current implementation
308310
shard_hash: &MerkleHash,
309311
_force_sync: bool,
310312
shard_data: bytes::Bytes,
311313
salt: &[u8; 32],
314+
_upload_permit: ConnectionPermit,
312315
) -> Result<bool> {
313316
// Write out the shard to the shard directory.
314317
let shard = MDBShardFile::write_out_from_reader(&self.shard_dir, &mut Cursor::new(&shard_data))?;

cas_client/src/remote_client.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use utils::auth::AuthConfig;
2828
#[cfg(not(target_family = "wasm"))]
2929
use utils::singleflight::Group;
3030

31+
use crate::adaptive_concurrency_control::ConnectionPermit;
3132
#[cfg(not(target_family = "wasm"))]
3233
use crate::download_utils::*;
3334
use crate::error::{CasClientError, Result};
@@ -596,11 +597,12 @@ impl Client for RemoteClient {
596597
#[instrument(skip_all, name = "RemoteClient::upload_xorb", fields(key = Key{prefix : prefix.to_string(), hash : serialized_cas_object.hash}.to_string(),
597598
xorb.len = serialized_cas_object.serialized_data.len(), xorb.num_chunks = serialized_cas_object.num_chunks
598599
))]
599-
async fn upload_xorb(
600+
async fn upload_xorb_with_permit(
600601
&self,
601602
prefix: &str,
602603
serialized_cas_object: SerializedCasObject,
603604
upload_tracker: Option<Arc<CompletionTracker>>,
605+
upload_permit: ConnectionPermit,
604606
) -> Result<u64> {
605607
let key = Key {
606608
prefix: prefix.to_string(),
@@ -639,6 +641,7 @@ impl Client for RemoteClient {
639641
let api_tag = "cas::upload_xorb";
640642

641643
let response: UploadXorbResponse = RetryWrapper::new(api_tag)
644+
.with_connection_permit(upload_permit)
642645
.run_and_extract_json(move || {
643646
let upload_stream = upload_stream.clone_with_reset();
644647
let url = url.clone();
@@ -773,13 +776,14 @@ impl Client for RemoteClient {
773776

774777
#[instrument(skip_all, name = "RemoteClient::upload_shard", fields(shard.hash = hash.hex(), shard.len = shard_data.len()
775778
))]
776-
async fn upload_shard(
779+
async fn upload_shard_with_permit(
777780
&self,
778781
prefix: &str,
779782
hash: &MerkleHash,
780783
force_sync: bool,
781784
shard_data: bytes::Bytes,
782785
_salt: &[u8; 32],
786+
upload_permit: ConnectionPermit,
783787
) -> Result<bool> {
784788
if self.dry_run {
785789
return Ok(true);
@@ -801,6 +805,7 @@ impl Client for RemoteClient {
801805
let url = Url::parse(&format!("{}/shard/{key}", self.endpoint))?;
802806

803807
let response: UploadShardResponse = RetryWrapper::new(api_tag)
808+
.with_connection_permit(upload_permit)
804809
.run_and_extract_json(move || {
805810
client
806811
.request(method.clone(), url.clone())

cas_client/src/retry_wrapper.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,9 @@ impl RetryWrapper {
205205
let (make_request, process_fn, try_count, self_) = retry_info.as_ref();
206206

207207
if let Some(p) = &self_.connection_permit {
208-
p.lock().await.as_mut().map(|p| p.transfer_starting());
208+
if let Some(p) = p.lock().await.as_mut() {
209+
p.transfer_starting()
210+
}
209211
}
210212

211213
let resp_result = make_request().await;

data/src/file_upload_session.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,9 @@ impl FileUploadSession {
349349
async move {
350350
let n_bytes_transmitted = session
351351
.client
352-
.upload_xorb(&cas_prefix, cas_object, Some(completion_tracker))
352+
.upload_xorb_with_permit(&cas_prefix, cas_object, Some(completion_tracker), upload_permit)
353353
.await?;
354354

355-
drop(upload_permit);
356-
357355
// Register that the xorb has been uploaded.
358356
session.completion_tracker.register_xorb_upload_completion(xorb_hash).await;
359357

data/src/shard_interface.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,10 +281,9 @@ impl SessionShardInterface {
281281
}
282282

283283
// Upload the shard.
284-
client.upload_shard(&shard_prefix, &si.shard_hash, false, data, &salt).await?;
285-
286-
// Done with the upload, drop the permit.
287-
drop(upload_permit);
284+
client
285+
.upload_shard_with_permit(&shard_prefix, &si.shard_hash, false, data, &salt, upload_permit)
286+
.await?;
288287

289288
info!("Shard {shard_prefix}/{:?} upload + sync completed successfully.", &si.shard_hash);
290289

utils/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,8 @@ xet_threadpool = { path = "../xet_threadpool" }
2828
[target.'cfg(target_family = "wasm")'.dependencies]
2929
web-time = { workspace = true }
3030

31+
[dev-dependencies]
32+
rand = { workspace = true }
33+
3134
[features]
3235
strict = []

0 commit comments

Comments
 (0)