Skip to content

Commit 4f9e497

Browse files
authored
Relax the inbox check. (#4791)
## Motivation Currently a block proposal is always rejected (by a validator, or even by the local node) if any of the incoming messages included on this chain up to and including the proposal itself have not been received from the sender chain yet. For example, if: - block A0 on chain A has included a message from block B0 on chain B, - and now a proposal for block A1 is made that includes a message from block C0 on chain C, - and a validator has block C0 but not B0, it would currently reject that proposal. That is unnecessarily restrictive, because block A0 has already been signed by a quorum of validators, and it causes more round-trips and download work for clients. ## Proposal Relax the inbox check, so that only incoming messages in the proposal itself have to be received yet. The validator in the above example would accept the proposal. In other words, don't reject a new block proposal just because you have an old, already confirmed block in that chain with an incoming message whose sender block you have not seen yet. ## Test Plan CI; some tests have been updated. ## Release Plan - These changes should be backported to `testnet_conway`, then - be released in a new SDK, - be released in a validator hotfix, ideally at the same time. ## Links - Closes #413. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 1641f06 commit 4f9e497

File tree

6 files changed

+55
-157
lines changed

6 files changed

+55
-157
lines changed

linera-chain/src/chain.rs

Lines changed: 11 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::{
77
sync::Arc,
88
};
99

10-
use futures::stream::{self, StreamExt, TryStreamExt};
1110
use linera_base::{
1211
crypto::{CryptoHash, ValidatorPublicKey},
1312
data_types::{
@@ -31,7 +30,6 @@ use linera_views::{
3130
reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
3231
register_view::RegisterView,
3332
set_view::SetView,
34-
store::ReadableKeyValueStore as _,
3533
views::{ClonableView, CryptoHashView, RootView, View},
3634
};
3735
use serde::{Deserialize, Serialize};
@@ -517,58 +515,6 @@ where
517515
Ok(())
518516
}
519517

520-
/// Verifies that this chain is up-to-date and all the messages executed ahead of time
521-
/// have been properly received by now.
522-
#[instrument(target = "telemetry_only", skip_all, fields(
523-
chain_id = %self.chain_id()
524-
))]
525-
pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
526-
let chain_id = self.chain_id();
527-
let pairs = self.inboxes.try_load_all_entries().await?;
528-
let max_stream_queries = self.context().store().max_stream_queries();
529-
let stream = stream::iter(pairs)
530-
.map(|(origin, inbox)| async move {
531-
if let Some(bundle) = inbox.removed_bundles.front().await? {
532-
return Err(ChainError::MissingCrossChainUpdate {
533-
chain_id,
534-
origin,
535-
height: bundle.height,
536-
});
537-
}
538-
Ok::<(), ChainError>(())
539-
})
540-
.buffer_unordered(max_stream_queries);
541-
stream.try_collect::<Vec<_>>().await?;
542-
Ok(())
543-
}
544-
545-
/// Collects all missing sender blocks from removed bundles across all inboxes.
546-
/// Returns a map of origin chain IDs to their respective missing block heights.
547-
#[instrument(target = "telemetry_only", skip_all, fields(
548-
chain_id = %self.chain_id()
549-
))]
550-
pub async fn collect_missing_sender_blocks(
551-
&self,
552-
) -> Result<BTreeMap<ChainId, Vec<BlockHeight>>, ChainError> {
553-
let pairs = self.inboxes.try_load_all_entries().await?;
554-
let max_stream_queries = self.context().store().max_stream_queries();
555-
let stream = stream::iter(pairs)
556-
.map(|(origin, inbox)| async move {
557-
let mut missing_heights = Vec::new();
558-
let bundles = inbox.removed_bundles.elements().await?;
559-
for bundle in bundles {
560-
missing_heights.push(bundle.height);
561-
}
562-
Ok::<(ChainId, Vec<BlockHeight>), ChainError>((origin, missing_heights))
563-
})
564-
.buffer_unordered(max_stream_queries);
565-
let results: Vec<(ChainId, Vec<BlockHeight>)> = stream.try_collect().await?;
566-
Ok(results
567-
.into_iter()
568-
.filter(|(_, heights)| !heights.is_empty())
569-
.collect())
570-
}
571-
572518
pub async fn next_block_height_to_receive(
573519
&self,
574520
origin: &ChainId,
@@ -715,6 +661,7 @@ where
715661
pub async fn remove_bundles_from_inboxes(
716662
&mut self,
717663
timestamp: Timestamp,
664+
must_be_present: bool,
718665
incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
719666
) -> Result<(), ChainError> {
720667
let chain_id = self.chain_id();
@@ -749,6 +696,16 @@ where
749696
.remove_bundle(bundle)
750697
.await
751698
.map_err(|error| (chain_id, origin, error))?;
699+
if must_be_present {
700+
ensure!(
701+
was_present,
702+
ChainError::MissingCrossChainUpdate {
703+
chain_id,
704+
origin,
705+
height: bundle.height,
706+
}
707+
);
708+
}
752709
if was_present && !bundle.is_skippable() {
753710
removed_unskippable.insert(BundleInInbox::new(origin, bundle));
754711
}

linera-core/src/chain_worker/state.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,11 @@ where
839839
let local_time = self.storage.clock().current_time();
840840
let chain = &mut self.chain;
841841
chain
842-
.remove_bundles_from_inboxes(block.header.timestamp, block.body.incoming_bundles())
842+
.remove_bundles_from_inboxes(
843+
block.header.timestamp,
844+
false,
845+
block.body.incoming_bundles(),
846+
)
843847
.await?;
844848
let oracle_responses = Some(block.body.oracle_responses.clone());
845849
let (proposed_block, outcome) = block.clone().into_proposal();
@@ -1310,7 +1314,7 @@ where
13101314
let local_time = self.storage.clock().current_time();
13111315

13121316
self.chain
1313-
.remove_bundles_from_inboxes(block.timestamp, block.incoming_bundles())
1317+
.remove_bundles_from_inboxes(block.timestamp, true, block.incoming_bundles())
13141318
.await?;
13151319
let outcome = if let Some(outcome) = outcome {
13161320
outcome.clone()
@@ -1329,8 +1333,6 @@ where
13291333
.tip_state
13301334
.get_mut()
13311335
.update_counters(&block.transactions, &outcome.messages)?;
1332-
// Verify that the resulting chain would have no unconfirmed incoming messages.
1333-
chain.validate_incoming_bundles().await?;
13341336
// Don't save the changes since the block is not confirmed yet.
13351337
chain.rollback();
13361338

linera-core/src/client/mod.rs

Lines changed: 1 addition & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,68 +1028,6 @@ impl<Env: Environment> Client<Env> {
10281028
Ok(remote_log)
10291029
}
10301030

1031-
/// Downloads only the specific sender blocks needed for missing cross-chain messages.
1032-
/// This is a targeted alternative to `find_received_certificates` that only downloads
1033-
/// the exact sender blocks we're missing, rather than searching through all received
1034-
/// certificates.
1035-
async fn download_missing_sender_blocks(
1036-
&self,
1037-
receiver_chain_id: ChainId,
1038-
missing_blocks: BTreeMap<ChainId, Vec<BlockHeight>>,
1039-
) -> Result<(), ChainClientError> {
1040-
if missing_blocks.is_empty() {
1041-
return Ok(());
1042-
}
1043-
1044-
let (_, committee) = self.admin_committee().await?;
1045-
let nodes = self.make_nodes(&committee)?;
1046-
1047-
// Download certificates for each sender chain at the specific heights.
1048-
stream::iter(missing_blocks.into_iter())
1049-
.map(|(sender_chain_id, heights)| {
1050-
let height = heights.into_iter().max();
1051-
let mut shuffled_nodes = nodes.clone();
1052-
shuffled_nodes.shuffle(&mut rand::thread_rng());
1053-
async move {
1054-
let Some(height) = height else {
1055-
return Ok(());
1056-
};
1057-
// Try to download from any node.
1058-
for node in &shuffled_nodes {
1059-
if let Err(err) = self
1060-
.download_sender_block_with_sending_ancestors(
1061-
receiver_chain_id,
1062-
sender_chain_id,
1063-
height,
1064-
node,
1065-
)
1066-
.await
1067-
{
1068-
tracing::debug!(
1069-
%height,
1070-
%receiver_chain_id,
1071-
%sender_chain_id,
1072-
%err,
1073-
validator = %node.public_key,
1074-
"Failed to fetch sender block",
1075-
);
1076-
} else {
1077-
return Ok::<_, ChainClientError>(());
1078-
}
1079-
}
1080-
// If all nodes fail, return an error.
1081-
Err(ChainClientError::CannotDownloadMissingSenderBlock {
1082-
chain_id: sender_chain_id,
1083-
height,
1084-
})
1085-
}
1086-
})
1087-
.buffer_unordered(self.options.max_joined_tasks)
1088-
.try_collect::<Vec<_>>()
1089-
.await?;
1090-
Ok(())
1091-
}
1092-
10931031
/// Downloads a specific sender block and recursively downloads any earlier blocks
10941032
/// that also sent a message to our chain, based on `previous_message_blocks`.
10951033
///
@@ -2177,7 +2115,7 @@ impl<Env: Environment> ChainClient<Env> {
21772115
}
21782116

21792117
/// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
2180-
/// its current height and are not missing any received messages from the inbox.
2118+
/// its current height.
21812119
#[instrument(level = "trace")]
21822120
pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
21832121
#[cfg(with_metrics)]
@@ -2203,16 +2141,6 @@ impl<Env: Environment> ChainClient<Env> {
22032141
.await?;
22042142
}
22052143

2206-
// Check if we're missing any sender blocks for cross-chain messages.
2207-
let missing_blocks = self
2208-
.chain_state_view()
2209-
.await?
2210-
.collect_missing_sender_blocks()
2211-
.await?;
2212-
// Download any sender blocks we're missing.
2213-
self.client
2214-
.download_missing_sender_blocks(self.chain_id, missing_blocks)
2215-
.await?;
22162144
self.client.update_from_info(&info);
22172145
Ok(info)
22182146
}

linera-core/src/unit_tests/client_tests.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,19 +2845,15 @@ where
28452845
)
28462846
.await?;
28472847

2848-
// Test that prepare_chain downloads only the sender blocks for acknowledged messages.
2849-
// The new client has the inbox state showing that a message from sender was processed,
2850-
// but needs to download the sender block. The message from sender2 hasn't been
2851-
// processed yet, so its block should NOT be downloaded.
2848+
// Test that prepare_chain does not download any sender chain blocks.
28522849
let info = receiver2.prepare_chain().await?;
28532850
assert_eq!(info.next_block_height, BlockHeight::from(1));
28542851

2855-
// Verify that sender's block WAS downloaded.
28562852
let local_node = &receiver2.client.local_node;
28572853
let sender_info = local_node.chain_info(sender.chain_id()).await?;
28582854
assert_eq!(
28592855
sender_info.next_block_height,
2860-
BlockHeight::from(1),
2856+
BlockHeight::ZERO,
28612857
"prepare_chain should download acknowledged sender blocks"
28622858
);
28632859

linera-core/src/unit_tests/worker_tests.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use linera_chain::{
3636
CertificateKind, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate,
3737
GenericCertificate, Timeout, ValidatedBlock,
3838
},
39-
ChainError, ChainExecutionContext,
39+
ChainError, ChainExecutionContext, ChainStateView,
4040
};
4141
use linera_execution::{
4242
committee::Committee,
@@ -47,11 +47,12 @@ use linera_execution::{
4747
test_utils::{
4848
dummy_chain_description, ExpectedCall, RegisterMockApplication, SystemExecutionState,
4949
},
50-
ExecutionError, Message, MessageKind, OutgoingMessage, Query, QueryContext, QueryOutcome,
51-
QueryResponse, SystemQuery, SystemResponse,
50+
ExecutionError, ExecutionRuntimeContext, Message, MessageKind, OutgoingMessage, Query,
51+
QueryContext, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
5252
};
5353
use linera_storage::{DbStorage, Storage, TestClock};
5454
use linera_views::{
55+
context::Context,
5556
memory::MemoryDatabase,
5657
random::generate_test_namespace,
5758
store::TestKeyValueDatabase as _,
@@ -474,6 +475,18 @@ where
474475
}
475476
}
476477

478+
/// Asserts that there are no "removed" bundles in the inbox, that have been included as
479+
/// incoming in a block but not received from the sender chain yet.
480+
async fn assert_no_removed_bundles<C>(chain: &ChainStateView<C>)
481+
where
482+
C: Context + Clone + Send + Sync + 'static,
483+
C::Extra: ExecutionRuntimeContext,
484+
{
485+
for (_, inbox) in chain.inboxes.try_load_all_entries().await.unwrap() {
486+
assert_eq!(inbox.removed_bundles.front().await.unwrap(), None);
487+
}
488+
}
489+
477490
fn direct_outgoing_message(
478491
recipient: ChainId,
479492
kind: MessageKind,
@@ -2478,7 +2491,7 @@ where
24782491
{
24792492
let chain = env.worker().chain_state_view(chain_2).await?;
24802493
assert!(chain.is_active());
2481-
chain.validate_incoming_bundles().await?;
2494+
assert_no_removed_bundles(&chain).await;
24822495
}
24832496

24842497
// Process the bounced message and try to use the refund.
@@ -2519,7 +2532,7 @@ where
25192532
{
25202533
let chain = env.worker.chain_state_view(chain_1).await?;
25212534
assert!(chain.is_active());
2522-
chain.validate_incoming_bundles().await?;
2535+
assert_no_removed_bundles(&chain).await;
25232536
}
25242537
Ok(())
25252538
}
@@ -2575,7 +2588,7 @@ where
25752588
{
25762589
let admin_chain = env.worker().chain_state_view(admin_id).await?;
25772590
assert!(admin_chain.is_active());
2578-
admin_chain.validate_incoming_bundles().await?;
2591+
assert_no_removed_bundles(&admin_chain).await;
25792592
assert_eq!(
25802593
BlockHeight::from(1),
25812594
admin_chain.tip_state.get().next_block_height
@@ -2656,7 +2669,7 @@ where
26562669
*user_chain.execution_state.system.admin_id.get(),
26572670
Some(admin_id)
26582671
);
2659-
user_chain.validate_incoming_bundles().await?;
2672+
assert_no_removed_bundles(&user_chain).await;
26602673
matches!(
26612674
&user_chain
26622675
.inboxes
@@ -2739,7 +2752,7 @@ where
27392752
Some(admin_id)
27402753
);
27412754
assert_eq!(user_chain.execution_state.system.committees.get().len(), 2);
2742-
user_chain.validate_incoming_bundles().await?;
2755+
assert_no_removed_bundles(&user_chain).await;
27432756
Ok(())
27442757
}
27452758
}
@@ -3033,10 +3046,15 @@ where
30333046
// The admin chain has an anticipated message.
30343047
let admin_chain = env.worker().chain_state_view(admin_id).await?;
30353048
assert!(admin_chain.is_active());
3036-
assert_matches!(
3037-
admin_chain.validate_incoming_bundles().await,
3038-
Err(ChainError::MissingCrossChainUpdate { .. })
3039-
);
3049+
assert!(admin_chain
3050+
.inboxes
3051+
.try_load_entry(&user_id)
3052+
.await?
3053+
.unwrap()
3054+
.removed_bundles
3055+
.front()
3056+
.await?
3057+
.is_some());
30403058
}
30413059

30423060
// Try again to execute the transfer from the user chain to the admin chain.
@@ -3049,7 +3067,7 @@ where
30493067
// The admin chain has no more anticipated messages.
30503068
let admin_chain = env.worker().chain_state_view(admin_id).await?;
30513069
assert!(admin_chain.is_active());
3052-
admin_chain.validate_incoming_bundles().await?;
3070+
assert_no_removed_bundles(&admin_chain).await;
30533071
}
30543072

30553073
// Let's make a certificate for a block creating another epoch.

linera-service/tests/local_net_tests.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,13 +1028,12 @@ async fn test_update_validator_sender_gaps(config: LocalNetConfig) -> Result<()>
10281028
sender_client
10291029
.transfer(Amount::from_tokens(2), sender_chain, sender_chain)
10301030
.await?;
1031+
receiver_client.process_inbox(receiver_chain).await?;
10311032
// transfer some more to create a gap in the chain from the recipient's perspective
10321033
sender_client
10331034
.transfer(Amount::from_tokens(3), sender_chain, receiver_chain)
10341035
.await?;
10351036

1036-
receiver_client.process_inbox(receiver_chain).await?;
1037-
10381037
// Restart the stopped validator and stop another one.
10391038
net.restart_validator(UNAWARE_VALIDATOR_INDEX).await?;
10401039
net.stop_validator(STOPPED_VALIDATOR_INDEX).await?;
@@ -1057,11 +1056,9 @@ async fn test_update_validator_sender_gaps(config: LocalNetConfig) -> Result<()>
10571056
BlockHeight::ZERO
10581057
);
10591058

1060-
// Try to send tokens from receiver to sender. Receiver should have a gap in the
1061-
// sender chain at this point.
1062-
receiver_client
1063-
.transfer(Amount::from_tokens(4), receiver_chain, sender_chain)
1064-
.await?;
1059+
// Process the last sender block. The client has a gap in the sender chain and does
1060+
// not update the unaware validator about block 1.
1061+
receiver_client.process_inbox(receiver_chain).await?;
10651062

10661063
// Synchronize the validator
10671064
let validator_address = net.validator_address(UNAWARE_VALIDATOR_INDEX);

0 commit comments

Comments
 (0)