Skip to content

Commit ddca430

Browse files
committed
Add CAS speed check
1 parent 07e57d9 commit ddca430

File tree

4 files changed

+129
-0
lines changed

4 files changed

+129
-0
lines changed

BUILD.bazel

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,25 @@ rust_binary(
5959
],
6060
)
6161

62+
rust_binary(
63+
name = "cas_speed_check",
64+
srcs = [
65+
"src/bin/cas_speed_check.rs",
66+
],
67+
deps = [
68+
"//nativelink-error",
69+
"//nativelink-proto",
70+
"//nativelink-util",
71+
"@crates//:clap",
72+
"@crates//:hex",
73+
"@crates//:rand",
74+
"@crates//:sha2",
75+
"@crates//:tokio",
76+
"@crates//:tonic",
77+
"@crates//:tracing",
78+
],
79+
)
80+
6281
filegroup(
6382
name = "docs",
6483
srcs = [

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ nix = ["nativelink-worker/nix"]
3232
[dependencies]
3333
nativelink-config = { path = "nativelink-config" }
3434
nativelink-error = { path = "nativelink-error" }
35+
nativelink-proto = { path = "nativelink-proto" }
3536
nativelink-scheduler = { path = "nativelink-scheduler" }
3637
nativelink-service = { path = "nativelink-service" }
3738
nativelink-store = { path = "nativelink-store" }
@@ -51,6 +52,7 @@ clap = { version = "4.5.35", features = [
5152
"usage",
5253
], default-features = false }
5354
futures = { version = "0.3.31", default-features = false }
55+
hex = { version = "0.4.3", default-features = false }
5456
hyper = { version = "1.6.0", default-features = false }
5557
hyper-util = { version = "0.1.11", default-features = false, features = [
5658
"tracing",
@@ -62,6 +64,7 @@ rand = { version = "0.9.0", default-features = false, features = [
6264
rustls-pki-types = { version = "1.13.1", features = [
6365
"std",
6466
], default-features = false }
67+
sha2 = { version = "0.10.8", default-features = false }
6568
tokio = { version = "1.44.1", features = [
6669
"fs",
6770
"io-util",

src/bin/cas_speed_check.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
4+
use clap::Parser;
5+
use nativelink_error::{Error, ResultExt};
6+
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
7+
use nativelink_proto::build::bazel::remote::execution::v2::{
8+
Digest, FindMissingBlobsRequest, digest_function,
9+
};
10+
use nativelink_util::spawn;
11+
use nativelink_util::telemetry::init_tracing;
12+
use nativelink_util::tls_utils::endpoint_from;
13+
use rand::{Rng, RngCore};
14+
use sha2::{Digest as _, Sha256};
15+
use tokio::sync::Mutex;
16+
use tokio::time::Instant;
17+
use tonic::Request;
18+
use tonic::transport::ClientTlsConfig;
19+
20+
#[derive(Parser, Debug)]
21+
#[command(version, about)]
22+
struct Args {
23+
#[arg(short, long)]
24+
endpoint: String,
25+
26+
#[arg(short, long)]
27+
nativelink_key: Option<String>,
28+
}
29+
30+
fn main() -> Result<(), Box<dyn core::error::Error>> {
31+
let args = Args::parse();
32+
#[expect(
33+
clippy::disallowed_methods,
34+
reason = "It's the top-level, so we need the function"
35+
)]
36+
tokio::runtime::Builder::new_multi_thread()
37+
.enable_all()
38+
.build()
39+
.unwrap()
40+
.block_on(async {
41+
init_tracing()?;
42+
let timings = Arc::new(Mutex::new(Vec::new()));
43+
let spawns: Vec<_> = (0..200)
44+
.map(|_| {
45+
let local_timings = timings.clone();
46+
let local_endpoint = args.endpoint.clone();
47+
let local_api_key = args.nativelink_key.clone();
48+
spawn!("CAS requester", async move {
49+
let tls_config = ClientTlsConfig::new().with_enabled_roots();
50+
let endpoint = endpoint_from(&local_endpoint, Some(tls_config))?;
51+
let channel = endpoint.connect().await.unwrap();
52+
53+
let mut client = ContentAddressableStorageClient::new(channel);
54+
55+
for _ in 0..100 {
56+
let raw_data: String = rand::rng()
57+
.sample_iter::<char, _>(rand::distr::StandardUniform)
58+
.take(300)
59+
.collect();
60+
let hashed = Sha256::digest(raw_data.as_bytes());
61+
let rand_hash = hex::encode(hashed);
62+
let digest = Digest {
63+
hash: rand_hash,
64+
size_bytes: i64::from(rand::rng().next_u32()),
65+
};
66+
67+
let mut request = Request::new(FindMissingBlobsRequest {
68+
instance_name: "".into(),
69+
blob_digests: vec![digest.clone()],
70+
digest_function: digest_function::Value::Sha256.into(),
71+
});
72+
if let Some(ref api_key) = local_api_key {
73+
request
74+
.metadata_mut()
75+
.insert("x-nativelink-api-key", api_key.parse().unwrap());
76+
}
77+
let start = Instant::now();
78+
client
79+
.find_missing_blobs(request)
80+
.await
81+
.err_tip(|| "in find_missing_blobs")?
82+
.into_inner();
83+
let duration = Instant::now().checked_duration_since(start).unwrap();
84+
85+
// info!("response duration={duration:?} res={:?}", res);
86+
local_timings.lock().await.push(duration);
87+
}
88+
Ok::<(), Error>(())
89+
})
90+
})
91+
.collect();
92+
for thread in spawns {
93+
let res = thread.await;
94+
res.err_tip(|| "with spawn")??;
95+
}
96+
let avg = Duration::from_secs_f64({
97+
let locked = timings.lock().await;
98+
locked.iter().map(|d| d.as_secs_f64()).sum::<f64>() / locked.len() as f64
99+
});
100+
println!("avg: {avg:?}");
101+
Ok::<(), Error>(())
102+
})?;
103+
Ok(())
104+
}

0 commit comments

Comments
 (0)