Skip to content

Commit a350cc7

Browse files
committed
Add debug info to connection manager queues
1 parent 27fa965 commit a350cc7

File tree

3 files changed

+50
-47
lines changed

3 files changed

+50
-47
lines changed

nativelink-scheduler/src/grpc_scheduler.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl GrpcScheduler {
216216
// Not in the cache, lookup the capabilities with the upstream.
217217
let channel = self
218218
.connection_manager
219-
.connection()
219+
.connection("get_known_properties".into())
220220
.await
221221
.err_tip(|| "in get_platform_property_manager()")?;
222222
let capabilities_result = CapabilitiesClient::new(channel)
@@ -274,7 +274,7 @@ impl GrpcScheduler {
274274
.perform_request(request, |request| async move {
275275
let channel = self
276276
.connection_manager
277-
.connection()
277+
.connection(format!("add_action: {:?}", request.action_digest))
278278
.await
279279
.err_tip(|| "in add_action()")?;
280280
ExecutionClient::new(channel)
@@ -309,7 +309,7 @@ impl GrpcScheduler {
309309
.perform_request(request, |request| async move {
310310
let channel = self
311311
.connection_manager
312-
.connection()
312+
.connection(format!("filter_operations: {}", request.name))
313313
.await
314314
.err_tip(|| "in find_by_client_operation_id()")?;
315315
ExecutionClient::new(channel)

nativelink-store/src/grpc_store.rs

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl GrpcStore {
149149
self.perform_request(request, |request| async move {
150150
let channel = self
151151
.connection_manager
152-
.connection()
152+
.connection("find_missing_blobs".into())
153153
.await
154154
.err_tip(|| "in find_missing_blobs")?;
155155
ContentAddressableStorageClient::new(channel)
@@ -174,7 +174,7 @@ impl GrpcStore {
174174
self.perform_request(request, |request| async move {
175175
let channel = self
176176
.connection_manager
177-
.connection()
177+
.connection("batch_update_blobs".into())
178178
.await
179179
.err_tip(|| "in batch_update_blobs")?;
180180
ContentAddressableStorageClient::new(channel)
@@ -199,7 +199,7 @@ impl GrpcStore {
199199
self.perform_request(request, |request| async move {
200200
let channel = self
201201
.connection_manager
202-
.connection()
202+
.connection("batch_read_blobs".into())
203203
.await
204204
.err_tip(|| "in batch_read_blobs")?;
205205
ContentAddressableStorageClient::new(channel)
@@ -224,7 +224,7 @@ impl GrpcStore {
224224
self.perform_request(request, |request| async move {
225225
let channel = self
226226
.connection_manager
227-
.connection()
227+
.connection(format!("get_tree: {:?}", request.root_digest))
228228
.await
229229
.err_tip(|| "in get_tree")?;
230230
ContentAddressableStorageClient::new(channel)
@@ -251,7 +251,7 @@ impl GrpcStore {
251251
) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
252252
let channel = self
253253
.connection_manager
254-
.connection()
254+
.connection(format!("read_internal: {}", request.resource_name))
255255
.await
256256
.err_tip(|| "in read_internal")?;
257257
let mut response = ByteStreamClient::new(channel)
@@ -329,34 +329,36 @@ impl GrpcStore {
329329
"GrpcStore::write: requesting connection from pool",
330330
);
331331
let conn_start = std::time::Instant::now();
332-
let rpc_fut = self.connection_manager.connection().and_then(|channel| {
333-
let conn_elapsed = conn_start.elapsed();
334-
let instance_for_rpc = instance_name.clone();
335-
let conn_elapsed_ms =
336-
u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
337-
trace!(
338-
instance_name = %instance_for_rpc,
339-
conn_elapsed_ms,
340-
"GrpcStore::write: got connection, starting ByteStream.Write RPC",
341-
);
342-
let rpc_start = std::time::Instant::now();
343-
let local_state_for_rpc = local_state.clone();
344-
async move {
345-
let res = ByteStreamClient::new(channel)
346-
.write(WriteStateWrapper::new(local_state_for_rpc))
347-
.await
348-
.err_tip(|| "in GrpcStore::write");
349-
let rpc_elapsed_ms =
350-
u64::try_from(rpc_start.elapsed().as_millis()).unwrap_or(u64::MAX);
332+
let rpc_fut = self.connection_manager.connection("write".into()).and_then(
333+
|channel| {
334+
let conn_elapsed = conn_start.elapsed();
335+
let instance_for_rpc = instance_name.clone();
336+
let conn_elapsed_ms =
337+
u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
351338
trace!(
352339
instance_name = %instance_for_rpc,
353-
rpc_elapsed_ms,
354-
success = res.is_ok(),
355-
"GrpcStore::write: ByteStream.Write RPC returned",
340+
conn_elapsed_ms,
341+
"GrpcStore::write: got connection, starting ByteStream.Write RPC",
356342
);
357-
res
358-
}
359-
});
343+
let rpc_start = std::time::Instant::now();
344+
let local_state_for_rpc = local_state.clone();
345+
async move {
346+
let res = ByteStreamClient::new(channel)
347+
.write(WriteStateWrapper::new(local_state_for_rpc))
348+
.await
349+
.err_tip(|| "in GrpcStore::write");
350+
let rpc_elapsed_ms = u64::try_from(rpc_start.elapsed().as_millis())
351+
.unwrap_or(u64::MAX);
352+
trace!(
353+
instance_name = %instance_for_rpc,
354+
rpc_elapsed_ms,
355+
success = res.is_ok(),
356+
"GrpcStore::write: ByteStream.Write RPC returned",
357+
);
358+
res
359+
}
360+
},
361+
);
360362

361363
let result = if rpc_timeout > Duration::ZERO {
362364
match tokio::time::timeout(rpc_timeout, rpc_fut).await {
@@ -448,7 +450,7 @@ impl GrpcStore {
448450
self.perform_request(request, |request| async move {
449451
let channel = self
450452
.connection_manager
451-
.connection()
453+
.connection(format!("query_write_status: {}", request.resource_name))
452454
.await
453455
.err_tip(|| "in query_write_status")?;
454456
ByteStreamClient::new(channel)
@@ -468,7 +470,7 @@ impl GrpcStore {
468470
self.perform_request(request, |request| async move {
469471
let channel = self
470472
.connection_manager
471-
.connection()
473+
.connection(format!("get_action_result: {:?}", request.action_digest))
472474
.await
473475
.err_tip(|| "in get_action_result")?;
474476
ActionCacheClient::new(channel)
@@ -488,7 +490,7 @@ impl GrpcStore {
488490
self.perform_request(request, |request| async move {
489491
let channel = self
490492
.connection_manager
491-
.connection()
493+
.connection(format!("update_action_result: {:?}", request.action_digest))
492494
.await
493495
.err_tip(|| "in update_action_result")?;
494496
ActionCacheClient::new(channel)

nativelink-util/src/connection_manager.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::retry::{self, Retrier, RetryResult};
3434
#[derive(Debug)]
3535
pub struct ConnectionManager {
3636
// The channel to request connections from the worker.
37-
worker_tx: mpsc::Sender<oneshot::Sender<Connection>>,
37+
worker_tx: mpsc::Sender<(String, oneshot::Sender<Connection>)>,
3838
}
3939

4040
/// The index into `ConnectionManagerWorker::endpoints`.
@@ -101,8 +101,8 @@ struct ConnectionManagerWorker {
101101
connecting_channels: FuturesUnordered<Pin<Box<dyn Future<Output = IndexedChannel> + Send>>>,
102102
/// Connected channels that are available for use.
103103
available_channels: VecDeque<EstablishedChannel>,
104-
/// Requests for a Channel when available.
105-
waiting_connections: VecDeque<oneshot::Sender<Connection>>,
104+
/// Requests for a Channel when available - (reason, request)
105+
waiting_connections: VecDeque<(String, oneshot::Sender<Connection>)>,
106106
/// The retry configuration for connecting to an Endpoint, on failure will
107107
/// restart the retrier after a 1 second delay.
108108
retrier: Retrier,
@@ -165,10 +165,10 @@ impl ConnectionManager {
165165
/// Get a Connection that can be used as a `tonic::Channel`, except it
166166
/// performs some additional counting to reconnect on error and restrict
167167
/// the number of concurrent connections.
168-
pub async fn connection(&self) -> Result<Connection, Error> {
168+
pub async fn connection(&self, reason: String) -> Result<Connection, Error> {
169169
let (tx, rx) = oneshot::channel();
170170
self.worker_tx
171-
.send(tx)
171+
.send((reason, tx))
172172
.await
173173
.map_err(|err| make_err!(Code::Unavailable, "Requesting a new connection: {err:?}"))?;
174174
rx.await
@@ -180,7 +180,7 @@ impl ConnectionManagerWorker {
180180
async fn service_requests(
181181
mut self,
182182
connections_per_endpoint: usize,
183-
mut worker_rx: mpsc::Receiver<oneshot::Sender<Connection>>,
183+
mut worker_rx: mpsc::Receiver<(String, oneshot::Sender<Connection>)>,
184184
mut connection_rx: mpsc::UnboundedReceiver<ConnectionRequest>,
185185
) {
186186
// Make the initial set of connections, connection failures will be
@@ -199,12 +199,12 @@ impl ConnectionManagerWorker {
199199
loop {
200200
tokio::select! {
201201
request = worker_rx.recv() => {
202-
let Some(request) = request else {
202+
let Some((reason, request)) = request else {
203203
// The ConnectionManager was dropped, shut down the
204204
// worker.
205205
break;
206206
};
207-
self.handle_worker(request);
207+
self.handle_worker(reason, request);
208208
}
209209
maybe_request = connection_rx.recv() => {
210210
if let Some(request) = maybe_request {
@@ -308,7 +308,7 @@ impl ConnectionManagerWorker {
308308
}
309309

310310
// This must never be made async otherwise the select may cancel it.
311-
fn handle_worker(&mut self, tx: oneshot::Sender<Connection>) {
311+
fn handle_worker(&mut self, reason: String, tx: oneshot::Sender<Connection>) {
312312
if let Some(channel) = (self.available_connections > 0)
313313
.then_some(())
314314
.and_then(|()| self.available_channels.pop_front())
@@ -319,9 +319,10 @@ impl ConnectionManagerWorker {
319319
available_connections = self.available_connections,
320320
available_channels = self.available_channels.len(),
321321
waiting_connections = self.waiting_connections.len(),
322+
reason,
322323
"ConnectionManager: no connection available, request queued",
323324
);
324-
self.waiting_connections.push_back(tx);
325+
self.waiting_connections.push_back((reason, tx));
325326
}
326327
}
327328

@@ -342,7 +343,7 @@ impl ConnectionManagerWorker {
342343
&& !self.available_channels.is_empty()
343344
{
344345
if let Some(channel) = self.available_channels.pop_front() {
345-
if let Some(tx) = self.waiting_connections.pop_front() {
346+
if let Some((_reason, tx)) = self.waiting_connections.pop_front() {
346347
self.provide_channel(channel, tx);
347348
} else {
348349
// This should never happen, but better than an unwrap.

0 commit comments

Comments
 (0)