Skip to content

Commit 247adc3

Browse files
authored
[testnet] Add missing retry for incoming message on existing block proposals (#4759)
## Motivation fix liveness bug ## Proposal backport of #4758 ## Test Plan CI
1 parent 1d3d838 commit 247adc3

File tree

1 file changed

+174
-141
lines changed

1 file changed

+174
-141
lines changed

linera-core/src/client/mod.rs

Lines changed: 174 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,144 @@ impl<Env: Environment> Client<Env> {
889889
))
890890
}
891891

892+
/// Downloads only the specific sender blocks needed for missing cross-chain messages.
893+
/// This is a targeted alternative to `find_received_certificates` that only downloads
894+
/// the exact sender blocks we're missing, rather than searching through all received
895+
/// certificates.
896+
async fn download_missing_sender_blocks(
897+
&self,
898+
receiver_chain_id: ChainId,
899+
missing_blocks: BTreeMap<ChainId, Vec<BlockHeight>>,
900+
) -> Result<(), ChainClientError> {
901+
if missing_blocks.is_empty() {
902+
return Ok(());
903+
}
904+
905+
let (_, committee) = self.admin_committee().await?;
906+
let nodes = self.make_nodes(&committee)?;
907+
908+
// Download certificates for each sender chain at the specific heights.
909+
stream::iter(missing_blocks.into_iter())
910+
.map(|(sender_chain_id, heights)| {
911+
let height = heights.into_iter().max();
912+
let mut shuffled_nodes = nodes.clone();
913+
shuffled_nodes.shuffle(&mut rand::thread_rng());
914+
async move {
915+
let Some(height) = height else {
916+
return Ok(());
917+
};
918+
// Try to download from any node.
919+
for node in &shuffled_nodes {
920+
if let Err(err) = self
921+
.download_sender_block_with_sending_ancestors(
922+
receiver_chain_id,
923+
sender_chain_id,
924+
height,
925+
node,
926+
)
927+
.await
928+
{
929+
tracing::debug!(
930+
%height,
931+
%receiver_chain_id,
932+
%sender_chain_id,
933+
%err,
934+
validator = %node.public_key,
935+
"Failed to fetch sender block",
936+
);
937+
} else {
938+
return Ok::<_, ChainClientError>(());
939+
}
940+
}
941+
// If all nodes fail, return an error.
942+
Err(ChainClientError::CannotDownloadMissingSenderBlock {
943+
chain_id: sender_chain_id,
944+
height,
945+
})
946+
}
947+
})
948+
.buffer_unordered(self.options.max_joined_tasks)
949+
.try_collect::<Vec<_>>()
950+
.await?;
951+
Ok(())
952+
}
953+
954+
/// Downloads a specific sender block and recursively downloads any earlier blocks
955+
/// that also sent a message to our chain, based on `previous_message_blocks`.
956+
///
957+
/// This ensures that we have all the sender blocks needed to preprocess the target block
958+
/// and put the messages to our chain into the outbox.
959+
async fn download_sender_block_with_sending_ancestors(
960+
&self,
961+
receiver_chain_id: ChainId,
962+
sender_chain_id: ChainId,
963+
height: BlockHeight,
964+
remote_node: &RemoteNode<Env::ValidatorNode>,
965+
) -> Result<(), ChainClientError> {
966+
let next_outbox_height = self
967+
.local_node
968+
.next_outbox_heights(&[sender_chain_id], receiver_chain_id)
969+
.await?
970+
.get(&sender_chain_id)
971+
.copied()
972+
.unwrap_or(BlockHeight::ZERO);
973+
let (max_epoch, committees) = self.admin_committees().await?;
974+
975+
// Recursively collect all certificates we need, following
976+
// the chain of previous_message_blocks back to next_outbox_height.
977+
let mut certificates = BTreeMap::new();
978+
let mut current_height = height;
979+
980+
// Stop if we've reached the height we've already processed.
981+
while current_height >= next_outbox_height {
982+
// Download the certificate for this height.
983+
let downloaded = remote_node
984+
.download_certificates_by_heights(sender_chain_id, vec![current_height])
985+
.await?;
986+
let Some(certificate) = downloaded.into_iter().next() else {
987+
return Err(ChainClientError::CannotDownloadMissingSenderBlock {
988+
chain_id: sender_chain_id,
989+
height: current_height,
990+
});
991+
};
992+
993+
// Validate the certificate.
994+
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
995+
.into_result()?;
996+
997+
// Check if there's a previous message block to our chain.
998+
let block = certificate.block();
999+
let next_height = block
1000+
.body
1001+
.previous_message_blocks
1002+
.get(&receiver_chain_id)
1003+
.map(|(_prev_hash, prev_height)| *prev_height);
1004+
1005+
// Store this certificate.
1006+
certificates.insert(current_height, certificate);
1007+
1008+
if let Some(prev_height) = next_height {
1009+
// Continue with the previous block.
1010+
current_height = prev_height;
1011+
} else {
1012+
// No more dependencies.
1013+
break;
1014+
}
1015+
}
1016+
1017+
// Process certificates in ascending block height order (BTreeMap keeps them sorted).
1018+
for certificate in certificates.into_values() {
1019+
self.receive_sender_certificate(
1020+
certificate,
1021+
ReceiveCertificateMode::AlreadyChecked,
1022+
Some(vec![remote_node.clone()]),
1023+
)
1024+
.await?;
1025+
}
1026+
1027+
Ok(())
1028+
}
1029+
8921030
#[instrument(
8931031
level = "trace", skip_all,
8941032
fields(certificate_hash = ?incoming_certificate.hash()),
@@ -1087,6 +1225,32 @@ impl<Env: Environment> Client<Env> {
10871225
}
10881226
}
10891227
}
1228+
if let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1229+
if let ChainError::MissingCrossChainUpdate {
1230+
chain_id,
1231+
origin,
1232+
height,
1233+
} = &**chain_err
1234+
{
1235+
self.download_sender_block_with_sending_ancestors(
1236+
*chain_id,
1237+
*origin,
1238+
*height,
1239+
remote_node,
1240+
)
1241+
.await?;
1242+
// Retry
1243+
if let Err(new_err) = self
1244+
.local_node
1245+
.handle_block_proposal(proposal.clone())
1246+
.await
1247+
{
1248+
err = new_err;
1249+
} else {
1250+
continue;
1251+
}
1252+
}
1253+
}
10901254

10911255
debug!(
10921256
"Skipping proposal from {owner} and validator {} at height {}: {err}",
@@ -1897,7 +2061,9 @@ impl<Env: Environment> ChainClient<Env> {
18972061
.collect_missing_sender_blocks()
18982062
.await?;
18992063
// Download any sender blocks we're missing.
1900-
self.download_missing_sender_blocks(missing_blocks).await?;
2064+
self.client
2065+
.download_missing_sender_blocks(self.chain_id, missing_blocks)
2066+
.await?;
19012067
self.client.update_from_info(&info);
19022068
Ok(info)
19032069
}
@@ -2232,145 +2398,6 @@ impl<Env: Environment> ChainClient<Env> {
22322398
Ok(has_more)
22332399
}
22342400

2235-
/// Downloads only the specific sender blocks needed for missing cross-chain messages.
2236-
/// This is a targeted alternative to `find_received_certificates` that only downloads
2237-
/// the exact sender blocks we're missing, rather than searching through all received
2238-
/// certificates.
2239-
#[instrument(level = "trace")]
2240-
async fn download_missing_sender_blocks(
2241-
&self,
2242-
missing_blocks: BTreeMap<ChainId, Vec<BlockHeight>>,
2243-
) -> Result<(), ChainClientError> {
2244-
if missing_blocks.is_empty() {
2245-
return Ok(());
2246-
}
2247-
2248-
let (_, committee) = self.admin_committee().await?;
2249-
let nodes = self.client.make_nodes(&committee)?;
2250-
2251-
// Download certificates for each sender chain at the specific heights.
2252-
stream::iter(missing_blocks.into_iter())
2253-
.map(|(sender_chain_id, heights)| {
2254-
let height = heights.into_iter().max();
2255-
let this = self.clone();
2256-
let mut shuffled_nodes = nodes.clone();
2257-
shuffled_nodes.shuffle(&mut rand::thread_rng());
2258-
async move {
2259-
let Some(height) = height else {
2260-
return Ok(());
2261-
};
2262-
// Try to download from any node.
2263-
for node in &shuffled_nodes {
2264-
if let Err(err) = this
2265-
.download_sender_block_with_sending_ancestors(
2266-
sender_chain_id,
2267-
height,
2268-
node,
2269-
)
2270-
.await
2271-
{
2272-
tracing::debug!(
2273-
%height,
2274-
%sender_chain_id,
2275-
%err,
2276-
validator = %node.public_key,
2277-
"Failed to fetch sender block",
2278-
);
2279-
} else {
2280-
return Ok::<_, ChainClientError>(());
2281-
}
2282-
}
2283-
// If all nodes fail, return an error.
2284-
Err(ChainClientError::CannotDownloadMissingSenderBlock {
2285-
chain_id: sender_chain_id,
2286-
height,
2287-
})
2288-
}
2289-
})
2290-
.buffer_unordered(self.options.max_joined_tasks)
2291-
.try_collect::<Vec<_>>()
2292-
.await?;
2293-
Ok(())
2294-
}
2295-
2296-
/// Downloads a specific sender block and recursively downloads any earlier blocks
2297-
/// that also sent a message to our chain, based on `previous_message_blocks`.
2298-
///
2299-
/// This ensures that we have all the sender blocks needed to preprocess the target block
2300-
/// and put the messages to our chain into the outbox.
2301-
#[instrument(level = "trace")]
2302-
async fn download_sender_block_with_sending_ancestors(
2303-
&self,
2304-
sender_chain_id: ChainId,
2305-
height: BlockHeight,
2306-
remote_node: &RemoteNode<Env::ValidatorNode>,
2307-
) -> Result<(), ChainClientError> {
2308-
let next_outbox_height = self
2309-
.client
2310-
.local_node
2311-
.next_outbox_heights(&[sender_chain_id], self.chain_id)
2312-
.await?
2313-
.get(&sender_chain_id)
2314-
.copied()
2315-
.unwrap_or(BlockHeight::ZERO);
2316-
let (max_epoch, committees) = self.client.admin_committees().await?;
2317-
2318-
// Recursively collect all certificates we need, following
2319-
// the chain of previous_message_blocks back to next_outbox_height.
2320-
let mut certificates = BTreeMap::new();
2321-
let mut current_height = height;
2322-
2323-
// Stop if we've reached the height we've already processed.
2324-
while current_height >= next_outbox_height {
2325-
// Download the certificate for this height.
2326-
let downloaded = remote_node
2327-
.download_certificates_by_heights(sender_chain_id, vec![current_height])
2328-
.await?;
2329-
let Some(certificate) = downloaded.into_iter().next() else {
2330-
return Err(ChainClientError::CannotDownloadMissingSenderBlock {
2331-
chain_id: sender_chain_id,
2332-
height: current_height,
2333-
});
2334-
};
2335-
2336-
// Validate the certificate.
2337-
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
2338-
.into_result()?;
2339-
2340-
// Check if there's a previous message block to our chain.
2341-
let block = certificate.block();
2342-
let next_height = block
2343-
.body
2344-
.previous_message_blocks
2345-
.get(&self.chain_id)
2346-
.map(|(_prev_hash, prev_height)| *prev_height);
2347-
2348-
// Store this certificate.
2349-
certificates.insert(current_height, certificate);
2350-
2351-
if let Some(prev_height) = next_height {
2352-
// Continue with the previous block.
2353-
current_height = prev_height;
2354-
} else {
2355-
// No more dependencies.
2356-
break;
2357-
}
2358-
}
2359-
2360-
// Process certificates in ascending block height order (BTreeMap keeps them sorted).
2361-
for certificate in certificates.into_values() {
2362-
self.client
2363-
.receive_sender_certificate(
2364-
certificate,
2365-
ReceiveCertificateMode::AlreadyChecked,
2366-
Some(vec![remote_node.clone()]),
2367-
)
2368-
.await?;
2369-
}
2370-
2371-
Ok(())
2372-
}
2373-
23742401
/// Sends money.
23752402
#[instrument(level = "trace")]
23762403
pub async fn transfer(
@@ -3785,7 +3812,13 @@ impl<Env: Environment> ChainClient<Env> {
37853812
);
37863813
return Ok(());
37873814
}
3788-
self.download_sender_block_with_sending_ancestors(origin, height, &remote_node)
3815+
self.client
3816+
.download_sender_block_with_sending_ancestors(
3817+
self.chain_id,
3818+
origin,
3819+
height,
3820+
&remote_node,
3821+
)
37893822
.await?;
37903823
if self.local_next_height_to_receive(origin).await? <= height {
37913824
warn!(

0 commit comments

Comments
 (0)