Skip to content

Commit d9481a4

Browse files
Tweyma2bd
authored andcommitted
linera_core::client: batch downloading of missing blobs (#4755)
Currently, when we synchronize a chain, even though we receive the certificates in a batch to `process_certificates`, we handle them one by one at the local node level, and if a certificate is missing blobs we stop, download the blobs for that certificate, then retry, making the download of the blobs sequential. This makes startup time for the client linear in the number of certificates-with-blobs present in its initial chains (notably, the admin chain). We already have an ahead-of-time indicator of which blobs will be required by the certificates, so there's no need to download them one at a time. If we don't have some blobs marked as required by the certificate batch, try to download them (concurrently) before proceeding to process the batch. ~Thereafter, `BlobsNotFound` when processing the batch is a hard error.~ `required_blob_ids()` is conservative, so we still need to download blobs and retry if we get a `BlobsNotFound` error thereafter. CI. - These changes should be backported to the latest `testnet` branch, then - be released in a new SDK.
1 parent dbc497c commit d9481a4

File tree

1 file changed

+50
-21
lines changed

1 file changed

+50
-21
lines changed

linera-core/src/client/mod.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ impl<Env: Environment> Client<Env> {
338338
}
339339
};
340340
for certificate in certificates {
341-
last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info);
341+
last_info = Some(self.handle_certificate(certificate).await?.info);
342342
}
343343
// Now download the rest in batches from the remote node.
344344
while next_height < stop {
@@ -360,6 +360,24 @@ impl<Env: Environment> Client<Env> {
360360
Ok(last_info)
361361
}
362362

363+
async fn download_blobs(
364+
&self,
365+
remote_node: &RemoteNode<impl ValidatorNode>,
366+
blob_ids: impl IntoIterator<Item = BlobId>,
367+
) -> Result<(), ChainClientError> {
368+
self.local_node
369+
.store_blobs(
370+
&futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
371+
remote_node.try_download_blob(blob_id).await.unwrap()
372+
}))
373+
.buffer_unordered(self.options.max_joined_tasks)
374+
.collect::<Vec<_>>()
375+
.await,
376+
)
377+
.await
378+
.map_err(Into::into)
379+
}
380+
363381
/// Tries to process all the certificates, requesting any missing blobs from the given node.
364382
/// Returns the chain info of the last successfully processed certificate.
365383
#[instrument(level = "trace", skip_all)]
@@ -369,34 +387,47 @@ impl<Env: Environment> Client<Env> {
369387
certificates: Vec<ConfirmedBlockCertificate>,
370388
) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
371389
let mut info = None;
372-
for certificate in certificates {
373-
let certificate = Box::new(certificate);
374-
let mut result = self.handle_certificate(certificate.clone()).await;
390+
let required_blob_ids: Vec<_> = certificates
391+
.iter()
392+
.flat_map(|certificate| certificate.value().required_blob_ids())
393+
.collect();
375394

376-
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
377-
let blobs =
378-
futures::stream::iter(blob_ids.iter().copied().map(|blob_id| async move {
379-
remote_node.try_download_blob(blob_id).await.unwrap()
380-
}))
381-
.buffer_unordered(self.options.max_joined_tasks)
382-
.collect::<Vec<_>>()
383-
.await;
384-
self.local_node.store_blobs(&blobs).await?;
385-
result = self.handle_certificate(certificate.clone()).await;
395+
match self
396+
.local_node
397+
.read_blob_states_from_storage(&required_blob_ids)
398+
.await
399+
{
400+
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
401+
self.download_blobs(remote_node, blob_ids).await?;
386402
}
403+
x => {
404+
x?;
405+
}
406+
}
387407

388-
info = Some(result?.info);
408+
for certificate in certificates {
409+
info = Some(
410+
match self.handle_certificate(certificate.clone()).await {
411+
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
412+
self.download_blobs(remote_node, blob_ids).await?;
413+
self.handle_certificate(certificate).await?
414+
}
415+
x => x?,
416+
}
417+
.info,
418+
);
389419
}
420+
390421
// Done with all certificates.
391422
Ok(info)
392423
}
393424

394425
async fn handle_certificate<T: ProcessableCertificate>(
395426
&self,
396-
certificate: Box<GenericCertificate<T>>,
427+
certificate: GenericCertificate<T>,
397428
) -> Result<ChainInfoResponse, LocalNodeError> {
398429
self.local_node
399-
.handle_certificate(*certificate, &self.notifier)
430+
.handle_certificate(certificate, &self.notifier)
400431
.await
401432
}
402433

@@ -488,7 +519,7 @@ impl<Env: Environment> Client<Env> {
488519
&self,
489520
certificate: Box<GenericCertificate<T>>,
490521
) -> Result<(), LocalNodeError> {
491-
let info = self.handle_certificate(certificate).await?.info;
522+
let info = self.handle_certificate(*certificate).await?.info;
492523
self.update_from_info(&info);
493524
Ok(())
494525
}
@@ -671,8 +702,6 @@ impl<Env: Environment> Client<Env> {
671702
mode: ReceiveCertificateMode,
672703
nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
673704
) -> Result<(), ChainClientError> {
674-
let certificate = Box::new(certificate);
675-
676705
// Verify the certificate before doing any expensive networking.
677706
let (max_epoch, committees) = self.admin_committees().await?;
678707
if let ReceiveCertificateMode::NeedsCheck = mode {
@@ -1053,7 +1082,7 @@ impl<Env: Environment> Client<Env> {
10531082
};
10541083

10551084
if let Some(timeout) = remote_info.manager.timeout {
1056-
self.handle_certificate(Box::new(*timeout)).await?;
1085+
self.handle_certificate(*timeout).await?;
10571086
}
10581087
let mut proposals = Vec::new();
10591088
if let Some(proposal) = remote_info.manager.requested_signed_proposal {

0 commit comments

Comments
 (0)