@@ -28,8 +28,8 @@ use linera_base::{
2828 abi:: Abi ,
2929 crypto:: { signer, AccountPublicKey , CryptoHash , Signer , ValidatorPublicKey } ,
3030 data_types:: {
31- Amount , ApplicationPermissions , ArithmeticError , Blob , BlobContent , BlockHeight , Epoch ,
32- Round , Timestamp ,
31+ Amount , ApplicationPermissions , ArithmeticError , Blob , BlobContent , BlockHeight ,
32+ ChainDescription , Epoch , Round , Timestamp ,
3333 } ,
3434 ensure,
3535 identifiers:: {
@@ -260,33 +260,28 @@ impl<Env: Environment> Client<Env> {
260260 async fn fetch_chain_info (
261261 & self ,
262262 chain_id : ChainId ,
263- validators : & [ RemoteNode < impl ValidatorNode > ] ,
264- ) -> Result < Box < ChainInfo > , LocalNodeError > {
263+ validators : & [ RemoteNode < Env :: ValidatorNode > ] ,
264+ ) -> Result < Box < ChainInfo > , ChainClientError > {
265265 match self . local_node . chain_info ( chain_id) . await {
266266 Ok ( info) => Ok ( info) ,
267267 Err ( LocalNodeError :: BlobsNotFound ( blob_ids) ) => {
268+ // Make sure the admin chain is up to date.
269+ self . synchronize_chain_state ( self . admin_id ) . await ?;
268270 // If the chain is missing then the error is a WorkerError
269271 // and so a BlobsNotFound
270- // TODO(#2351): make sure the blobs are legitimate!
271- let blobs = RemoteNode :: download_blobs (
272- & blob_ids,
273- validators,
274- self . options . blob_download_timeout ,
275- )
276- . await
277- . ok_or ( LocalNodeError :: BlobsNotFound ( blob_ids) ) ?;
278- self . local_node . storage_client ( ) . write_blobs ( & blobs) . await ?;
279- self . local_node . chain_info ( chain_id) . await
272+ self . update_local_node_with_blobs_from ( blob_ids, validators)
273+ . await ?;
274+ Ok ( self . local_node . chain_info ( chain_id) . await ?)
280275 }
281- err => err,
276+ Err ( err) => Err ( err. into ( ) ) ,
282277 }
283278 }
284279
285280 /// Downloads and processes all certificates up to (excluding) the specified height.
286281 #[ instrument( level = "trace" , skip( self , validators) ) ]
287282 async fn download_certificates (
288283 & self ,
289- validators : & [ RemoteNode < impl ValidatorNode > ] ,
284+ validators : & [ RemoteNode < Env :: ValidatorNode > ] ,
290285 chain_id : ChainId ,
291286 target_next_block_height : BlockHeight ,
292287 ) -> Result < Box < ChainInfo > , ChainClientError > {
@@ -325,7 +320,7 @@ impl<Env: Environment> Client<Env> {
325320 #[ instrument( level = "trace" , skip_all) ]
326321 async fn download_certificates_from (
327322 & self ,
328- remote_node : & RemoteNode < impl ValidatorNode > ,
323+ remote_node : & RemoteNode < Env :: ValidatorNode > ,
329324 chain_id : ChainId ,
330325 stop : BlockHeight ,
331326 ) -> Result < Option < Box < ChainInfo > > , ChainClientError > {
@@ -381,11 +376,19 @@ impl<Env: Environment> Client<Env> {
381376 let mut result = self . handle_certificate ( certificate. clone ( ) ) . await ;
382377
383378 if let Err ( LocalNodeError :: BlobsNotFound ( blob_ids) ) = & result {
384- if let Some ( blobs) = remote_node. try_download_blobs ( blob_ids) . await {
385- // TODO(#2351): Don't store downloaded blobs without certificate.
386- let _ = self . local_node . store_blobs ( & blobs) . await ;
387- result = self . handle_certificate ( certificate. clone ( ) ) . await ;
388- }
379+ future:: try_join_all ( blob_ids. iter ( ) . map ( |blob_id| async move {
380+ let blob_certificate =
381+ remote_node. download_certificate_for_blob ( * blob_id) . await ?;
382+ self . receive_sender_certificate (
383+ blob_certificate,
384+ ReceiveCertificateMode :: NeedsCheck ,
385+ None ,
386+ )
387+ . await ?;
388+ Result :: < ( ) , ChainClientError > :: Ok ( ( ) )
389+ } ) )
390+ . await ?;
391+ result = self . handle_certificate ( certificate. clone ( ) ) . await ;
389392 }
390393
391394 info = Some ( result?. info ) ;
@@ -450,10 +453,10 @@ impl<Env: Environment> Client<Env> {
450453
451454 /// Ensures that the client has the `ChainDescription` blob corresponding to this
452455 /// client's `ChainId`.
453- pub async fn ensure_has_chain_description (
456+ pub async fn get_chain_description (
454457 & self ,
455458 chain_id : ChainId ,
456- ) -> Result < Blob , ChainClientError > {
459+ ) -> Result < ChainDescription , ChainClientError > {
457460 let chain_desc_id = BlobId :: new ( chain_id. 0 , BlobType :: ChainDescription ) ;
458461 let blob = self
459462 . local_node
@@ -462,17 +465,17 @@ impl<Env: Environment> Client<Env> {
462465 . await ?;
463466 if let Some ( blob) = blob {
464467 // We have the blob - return it.
465- return Ok ( blob) ;
468+ return Ok ( bcs :: from_bytes ( blob. bytes ( ) ) ? ) ;
466469 } ;
467470 // Recover history from the current validators, according to the admin chain.
468- // TODO(#2351): make sure that the blob is legitimately created!
471+ self . synchronize_chain_state ( self . admin_id ) . await ? ;
469472 let nodes = self . validator_nodes ( ) . await ?;
470- let blob =
471- RemoteNode :: download_blob ( & nodes , chain_desc_id, self . options . blob_download_timeout )
472- . await
473- . ok_or ( LocalNodeError :: BlobsNotFound ( vec ! [ chain_desc_id ] ) ) ? ;
474- self . local_node . storage_client ( ) . write_blob ( & blob ) . await ? ;
475- Ok ( blob)
473+ let blob = self
474+ . update_local_node_with_blobs_from ( vec ! [ chain_desc_id] , & nodes )
475+ . await ?
476+ . pop ( )
477+ . unwrap ( ) ; // Returns exactly as many blobs as passed-in IDs.
478+ Ok ( bcs :: from_bytes ( blob. bytes ( ) ) ? )
476479 }
477480
478481 /// Updates the latest block and next block height and round information from the chain info.
@@ -792,13 +795,6 @@ impl<Env: Environment> Client<Env> {
792795 let remote_max_heights = Self :: max_height_per_chain ( & remote_log) ;
793796
794797 // Obtain the next block height we need in the local node, for each chain.
795- // But first, ensure we have the chain descriptions!
796- future:: try_join_all (
797- remote_max_heights
798- . keys ( )
799- . map ( |chain| self . ensure_has_chain_description ( * chain) ) ,
800- )
801- . await ?;
802798 let local_next_heights = self
803799 . local_node
804800 . next_block_heights ( remote_max_heights. keys ( ) , chain_worker_limit)
@@ -1065,8 +1061,11 @@ impl<Env: Environment> Client<Env> {
10651061 }
10661062 }
10671063 if let LocalNodeError :: BlobsNotFound ( blob_ids) = & err {
1068- self . update_local_node_with_blobs_from ( blob_ids. clone ( ) , remote_node)
1069- . await ?;
1064+ self . update_local_node_with_blobs_from (
1065+ blob_ids. clone ( ) ,
1066+ & [ remote_node. clone ( ) ] ,
1067+ )
1068+ . await ?;
10701069 // We found the missing blobs: retry.
10711070 if let Err ( new_err) = self
10721071 . local_node
@@ -1120,17 +1119,40 @@ impl<Env: Environment> Client<Env> {
11201119 async fn update_local_node_with_blobs_from (
11211120 & self ,
11221121 blob_ids : Vec < BlobId > ,
1123- remote_node : & RemoteNode < Env :: ValidatorNode > ,
1124- ) -> Result < ( ) , ChainClientError > {
1122+ remote_nodes : & [ RemoteNode < Env :: ValidatorNode > ] ,
1123+ ) -> Result < Vec < Blob > , ChainClientError > {
1124+ let timeout = self . options . blob_download_timeout ;
11251125 future:: try_join_all ( blob_ids. into_iter ( ) . map ( |blob_id| async move {
1126- let certificate = remote_node. download_certificate_for_blob ( blob_id) . await ?;
1127- // This will download all ancestors of the certificate and process all of them locally.
1128- self . receive_sender_certificate ( certificate, ReceiveCertificateMode :: NeedsCheck , None )
1129- . await
1126+ let mut stream = remote_nodes
1127+ . iter ( )
1128+ . zip ( 0 ..)
1129+ . map ( |( remote_node, i) | async move {
1130+ linera_base:: time:: timer:: sleep ( timeout * i * i) . await ;
1131+ let certificate = remote_node. download_certificate_for_blob ( blob_id) . await ?;
1132+ // This will download all ancestors of the certificate and process all of them locally.
1133+ self . receive_sender_certificate (
1134+ certificate,
1135+ ReceiveCertificateMode :: NeedsCheck ,
1136+ Some ( vec ! [ remote_node. clone( ) ] ) ,
1137+ )
1138+ . await ?;
1139+ let blob = self
1140+ . local_node
1141+ . storage_client ( )
1142+ . read_blob ( blob_id)
1143+ . await ?
1144+ . ok_or_else ( || LocalNodeError :: BlobsNotFound ( vec ! [ blob_id] ) ) ?;
1145+ Result :: < _ , ChainClientError > :: Ok ( blob)
1146+ } )
1147+ . collect :: < FuturesUnordered < _ > > ( ) ;
1148+ while let Some ( maybe_blob) = stream. next ( ) . await {
1149+ if let Ok ( blob) = maybe_blob {
1150+ return Ok ( blob) ;
1151+ }
1152+ }
1153+ Err ( LocalNodeError :: BlobsNotFound ( vec ! [ blob_id] ) . into ( ) )
11301154 } ) )
1131- . await ?;
1132-
1133- Ok ( ( ) )
1155+ . await
11341156 }
11351157
11361158 /// Downloads and processes confirmed block certificates that use the given blobs.
@@ -1674,6 +1696,11 @@ impl<Env: Environment> ChainClient<Env> {
16741696 Ok ( response. info )
16751697 }
16761698
1699+ /// Returns the chain's description. Fetches it from the validators if necessary.
1700+ pub async fn get_chain_description ( & self ) -> Result < ChainDescription , ChainClientError > {
1701+ self . client . get_chain_description ( self . chain_id ) . await
1702+ }
1703+
16771704 /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
16781705 /// local chain.
16791706 #[ instrument( level = "trace" ) ]
@@ -3003,7 +3030,8 @@ impl<Env: Environment> ChainClient<Env> {
30033030 ownership : ChainOwnership ,
30043031 application_permissions : ApplicationPermissions ,
30053032 balance : Amount ,
3006- ) -> Result < ClientOutcome < ( ChainId , ConfirmedBlockCertificate ) > , ChainClientError > {
3033+ ) -> Result < ClientOutcome < ( ChainDescription , ConfirmedBlockCertificate ) > , ChainClientError >
3034+ {
30073035 loop {
30083036 let config = OpenChainConfig {
30093037 ownership : ownership. clone ( ) ,
@@ -3018,21 +3046,22 @@ impl<Env: Environment> ChainClient<Env> {
30183046 return Ok ( ClientOutcome :: WaitForTimeout ( timeout) ) ;
30193047 }
30203048 } ;
3021- // The first message of the only operation created the new chain.
3022- let chain_blob_id = certificate
3049+ // The only operation, i.e. the last transaction, created the new chain.
3050+ let chain_blob = certificate
30233051 . block ( )
3024- . created_blob_ids ( )
3025- . into_iter ( )
3026- . next ( )
3052+ . body
3053+ . blobs
3054+ . last ( )
3055+ . and_then ( |blobs| blobs. last ( ) )
30273056 . ok_or_else ( || ChainClientError :: InternalError ( "Failed to create a new chain" ) ) ?;
3028- let chain_id = ChainId ( chain_blob_id . hash ) ;
3057+ let description = bcs :: from_bytes :: < ChainDescription > ( chain_blob . bytes ( ) ) ? ;
30293058 // Add the new chain to the list of tracked chains
3030- self . client . track_chain ( chain_id ) ;
3059+ self . client . track_chain ( description . id ( ) ) ;
30313060 self . client
30323061 . local_node
30333062 . retry_pending_cross_chain_requests ( self . chain_id )
30343063 . await ?;
3035- return Ok ( ClientOutcome :: Committed ( ( chain_id , certificate) ) ) ;
3064+ return Ok ( ClientOutcome :: Committed ( ( description , certificate) ) ) ;
30363065 }
30373066 }
30383067
@@ -3433,9 +3462,9 @@ impl<Env: Environment> ChainClient<Env> {
34333462 & self ,
34343463 chain_id : ChainId ,
34353464 local_node : & mut LocalNodeClient < Env :: Storage > ,
3436- ) -> Option < BlockHeight > {
3437- let info = self . local_chain_info ( chain_id, local_node) . await ? ;
3438- Some ( info. next_block_height )
3465+ ) -> BlockHeight {
3466+ let maybe_info = self . local_chain_info ( chain_id, local_node) . await ;
3467+ maybe_info . map_or ( BlockHeight :: ZERO , |info| info. next_block_height )
34393468 }
34403469
34413470 #[ instrument( level = "trace" , skip( remote_node, local_node, notification) ) ]
@@ -3447,14 +3476,7 @@ impl<Env: Environment> ChainClient<Env> {
34473476 ) {
34483477 match notification. reason {
34493478 Reason :: NewIncomingBundle { origin, height } => {
3450- if let Err ( error) = self . client . ensure_has_chain_description ( origin) . await {
3451- error ! (
3452- chain_id = %self . chain_id,
3453- "NewIncomingBundle: could not find blob for sender's chain: {error}"
3454- ) ;
3455- return ;
3456- }
3457- if self . local_next_block_height ( origin, & mut local_node) . await > Some ( height) {
3479+ if self . local_next_block_height ( origin, & mut local_node) . await > height {
34583480 debug ! (
34593481 chain_id = %self . chain_id,
34603482 "Accepting redundant notification for new message"
@@ -3471,7 +3493,7 @@ impl<Env: Environment> ChainClient<Env> {
34713493 ) ;
34723494 return ;
34733495 }
3474- if self . local_next_block_height ( origin, & mut local_node) . await <= Some ( height) {
3496+ if self . local_next_block_height ( origin, & mut local_node) . await <= height {
34753497 error ! (
34763498 chain_id = %self . chain_id,
34773499 "NewIncomingBundle: Fail to synchronize new message after notification"
@@ -3483,7 +3505,7 @@ impl<Env: Environment> ChainClient<Env> {
34833505 if self
34843506 . local_next_block_height ( chain_id, & mut local_node)
34853507 . await
3486- > Some ( height)
3508+ > height
34873509 {
34883510 debug ! (
34893511 chain_id = %self . chain_id,
@@ -3505,7 +3527,7 @@ impl<Env: Environment> ChainClient<Env> {
35053527 let local_height = self
35063528 . local_next_block_height ( chain_id, & mut local_node)
35073529 . await ;
3508- if local_height <= Some ( height) {
3530+ if local_height <= height {
35093531 error ! ( "NewBlock: Fail to synchronize new block after notification" ) ;
35103532 }
35113533 }
0 commit comments