diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 936ac080326f..55e0dcd7ca1e 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -339,7 +339,7 @@ impl Client { } }; for certificate in certificates { - last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info); + last_info = Some(self.handle_certificate(certificate).await?.info); } // Now download the rest in batches from the remote node. while next_height < stop { @@ -361,6 +361,24 @@ impl Client { Ok(last_info) } + async fn download_blobs( + &self, + remote_node: &RemoteNode, + blob_ids: impl IntoIterator, + ) -> Result<(), ChainClientError> { + self.local_node + .store_blobs( + &futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move { + remote_node.try_download_blob(blob_id).await.unwrap() + })) + .buffer_unordered(self.options.max_joined_tasks) + .collect::>() + .await, + ) + .await + .map_err(Into::into) + } + /// Tries to process all the certificates, requesting any missing blobs from the given node. /// Returns the chain info of the last successfully processed certificate. #[instrument(level = "trace", skip_all)] @@ -370,34 +388,47 @@ impl Client { certificates: Vec, ) -> Result>, ChainClientError> { let mut info = None; - for certificate in certificates { - let certificate = Box::new(certificate); - let mut result = self.handle_certificate(certificate.clone()).await; + let required_blob_ids: Vec<_> = certificates + .iter() + .flat_map(|certificate| certificate.value().required_blob_ids()) + .collect(); - if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result { - let blobs = - futures::stream::iter(blob_ids.iter().copied().map(|blob_id| async move { - remote_node.try_download_blob(blob_id).await.unwrap() - })) - .buffer_unordered(self.options.max_joined_tasks) - .collect::>() - .await; - self.local_node.store_blobs(&blobs).await?; - result = self.handle_certificate(certificate.clone()).await; + match self + .local_node + .read_blob_states_from_storage(&required_blob_ids) + .await + { + Err(LocalNodeError::BlobsNotFound(blob_ids)) => { + self.download_blobs(remote_node, blob_ids).await?; } + x => { + x?; + } + } - info = Some(result?.info); + for certificate in certificates { + info = Some( + match self.handle_certificate(certificate.clone()).await { + Err(LocalNodeError::BlobsNotFound(blob_ids)) => { + self.download_blobs(remote_node, blob_ids).await?; + self.handle_certificate(certificate).await? + } + x => x?, + } + .info, + ); } + // Done with all certificates. Ok(info) } async fn handle_certificate( &self, - certificate: Box>, + certificate: GenericCertificate, ) -> Result { self.local_node - .handle_certificate(*certificate, &self.notifier) + .handle_certificate(certificate, &self.notifier) .await } @@ -489,7 +520,7 @@ impl Client { &self, certificate: Box>, ) -> Result<(), LocalNodeError> { - let info = self.handle_certificate(certificate).await?.info; + let info = self.handle_certificate(*certificate).await?.info; self.update_from_info(&info); Ok(()) } @@ -707,8 +738,6 @@ impl Client { mode: ReceiveCertificateMode, nodes: Option>>, ) -> Result<(), ChainClientError> { - let certificate = Box::new(certificate); - // Verify the certificate before doing any expensive networking. let (max_epoch, committees) = self.admin_committees().await?; if let ReceiveCertificateMode::NeedsCheck = mode { @@ -1145,7 +1174,7 @@ impl Client { }; if let Some(timeout) = remote_info.manager.timeout { - self.handle_certificate(Box::new(*timeout)).await?; + self.handle_certificate(*timeout).await?; } let mut proposals = Vec::new(); if let Some(proposal) = remote_info.manager.requested_signed_proposal {