Skip to content

Commit dc237ac

Browse files
authored
Add missing retry for incoming message on existing block proposals (#4758)
## Motivation Fix liveness issue ## Proposal * Move 2 functions from `ChainClient` to `Client` * Add missing retry for incoming message on existing block proposals ## Test Plan CI ## Release Plan - These changes should be backported to the latest `devnet` branch, then - be released in a new SDK,
1 parent 900f6b3 commit dc237ac

File tree

1 file changed

+174
-141
lines changed

1 file changed

+174
-141
lines changed

linera-core/src/client/mod.rs

Lines changed: 174 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,144 @@ impl<Env: Environment> Client<Env> {
999999
))
10001000
}
10011001

1002+
/// Downloads only the specific sender blocks needed for missing cross-chain messages.
1003+
/// This is a targeted alternative to `find_received_certificates` that only downloads
1004+
/// the exact sender blocks we're missing, rather than searching through all received
1005+
/// certificates.
1006+
async fn download_missing_sender_blocks(
1007+
&self,
1008+
receiver_chain_id: ChainId,
1009+
missing_blocks: BTreeMap<ChainId, Vec<BlockHeight>>,
1010+
) -> Result<(), ChainClientError> {
1011+
if missing_blocks.is_empty() {
1012+
return Ok(());
1013+
}
1014+
1015+
let (_, committee) = self.admin_committee().await?;
1016+
let nodes = self.make_nodes(&committee)?;
1017+
1018+
// Download certificates for each sender chain at the specific heights.
1019+
stream::iter(missing_blocks.into_iter())
1020+
.map(|(sender_chain_id, heights)| {
1021+
let height = heights.into_iter().max();
1022+
let mut shuffled_nodes = nodes.clone();
1023+
shuffled_nodes.shuffle(&mut rand::thread_rng());
1024+
async move {
1025+
let Some(height) = height else {
1026+
return Ok(());
1027+
};
1028+
// Try to download from any node.
1029+
for node in &shuffled_nodes {
1030+
if let Err(err) = self
1031+
.download_sender_block_with_sending_ancestors(
1032+
receiver_chain_id,
1033+
sender_chain_id,
1034+
height,
1035+
node,
1036+
)
1037+
.await
1038+
{
1039+
tracing::debug!(
1040+
%height,
1041+
%receiver_chain_id,
1042+
%sender_chain_id,
1043+
%err,
1044+
validator = %node.public_key,
1045+
"Failed to fetch sender block",
1046+
);
1047+
} else {
1048+
return Ok::<_, ChainClientError>(());
1049+
}
1050+
}
1051+
// If all nodes fail, return an error.
1052+
Err(ChainClientError::CannotDownloadMissingSenderBlock {
1053+
chain_id: sender_chain_id,
1054+
height,
1055+
})
1056+
}
1057+
})
1058+
.buffer_unordered(self.options.max_joined_tasks)
1059+
.try_collect::<Vec<_>>()
1060+
.await?;
1061+
Ok(())
1062+
}
1063+
1064+
/// Downloads a specific sender block and recursively downloads any earlier blocks
1065+
/// that also sent a message to our chain, based on `previous_message_blocks`.
1066+
///
1067+
/// This ensures that we have all the sender blocks needed to preprocess the target block
1068+
/// and put the messages to our chain into the outbox.
1069+
async fn download_sender_block_with_sending_ancestors(
1070+
&self,
1071+
receiver_chain_id: ChainId,
1072+
sender_chain_id: ChainId,
1073+
height: BlockHeight,
1074+
remote_node: &RemoteNode<Env::ValidatorNode>,
1075+
) -> Result<(), ChainClientError> {
1076+
let next_outbox_height = self
1077+
.local_node
1078+
.next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1079+
.await?
1080+
.get(&sender_chain_id)
1081+
.copied()
1082+
.unwrap_or(BlockHeight::ZERO);
1083+
let (max_epoch, committees) = self.admin_committees().await?;
1084+
1085+
// Recursively collect all certificates we need, following
1086+
// the chain of previous_message_blocks back to next_outbox_height.
1087+
let mut certificates = BTreeMap::new();
1088+
let mut current_height = height;
1089+
1090+
// Stop if we've reached the height we've already processed.
1091+
while current_height >= next_outbox_height {
1092+
// Download the certificate for this height.
1093+
let downloaded = remote_node
1094+
.download_certificates_by_heights(sender_chain_id, vec![current_height])
1095+
.await?;
1096+
let Some(certificate) = downloaded.into_iter().next() else {
1097+
return Err(ChainClientError::CannotDownloadMissingSenderBlock {
1098+
chain_id: sender_chain_id,
1099+
height: current_height,
1100+
});
1101+
};
1102+
1103+
// Validate the certificate.
1104+
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1105+
.into_result()?;
1106+
1107+
// Check if there's a previous message block to our chain.
1108+
let block = certificate.block();
1109+
let next_height = block
1110+
.body
1111+
.previous_message_blocks
1112+
.get(&receiver_chain_id)
1113+
.map(|(_prev_hash, prev_height)| *prev_height);
1114+
1115+
// Store this certificate.
1116+
certificates.insert(current_height, certificate);
1117+
1118+
if let Some(prev_height) = next_height {
1119+
// Continue with the previous block.
1120+
current_height = prev_height;
1121+
} else {
1122+
// No more dependencies.
1123+
break;
1124+
}
1125+
}
1126+
1127+
// Process certificates in ascending block height order (BTreeMap keeps them sorted).
1128+
for certificate in certificates.into_values() {
1129+
self.receive_sender_certificate(
1130+
certificate,
1131+
ReceiveCertificateMode::AlreadyChecked,
1132+
Some(vec![remote_node.clone()]),
1133+
)
1134+
.await?;
1135+
}
1136+
1137+
Ok(())
1138+
}
1139+
10021140
#[instrument(
10031141
level = "trace", skip_all,
10041142
fields(certificate_hash = ?incoming_certificate.hash()),
@@ -1197,6 +1335,32 @@ impl<Env: Environment> Client<Env> {
11971335
}
11981336
}
11991337
}
1338+
if let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1339+
if let ChainError::MissingCrossChainUpdate {
1340+
chain_id,
1341+
origin,
1342+
height,
1343+
} = &**chain_err
1344+
{
1345+
self.download_sender_block_with_sending_ancestors(
1346+
*chain_id,
1347+
*origin,
1348+
*height,
1349+
remote_node,
1350+
)
1351+
.await?;
1352+
// Retry
1353+
if let Err(new_err) = self
1354+
.local_node
1355+
.handle_block_proposal(proposal.clone())
1356+
.await
1357+
{
1358+
err = new_err;
1359+
} else {
1360+
continue;
1361+
}
1362+
}
1363+
}
12001364

12011365
debug!(
12021366
"Skipping proposal from {owner} and validator {} at height {}: {err}",
@@ -2024,7 +2188,9 @@ impl<Env: Environment> ChainClient<Env> {
20242188
.collect_missing_sender_blocks()
20252189
.await?;
20262190
// Download any sender blocks we're missing.
2027-
self.download_missing_sender_blocks(missing_blocks).await?;
2191+
self.client
2192+
.download_missing_sender_blocks(self.chain_id, missing_blocks)
2193+
.await?;
20282194
self.client.update_from_info(&info);
20292195
Ok(info)
20302196
}
@@ -2359,145 +2525,6 @@ impl<Env: Environment> ChainClient<Env> {
23592525
Ok(has_more)
23602526
}
23612527

2362-
/// Downloads only the specific sender blocks needed for missing cross-chain messages.
2363-
/// This is a targeted alternative to `find_received_certificates` that only downloads
2364-
/// the exact sender blocks we're missing, rather than searching through all received
2365-
/// certificates.
2366-
#[instrument(level = "trace")]
2367-
async fn download_missing_sender_blocks(
2368-
&self,
2369-
missing_blocks: BTreeMap<ChainId, Vec<BlockHeight>>,
2370-
) -> Result<(), ChainClientError> {
2371-
if missing_blocks.is_empty() {
2372-
return Ok(());
2373-
}
2374-
2375-
let (_, committee) = self.admin_committee().await?;
2376-
let nodes = self.client.make_nodes(&committee)?;
2377-
2378-
// Download certificates for each sender chain at the specific heights.
2379-
stream::iter(missing_blocks.into_iter())
2380-
.map(|(sender_chain_id, heights)| {
2381-
let height = heights.into_iter().max();
2382-
let this = self.clone();
2383-
let mut shuffled_nodes = nodes.clone();
2384-
shuffled_nodes.shuffle(&mut rand::thread_rng());
2385-
async move {
2386-
let Some(height) = height else {
2387-
return Ok(());
2388-
};
2389-
// Try to download from any node.
2390-
for node in &shuffled_nodes {
2391-
if let Err(err) = this
2392-
.download_sender_block_with_sending_ancestors(
2393-
sender_chain_id,
2394-
height,
2395-
node,
2396-
)
2397-
.await
2398-
{
2399-
tracing::debug!(
2400-
%height,
2401-
%sender_chain_id,
2402-
%err,
2403-
validator = %node.public_key,
2404-
"Failed to fetch sender block",
2405-
);
2406-
} else {
2407-
return Ok::<_, ChainClientError>(());
2408-
}
2409-
}
2410-
// If all nodes fail, return an error.
2411-
Err(ChainClientError::CannotDownloadMissingSenderBlock {
2412-
chain_id: sender_chain_id,
2413-
height,
2414-
})
2415-
}
2416-
})
2417-
.buffer_unordered(self.options.max_joined_tasks)
2418-
.try_collect::<Vec<_>>()
2419-
.await?;
2420-
Ok(())
2421-
}
2422-
2423-
/// Downloads a specific sender block and recursively downloads any earlier blocks
2424-
/// that also sent a message to our chain, based on `previous_message_blocks`.
2425-
///
2426-
/// This ensures that we have all the sender blocks needed to preprocess the target block
2427-
/// and put the messages to our chain into the outbox.
2428-
#[instrument(level = "trace")]
2429-
async fn download_sender_block_with_sending_ancestors(
2430-
&self,
2431-
sender_chain_id: ChainId,
2432-
height: BlockHeight,
2433-
remote_node: &RemoteNode<Env::ValidatorNode>,
2434-
) -> Result<(), ChainClientError> {
2435-
let next_outbox_height = self
2436-
.client
2437-
.local_node
2438-
.next_outbox_heights(&[sender_chain_id], self.chain_id)
2439-
.await?
2440-
.get(&sender_chain_id)
2441-
.copied()
2442-
.unwrap_or(BlockHeight::ZERO);
2443-
let (max_epoch, committees) = self.client.admin_committees().await?;
2444-
2445-
// Recursively collect all certificates we need, following
2446-
// the chain of previous_message_blocks back to next_outbox_height.
2447-
let mut certificates = BTreeMap::new();
2448-
let mut current_height = height;
2449-
2450-
// Stop if we've reached the height we've already processed.
2451-
while current_height >= next_outbox_height {
2452-
// Download the certificate for this height.
2453-
let downloaded = remote_node
2454-
.download_certificates_by_heights(sender_chain_id, vec![current_height])
2455-
.await?;
2456-
let Some(certificate) = downloaded.into_iter().next() else {
2457-
return Err(ChainClientError::CannotDownloadMissingSenderBlock {
2458-
chain_id: sender_chain_id,
2459-
height: current_height,
2460-
});
2461-
};
2462-
2463-
// Validate the certificate.
2464-
Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
2465-
.into_result()?;
2466-
2467-
// Check if there's a previous message block to our chain.
2468-
let block = certificate.block();
2469-
let next_height = block
2470-
.body
2471-
.previous_message_blocks
2472-
.get(&self.chain_id)
2473-
.map(|(_prev_hash, prev_height)| *prev_height);
2474-
2475-
// Store this certificate.
2476-
certificates.insert(current_height, certificate);
2477-
2478-
if let Some(prev_height) = next_height {
2479-
// Continue with the previous block.
2480-
current_height = prev_height;
2481-
} else {
2482-
// No more dependencies.
2483-
break;
2484-
}
2485-
}
2486-
2487-
// Process certificates in ascending block height order (BTreeMap keeps them sorted).
2488-
for certificate in certificates.into_values() {
2489-
self.client
2490-
.receive_sender_certificate(
2491-
certificate,
2492-
ReceiveCertificateMode::AlreadyChecked,
2493-
Some(vec![remote_node.clone()]),
2494-
)
2495-
.await?;
2496-
}
2497-
2498-
Ok(())
2499-
}
2500-
25012528
/// Sends money.
25022529
#[instrument(level = "trace")]
25032530
pub async fn transfer(
@@ -3913,7 +3940,13 @@ impl<Env: Environment> ChainClient<Env> {
39133940
);
39143941
return Ok(());
39153942
}
3916-
self.download_sender_block_with_sending_ancestors(origin, height, &remote_node)
3943+
self.client
3944+
.download_sender_block_with_sending_ancestors(
3945+
self.chain_id,
3946+
origin,
3947+
height,
3948+
&remote_node,
3949+
)
39173950
.await?;
39183951
if self.local_next_height_to_receive(origin).await? <= height {
39193952
warn!(

0 commit comments

Comments
 (0)