Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 45 additions & 126 deletions app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ alloy_sol_types::sol!(
"../solidity/out/ValidatorManager.sol/ValidatorManager.json"
);

use crate::state::{assemble_value_from_parts, decode_value, State};
use crate::sync_handler::{get_decided_value_for_sync, validate_payload};
use crate::payload::validate_execution_payload;
use crate::state::{decode_value, State};
use crate::sync_handler::get_decided_value_for_sync;

pub async fn initialize_state_from_genesis(state: &mut State, engine: &Engine) -> eyre::Result<()> {
// Get the genesis block from the execution engine
Expand Down Expand Up @@ -396,54 +397,21 @@ pub async fn on_started_round(
);

for parts in &pending_parts {
match state.validate_proposal_parts(parts) {
Ok(()) => {
// Validate execution payload with the execution engine before storing it as undecided proposal
let (value, data) = assemble_value_from_parts(parts.clone());

let validity = state
.validate_execution_payload(
&data,
parts.height,
parts.round,
engine,
&emerald_config.retry_config,
)
.await?;

if validity == Validity::Invalid {
warn!(
height = %parts.height,
round = %parts.round,
"Pending proposal has invalid execution payload, rejecting"
);
continue;
}
// Validate and store the pending proposal
let result = state
.process_complete_proposal_parts(parts, engine, &emerald_config.retry_config)
.await?;

state.store.store_undecided_proposal(value.clone()).await?;
if result.is_some() {
info!(
height = %parts.height,
round = %parts.round,
proposer = %parts.proposer,
"Moved valid pending proposal to undecided after validation"
);
}

state
.store
.store_undecided_block_data(value.height, value.round, value.value.id(), data)
.await?;
info!(
height = %parts.height,
round = %parts.round,
proposer = %parts.proposer,
"Moved valid pending proposal to undecided after validation"
);
}
Err(error) => {
// Validation failed, log error
error!(
height = %parts.height,
round = %parts.round,
proposer = %parts.proposer,
error = ?error,
"Removed invalid pending proposal"
);
}
} // Remove the parts from pending
// Remove the parts from pending regardless of validation outcome
state
.store
.remove_pending_proposal_parts(parts.clone())
Expand Down Expand Up @@ -596,15 +564,20 @@ pub async fn on_received_proposal_part(
"Received proposal part"
);

// Try to reassemble the proposal from received parts. If present,
// validate it with the execution engine and mark invalid when
// parsing or validation fails. Keep the outer `Option` and send it
// back to the caller (consensus) regardless.
let proposed_value = state
.received_proposal_part(from, part, engine, &emerald_config.retry_config)
.await?;
// Try to reassemble the proposal from received parts
let parts = state.reassemble_proposal(from, part).await?;

if let Some(proposed_value) = proposed_value.clone() {
// If we have complete parts, validate and store the proposal
let proposed_value = match parts {
Some(parts) => {
state
.process_complete_proposal_parts(&parts, engine, &emerald_config.retry_config)
.await?
}
None => None,
};

if let Some(ref proposed_value) = proposed_value {
debug!("✅ Received complete proposal: {:?}", proposed_value);
}

Expand Down Expand Up @@ -680,36 +653,16 @@ pub async fn on_decided(
.block_hash;
assert_eq!(latest_block_hash, parent_block_hash);

// Get validation status from cache or call newPayload
let validity = if let Some(cached) = state.validated_cache_mut().get(&block_hash) {
cached
} else {
// Collect hashes from blob transactions
let block: Block = execution_payload.clone().try_into_block().map_err(|e| {
eyre::eyre!(
"Failed to convert decided ExecutionPayloadV3 to Block at height {}: {}",
height,
e
)
})?;
let versioned_hashes: Vec<BlockHash> =
block.body.blob_versioned_hashes_iter().copied().collect();

// Ask the EL to validate the execution payload
let payload_status = engine
.notify_new_block(execution_payload, versioned_hashes)
.await?;

let validity = if payload_status.status.is_valid() {
Validity::Valid
} else {
Validity::Invalid
};

// TODO: insert validation outcome into cache also when calling notify_new_block_with_retry in validate_payload
state.validated_cache_mut().insert(block_hash, validity);
validity
};
// Validate the execution payload (uses cache internally)
let validity = validate_execution_payload(
state.validated_cache_mut(),
&block_bytes,
height,
round,
engine,
&emerald_config.retry_config,
)
.await?;

if validity == Validity::Invalid {
return Err(eyre!("Block validation failed for hash: {}", block_hash));
Expand Down Expand Up @@ -808,38 +761,16 @@ pub async fn on_process_synced_value(
info!(%height, %round, "🟢🟢 Processing synced value");

let value = decode_value(value_bytes);

// Extract execution payload from the synced value for validation
let block_bytes = value.extensions.clone();
let execution_payload = ExecutionPayloadV3::from_ssz_bytes(&block_bytes).map_err(|e| {
eyre::eyre!(
"Failed to decode synced ExecutionPayloadV3 at height {}: {:?}",
height,
e
)
})?;
let new_block_hash = execution_payload.payload_inner.payload_inner.block_hash;

// Collect hashes from blob transactions
let block: Block = execution_payload.clone().try_into_block().map_err(|e| {
eyre::eyre!(
"Failed to convert synced ExecutionPayloadV3 to Block at height {}: {}",
height,
e
)
})?;
let versioned_hashes: Vec<BlockHash> =
block.body.blob_versioned_hashes_iter().copied().collect();

// Validate the synced block
let validity = validate_payload(
let validity = validate_execution_payload(
state.validated_cache_mut(),
engine,
&execution_payload,
&versioned_hashes,
&emerald_config.retry_config,
&block_bytes,
height,
round,
engine,
&emerald_config.retry_config,
)
.await?;

Expand All @@ -861,10 +792,7 @@ pub async fn on_process_synced_value(
return Ok(());
}

debug!(
"💡 Sync block validated at height {} with hash: {}",
height, new_block_hash
);
debug!(%height, "💡 Sync block validated");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add back the hash ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really as we no longer have the block hash. We could get it by extracting the execution payload from the block_bytes and then use execution_payload.payload_inner.payload_inner.block_hash, but I don't see the benefit of logging the block hash in a debug message.

let proposed_value: ProposedValue<EmeraldContext> = ProposedValue {
height,
round,
Expand All @@ -875,16 +803,7 @@ pub async fn on_process_synced_value(
};

if let Err(e) = state
.store
.store_undecided_block_data(height, round, proposed_value.value.id(), block_bytes)
.await
{
error!(%height, %round, error = %e, "Failed to store synced block data");
}
// Store the synced value and block data
if let Err(e) = state
.store
.store_undecided_proposal(proposed_value.clone())
.store_undecided_value(&proposed_value, block_bytes)
.await
{
error!(%height, %round, error = %e, "Failed to store synced value");
Expand Down
1 change: 1 addition & 0 deletions app/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod app;
mod metrics;
pub mod node;
mod payload;
pub mod state;
mod store;
mod streaming;
Expand Down
157 changes: 157 additions & 0 deletions app/src/payload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
//! Execution payload utilities for validation, caching, and manipulation.

use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use bytes::Bytes;
use caches::lru::AdaptiveCache;
use caches::Cache;
use color_eyre::eyre::{self, eyre};
use malachitebft_app_channel::app::types::core::{Round, Validity};
use malachitebft_eth_engine::engine::Engine;
use malachitebft_eth_engine::json_structures::ExecutionPayloadBodyV1;
use malachitebft_eth_types::{Block, BlockHash, Height, RetryConfig};
use ssz::Decode;
use tracing::{debug, error, warn};

/// Cache for tracking recently validated execution payloads to avoid redundant validation.
/// Stores both the block hash and its validity result (Valid or Invalid).
pub struct ValidatedPayloadCache {
cache: AdaptiveCache<BlockHash, Validity>,
}

impl ValidatedPayloadCache {
pub fn new(max_size: usize) -> Self {
Self {
cache: AdaptiveCache::new(max_size)
.expect("Failed to create AdaptiveCache: invalid cache size"),
}
}

/// Check if a block hash has been validated and return its cached validity
pub fn get(&mut self, block_hash: &BlockHash) -> Option<Validity> {
self.cache.get(block_hash).copied()
}

/// Insert a block hash and its validity result into the cache
pub fn insert(&mut self, block_hash: BlockHash, validity: Validity) {
self.cache.put(block_hash, validity);
}
}

/// Validates execution payload bytes with the execution engine.
/// Decodes the payload, extracts versioned hashes, and validates.
/// Uses cache to avoid duplicate validation calls.
///
/// Returns `Ok(Validity::Invalid)` if decoding fails or payload is invalid,
/// `Ok(Validity::Valid)` if valid, or `Err` for engine communication failures.
pub async fn validate_execution_payload(
cache: &mut ValidatedPayloadCache,
data: &Bytes,
height: Height,
round: Round,
engine: &Engine,
retry_config: &RetryConfig,
) -> eyre::Result<Validity> {
// Decode execution payload
let execution_payload = match ExecutionPayloadV3::from_ssz_bytes(data) {
Ok(payload) => payload,
Err(e) => {
warn!(
height = %height,
round = %round,
error = ?e,
"Proposal has invalid ExecutionPayloadV3 encoding"
);
return Ok(Validity::Invalid);
}
};

let block_hash = execution_payload.payload_inner.payload_inner.block_hash;

// Check if we've already validated this block
if let Some(cached_validity) = cache.get(&block_hash) {
debug!(
%height, %round, %block_hash, validity = ?cached_validity,
"Skipping duplicate newPayload call, returning cached result"
);
return Ok(cached_validity);
}

// Extract versioned hashes for blob transactions
let block: Block = match execution_payload.clone().try_into_block() {
Ok(block) => block,
Err(e) => {
warn!(
height = %height,
round = %round,
error = ?e,
"Failed to convert ExecutionPayloadV3 to Block"
);
return Ok(Validity::Invalid);
}
};
let versioned_hashes: Vec<BlockHash> =
block.body.blob_versioned_hashes_iter().copied().collect();

// Validate with execution engine
let payload_status = engine
.notify_new_block_with_retry(execution_payload, versioned_hashes, retry_config)
.await
.map_err(|e| {
eyre!(
"Execution client stuck in SYNCING for {:?} at height {}: {}",
retry_config.max_elapsed_time,
height,
e
)
})?;

let validity = if payload_status.status.is_valid() {
Validity::Valid
} else {
// INVALID or ACCEPTED - both are treated as invalid
// INVALID: malicious block
// ACCEPTED: Non-canonical payload - should not happen with instant finality
error!(%height, %round, "Block validation failed: {}", payload_status.status);
Validity::Invalid
};

cache.insert(block_hash, validity);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call validate_execution_payload inside on_decided. do we need this caching even inside on_decided ?

I would prefer to manage cache outside this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just an optimization that would avoid caching the validation result for the proposer. We could add a param to indicate if the validation result should be cached, but I think it just adds complexity for very little benefit.

Ok(validity)
}

/// Extracts a block header from an ExecutionPayloadV3 by removing transactions and withdrawals.
///
/// Returns an ExecutionPayloadV3 with empty transactions and withdrawals vectors,
/// containing only the block header fields.
pub fn extract_block_header(payload: &ExecutionPayloadV3) -> ExecutionPayloadV3 {
ExecutionPayloadV3 {
payload_inner: ExecutionPayloadV2 {
payload_inner: ExecutionPayloadV1 {
transactions: vec![],
..payload.payload_inner.payload_inner.clone()
},
withdrawals: vec![],
},
..payload.clone()
}
}

/// Reconstructs a complete ExecutionPayloadV3 from a block header and payload body.
///
/// Takes a header (ExecutionPayloadV3 with empty transactions/withdrawals) and combines it
/// with the transactions and withdrawals from an ExecutionPayloadBodyV1 to create a full payload.
pub fn reconstruct_execution_payload(
header: ExecutionPayloadV3,
body: ExecutionPayloadBodyV1,
) -> ExecutionPayloadV3 {
ExecutionPayloadV3 {
payload_inner: ExecutionPayloadV2 {
payload_inner: ExecutionPayloadV1 {
transactions: body.transactions,
..header.payload_inner.payload_inner
},
withdrawals: body.withdrawals.unwrap_or_default(),
},
..header
}
}
Loading