Skip to content

Commit e3b1e5b

Browse files
authored
Don't execute sender chains (#3989)
## Motivation When the client verifies incoming messages to its own chains, it doesn't need the sender chain's execution state, so computing the block execution there is unnecessary work. Eventually we want to go even further and avoid even downloading some of the sender chain's blocks (see #3969). ## Proposal As a preparation for #3969, we introduce an `unexecuted_blocks` map to the chain state: It maps block heights that are higher than `next_block_height`, i.e. haven't been executed yet, to block hashes. We don't purge empty outboxes anymore. When updating the outboxes, we make sure that we haven't skipped any messages and compare to the next height to schedule: That allows us to update some outboxes even for loose blocks, ahead of execution. For now we still download all blocks of the sender chain. Skipping any will be done in a future PR. However, we now add them as loose blocks and don't execute them. ## Test Plan CI ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - Closes #3270. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 6976d9c commit e3b1e5b

File tree

12 files changed

+482
-63
lines changed

12 files changed

+482
-63
lines changed

linera-base/src/data_types.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::{
1313
hash::Hash,
1414
io, iter,
1515
num::ParseIntError,
16+
ops::{Bound, RangeBounds},
1617
path::Path,
1718
str::FromStr,
1819
};
@@ -486,6 +487,32 @@ impl TryFrom<BlockHeight> for usize {
486487
}
487488
}
488489

490+
/// Allows converting [`BlockHeight`] ranges to inclusive tuples of bounds.
491+
pub trait BlockHeightRangeBounds {
492+
/// Returns the range as a tuple of inclusive bounds.
493+
/// If the range is empty, returns `None`.
494+
fn to_inclusive(&self) -> Option<(BlockHeight, BlockHeight)>;
495+
}
496+
497+
impl<T: RangeBounds<BlockHeight>> BlockHeightRangeBounds for T {
498+
fn to_inclusive(&self) -> Option<(BlockHeight, BlockHeight)> {
499+
let start = match self.start_bound() {
500+
Bound::Included(height) => *height,
501+
Bound::Excluded(height) => height.try_add_one().ok()?,
502+
Bound::Unbounded => BlockHeight(0),
503+
};
504+
let end = match self.end_bound() {
505+
Bound::Included(height) => *height,
506+
Bound::Excluded(height) => height.try_sub_one().ok()?,
507+
Bound::Unbounded => BlockHeight::MAX,
508+
};
509+
if start > end {
510+
return None;
511+
}
512+
Some((start, end))
513+
}
514+
}
515+
489516
impl_wrapped_number!(Amount, u128);
490517
impl_wrapped_number!(BlockHeight, u64);
491518
impl_wrapped_number!(TimeDelta, u64);

linera-chain/src/chain.rs

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use std::{
55
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6+
ops::RangeBounds,
67
sync::Arc,
78
};
89

@@ -11,7 +12,7 @@ use linera_base::{
1112
crypto::{CryptoHash, ValidatorPublicKey},
1213
data_types::{
1314
Amount, ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
14-
Epoch, OracleResponse, Timestamp,
15+
BlockHeightRangeBounds as _, Epoch, OracleResponse, Timestamp,
1516
},
1617
ensure,
1718
identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, MessageId},
@@ -277,6 +278,9 @@ where
277278
/// Number of outgoing messages in flight for each block height.
278279
/// We use a `RegisterView` to prioritize speed for small maps.
279280
pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
281+
282+
/// Blocks that have been verified but not executed yet, and that may not be contiguous.
283+
pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
280284
}
281285

282286
/// Block-chaining state.
@@ -419,9 +423,6 @@ where
419423
self.outbox_counters.get_mut().remove(&update);
420424
}
421425
}
422-
if outbox.queue.count() == 0 {
423-
self.outboxes.remove_entry(target)?;
424-
}
425426
#[cfg(with_metrics)]
426427
metrics::NUM_OUTBOXES
427428
.with_label_values(&[])
@@ -942,6 +943,20 @@ where
942943
&block.body.messages,
943944
)?;
944945
self.confirmed_log.push(hash);
946+
self.preprocessed_blocks.remove(&block.header.height)?;
947+
Ok(())
948+
}
949+
950+
/// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
951+
pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
952+
let hash = block.inner().hash();
953+
let block = block.inner().inner();
954+
let height = block.header.height;
955+
if height < self.tip_state.get().next_block_height {
956+
return Ok(());
957+
}
958+
self.process_outgoing_messages(block).await?;
959+
self.preprocessed_blocks.insert(&height, hash)?;
945960
Ok(())
946961
}
947962

@@ -1067,6 +1082,37 @@ where
10671082
Ok(())
10681083
}
10691084

1085+
/// Returns the hashes of all blocks in the given range. Returns an error if we are missing
1086+
/// any of those blocks.
1087+
pub async fn block_hashes(
1088+
&self,
1089+
range: impl RangeBounds<BlockHeight>,
1090+
) -> Result<Vec<CryptoHash>, ChainError> {
1091+
let next_height = self.tip_state.get().next_block_height;
1092+
// If the range is not empty, it can always be represented as start..=end.
1093+
let Some((start, end)) = range.to_inclusive() else {
1094+
return Ok(Vec::new());
1095+
};
1096+
let mut hashes = if let Ok(last_height) = next_height.try_sub_one() {
1097+
let usize_start = usize::try_from(start)?;
1098+
let usize_end = usize::try_from(end.min(last_height))?;
1099+
self.confirmed_log.read(usize_start..=usize_end).await?
1100+
} else {
1101+
Vec::new()
1102+
};
1103+
for height in start.max(next_height).0..=end.0 {
1104+
hashes.push(
1105+
self.preprocessed_blocks
1106+
.get(&BlockHeight(height))
1107+
.await?
1108+
.ok_or_else(|| {
1109+
ChainError::InternalError("missing entry in preprocessed_blocks".into())
1110+
})?,
1111+
);
1112+
}
1113+
Ok(hashes)
1114+
}
1115+
10701116
/// Resets the chain manager for the next block height.
10711117
fn reset_chain_manager(
10721118
&mut self,
@@ -1094,12 +1140,33 @@ where
10941140
// application.
10951141
let recipients = block.recipients();
10961142
let block_height = block.header.height;
1143+
let next_height = self.tip_state.get().next_block_height;
10971144

10981145
// Update the outboxes.
10991146
let outbox_counters = self.outbox_counters.get_mut();
11001147
let targets = recipients.into_iter().collect::<Vec<_>>();
11011148
let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1102-
for mut outbox in outboxes {
1149+
for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1150+
if block_height > next_height {
1151+
// Find the hash of the block that was most recently added to the outbox.
1152+
let prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok() {
1153+
Some(height) if height < next_height => {
1154+
let index =
1155+
usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1156+
Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1157+
ChainError::InternalError("missing entry in confirmed_log".into())
1158+
})?)
1159+
}
1160+
Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1161+
|| ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1162+
)?),
1163+
None => None,
1164+
};
1165+
// Schedule only if no block is missing that sent something to the same recipient.
1166+
if prev_hash.as_ref() != block.body.previous_message_blocks.get(target) {
1167+
continue;
1168+
}
1169+
}
11031170
if outbox.schedule_message(block_height)? {
11041171
*outbox_counters.entry(block_height).or_default() += 1;
11051172
}

linera-core/src/chain_worker/actor.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ where
121121
callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
122122
},
123123

124+
/// Preprocess a block without executing it.
125+
PreprocessCertificate {
126+
certificate: ConfirmedBlockCertificate,
127+
#[debug(skip)]
128+
callback: oneshot::Sender<Result<NetworkActions, WorkerError>>,
129+
},
130+
124131
/// Process a cross-chain update.
125132
ProcessCrossChainUpdate {
126133
origin: ChainId,
@@ -392,6 +399,12 @@ where
392399
.await,
393400
)
394401
.is_ok(),
402+
ChainWorkerRequest::PreprocessCertificate {
403+
certificate,
404+
callback,
405+
} => callback
406+
.send(self.worker.preprocess_certificate(certificate).await)
407+
.is_ok(),
395408
ChainWorkerRequest::ProcessCrossChainUpdate {
396409
origin,
397410
bundles,
@@ -480,6 +493,9 @@ where
480493
ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
481494
callback.send(Err(error)).is_ok()
482495
}
496+
ChainWorkerRequest::PreprocessCertificate { callback, .. } => {
497+
callback.send(Err(error)).is_ok()
498+
}
483499
ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
484500
callback.send(Err(error)).is_ok()
485501
}

linera-core/src/chain_worker/state/attempted_changes.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,53 @@ where
414414
Ok((info, actions))
415415
}
416416

417+
/// Stores a block's blobs, and adds its messages to the outbox where possible.
418+
/// Does not execute the block.
419+
pub(super) async fn preprocess_certificate(
420+
&mut self,
421+
certificate: ConfirmedBlockCertificate,
422+
) -> Result<NetworkActions, WorkerError> {
423+
let block = certificate.block();
424+
// Check that the chain is active and ready for this confirmation.
425+
let tip = self.state.chain.tip_state.get().clone();
426+
if tip.next_block_height > block.header.height {
427+
// We already processed this block.
428+
return self.state.create_network_actions().await;
429+
}
430+
431+
let required_blob_ids = block.required_blob_ids();
432+
let created_blobs: BTreeMap<_, _> = block.iter_created_blobs().collect();
433+
let blobs_result = self
434+
.state
435+
.get_required_blobs(block.required_blob_ids(), &created_blobs)
436+
.await
437+
.map(|blobs| blobs.into_values().collect::<Vec<_>>());
438+
439+
if let Ok(blobs) = &blobs_result {
440+
self.state
441+
.storage
442+
.write_blobs_and_certificate(blobs, &certificate)
443+
.await?;
444+
}
445+
446+
// Update the blob state with last used certificate hash.
447+
let blob_state = certificate.value().to_blob_state(blobs_result.is_ok());
448+
let blob_ids = required_blob_ids.into_iter().collect::<Vec<_>>();
449+
self.state
450+
.storage
451+
.maybe_write_blob_states(&blob_ids, blob_state)
452+
.await?;
453+
blobs_result?;
454+
// Update the outboxes.
455+
self.state
456+
.chain
457+
.preprocess_block(certificate.value())
458+
.await?;
459+
// Persist chain.
460+
self.save().await?;
461+
self.state.create_network_actions().await
462+
}
463+
417464
/// Schedules a notification for when cross-chain messages are delivered up to the given
418465
/// `height`.
419466
#[instrument(level = "trace", skip(self, notify_when_messages_are_delivered))]

linera-core/src/chain_worker/state/mod.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use linera_chain::{
2727
};
2828
use linera_execution::{ExecutionStateView, Query, QueryOutcome, ServiceRuntimeEndpoint};
2929
use linera_storage::{Clock as _, Storage};
30-
use linera_views::{views::ClonableView, ViewError};
30+
use linera_views::views::ClonableView;
3131
use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock};
3232

3333
#[cfg(test)]
@@ -261,6 +261,18 @@ where
261261
.await
262262
}
263263

264+
/// Preprocesses a block without executing it.
265+
#[tracing::instrument(level = "debug", skip(self))]
266+
pub(super) async fn preprocess_certificate(
267+
&mut self,
268+
certificate: ConfirmedBlockCertificate,
269+
) -> Result<NetworkActions, WorkerError> {
270+
ChainWorkerStateWithAttemptedChanges::new(self)
271+
.await
272+
.preprocess_certificate(certificate)
273+
.await
274+
}
275+
264276
/// Updates the chain's inboxes, receiving messages from a cross-chain update.
265277
#[tracing::instrument(level = "debug", skip(self))]
266278
pub(super) async fn process_cross_chain_update(
@@ -458,22 +470,38 @@ where
458470
) -> Result<NetworkActions, WorkerError> {
459471
// Load all the certificates we will need, regardless of the medium.
460472
let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
461-
let heights_usize = heights
462-
.iter()
473+
let next_block_height = self.chain.tip_state.get().next_block_height;
474+
let log_heights = heights
475+
.range(..next_block_height)
463476
.copied()
464477
.map(usize::try_from)
465478
.collect::<Result<Vec<_>, _>>()?;
466-
let hashes = self
479+
let mut hashes = self
467480
.chain
468481
.confirmed_log
469-
.multi_get(heights_usize.clone())
482+
.multi_get(log_heights)
470483
.await?
471484
.into_iter()
472-
.zip(heights_usize)
485+
.zip(&heights)
473486
.map(|(maybe_hash, height)| {
474-
maybe_hash.ok_or_else(|| ViewError::not_found("confirmed log entry", height))
487+
maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
488+
height: *height,
489+
chain_id: self.chain_id(),
490+
})
475491
})
476492
.collect::<Result<Vec<_>, _>>()?;
493+
for height in heights.range(next_block_height..) {
494+
hashes.push(
495+
self.chain
496+
.preprocessed_blocks
497+
.get(height)
498+
.await?
499+
.ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
500+
height: *height,
501+
chain_id: self.chain_id(),
502+
})?,
503+
);
504+
}
477505
let certificates = self.storage.read_certificates(hashes).await?;
478506
let certificates = heights
479507
.into_iter()

0 commit comments

Comments
 (0)