Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3690,6 +3690,34 @@ impl ReplayStage {
let _ = cluster_slots_update_sender.send(vec![bank_slot]);
}

// Verify and process block components (e.g., header, footer) before freezing
// Only verify blocks that were replayed from blockstore (not leader blocks)
if !is_leader_block {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a case I should pay attention to my own block_components?

For example, a standby sending out block in my name, putting in block component I don't agree with?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason to do this - if a validator wishes to modify block component dissemination, they should be doing so either in block creation loop or in broadcast

could you clarify what you mean by "standby sending out block in my name?"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say I run two validators A and B, A is B's standby, now I'm switching identity so A becomes the active one, but it's still processing blocks produced by B, in this case do I need to process the block_components for the blocks owned by "me"?

if let Err(err) = bank
.block_component_processor
.read()
.unwrap()
.finish(migration_status)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't like the name finish, it sounds like you are trying to finish migration. But you are really checking whether the block format is consistent with migration status right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given a block, BlockComponentProcessor accomplishes two things:

  • returns an error, resulting in the block marked dead, if the block violates an invariant; sample invariants include # of block headers == 1, # of block footers == 1, etc. a follow-up PR will enforce the clock checks, e.g.

  • soon, apply state changes that certain block components opine on; e.g., update account rewards, the clock sysvar, etc.

the finish() function reasons about the sufficient statistics gathered by the BlockComponentProcessor during accumulation in confirm_slot, and helps accomplish the two above goals. for now, all we do is to enforce that both a block header and footer are present.

right now, the specific invariants we enforce depend on migration_status, which is why we need to pass migration_status in; this being said, I think the name is reasonable, since we'll eventually be throwing migration_status entirely away.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is okay for now, but it gets tricky when we apply state changes I guess.

{
warn!("Block component processing failed for slot {bank_slot}: {err:?}",);
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&BlockstoreProcessorError::BlockComponentProcessor(err),
rpc_subscriptions,
slot_status_notifier,
progress,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
&mut tbft_structs,
);
continue;
}
}

Comment on lines +3695 to +3720
Copy link
Copy Markdown
Contributor

@carllin carllin Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason not to move this check right after https://github.com/ksn6/alpenglow/blob/abb42d57cf609c64823b53ef39888e7aee001a0b/core/src/replay_stage.rs#L3606-L3639

That way all the failures are directly blocked one after another and don't have to reason about whether logic in between like https://github.com/ksn6/alpenglow/blob/abb42d57cf609c64823b53ef39888e7aee001a0b/core/src/replay_stage.rs#L3675-L3691 is necessary

Could even group the failure checks into a single function check_block_failures

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ashwin came up with this nice idea to entirely eliminate the finish() function; we pushed this a few days ago:

#594

bank.freeze();
datapoint_info!(
"bank_frozen",
Expand Down
146 changes: 122 additions & 24 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3928,14 +3928,13 @@ impl Blockstore {
.map(|x| x.0)
}

/// Returns the entry vector for the slot starting with `shred_start_index`, the number of
/// shreds that comprise the entry vector, and whether the slot is full (consumed all shreds).
pub fn get_slot_entries_with_shred_info(
/// Helper function that contains the common logic for getting slot data with shred info
fn get_slot_data_with_shred_info_common(
&self,
slot: Slot,
start_index: u64,
allow_dead_slots: bool,
) -> Result<(Vec<Entry>, u64, bool)> {
) -> Result<(CompletedRanges, SlotMeta, u64)> {
let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?;

// Check if the slot is dead *after* fetching completed ranges to avoid a race
Expand All @@ -3945,7 +3944,7 @@ impl Blockstore {
if self.is_dead(slot) && !allow_dead_slots {
return Err(BlockstoreError::DeadSlot);
} else if completed_ranges.is_empty() {
return Ok((vec![], 0, false));
return Err(BlockstoreError::SlotUnavailable);
}

let slot_meta = slot_meta.unwrap();
Expand All @@ -3954,10 +3953,58 @@ impl Blockstore {
.map(|&Range { end, .. }| u64::from(end) - start_index)
.unwrap_or(0);

let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?;
Ok((completed_ranges, slot_meta, num_shreds))
}

/// Returns the entry vector for the slot starting with `shred_start_index`, the number of
/// shreds that comprise the entry vector, and whether the slot is full (consumed all shreds).
pub fn get_slot_entries_with_shred_info(
&self,
slot: Slot,
start_index: u64,
allow_dead_slots: bool,
) -> Result<(Vec<Entry>, u64, bool)> {
let (completed_ranges, slot_meta, num_shreds) =
match self.get_slot_data_with_shred_info_common(slot, start_index, allow_dead_slots) {
Ok(data) => data,
Err(BlockstoreError::SlotUnavailable) => return Ok((vec![], 0, false)),
Err(e) => return Err(e),
};

let entries = self.get_slot_entries_in_block(slot, &completed_ranges, Some(&slot_meta))?;
Ok((entries, num_shreds, slot_meta.is_full()))
}

/// Returns the components vector for the slot starting with `shred_start_index`, the number of
/// shreds that comprise the components vector, and whether the slot is full (consumed all
/// shreds).
pub fn get_slot_components_with_shred_info(
&self,
slot: Slot,
start_index: u64,
allow_dead_slots: bool,
) -> Result<(Vec<BlockComponent>, Vec<Range<u32>>, bool)> {
let (completed_ranges, slot_meta, _) =
match self.get_slot_data_with_shred_info_common(slot, start_index, allow_dead_slots) {
Ok(data) => data,
Err(BlockstoreError::SlotUnavailable) => return Ok((vec![], vec![], false)),
Err(e) => return Err(e),
};

let components =
self.get_slot_components_in_block(slot, &completed_ranges, Some(&slot_meta))?;

// TODO(ksn): get rid of this once we remove BlockComponent::from_bytes_multiple
if completed_ranges.len() != components.len() {
return Err(BlockstoreError::BlockComponentMisalignment(
slot,
start_index,
));
}

Ok((components, completed_ranges, slot_meta.is_full()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe we should do the zipping inside this function, less confusion to caller.

Copy link
Copy Markdown
Contributor Author

@ksn6 ksn6 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you clarify what you mean by "the zipping inside this function?"

EDIT: nvm, I see what you mean - a lot of this code is going to become much cleaner after a bunch of small refactors. would rather we get this PR checked in asap to unblock a number of other things - I promise to make this change, though!

}

/// Gets accounts used in transactions in the slot range [starting_slot, ending_slot].
/// Additionally returns a bool indicating if the set may be incomplete.
/// Used by ledger-tool to create a minimized snapshot
Expand Down Expand Up @@ -4061,18 +4108,24 @@ impl Blockstore {
.collect()
}

/// Fetch the entries corresponding to all of the shred indices in `completed_ranges`
/// Helper function to process shreds in `completed_ranges` and apply a transformation
/// to the resulting block components.
/// This function takes advantage of the fact that `completed_ranges` are both
/// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows:
/// completed_ranges = [..., (s_i..e_i), (s_i+1..e_i+1), ...]
/// Then, the following statements are true:
/// s_i < e_i == s_i+1 < e_i+1
fn get_slot_entries_in_block(
fn process_slot_data_in_block<T, I, F>(
&self,
slot: Slot,
completed_ranges: CompletedRanges,
completed_ranges: &CompletedRanges,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
transform: F,
) -> Result<Vec<T>>
where
I: IntoIterator<Item = T>,
F: Fn(Vec<BlockComponent>) -> Result<I>,
{
debug_assert!(completed_ranges
.iter()
.tuple_windows()
Expand Down Expand Up @@ -4103,7 +4156,7 @@ impl Blockstore {
})
});
completed_ranges
.into_iter()
.iter()
.map(|Range { start, end }| end - start)
.map(|num_shreds| {
shreds
Expand All @@ -4116,35 +4169,80 @@ impl Blockstore {
)))
})
.and_then(|payload| {
// TODO(karthik): if Alpenglow flag is disabled, return an error on special
// EntryBatches.
BlockComponent::from_bytes_multiple(&payload)
.map(|cs| {
cs.into_iter()
.filter_map(|bc| bc.as_entry_batch_owned())
.flatten()
.collect_vec()
})
.map_err(|e| {
let components =
BlockComponent::from_bytes_multiple(&payload).map_err(|e| {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can entirely remove from_bytes_multiple here and just go with from_bytes. This PR is getting large, though - will do this in a separate PR, as indicated in the below TODO.

BlockstoreError::InvalidShredData(Box::new(
bincode::ErrorKind::Custom(format!(
"could not reconstruct entries: {e:?}"
"could not reconstruct block components: {e:?}"
)),
))
})
})?;

// Enforce that completed ranges have precisely one BlockComponent.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you clarify how the caller can ensure this is true? And it should be specified in the doc as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this PR lands, I'll be modifying SIMD-0337 to ensure that this statement is true

in a follow-up PR, I'll be replacing the block component parsing with parsing a single component rather than a Vec<BlockComponent> and ensuring that the length is 1

// TODO(ksn): we can substantially clean up the parsing logic because of
// this assumption.
if components.len() != 1 {
return Err(BlockstoreError::InvalidShredData(Box::new(
bincode::ErrorKind::Custom(format!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is it worth introducing new error type rather than fitting into this one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is going away in a follow-up PR; see #575 (comment)

"expected 1 component, but got {}",
components.len()
)),
)));
}

transform(components).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not transform block components: {e:?}"),
)))
})
})
})
.flatten_ok()
.collect()
}

/// Fetch the components corresponding to all of the shred indices in `completed_ranges`
/// This function takes advantage of the fact that `completed_ranges` are both
/// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows:
/// completed_ranges = [..., (s_i..e_i), (s_i+1..e_i+1), ...]
/// Then, the following statements are true:
/// s_i < e_i == s_i+1 < e_i+1
fn get_slot_components_in_block(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: explain the difference from get_slot_entries_in_block maybe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you clarify? return type for getting slot components is Result<Vec<BlockComponent>> while the that of entries is Result<Vec<Entry>>; I think the comments elaborate a bit further on this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we now seem to almost identical comments between the two functions, would be good to mention difference, but I'm fine either way.

&self,
slot: Slot,
completed_ranges: &CompletedRanges,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<BlockComponent>> {
self.process_slot_data_in_block(slot, completed_ranges, slot_meta, |cs| Ok(cs.into_iter()))
}

/// Fetch the entries corresponding to all of the shred indices in `completed_ranges`
/// This function takes advantage of the fact that `completed_ranges` are both
/// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows:
/// completed_ranges = [..., (s_i..e_i), (s_i+1..e_i+1), ...]
/// Then, the following statements are true:
/// s_i < e_i == s_i+1 < e_i+1
fn get_slot_entries_in_block(
&self,
slot: Slot,
completed_ranges: &CompletedRanges,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing CompletedRanges to &CompletedRanges here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_slot_components_with_shred_info returns an owned completed_ranges, but also needs to invoke get_slot_components_in_block, so get_slot_components_in_block must take in &CompletedRanges

get_slot_entries_in_block invokes get_slot_components_in_block, so we might as well pass in a ref rather than an owned version of CompletedRanges, since an owned version here is more restrictive unnecessarily

slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
self.process_slot_data_in_block(slot, completed_ranges, slot_meta, |cs| {
Ok(cs
.into_iter()
.filter_map(|bc| bc.as_entry_batch_owned())
.flatten())
})
}

pub fn get_entries_in_data_block(
&self,
slot: Slot,
range: Range<u32>,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
self.get_slot_entries_in_block(slot, vec![range], slot_meta)
self.get_slot_entries_in_block(slot, &vec![range], slot_meta)
}

/// Performs checks on the last fec set of a replayed slot, and returns the block_id.
Expand Down
2 changes: 2 additions & 0 deletions ledger/src/blockstore/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ pub enum BlockstoreError {
LegacyShred(Slot, u64),
#[error("unable to read merkle root slot {0}, index {1}")]
MissingMerkleRoot(Slot, u64),
#[error("BlockComponent misalignment slot {0}, index {1}")]
BlockComponentMisalignment(Slot, u64),
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;
90 changes: 71 additions & 19 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ use {
},
solana_clock::{Slot, MAX_PROCESSING_AGE},
solana_cost_model::{cost_model::CostModel, transaction_cost::TransactionCost},
solana_entry::entry::{
self, create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus, VerifyRecyclers,
solana_entry::{
block_component::BlockComponent,
entry::{
self, create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus,
VerifyRecyclers,
},
},
solana_genesis_config::GenesisConfig,
solana_hash::Hash,
Expand All @@ -32,6 +36,7 @@ use {
bank::{Bank, PreCommitResult, TransactionBalancesSet},
bank_forks::{BankForks, SetRootError},
bank_utils,
block_component_processor::BlockComponentProcessorError,
commitment::VOTE_THRESHOLD_SIZE,
dependency_tracker::DependencyTracker,
installed_scheduler_pool::BankWithScheduler,
Expand Down Expand Up @@ -837,6 +842,9 @@ pub enum BlockstoreProcessorError {

#[error("user transactions found in vote only mode bank at slot {0}")]
UserTransactionsInVoteOnlyBank(Slot),

#[error("block component processor error: {0}")]
BlockComponentProcessor(#[from] BlockComponentProcessorError),
}

/// Callback for accessing bank state after each slot is confirmed while
Expand Down Expand Up @@ -1492,10 +1500,10 @@ pub fn confirm_slot(
) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot();

let slot_entries_load_result = {
let (slot_components, completed_ranges, slot_full) = {
let mut load_elapsed = Measure::start("load_elapsed");
let load_result = blockstore
.get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots)
.get_slot_components_with_shred_info(slot, progress.num_shreds, allow_dead_slots)
.map_err(BlockstoreProcessorError::FailedToLoadEntries);
load_elapsed.stop();
if load_result.is_err() {
Expand All @@ -1506,21 +1514,65 @@ pub fn confirm_slot(
load_result
}?;

confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
migration_status,
)
// Process block components for Alpenglow slots. Note that we don't need to run migration checks
// for BlockMarkers here, despite BlockMarkers only being active post-Alpenglow. Here's why:
//
// Post-Alpenglow migration - validators that have Alpenglow enabled can parse BlockComponents.
// Things just work.
//
// Pre-Alpenglow migration, suppose a validator receives a BlockMarker:
//
// (1) validators *incapable* of processing BlockMarkers will mark the slot as dead on shred
// ingest in blockstore.
//
// (2) validators *capable* of processing BlockMarkers will store the BlockMarkers in shred
// ingest, run through this verifying code here, and then error out when finish() is invoked
// during replay, resulting in the slot being marked as dead.
let mut processor = bank.block_component_processor.write().unwrap();

// Find the index of the last EntryBatch in slot_components
let last_entry_batch_index = slot_components
.iter()
.rposition(|bc| matches!(bc, BlockComponent::EntryBatch(_)));

for (ix, (completed_range, component)) in completed_ranges
.iter()
.zip(slot_components.into_iter())
.enumerate()
{
let num_shreds = completed_range.end - completed_range.start;

match component {
BlockComponent::EntryBatch(entries) => {
let slot_full = slot_full && ix == last_entry_batch_index.unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we unwrap when last_entry_batch_index is defined?

Also: is it always safe to unwrap?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_entry_batch_index exists if and only if an EntryBatch exists in slot_components; as such, we cannot always invoke .unwrap()

however, if we've entered the match statement in line 1548, last_entry_batch_index must exist, since we're currently processing an entry batch


confirm_slot_entries(
bank,
replay_tx_thread_pool,
(entries, num_shreds as u64, slot_full),
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
migration_status,
)?;
}
BlockComponent::BlockMarker(marker) => {
// Skip verification for the genesis block
if let Some(parent_bank) = bank.parent() {
processor.on_marker(bank.clone_without_scheduler(), parent_bank, &marker)?;
}
progress.num_shreds += num_shreds as u64;
}
}
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
Expand Down
Loading