Skip to content

Commit ed661c5

Browse files
authored
Improve syncing on incoming message (#4373)
## Motivation When inspecting the performance of our web demos (namely hosted-fungible) we noticed that an incoming transfer triggers 7 network requests while only 3 (last ones) are about accepting an incoming transfer (proposing a block that includes the incoming message). After closer research, it was discovered that we make: - `HandleChainInfoQuery` - first request to learn about the sender chain tip - `HandleChainInfoQuery` - second time we request via `fetch_sent_certificate_hashes` with a specific `BlockRange` (after checking the `info.requested_received_log`) - `DownloadCertificates` - to download the certififcates (by hash) - `DownloadCertificates` - unnecessary query with empty payload (this was a bug). - and finally three requests to propose the block. ## Proposal This PR proposes an improvement of the situation – decreasing # of requests from 7 to 5: - `HandleChainInfoQuery` – to learn about the missing certificates for block heights that we might be missing - `DownloadCertificatesByHeights` – new endpoint that we use to download certificates at specific block heights - and finally three requests to propose a block. I don't think we can get it any lower with the way `ChainClient` is currently structured. This also improves all calls to `query_certificates_from` which are now making 1 network query instead of two. ## Test Plan CI ## Release Plan - Nothing to do / These changes follow the usual release cycle. **OR** - These changes should be backported to the latest `devnet` branch, then - be released in a new SDK, - be released in a validator hotfix. ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 8cdabb4 commit ed661c5

File tree

17 files changed

+376
-165
lines changed

17 files changed

+376
-165
lines changed

linera-core/src/chain_worker/state.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,18 +1294,11 @@ where
12941294

12951295
info.requested_pending_message_bundles = messages;
12961296
}
1297-
if let Some(range) = query.request_sent_certificate_hashes_in_range {
1298-
let start: usize = range.start.try_into()?;
1299-
let end = match range.limit {
1300-
None => chain.confirmed_log.count(),
1301-
Some(limit) => start
1302-
.checked_add(usize::try_from(limit).map_err(|_| ArithmeticError::Overflow)?)
1303-
.ok_or(ArithmeticError::Overflow)?
1304-
.min(chain.confirmed_log.count()),
1305-
};
1306-
let keys = chain.confirmed_log.read(start..end).await?;
1307-
info.requested_sent_certificate_hashes = keys;
1297+
let mut hashes = Vec::new();
1298+
for height in query.request_sent_certificate_hashes_by_heights {
1299+
hashes.extend(chain.block_hashes(height..=height).await?);
13081300
}
1301+
info.requested_sent_certificate_hashes = hashes;
13091302
if let Some(start) = query.request_received_log_excluding_first_n {
13101303
let start = usize::try_from(start).map_err(|_| ArithmeticError::Overflow)?;
13111304
info.requested_received_log = chain.received_log.read(start..).await?;

linera-core/src/client/mod.rs

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
6969
use tracing::{debug, error, info, instrument, warn, Instrument as _};
7070

7171
use crate::{
72-
data_types::{
73-
BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout,
74-
},
72+
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
7573
environment::Environment,
7674
local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
7775
node::{
@@ -786,7 +784,7 @@ impl<Env: Environment> Client<Env> {
786784
// put all their sent messages into the inbox.
787785
let mut other_sender_chains = Vec::new();
788786

789-
let certificate_hashes = future::try_join_all(remote_heights.into_iter().filter_map(
787+
let certificates = future::try_join_all(remote_heights.into_iter().filter_map(
790788
|(sender_chain_id, remote_heights)| {
791789
let local_next = *local_next_heights.get(&sender_chain_id)?;
792790
if let Ok(height) = local_next.try_sub_one() {
@@ -803,20 +801,11 @@ impl<Env: Environment> Client<Env> {
803801
other_sender_chains.push(sender_chain_id);
804802
return None;
805803
};
806-
let height0 = *remote_heights.first()?;
807-
let height1 = *remote_heights.last()?;
808-
809-
// Find the hashes of the blocks we need.
810-
let range = BlockHeightRange::multi(height0, height1.0 + 1 - height0.0);
811804
Some(async move {
812-
let hashes = remote_node
813-
.fetch_sent_certificate_hashes(sender_chain_id, range)
805+
let certificates = remote_node
806+
.download_certificates_by_heights(sender_chain_id, remote_heights)
814807
.await?;
815-
Ok::<_, ChainClientError>(
816-
remote_heights
817-
.into_iter()
818-
.filter_map(move |h| hashes.get((h.0 - height0.0) as usize).copied()),
819-
)
808+
Ok::<Vec<_>, ChainClientError>(certificates)
820809
})
821810
},
822811
))
@@ -825,41 +814,10 @@ impl<Env: Environment> Client<Env> {
825814
.flatten()
826815
.collect::<Vec<_>>();
827816

828-
let local_certificates = self
829-
.storage_client()
830-
.read_certificates(certificate_hashes.clone())
831-
.await?
832-
.into_iter()
833-
.flatten()
834-
.collect::<Vec<_>>();
835-
let local_certificate_hashes = local_certificates
836-
.iter()
837-
.map(|cert| cert.hash())
838-
.collect::<HashSet<_>>();
839-
840-
// Download the block certificates.
841-
let remote_certificates = remote_node
842-
.download_certificates(
843-
certificate_hashes
844-
.into_iter()
845-
.filter(|hash| !local_certificate_hashes.contains(hash))
846-
.collect(),
847-
)
848-
.await?;
849817
let mut certificates_by_height_by_chain = BTreeMap::new();
850818

851-
for confirmed_block_certificate in local_certificates {
852-
let block_header = &confirmed_block_certificate.inner().block().header;
853-
let sender_chain_id = block_header.chain_id;
854-
let height = block_header.height;
855-
certificates_by_height_by_chain
856-
.entry(sender_chain_id)
857-
.or_insert_with(BTreeMap::new)
858-
.insert(height, confirmed_block_certificate);
859-
}
860-
861819
// Check the signatures and keep only the ones that are valid.
862-
for confirmed_block_certificate in remote_certificates {
820+
for confirmed_block_certificate in certificates {
863821
let block_header = &confirmed_block_certificate.inner().block().header;
864822
let sender_chain_id = block_header.chain_id;
865823
let height = block_header.height;

linera-core/src/data_types.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub struct ChainInfoQuery {
7676
#[debug(skip_if = Not::not)]
7777
pub request_pending_message_bundles: bool,
7878
/// Query a range of certificate hashes sent from the chain.
79+
// dev: this field is left and unused to maintain backwards compatibility
80+
// after hotfixing Testnet Conway.
7981
#[debug(skip_if = Option::is_none)]
8082
pub request_sent_certificate_hashes_in_range: Option<BlockHeightRange>,
8183
/// Query new certificate sender chain IDs and block heights received from the chain.
@@ -132,8 +134,8 @@ impl ChainInfoQuery {
132134
self
133135
}
134136

135-
pub fn with_sent_certificate_hashes_in_range(mut self, range: BlockHeightRange) -> Self {
136-
self.request_sent_certificate_hashes_in_range = Some(range);
137+
pub fn with_sent_certificate_hashes_by_heights(mut self, heights: Vec<BlockHeight>) -> Self {
138+
self.request_sent_certificate_hashes_by_heights = heights;
137139
self
138140
}
139141

@@ -314,6 +316,13 @@ impl ChainInfoResponse {
314316

315317
impl BcsSignable<'_> for ChainInfo {}
316318

319+
/// Request for downloading certificates by heights.
320+
#[derive(Debug, Clone)]
321+
pub struct CertificatesByHeightRequest {
322+
pub chain_id: ChainId,
323+
pub heights: Vec<BlockHeight>,
324+
}
325+
317326
/// The outcome of trying to commit a list of operations to the chain.
318327
#[derive(Debug)]
319328
pub enum ClientOutcome<T> {

linera-core/src/node.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ pub trait ValidatorNode {
145145
hashes: Vec<CryptoHash>,
146146
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
147147

148+
/// Requests a batch of certificates from a specific chain by heights.
149+
async fn download_certificates_by_heights(
150+
&self,
151+
chain_id: ChainId,
152+
heights: Vec<BlockHeight>,
153+
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
154+
148155
/// Returns the hash of the `Certificate` that last used a blob.
149156
async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
150157

linera-core/src/remote_node.rs

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::{collections::HashSet, time::Duration};
4+
use std::{
5+
collections::{HashSet, VecDeque},
6+
time::Duration,
7+
};
58

69
use custom_debug_derive::Debug;
710
use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt};
811
use linera_base::{
9-
crypto::{CryptoHash, ValidatorPublicKey},
12+
crypto::ValidatorPublicKey,
1013
data_types::{Blob, BlockHeight},
1114
ensure,
1215
identifiers::{BlobId, ChainId},
@@ -22,7 +25,7 @@ use rand::seq::SliceRandom as _;
2225
use tracing::{instrument, warn};
2326

2427
use crate::{
25-
data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse},
28+
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
2629
node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
2730
};
2831

@@ -169,25 +172,11 @@ impl<N: ValidatorNode> RemoteNode<N> {
169172
limit: u64,
170173
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
171174
tracing::debug!(name = ?self.public_key, ?chain_id, ?start, ?limit, "Querying certificates");
172-
let range = BlockHeightRange {
173-
start,
174-
limit: Some(limit),
175-
};
176-
let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range);
177-
let info = self.handle_chain_info_query(query).await?;
178-
self.node
179-
.download_certificates(info.requested_sent_certificate_hashes)
180-
.await?
181-
.into_iter()
182-
.map(|c| {
183-
ensure!(
184-
c.inner().chain_id() == chain_id,
185-
NodeError::UnexpectedCertificateValue
186-
);
187-
ConfirmedBlockCertificate::try_from(c)
188-
.map_err(|_| NodeError::InvalidChainInfoResponse)
189-
})
190-
.collect()
175+
let heights = (start.0..start.0 + limit)
176+
.map(BlockHeight)
177+
.collect::<Vec<_>>();
178+
self.download_certificates_by_heights(chain_id, heights)
179+
.await
191180
}
192181

193182
#[instrument(level = "trace")]
@@ -246,51 +235,46 @@ impl<N: ValidatorNode> RemoteNode<N> {
246235
}
247236
}
248237

249-
/// Returns the list of certificate hashes on the given chain in the given range of heights.
250-
/// Returns an error if the number of hashes does not match the size of the range.
238+
/// Downloads a list of certificates from the given chain.
251239
#[instrument(level = "trace")]
252-
pub(crate) async fn fetch_sent_certificate_hashes(
240+
pub async fn download_certificates_by_heights(
253241
&self,
254242
chain_id: ChainId,
255-
range: BlockHeightRange,
256-
) -> Result<Vec<CryptoHash>, NodeError> {
257-
let query =
258-
ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range.clone());
259-
let response = self.handle_chain_info_query(query).await?;
260-
let hashes = response.requested_sent_certificate_hashes;
243+
heights: Vec<BlockHeight>,
244+
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
245+
let mut expected_heights = VecDeque::from(heights.clone());
246+
let certificates = self
247+
.node
248+
.download_certificates_by_heights(chain_id, heights)
249+
.await?;
261250

262-
if range
263-
.limit
264-
.is_some_and(|limit| hashes.len() as u64 != limit)
265-
{
266-
warn!(
267-
?range,
268-
received_num = hashes.len(),
269-
"Validator sent invalid number of certificate hashes."
270-
);
271-
return Err(NodeError::InvalidChainInfoResponse);
251+
if certificates.len() > expected_heights.len() {
252+
return Err(NodeError::TooManyCertificatesReturned {
253+
chain_id,
254+
remote_node: Box::new(self.public_key),
255+
});
272256
}
273-
Ok(hashes)
274-
}
275257

276-
#[instrument(level = "trace")]
277-
pub async fn download_certificates(
278-
&self,
279-
hashes: Vec<CryptoHash>,
280-
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
281-
if hashes.is_empty() {
282-
return Ok(Vec::new());
258+
for certificate in &certificates {
259+
ensure!(
260+
certificate.inner().chain_id() == chain_id,
261+
NodeError::UnexpectedCertificateValue
262+
);
263+
if let Some(expected_height) = expected_heights.pop_front() {
264+
ensure!(
265+
expected_height == certificate.inner().height(),
266+
NodeError::UnexpectedCertificateValue
267+
);
268+
} else {
269+
return Err(NodeError::UnexpectedCertificateValue);
270+
}
283271
}
284-
let certificates = self.node.download_certificates(hashes.clone()).await?;
285-
let returned = certificates
286-
.iter()
287-
.map(ConfirmedBlockCertificate::hash)
288-
.collect();
272+
289273
ensure!(
290-
returned == hashes,
291-
NodeError::UnexpectedCertificates {
292-
returned,
293-
requested: hashes
274+
expected_heights.is_empty(),
275+
NodeError::MissingCertificatesByHeights {
276+
chain_id,
277+
heights: expected_heights.into_iter().collect(),
294278
}
295279
);
296280
Ok(certificates)

linera-core/src/unit_tests/test_utils.rs

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,17 @@ where
238238
.await
239239
}
240240

241+
async fn download_certificates_by_heights(
242+
&self,
243+
chain_id: ChainId,
244+
heights: Vec<BlockHeight>,
245+
) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
246+
self.spawn_and_receive(move |validator, sender| {
247+
validator.do_download_certificates_by_heights(chain_id, heights, sender)
248+
})
249+
.await
250+
}
251+
241252
async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
242253
self.spawn_and_receive(move |validator, sender| {
243254
validator.do_blob_last_used_by(blob_id, sender)
@@ -596,6 +607,51 @@ where
596607
sender.send(certificates)
597608
}
598609

610+
async fn do_download_certificates_by_heights(
611+
self,
612+
chain_id: ChainId,
613+
heights: Vec<BlockHeight>,
614+
sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
615+
) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
616+
// First, use do_handle_chain_info_query to get the certificate hashes
617+
let (query_sender, query_receiver) = oneshot::channel();
618+
let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
619+
620+
let self_clone = self.clone();
621+
self.do_handle_chain_info_query(query, query_sender)
622+
.await
623+
.expect("Failed to handle chain info query");
624+
625+
// Get the response from the chain info query
626+
let chain_info_response = query_receiver.await.map_err(|_| {
627+
Err(NodeError::ClientIoError {
628+
error: "Failed to receive chain info response".to_string(),
629+
})
630+
})?;
631+
632+
let hashes = match chain_info_response {
633+
Ok(response) => response.info.requested_sent_certificate_hashes,
634+
Err(e) => {
635+
return sender.send(Err(e));
636+
}
637+
};
638+
639+
// Now use do_download_certificates to get the actual certificates
640+
let (cert_sender, cert_receiver) = oneshot::channel();
641+
self_clone
642+
.do_download_certificates(hashes, cert_sender)
643+
.await?;
644+
645+
// Forward the result to the original sender
646+
let result = cert_receiver.await.map_err(|_| {
647+
Err(NodeError::ClientIoError {
648+
error: "Failed to receive certificates".to_string(),
649+
})
650+
})?;
651+
652+
sender.send(result)
653+
}
654+
599655
async fn do_blob_last_used_by(
600656
self,
601657
blob_id: BlobId,
@@ -1020,11 +1076,8 @@ where
10201076
block_height: BlockHeight,
10211077
target_count: usize,
10221078
) -> Option<ConfirmedBlockCertificate> {
1023-
let query =
1024-
ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(BlockHeightRange {
1025-
start: block_height,
1026-
limit: Some(1),
1027-
});
1079+
let query = ChainInfoQuery::new(chain_id)
1080+
.with_sent_certificate_hashes_by_heights(vec![block_height]);
10281081
let mut count = 0;
10291082
let mut certificate = None;
10301083
for validator in self.validator_clients.clone() {

0 commit comments

Comments
 (0)