Skip to content

Commit b32352e

Browse files
authored
Leave gaps between sender blocks. (#4130)
## Motivation It's unnecessary to fully download all chains that sent a message to our own chain(s). ## Proposal Only download the relevant blocks. ## Test Plan A new client test verifies that we are leaving gaps. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - Closes #3969. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent fe0f68d commit b32352e

File tree

3 files changed

+136
-27
lines changed

3 files changed

+136
-27
lines changed

linera-core/src/client/mod.rs

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -768,8 +768,8 @@ impl<Env: Environment> Client<Env> {
768768
Ok(())
769769
}
770770

771-
/// Downloads and processes all confirmed block certificates that sent any message to this
772-
/// chain, including their ancestors.
771+
/// Downloads and preprocesses all confirmed block certificates that sent any message to this
772+
/// chain.
773773
#[instrument(level = "trace", skip(self))]
774774
async fn synchronize_received_certificates_from_validator(
775775
&self,
@@ -792,12 +792,12 @@ impl<Env: Environment> Client<Env> {
792792
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
793793
let info = remote_node.handle_chain_info_query(query).await?;
794794
let remote_log = info.requested_received_log;
795-
let remote_max_heights = Self::max_height_per_chain(&remote_log);
795+
let remote_heights = Self::heights_per_chain(&remote_log);
796796

797797
// Obtain the next block height we need in the local node, for each chain.
798798
let local_next_heights = self
799799
.local_node
800-
.next_block_heights(remote_max_heights.keys(), chain_worker_limit)
800+
.next_outbox_heights(remote_heights.keys(), chain_worker_limit, chain_id)
801801
.await?;
802802

803803
// We keep track of the height we've successfully downloaded and checked, per chain.
@@ -806,15 +806,17 @@ impl<Env: Environment> Client<Env> {
806806
// put all their sent messages into the inbox.
807807
let mut other_sender_chains = Vec::new();
808808

809-
let certificate_hashes = future::try_join_all(remote_max_heights.into_iter().filter_map(
810-
|(sender_chain_id, remote_height)| {
811-
let local_next = *local_next_heights.get(&sender_chain_id)?;
809+
let certificate_hashes = future::try_join_all(remote_heights.into_iter().filter_map(
810+
|(sender_chain_id, remote_heights)| {
811+
let height0 = *remote_heights.first()?;
812+
let local_next = (*local_next_heights.get(&sender_chain_id)?).max(height0);
812813
if let Ok(height) = local_next.try_sub_one() {
813814
downloaded_heights.insert(sender_chain_id, height);
814815
}
815816

816-
let Some(diff) = remote_height.0.checked_sub(local_next.0) else {
817-
// Our highest, locally-known block is higher than any block height
817+
let height1 = *remote_heights.last()?;
818+
let Some(diff) = height1.0.checked_sub(local_next.0) else {
819+
// Our highest, locally executed block is higher than any block height
818820
// from the current batch. Skip this batch, but remember to wait for
819821
// the messages to be delivered to the inboxes.
820822
other_sender_chains.push(sender_chain_id);
@@ -823,20 +825,61 @@ impl<Env: Environment> Client<Env> {
823825

824826
// Find the hashes of the blocks we need.
825827
let range = BlockHeightRange::multi(local_next, diff.saturating_add(1));
826-
Some(remote_node.fetch_sent_certificate_hashes(sender_chain_id, range))
828+
Some(async move {
829+
let hashes = remote_node
830+
.fetch_sent_certificate_hashes(sender_chain_id, range)
831+
.await?;
832+
Ok::<_, ChainClientError>(
833+
remote_heights
834+
.into_iter()
835+
.map(move |h| hashes[(h.0 - local_next.0) as usize]),
836+
)
837+
})
827838
},
828839
))
829840
.await?
830841
.into_iter()
831842
.flatten()
832-
.collect();
843+
.collect::<Vec<_>>();
844+
845+
let local_certificates =
846+
future::try_join_all(certificate_hashes.iter().map(|hash| async move {
847+
match self.storage_client().read_certificate(*hash).await {
848+
Ok(certificate) => Ok(Some(certificate)),
849+
Err(ViewError::NotFound(_)) => Ok(None),
850+
Err(error) => Err(error),
851+
}
852+
}))
853+
.await?
854+
.into_iter()
855+
.flatten()
856+
.collect::<Vec<_>>();
857+
let local_certificate_hashes = local_certificates
858+
.iter()
859+
.map(|cert| cert.hash())
860+
.collect::<HashSet<_>>();
833861

834862
// Download the block certificates.
835863
let remote_certificates = remote_node
836-
.download_certificates(certificate_hashes)
864+
.download_certificates(
865+
certificate_hashes
866+
.into_iter()
867+
.filter(|hash| !local_certificate_hashes.contains(hash))
868+
.collect(),
869+
)
837870
.await?;
838871
let mut certificates_by_height_by_chain = BTreeMap::new();
839872

873+
for confirmed_block_certificate in local_certificates {
874+
let block_header = &confirmed_block_certificate.inner().block().header;
875+
let sender_chain_id = block_header.chain_id;
876+
let height = block_header.height;
877+
certificates_by_height_by_chain
878+
.entry(sender_chain_id)
879+
.or_insert_with(BTreeMap::new)
880+
.insert(height, confirmed_block_certificate);
881+
}
882+
840883
// Check the signatures and keep only the ones that are valid.
841884
for confirmed_block_certificate in remote_certificates {
842885
let block_header = &confirmed_block_certificate.inner().block().header;
@@ -863,7 +906,7 @@ impl<Env: Environment> Client<Env> {
863906
certificates_by_height_by_chain
864907
.entry(sender_chain_id)
865908
.or_insert_with(BTreeMap::new)
866-
.insert(height, confirmed_block_certificate.clone());
909+
.insert(height, confirmed_block_certificate);
867910
}
868911
}
869912
}
@@ -881,10 +924,9 @@ impl<Env: Environment> Client<Env> {
881924
}
882925

883926
for (sender_chain_id, certs) in &mut certificates_by_height_by_chain {
884-
if !certs
927+
if certs
885928
.values()
886-
.last()
887-
.is_some_and(|cert| cert.block().recipients().contains(&chain_id))
929+
.any(|cert| !cert.block().recipients().contains(&chain_id))
888930
{
889931
warn!(
890932
"Skipping received certificates from chain {sender_chain_id:.8}:
@@ -931,15 +973,17 @@ impl<Env: Environment> Client<Env> {
931973
}
932974

933975
/// Given a set of chain ID-block height pairs, returns a map that assigns to each chain ID
934-
/// the highest height seen.
935-
fn max_height_per_chain(remote_log: &[ChainAndHeight]) -> BTreeMap<ChainId, BlockHeight> {
976+
/// the set of heights. The returned map contains no empty values.
977+
fn heights_per_chain(
978+
remote_log: &[ChainAndHeight],
979+
) -> BTreeMap<ChainId, BTreeSet<BlockHeight>> {
936980
remote_log.iter().fold(
937-
BTreeMap::<ChainId, BlockHeight>::new(),
981+
BTreeMap::<ChainId, BTreeSet<_>>::new(),
938982
|mut chain_to_info, entry| {
939983
chain_to_info
940984
.entry(entry.chain_id)
941-
.and_modify(|h| *h = entry.height.max(*h))
942-
.or_insert(entry.height);
985+
.or_default()
986+
.insert(entry.height);
943987
chain_to_info
944988
},
945989
)

linera-core/src/local_node.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,23 +311,23 @@ where
311311
}
312312

313313
/// Given a list of chain IDs, returns a map that assigns to each of them the next block
314-
/// height, i.e. the lowest block height that we have not processed in the local node yet.
314+
/// height to schedule, i.e. the lowest block height for which we haven't added the messages
315+
/// to `receiver_id` to the outbox yet.
315316
///
316317
/// It makes at most `chain_worker_limit` requests to the local node in parallel.
317-
pub async fn next_block_heights(
318+
pub async fn next_outbox_heights(
318319
&self,
319320
chain_ids: impl IntoIterator<Item = &ChainId>,
320321
chain_worker_limit: usize,
322+
receiver_id: ChainId,
321323
) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
322324
let futures = chain_ids
323325
.into_iter()
324326
.map(|chain_id| async move {
325327
let chain = self.chain_state_view(*chain_id).await?;
326328
let mut next_height = chain.tip_state.get().next_block_height;
327-
// TODO(#3969): This is not great for performance, but the whole function will
328-
// probably go away with #3969 anyway.
329-
while chain.preprocessed_blocks.contains_key(&next_height).await? {
330-
next_height.try_add_assign_one()?;
329+
if let Some(outbox) = chain.outboxes.try_load_entry(&receiver_id).await? {
330+
next_height = next_height.max(*outbox.next_height_to_schedule.get());
331331
}
332332
Ok::<_, LocalNodeError>((*chain_id, next_height))
333333
})

linera-core/src/unit_tests/client_tests.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use linera_execution::{
2828
ExecutionError, Message, MessageKind, Operation, QueryOutcome, ResourceControlPolicy,
2929
SystemMessage, SystemQuery, SystemResponse,
3030
};
31+
use linera_storage::Storage;
3132
use rand::Rng;
3233
use test_case::test_case;
3334
use test_helpers::{
@@ -1289,6 +1290,70 @@ where
12891290
Ok(())
12901291
}
12911292

1293+
#[test_case(MemoryStorageBuilder::default(); "memory")]
1294+
#[cfg_attr(feature = "storage-service", test_case(ServiceStorageBuilder::new().await; "storage_service"))]
1295+
#[test_log::test(tokio::test)]
1296+
async fn test_sparse_sender_chain<B>(storage_builder: B) -> anyhow::Result<()>
1297+
where
1298+
B: StorageBuilder,
1299+
{
1300+
let signer = InMemorySigner::new(None);
1301+
let mut builder = TestBuilder::new(storage_builder, 4, 1, signer).await?;
1302+
let sender = builder.add_root_chain(1, Amount::from_tokens(4)).await?;
1303+
let receiver = builder.add_root_chain(2, Amount::ZERO).await?;
1304+
let receiver_id = receiver.chain_id();
1305+
1306+
let cert0 = sender
1307+
.transfer_to_account(
1308+
AccountOwner::CHAIN,
1309+
Amount::ONE,
1310+
Account::chain(receiver_id),
1311+
)
1312+
.await
1313+
.unwrap()
1314+
.unwrap();
1315+
let cert1 = sender
1316+
.burn(AccountOwner::CHAIN, Amount::ONE)
1317+
.await
1318+
.unwrap()
1319+
.unwrap();
1320+
let cert2 = sender
1321+
.transfer_to_account(
1322+
AccountOwner::CHAIN,
1323+
Amount::ONE,
1324+
Account::chain(receiver_id),
1325+
)
1326+
.await
1327+
.unwrap()
1328+
.unwrap();
1329+
1330+
receiver.synchronize_from_validators().await?;
1331+
receiver.process_inbox().await?;
1332+
1333+
// The first and last blocks sent something to the receiver. The middle one didn't.
1334+
// So the sender chain should have a gap.
1335+
assert!(
1336+
receiver
1337+
.storage_client()
1338+
.contains_certificate(cert0.hash())
1339+
.await?
1340+
);
1341+
assert!(
1342+
!receiver
1343+
.storage_client()
1344+
.contains_certificate(cert1.hash())
1345+
.await?
1346+
);
1347+
assert!(
1348+
receiver
1349+
.storage_client()
1350+
.contains_certificate(cert2.hash())
1351+
.await?
1352+
);
1353+
1354+
Ok(())
1355+
}
1356+
12921357
#[test_case(MemoryStorageBuilder::default(); "memory")]
12931358
#[cfg_attr(feature = "storage-service", test_case(ServiceStorageBuilder::new().await; "storage_service"))]
12941359
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]

0 commit comments

Comments
 (0)