Skip to content

Commit b324eea

Browse files
Tweyma2bd
authored andcommitted
linera_core::client: batch downloading of missing blobs (#4755)
Currently, when we synchronize a chain, even though we receive the certificates in a batch to `process_certificates`, we handle them one by one at the local node level, and if a certificate is missing blobs we stop, download the blobs for that certificate, then retry, making the download of the blobs sequential. This makes startup time for the client linear in the number of certificates-with-blobs present in its initial chains (notably, the admin chain). We already have an ahead-of-time indicator of which blobs will be required by the certificates, so there's no need to download them one at a time. If we don't have some blobs marked as required by the certificate batch, try to download them (concurrently) before proceeding to process the batch. ~Thereafter, `BlobsNotFound` when processing the batch is a hard error.~ `required_blob_ids()` is conservative, so we still need to download blobs and retry if we get a `BlobsNotFound` error thereafter. CI. - These changes should be backported to the latest `testnet` branch, then - be released in a new SDK.
1 parent 0d1bdd1 commit b324eea

File tree

1 file changed

+50
-21
lines changed

1 file changed

+50
-21
lines changed

linera-core/src/client/mod.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ impl<Env: Environment> Client<Env> {
339339
}
340340
};
341341
for certificate in certificates {
342-
last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info);
342+
last_info = Some(self.handle_certificate(certificate).await?.info);
343343
}
344344
// Now download the rest in batches from the remote node.
345345
while next_height < stop {
@@ -361,6 +361,24 @@ impl<Env: Environment> Client<Env> {
361361
Ok(last_info)
362362
}
363363

364+
async fn download_blobs(
365+
&self,
366+
remote_node: &RemoteNode<impl ValidatorNode>,
367+
blob_ids: impl IntoIterator<Item = BlobId>,
368+
) -> Result<(), ChainClientError> {
369+
self.local_node
370+
.store_blobs(
371+
&futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
372+
remote_node.try_download_blob(blob_id).await.unwrap()
373+
}))
374+
.buffer_unordered(self.options.max_joined_tasks)
375+
.collect::<Vec<_>>()
376+
.await,
377+
)
378+
.await
379+
.map_err(Into::into)
380+
}
381+
364382
/// Tries to process all the certificates, requesting any missing blobs from the given node.
365383
/// Returns the chain info of the last successfully processed certificate.
366384
#[instrument(level = "trace", skip_all)]
@@ -370,34 +388,47 @@ impl<Env: Environment> Client<Env> {
370388
certificates: Vec<ConfirmedBlockCertificate>,
371389
) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
372390
let mut info = None;
373-
for certificate in certificates {
374-
let certificate = Box::new(certificate);
375-
let mut result = self.handle_certificate(certificate.clone()).await;
391+
let required_blob_ids: Vec<_> = certificates
392+
.iter()
393+
.flat_map(|certificate| certificate.value().required_blob_ids())
394+
.collect();
376395

377-
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
378-
let blobs =
379-
futures::stream::iter(blob_ids.iter().copied().map(|blob_id| async move {
380-
remote_node.try_download_blob(blob_id).await.unwrap()
381-
}))
382-
.buffer_unordered(self.options.max_joined_tasks)
383-
.collect::<Vec<_>>()
384-
.await;
385-
self.local_node.store_blobs(&blobs).await?;
386-
result = self.handle_certificate(certificate.clone()).await;
396+
match self
397+
.local_node
398+
.read_blob_states_from_storage(&required_blob_ids)
399+
.await
400+
{
401+
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
402+
self.download_blobs(remote_node, blob_ids).await?;
387403
}
404+
x => {
405+
x?;
406+
}
407+
}
388408

389-
info = Some(result?.info);
409+
for certificate in certificates {
410+
info = Some(
411+
match self.handle_certificate(certificate.clone()).await {
412+
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
413+
self.download_blobs(remote_node, blob_ids).await?;
414+
self.handle_certificate(certificate).await?
415+
}
416+
x => x?,
417+
}
418+
.info,
419+
);
390420
}
421+
391422
// Done with all certificates.
392423
Ok(info)
393424
}
394425

395426
async fn handle_certificate<T: ProcessableCertificate>(
396427
&self,
397-
certificate: Box<GenericCertificate<T>>,
428+
certificate: GenericCertificate<T>,
398429
) -> Result<ChainInfoResponse, LocalNodeError> {
399430
self.local_node
400-
.handle_certificate(*certificate, &self.notifier)
431+
.handle_certificate(certificate, &self.notifier)
401432
.await
402433
}
403434

@@ -489,7 +520,7 @@ impl<Env: Environment> Client<Env> {
489520
&self,
490521
certificate: Box<GenericCertificate<T>>,
491522
) -> Result<(), LocalNodeError> {
492-
let info = self.handle_certificate(certificate).await?.info;
523+
let info = self.handle_certificate(*certificate).await?.info;
493524
self.update_from_info(&info);
494525
Ok(())
495526
}
@@ -707,8 +738,6 @@ impl<Env: Environment> Client<Env> {
707738
mode: ReceiveCertificateMode,
708739
nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
709740
) -> Result<(), ChainClientError> {
710-
let certificate = Box::new(certificate);
711-
712741
// Verify the certificate before doing any expensive networking.
713742
let (max_epoch, committees) = self.admin_committees().await?;
714743
if let ReceiveCertificateMode::NeedsCheck = mode {
@@ -1145,7 +1174,7 @@ impl<Env: Environment> Client<Env> {
11451174
};
11461175

11471176
if let Some(timeout) = remote_info.manager.timeout {
1148-
self.handle_certificate(Box::new(*timeout)).await?;
1177+
self.handle_certificate(*timeout).await?;
11491178
}
11501179
let mut proposals = Vec::new();
11511180
if let Some(proposal) = remote_info.manager.requested_signed_proposal {

0 commit comments

Comments
 (0)