Skip to content

Commit 54d9efa

Browse files
authored
Extra logs for P2P model sharing process (#165)
This PR updates some logs to use `info` level, mostly around the P2P model sharing process, to improve observability. It also includes two small changes that simplify the logic without altering the core functionality: * We no longer differentiate between errors during the blob request and those occurring mid-download. If a peer fails 5 times at any point in the process, it will be removed and not used again. * The retry window for selecting an available peer to request a blob ticket has been reduced from 8 minutes to 4. If no peers become available within that time, the process will be canceled.
2 parents 609fdd9 + a5462e9 commit 54d9efa

File tree

3 files changed

+73
-91
lines changed

3 files changed

+73
-91
lines changed

shared/client/src/client.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ const DOWNLOAD_RETRY_BACKOFF_BASE: Duration = Duration::from_secs(2);
4949
const DOWNLOAD_RETRY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
5050
const OPPROTUNISTIC_WITNESS_INTERVAL: Duration = Duration::from_millis(500);
5151
const CHECK_CONNECTION_INTERVAL: Duration = Duration::from_secs(10);
52-
const MAX_ERRORS_PER_PEER: u8 = 3;
53-
const MAX_RETRIES_PER_PEER: u8 = 5;
52+
const MAX_ERRORS_PER_PEER: u8 = 5;
5453

5554
impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'static>
5655
Client<T, A, B>
@@ -103,6 +102,9 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
103102
let max_concurrent_parameter_requests =
104103
init_config.max_concurrent_parameter_requests;
105104

105+
let mut current_downloaded_parameters = 0_u16;
106+
let mut total_parameters = None;
107+
106108
let mut run = RunManager::<T, A>::new(RunInitConfigAndIO {
107109
init_config,
108110

@@ -122,7 +124,6 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
122124
let mut sharable_model = SharableModel::empty();
123125
let peer_manager = Arc::new(PeerManagerHandle::new(
124126
MAX_ERRORS_PER_PEER,
125-
MAX_RETRIES_PER_PEER,
126127
param_requests_cancel_token.clone(),
127128
));
128129

@@ -266,22 +267,28 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
266267
let _ = trace_span!("NetworkEvent::DownloadComplete", hash = %hash).entered();
267268
metrics.record_download_completed(hash, from);
268269
if retried_downloads.remove(hash).await.is_some() {
269-
debug!("Successfully downloaded previously failed blob {}", hex::encode(hash));
270+
info!("Successfully downloaded previously failed blob {}", hex::encode(hash));
270271
}
271272
match download_data {
272273
TransmittableDownload::DistroResult(distro_result) => {
273274
debug!("Download complete: step {} batch id {}", distro_result.step, distro_result.batch_id);
274275
run.apply_distro_result(hash, distro_result, None);
275276
},
276277
TransmittableDownload::ModelParameter(parameter) => {
277-
debug!("Download complete: parameter {}", parameter.name()?);
278+
current_downloaded_parameters += 1;
279+
info!("Download complete: parameter {}", parameter.name()?);
280+
if let Some(total_parameters) = total_parameters {
281+
info!("Downloaded parameters total: {}/{}", current_downloaded_parameters, total_parameters);
282+
} else {
283+
error!("Total parameters not set");
284+
}
278285
sharable_model.add_parameter(parameter).await?;
279286
if sharable_model.is_download_complete() {
280287
sharable_model.send_init_parameters()?;
281288
}
282289
},
283290
TransmittableDownload::ModelConfig(config) => {
284-
debug!("Download complete: model config");
291+
info!("Download complete: model config");
285292
sharable_model.add_config(config)?;
286293
sharable_model.send_config()?;
287294
},
@@ -299,11 +306,12 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
299306
// We often get an error after some time in the iroh-blobs side so we use the base backoff to retry faster.
300307
let backoff_duration = DOWNLOAD_RETRY_BACKOFF_BASE;
301308
let retry_time = Some(std::time::Instant::now() + backoff_duration);
302-
peer_manager.report_blob_ticket_download_error(dl.blob_ticket.node_addr().node_id);
309+
peer_manager.report_blob_ticket_request_error(dl.blob_ticket.node_addr().node_id, Some(dl.blob_ticket.clone()));
303310

304311
info!(
305-
"Model Sharing download failed {} time/s (will retry in {:?}): {}",
312+
"Model Sharing download failed {} time/s with provider node {} (will retry in {:?}): {}",
306313
retries + 1,
314+
dl.blob_ticket.node_addr().node_id,
307315
backoff_duration,
308316
dl.error
309317
);
@@ -365,7 +373,7 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
365373
}
366374
},
367375
Ok(ticket) => {
368-
info!(parameter = parameter_name, "Sending requested model parameter blob ticket");
376+
info!(parameter = parameter_name, hash = %ticket.hash(), "Sending requested model parameter blob ticket");
369377
if let Err(e) = protocol_req_tx.send(Ok(ticket)) {
370378
warn!("Could not send model parameter {parameter_name} blob ticket. Error: {e:?}");
371379
};
@@ -381,7 +389,7 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
381389
}
382390
},
383391
Ok(config_ticket) => {
384-
info!("Sending requested model config blob ticket");
392+
info!(hash = %config_ticket.hash(), "Sending requested model config blob ticket");
385393
if let Err(e) = protocol_req_tx.send(Ok(config_ticket)) {
386394
warn!("Could not send model config blob ticket. Error: {e:?}");
387395
}
@@ -470,20 +478,21 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
470478
for (hash, ticket, tag, download_type) in pending_retries {
471479
let retries = retried_downloads.update_time(hash).await;
472480

473-
debug!("Retrying download for blob {} (attempt {})", hash, retries);
474-
475481
metrics.record_download_retry(hash);
476482
// We check the type of the failed download and send it to the appropriate channel to retry it
477483
match download_type {
478484
DownloadType::DistroResult(_) => {
485+
info!("Retrying download for distro result, (attempt {})", retries);
479486
let _ = tx_request_download.send((ticket, tag));
480487
},
481488
DownloadType::ModelSharing(inner) => {
482489
match inner {
483490
ModelRequestType::Parameter(parameter) => {
491+
info!("Retrying download for model parameter: {parameter}, (attempt {})", retries);
484492
let _ = tx_params_download.send(vec![(ticket, ModelRequestType::Parameter(parameter.clone()))]);
485493
},
486494
ModelRequestType::Config => {
495+
info!("Retrying download for model config, (attempt {})", retries);
487496
let _ = tx_config_download.send(ticket);
488497
}
489498
}
@@ -521,6 +530,7 @@ impl<T: NodeIdentity, A: AuthenticatableIdentity + 'static, B: Backend<T> + 'sta
521530
sharable_model.update_config(config_string, tokenizer)?;
522531
}
523532
Some((param_names, tx_params_response)) = rx_parameters_req.recv() => {
533+
total_parameters = Some(param_names.len());
524534
sharable_model.initialize_parameters(&param_names, tx_params_response);
525535

526536
let tx_params_download = tx_params_download.clone();

shared/network/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ where
399399
self.download_manager
400400
.add(ticket, tag, rx, download_type.clone());
401401

402-
debug!(name: "blob_download_start", hash = ticket_hash.fmt_short(), "started downloading blob {}", ticket_hash);
402+
info!(name: "blob_download_start", hash = ticket_hash.fmt_short(), "started downloading blob {}", ticket_hash);
403403

404404
let blobs_client = self.blobs.client().clone();
405405
tokio::spawn(async move {
@@ -804,7 +804,7 @@ pub async fn blob_ticket_param_request_task(
804804
peer_manager: Arc<PeerManagerHandle>,
805805
cancellation_token: CancellationToken,
806806
) {
807-
let max_attempts = 1000u16;
807+
let max_attempts = 500u16;
808808
let mut attempts = 0u16;
809809

810810
while attempts < max_attempts {
@@ -815,7 +815,7 @@ pub async fn blob_ticket_param_request_task(
815815
continue;
816816
};
817817

818-
debug!(type = ?&model_request_type, peer = %peer_id, "Requesting model");
818+
info!(type = ?&model_request_type, peer = %peer_id, "Requesting model");
819819
let result = timeout(
820820
Duration::from_secs(MODEL_REQUEST_TIMEOUT_SECS),
821821
request_model_blob_ticket(router.clone(), peer_id, &model_request_type),
@@ -835,7 +835,7 @@ pub async fn blob_ticket_param_request_task(
835835
}
836836
Ok(Err(e)) | Err(e) => {
837837
// Failed - report error and potentially try next peer
838-
peer_manager.report_blob_ticket_request_error(peer_id);
838+
peer_manager.report_blob_ticket_request_error(peer_id, None);
839839

840840
warn!("Request failed for peer {peer_id}: {e}. Trying next peer");
841841
attempts += 1;

0 commit comments

Comments
 (0)