Skip to content

Commit 6c34d51

Browse files
afckclaude
andauthored
[testnet] Cache execution state when staging blocks. (#5367) (#5388)
Backport of #5367. ## Motivation When we discard a transaction during block staging, we restart block execution from the start, which is wasteful and causes long delays. ## Proposal Cache the execution state after each transaction and restore it if the next one fails. Conflicts resolved for backport: - `chain.rs`: Preserved testnet's `previous_message_blocks_view`/`previous_event_blocks_view` params while taking the PR's `&mut ProposedBlock` and `BundleExecutionPolicy` changes - `chain_worker/state.rs`: Fixed `handle_block_proposal` to call `self.chain.execute_block()` directly (returns `BlockExecutionOutcome`) instead of `self.execute_block()` (returns `Block`) - `client/mod.rs`: Applied `chain_client` changes to the unified `client/mod.rs`, added `max_block_limit_errors` field to `ChainClientOptions` - `wasm_worker_tests.rs`: Adapted the new `test_memory_auto_retry_vs_abort_consistency` test to testnet's `TestEnvironment` API (no `execute_proposal()`/`executing_worker()` methods; `sync Bytecode::load_from_file`) ## Test Plan Tests have been added. ## Release Plan - Release SDK. ## Links - PR to main: #5367 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 42f7353 commit 6c34d51

File tree

15 files changed

+872
-188
lines changed

15 files changed

+872
-188
lines changed

CLI.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ Client implementation and command-line tool for the Linera blockchain
131131
* `--max-pending-message-bundles <MAX_PENDING_MESSAGE_BUNDLES>` — The maximum number of incoming message bundles to include in a block proposal
132132

133133
Default value: `10`
134+
* `--max-block-limit-errors <MAX_BLOCK_LIMIT_ERRORS>` — Maximum number of message bundles to discard from a block proposal due to block limit errors before discarding all remaining bundles.
135+
136+
Discarded bundles can be retried in the next block.
137+
138+
Default value: `3`
134139
* `--max-new-events-per-block <MAX_NEW_EVENTS_PER_BLOCK>` — The maximum number of new stream events to include in a block proposal
135140

136141
Default value: `10`

linera-chain/src/block_tracker.rs

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ pub struct BlockExecutionTracker<'resources, 'blobs> {
5959

6060
// Blobs published in the block.
6161
published_blobs: BTreeMap<BlobId, &'blobs Blob>,
62-
63-
// We expect the number of outcomes to be equal to the number of transactions in the block.
64-
expected_outcomes_count: usize,
6562
}
6663

6764
impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
@@ -97,7 +94,6 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
9794
operation_results: Vec::new(),
9895
transaction_index: 0,
9996
published_blobs,
100-
expected_outcomes_count: proposal.transactions.len(),
10197
})
10298
}
10399

@@ -410,17 +406,65 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
410406
self.resource_controller
411407
}
412408

409+
/// Creates a checkpoint of the tracker's mutable state.
410+
///
411+
/// This captures all state that could be modified during transaction execution,
412+
/// allowing restoration if execution fails.
413+
pub fn create_checkpoint(&self) -> TrackerCheckpoint {
414+
TrackerCheckpoint {
415+
resource_tracker: self.resource_controller.tracker,
416+
next_application_index: self.next_application_index,
417+
next_chain_index: self.next_chain_index,
418+
transaction_index: self.transaction_index,
419+
oracle_responses_len: self.oracle_responses.len(),
420+
events_len: self.events.len(),
421+
blobs_len: self.blobs.len(),
422+
messages_len: self.messages.len(),
423+
operation_results_len: self.operation_results.len(),
424+
}
425+
}
426+
427+
/// Restores the tracker's mutable state from a checkpoint.
428+
///
429+
/// This reverts all state to what it was when the checkpoint was saved,
430+
/// as if the failed transaction execution never happened.
431+
pub fn restore_checkpoint(&mut self, checkpoint: TrackerCheckpoint) {
432+
// Destructure to ensure all fields are handled (compiler will warn on new fields).
433+
let TrackerCheckpoint {
434+
resource_tracker,
435+
next_application_index,
436+
next_chain_index,
437+
transaction_index,
438+
oracle_responses_len,
439+
events_len,
440+
blobs_len,
441+
messages_len,
442+
operation_results_len,
443+
} = checkpoint;
444+
445+
self.resource_controller.tracker = resource_tracker;
446+
self.next_application_index = next_application_index;
447+
self.next_chain_index = next_chain_index;
448+
self.transaction_index = transaction_index;
449+
self.oracle_responses.truncate(oracle_responses_len);
450+
self.events.truncate(events_len);
451+
self.blobs.truncate(blobs_len);
452+
self.messages.truncate(messages_len);
453+
self.operation_results.truncate(operation_results_len);
454+
}
455+
413456
/// Finalizes the execution and returns the collected results.
414457
///
415458
/// This method should be called after all transactions have been processed.
459+
/// The `expected_outcomes_count` should be the number of transactions in the final block.
416460
/// Panics if the number of lists of oracle responses, outgoing messages,
417461
/// events, or blobs does not match the expected counts.
418-
pub fn finalize(self) -> FinalizeExecutionResult {
462+
pub fn finalize(self, expected_outcomes_count: usize) -> FinalizeExecutionResult {
419463
// Asserts that the number of outcomes matches the expected count.
420-
assert_eq!(self.oracle_responses.len(), self.expected_outcomes_count);
421-
assert_eq!(self.messages.len(), self.expected_outcomes_count);
422-
assert_eq!(self.events.len(), self.expected_outcomes_count);
423-
assert_eq!(self.blobs.len(), self.expected_outcomes_count);
464+
assert_eq!(self.oracle_responses.len(), expected_outcomes_count);
465+
assert_eq!(self.messages.len(), expected_outcomes_count);
466+
assert_eq!(self.events.len(), expected_outcomes_count);
467+
assert_eq!(self.blobs.len(), expected_outcomes_count);
424468

425469
#[cfg(with_metrics)]
426470
crate::chain::metrics::track_block_metrics(&self.resource_controller.tracker);
@@ -446,3 +490,17 @@ pub(crate) type FinalizeExecutionResult = (
446490
Vec<OperationResult>,
447491
ResourceTracker,
448492
);
493+
494+
/// Checkpoint of the tracker's mutable state for restoration on failure.
495+
#[derive(Clone)]
496+
pub struct TrackerCheckpoint {
497+
pub(crate) resource_tracker: ResourceTracker,
498+
pub(crate) next_application_index: u32,
499+
pub(crate) next_chain_index: u32,
500+
pub(crate) transaction_index: u32,
501+
pub(crate) oracle_responses_len: usize,
502+
pub(crate) events_len: usize,
503+
pub(crate) blobs_len: usize,
504+
pub(crate) messages_len: usize,
505+
pub(crate) operation_results_len: usize,
506+
}

linera-chain/src/chain.rs

Lines changed: 149 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ use linera_views::{
3333
views::{ClonableView, RootView, View},
3434
};
3535
use serde::{Deserialize, Serialize};
36-
use tracing::instrument;
36+
use tracing::{info, instrument};
3737

3838
use crate::{
3939
block::{Block, ConfirmedBlock},
4040
block_tracker::BlockExecutionTracker,
4141
data_types::{
42-
BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageAction, MessageBundle,
43-
ProposedBlock, Transaction,
42+
BlockExecutionOutcome, BundleExecutionPolicy, ChainAndHeight, IncomingBundle,
43+
MessageAction, MessageBundle, ProposedBlock, Transaction,
4444
},
4545
inbox::{Cursor, InboxError, InboxStateView},
4646
manager::ChainManager,
@@ -715,8 +715,7 @@ where
715715
optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
716716
}
717717

718-
/// Executes a block: first the incoming messages, then the main operation.
719-
/// Does not update chain state other than the execution state.
718+
/// Executes a block with a specified policy for handling bundle failures.
720719
#[instrument(skip_all, fields(
721720
chain_id = %block.chain_id,
722721
block_height = %block.height
@@ -727,17 +726,27 @@ where
727726
confirmed_log: &LogView<C, CryptoHash>,
728727
previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
729728
previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
730-
block: &ProposedBlock,
729+
block: &mut ProposedBlock,
731730
local_time: Timestamp,
732731
round: Option<u32>,
733732
published_blobs: &[Blob],
734733
replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
734+
exec_policy: BundleExecutionPolicy,
735735
) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
736+
// AutoRetry is incompatible with replaying oracle responses because discarding or
737+
// rejecting bundles would change which transactions execute.
738+
if !matches!(exec_policy, BundleExecutionPolicy::Abort) {
739+
assert!(
740+
replaying_oracle_responses.is_none(),
741+
"Cannot use AutoRetry policy when replaying oracle responses"
742+
);
743+
}
744+
736745
#[cfg(with_metrics)]
737746
let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
738747
chain.system.timestamp.set(block.timestamp);
739748

740-
let policy = chain
749+
let committee_policy = chain
741750
.system
742751
.current_committee()
743752
.ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
@@ -746,7 +755,7 @@ where
746755
.clone();
747756

748757
let mut resource_controller = ResourceController::new(
749-
Arc::new(policy),
758+
Arc::new(committee_policy),
750759
ResourceTracker::default(),
751760
block.authenticated_signer,
752761
);
@@ -760,8 +769,6 @@ where
760769
chain.system.used_blobs.insert(&blob_id)?;
761770
}
762771

763-
// Execute each incoming bundle as a transaction, then each operation.
764-
// Collect messages, events and oracle responses, each as one list per transaction.
765772
let mut block_execution_tracker = BlockExecutionTracker::new(
766773
&mut resource_controller,
767774
published_blobs
@@ -773,12 +780,103 @@ where
773780
block,
774781
)?;
775782

776-
for transaction in block.transaction_refs() {
777-
block_execution_tracker
778-
.execute_transaction(transaction, round, chain)
779-
.await?;
783+
// Extract max_failures from exec_policy.
784+
let max_failures = match exec_policy {
785+
BundleExecutionPolicy::Abort => 0,
786+
BundleExecutionPolicy::AutoRetry { max_failures } => max_failures,
787+
};
788+
let auto_retry = !matches!(exec_policy, BundleExecutionPolicy::Abort);
789+
let mut failure_count = 0u32;
790+
791+
let mut i = 0;
792+
while i < block.transactions.len() {
793+
let transaction = &mut block.transactions[i];
794+
let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
795+
796+
// Checkpoint before bundle transactions if using auto-retry.
797+
let checkpoint = if auto_retry && is_bundle {
798+
Some((
799+
chain.clone_unchecked()?,
800+
block_execution_tracker.create_checkpoint(),
801+
))
802+
} else {
803+
None
804+
};
805+
806+
let result = block_execution_tracker
807+
.execute_transaction(&*transaction, round, chain)
808+
.await;
809+
810+
// If the transaction executed successfully, we move on to the next one.
811+
// On transient errors (e.g. missing blobs) we fail, so it can be retried after
812+
// syncing. In auto-retry mode, we can discard or reject message bundles that failed
813+
// with non-transient errors.
814+
let (error, context, incoming_bundle, saved_chain, saved_tracker) =
815+
match (result, transaction, checkpoint) {
816+
(Ok(()), _, _) => {
817+
i += 1;
818+
continue;
819+
}
820+
(
821+
Err(ChainError::ExecutionError(error, context)),
822+
Transaction::ReceiveMessages(incoming_bundle),
823+
Some((saved_chain, saved_tracker)),
824+
) if !error.is_transient_error() => {
825+
(error, context, incoming_bundle, saved_chain, saved_tracker)
826+
}
827+
(Err(e), _, _) => return Err(e),
828+
};
829+
830+
// Restore checkpoint.
831+
*chain = saved_chain;
832+
block_execution_tracker.restore_checkpoint(saved_tracker);
833+
834+
if error.is_limit_error() && i > 0 {
835+
failure_count += 1;
836+
// If we've exceeded max failures, discard all remaining message bundles.
837+
let maybe_sender = if failure_count > max_failures {
838+
info!(
839+
failure_count,
840+
max_failures,
841+
"Exceeded max bundle failures, discarding all remaining message bundles"
842+
);
843+
None
844+
} else {
845+
// Not the first - discard it and same-sender subsequent bundles.
846+
info!(
847+
%error,
848+
index = i,
849+
origin = %incoming_bundle.origin,
850+
"Message bundle exceeded block limits and will be discarded for \
851+
retry in a later block"
852+
);
853+
Some(incoming_bundle.origin)
854+
};
855+
Self::discard_remaining_bundles(block, i, maybe_sender);
856+
// Continue without incrementing i (next transaction is now at i).
857+
} else if incoming_bundle.bundle.is_protected()
858+
|| incoming_bundle.action == MessageAction::Reject
859+
{
860+
// Protected bundles cannot be rejected. Failed rejected bundles fail the block.
861+
return Err(ChainError::ExecutionError(error, context));
862+
} else {
863+
// Reject the bundle: either a non-limit error, or the first bundle
864+
// exceeded limits (and is inherently too large for any block).
865+
info!(
866+
%error,
867+
index = i,
868+
origin = %incoming_bundle.origin,
869+
"Message bundle failed to execute and will be rejected"
870+
);
871+
incoming_bundle.action = MessageAction::Reject;
872+
// Retry the transaction as rejected (don't increment i).
873+
}
780874
}
781875

876+
// This can only happen if all transactions were incoming bundles that all got discarded
877+
// due to resource limit errors. This is unlikely in practice but theoretically possible.
878+
ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
879+
782880
let recipients = block_execution_tracker.recipients();
783881
let heights = previous_message_blocks_view.multi_get(&recipients).await?;
784882
let mut recipient_heights = Vec::new();
@@ -826,7 +924,7 @@ where
826924
};
827925

828926
let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
829-
block_execution_tracker.finalize();
927+
block_execution_tracker.finalize(block.transactions.len());
830928

831929
Ok((
832930
BlockExecutionOutcome {
@@ -843,20 +941,48 @@ where
843941
))
844942
}
845943

846-
/// Executes a block: first the incoming messages, then the main operation.
847-
/// Does not update chain state other than the execution state.
944+
/// Discards all bundles from the given origin (or all if `None`), starting at the given index.
945+
fn discard_remaining_bundles(
946+
block: &mut ProposedBlock,
947+
mut index: usize,
948+
maybe_origin: Option<ChainId>,
949+
) {
950+
while index < block.transactions.len() {
951+
if matches!(
952+
&block.transactions[index],
953+
Transaction::ReceiveMessages(bundle)
954+
if maybe_origin.is_none_or(|origin| bundle.origin == origin)
955+
) {
956+
block.transactions.remove(index);
957+
} else {
958+
index += 1;
959+
}
960+
}
961+
}
962+
963+
/// Executes a block with a specified policy for handling bundle failures.
964+
///
965+
/// This method supports automatic retry with checkpointing when bundles fail:
966+
/// - For limit errors (block too large, fuel exceeded, etc.): the bundle is discarded
967+
/// so it can be retried in a later block, unless it's the first transaction
968+
/// (which gets rejected as inherently too large).
969+
/// - For non-limit errors: the bundle is rejected (triggering bounced messages).
970+
/// - After `max_failures` failed bundles, all remaining message bundles are discarded.
971+
///
972+
/// The block may be modified to reflect the actual executed transactions.
848973
#[instrument(skip_all, fields(
849974
chain_id = %self.chain_id(),
850975
block_height = %block.height
851976
))]
852977
pub async fn execute_block(
853978
&mut self,
854-
block: &ProposedBlock,
979+
mut block: ProposedBlock,
855980
local_time: Timestamp,
856981
round: Option<u32>,
857982
published_blobs: &[Blob],
858983
replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
859-
) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
984+
policy: BundleExecutionPolicy,
985+
) -> Result<(ProposedBlock, BlockExecutionOutcome, ResourceTracker), ChainError> {
860986
assert_eq!(
861987
block.chain_id,
862988
self.execution_state.context().extra().chain_id()
@@ -899,7 +1025,7 @@ where
8991025
});
9001026
Self::check_app_permissions(
9011027
self.execution_state.system.application_permissions.get(),
902-
block,
1028+
&block,
9031029
mandatory_apps_need_accepted_message,
9041030
)?;
9051031

@@ -908,13 +1034,15 @@ where
9081034
&self.confirmed_log,
9091035
&self.previous_message_blocks,
9101036
&self.previous_event_blocks,
911-
block,
1037+
&mut block,
9121038
local_time,
9131039
round,
9141040
published_blobs,
9151041
replaying_oracle_responses,
1042+
policy,
9161043
)
9171044
.await
1045+
.map(|(outcome, tracker)| (block, outcome, tracker))
9181046
}
9191047

9201048
/// Applies an execution outcome to the chain, updating the outboxes, state hash and chain

0 commit comments

Comments
 (0)