Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ rust_binary(
],
)

rust_binary(
name = "cas_speed_check",
srcs = [
"src/bin/cas_speed_check.rs",
],
deps = [
"//nativelink-error",
"//nativelink-proto",
"//nativelink-util",
"@crates//:clap",
"@crates//:hex",
"@crates//:rand",
"@crates//:sha2",
"@crates//:tokio",
"@crates//:tonic",
"@crates//:tracing",
],
)

filegroup(
name = "docs",
srcs = [
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ nix = ["nativelink-worker/nix"]
[dependencies]
nativelink-config = { path = "nativelink-config" }
nativelink-error = { path = "nativelink-error" }
nativelink-proto = { path = "nativelink-proto" }
nativelink-scheduler = { path = "nativelink-scheduler" }
nativelink-service = { path = "nativelink-service" }
nativelink-store = { path = "nativelink-store" }
Expand All @@ -51,6 +52,7 @@ clap = { version = "4.5.35", features = [
"usage",
], default-features = false }
futures = { version = "0.3.31", default-features = false }
hex = { version = "0.4.3", default-features = false }
hyper = { version = "1.6.0", default-features = false }
hyper-util = { version = "0.1.11", default-features = false, features = [
"tracing",
Expand All @@ -62,6 +64,7 @@ rand = { version = "0.9.0", default-features = false, features = [
rustls-pki-types = { version = "1.13.1", features = [
"std",
], default-features = false }
sha2 = { version = "0.10.8", default-features = false }
tokio = { version = "1.44.1", features = [
"fs",
"io-util",
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl GrpcScheduler {
// Not in the cache, lookup the capabilities with the upstream.
let channel = self
.connection_manager
.connection()
.connection("get_known_properties".into())
.await
.err_tip(|| "in get_platform_property_manager()")?;
let capabilities_result = CapabilitiesClient::new(channel)
Expand Down Expand Up @@ -274,7 +274,7 @@ impl GrpcScheduler {
.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("add_action: {:?}", request.action_digest))
.await
.err_tip(|| "in add_action()")?;
ExecutionClient::new(channel)
Expand Down Expand Up @@ -309,7 +309,7 @@ impl GrpcScheduler {
.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("filter_operations: {}", request.name))
.await
.err_tip(|| "in find_by_client_operation_id()")?;
ExecutionClient::new(channel)
Expand Down
2 changes: 1 addition & 1 deletion nativelink-service/src/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl ActionCache for AcServer {

#[instrument(
err,
ret(level = Level::INFO),
ret(level = Level::TRACE),
level = Level::ERROR,
skip_all,
fields(request = ?grpc_request.get_ref())
Expand Down
68 changes: 35 additions & 33 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("find_missing_blobs: {:?}", request.blob_digests))
.await
.err_tip(|| "in find_missing_blobs")?;
ContentAddressableStorageClient::new(channel)
Expand All @@ -174,7 +174,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection("batch_update_blobs".into())
.await
.err_tip(|| "in batch_update_blobs")?;
ContentAddressableStorageClient::new(channel)
Expand All @@ -199,7 +199,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection("batch_read_blobs".into())
.await
.err_tip(|| "in batch_read_blobs")?;
ContentAddressableStorageClient::new(channel)
Expand All @@ -224,7 +224,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("get_tree: {:?}", request.root_digest))
.await
.err_tip(|| "in get_tree")?;
ContentAddressableStorageClient::new(channel)
Expand All @@ -251,7 +251,7 @@ impl GrpcStore {
) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
let channel = self
.connection_manager
.connection()
.connection(format!("read_internal: {}", request.resource_name))
.await
.err_tip(|| "in read_internal")?;
let mut response = ByteStreamClient::new(channel)
Expand Down Expand Up @@ -329,34 +329,36 @@ impl GrpcStore {
"GrpcStore::write: requesting connection from pool",
);
let conn_start = std::time::Instant::now();
let rpc_fut = self.connection_manager.connection().and_then(|channel| {
let conn_elapsed = conn_start.elapsed();
let instance_for_rpc = instance_name.clone();
let conn_elapsed_ms =
u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
trace!(
instance_name = %instance_for_rpc,
conn_elapsed_ms,
"GrpcStore::write: got connection, starting ByteStream.Write RPC",
);
let rpc_start = std::time::Instant::now();
let local_state_for_rpc = local_state.clone();
async move {
let res = ByteStreamClient::new(channel)
.write(WriteStateWrapper::new(local_state_for_rpc))
.await
.err_tip(|| "in GrpcStore::write");
let rpc_elapsed_ms =
u64::try_from(rpc_start.elapsed().as_millis()).unwrap_or(u64::MAX);
let rpc_fut = self.connection_manager.connection("write".into()).and_then(
|channel| {
let conn_elapsed = conn_start.elapsed();
let instance_for_rpc = instance_name.clone();
let conn_elapsed_ms =
u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
trace!(
instance_name = %instance_for_rpc,
rpc_elapsed_ms,
success = res.is_ok(),
"GrpcStore::write: ByteStream.Write RPC returned",
conn_elapsed_ms,
"GrpcStore::write: got connection, starting ByteStream.Write RPC",
);
res
}
});
let rpc_start = std::time::Instant::now();
let local_state_for_rpc = local_state.clone();
async move {
let res = ByteStreamClient::new(channel)
.write(WriteStateWrapper::new(local_state_for_rpc))
.await
.err_tip(|| "in GrpcStore::write");
let rpc_elapsed_ms = u64::try_from(rpc_start.elapsed().as_millis())
.unwrap_or(u64::MAX);
trace!(
instance_name = %instance_for_rpc,
rpc_elapsed_ms,
success = res.is_ok(),
"GrpcStore::write: ByteStream.Write RPC returned",
);
res
}
},
);

let result = if rpc_timeout > Duration::ZERO {
match tokio::time::timeout(rpc_timeout, rpc_fut).await {
Expand Down Expand Up @@ -448,7 +450,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("query_write_status: {}", request.resource_name))
.await
.err_tip(|| "in query_write_status")?;
ByteStreamClient::new(channel)
Expand All @@ -468,7 +470,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("get_action_result: {:?}", request.action_digest))
.await
.err_tip(|| "in get_action_result")?;
ActionCacheClient::new(channel)
Expand All @@ -488,7 +490,7 @@ impl GrpcStore {
self.perform_request(request, |request| async move {
let channel = self
.connection_manager
.connection()
.connection(format!("update_action_result: {:?}", request.action_digest))
.await
.err_tip(|| "in update_action_result")?;
ActionCacheClient::new(channel)
Expand Down
27 changes: 15 additions & 12 deletions nativelink-util/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::retry::{self, Retrier, RetryResult};
#[derive(Debug)]
pub struct ConnectionManager {
// The channel to request connections from the worker.
worker_tx: mpsc::Sender<oneshot::Sender<Connection>>,
worker_tx: mpsc::Sender<(String, oneshot::Sender<Connection>)>,
}

/// The index into `ConnectionManagerWorker::endpoints`.
Expand Down Expand Up @@ -101,8 +101,8 @@ struct ConnectionManagerWorker {
connecting_channels: FuturesUnordered<Pin<Box<dyn Future<Output = IndexedChannel> + Send>>>,
/// Connected channels that are available for use.
available_channels: VecDeque<EstablishedChannel>,
/// Requests for a Channel when available.
waiting_connections: VecDeque<oneshot::Sender<Connection>>,
/// Requests for a Channel when available - (reason, request)
waiting_connections: VecDeque<(String, oneshot::Sender<Connection>)>,
/// The retry configuration for connecting to an Endpoint, on failure will
/// restart the retrier after a 1 second delay.
retrier: Retrier,
Expand Down Expand Up @@ -136,7 +136,7 @@ impl ConnectionManager {
.collect();

if max_concurrent_requests == 0 {
max_concurrent_requests = usize::MAX;
max_concurrent_requests = 100;
}
if connections_per_endpoint == 0 {
connections_per_endpoint = 1;
Expand Down Expand Up @@ -165,10 +165,10 @@ impl ConnectionManager {
/// Get a Connection that can be used as a `tonic::Channel`, except it
/// performs some additional counting to reconnect on error and restrict
/// the number of concurrent connections.
pub async fn connection(&self) -> Result<Connection, Error> {
pub async fn connection(&self, reason: String) -> Result<Connection, Error> {
let (tx, rx) = oneshot::channel();
self.worker_tx
.send(tx)
.send((reason, tx))
.await
.map_err(|err| make_err!(Code::Unavailable, "Requesting a new connection: {err:?}"))?;
rx.await
Expand All @@ -180,7 +180,7 @@ impl ConnectionManagerWorker {
async fn service_requests(
mut self,
connections_per_endpoint: usize,
mut worker_rx: mpsc::Receiver<oneshot::Sender<Connection>>,
mut worker_rx: mpsc::Receiver<(String, oneshot::Sender<Connection>)>,
mut connection_rx: mpsc::UnboundedReceiver<ConnectionRequest>,
) {
// Make the initial set of connections, connection failures will be
Expand All @@ -199,12 +199,12 @@ impl ConnectionManagerWorker {
loop {
tokio::select! {
request = worker_rx.recv() => {
let Some(request) = request else {
let Some((reason, request)) = request else {
// The ConnectionManager was dropped, shut down the
// worker.
break;
};
self.handle_worker(request);
self.handle_worker(reason, request);
}
maybe_request = connection_rx.recv() => {
if let Some(request) = maybe_request {
Expand Down Expand Up @@ -308,20 +308,22 @@ impl ConnectionManagerWorker {
}

// This must never be made async otherwise the select may cancel it.
fn handle_worker(&mut self, tx: oneshot::Sender<Connection>) {
fn handle_worker(&mut self, reason: String, tx: oneshot::Sender<Connection>) {
if let Some(channel) = (self.available_connections > 0)
.then_some(())
.and_then(|()| self.available_channels.pop_front())
{
debug!(reason, "ConnectionManager: request running");
self.provide_channel(channel, tx);
} else {
debug!(
available_connections = self.available_connections,
available_channels = self.available_channels.len(),
waiting_connections = self.waiting_connections.len(),
reason,
"ConnectionManager: no connection available, request queued",
);
self.waiting_connections.push_back(tx);
self.waiting_connections.push_back((reason, tx));
}
}

Expand All @@ -342,7 +344,8 @@ impl ConnectionManagerWorker {
&& !self.available_channels.is_empty()
{
if let Some(channel) = self.available_channels.pop_front() {
if let Some(tx) = self.waiting_connections.pop_front() {
if let Some((reason, tx)) = self.waiting_connections.pop_front() {
debug!(reason, "ConnectionManager: channel available, running");
self.provide_channel(channel, tx);
} else {
// This should never happen, but better than an unwrap.
Expand Down
Loading
Loading