From e400a111afdbc6eb45cf412da9b4a8ad2043789a Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 10:06:30 +0100 Subject: [PATCH 01/11] add process_complete_proposal_parts --- app/src/app.rs | 82 ++++++++++------------------- app/src/state.rs | 133 +++++++++++++++++++++++++---------------------- 2 files changed, 99 insertions(+), 116 deletions(-) diff --git a/app/src/app.rs b/app/src/app.rs index 5ed04ac0..9d3244d9 100644 --- a/app/src/app.rs +++ b/app/src/app.rs @@ -27,7 +27,7 @@ alloy_sol_types::sol!( "../solidity/out/ValidatorManager.sol/ValidatorManager.json" ); -use crate::state::{assemble_value_from_parts, decode_value, State}; +use crate::state::{decode_value, State}; use crate::sync_handler::{get_decided_value_for_sync, validate_payload}; pub async fn initialize_state_from_genesis(state: &mut State, engine: &Engine) -> eyre::Result<()> { @@ -396,54 +396,21 @@ pub async fn on_started_round( ); for parts in &pending_parts { - match state.validate_proposal_parts(parts) { - Ok(()) => { - // Validate execution payload with the execution engine before storing it as undecided proposal - let (value, data) = assemble_value_from_parts(parts.clone()); - - let validity = state - .validate_execution_payload( - &data, - parts.height, - parts.round, - engine, - &emerald_config.retry_config, - ) - .await?; - - if validity == Validity::Invalid { - warn!( - height = %parts.height, - round = %parts.round, - "Pending proposal has invalid execution payload, rejecting" - ); - continue; - } + // Validate and store the pending proposal + let result = state + .process_complete_proposal_parts(parts, engine, &emerald_config.retry_config) + .await?; - state.store.store_undecided_proposal(value.clone()).await?; + if result.is_some() { + info!( + height = %parts.height, + round = %parts.round, + proposer = %parts.proposer, + "Moved valid pending proposal to undecided after validation" + ); + } - state - .store - .store_undecided_block_data(value.height, value.round, value.value.id(), data) - .await?; - info!( - height = %parts.height, - round = %parts.round, - proposer = %parts.proposer, - "Moved valid pending proposal to undecided after validation" - ); - } - Err(error) => { - // Validation failed, log error - error!( - height = %parts.height, - round = %parts.round, - proposer = %parts.proposer, - error = ?error, - "Removed invalid pending proposal" - ); - } - } // Remove the parts from pending + // Remove the parts from pending regardless of validation outcome state .store .remove_pending_proposal_parts(parts.clone()) @@ -596,15 +563,20 @@ pub async fn on_received_proposal_part( "Received proposal part" ); - // Try to reassemble the proposal from received parts. If present, - // validate it with the execution engine and mark invalid when - // parsing or validation fails. Keep the outer `Option` and send it - // back to the caller (consensus) regardless. - let proposed_value = state - .received_proposal_part(from, part, engine, &emerald_config.retry_config) - .await?; + // Try to reassemble the proposal from received parts + let parts = state.reassemble_proposal(from, part).await?; + + // If we have complete parts, validate and store the proposal + let proposed_value = match parts { + Some(parts) => { + state + .process_complete_proposal_parts(&parts, engine, &emerald_config.retry_config) + .await? + } + None => None, + }; - if let Some(proposed_value) = proposed_value.clone() { + if let Some(ref proposed_value) = proposed_value { debug!("✅ Received complete proposal: {:?}", proposed_value); } diff --git a/app/src/state.rs b/app/src/state.rs index cf76ddc3..6e8100c3 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -370,7 +370,7 @@ impl State { /// Validates execution payload with the execution engine /// Returns Ok(Validity) - Invalid if decoding fails or payload is invalid - pub async fn validate_execution_payload( + async fn validate_execution_payload( &mut self, data: &Bytes, height: Height, @@ -421,15 +421,78 @@ impl State { .await } - /// Processes and adds a new proposal to the state if it's valid - /// Returns Some(ProposedValue) if the proposal was accepted, None otherwise - pub async fn received_proposal_part( + /// Processes complete proposal parts: validates, stores, and returns the proposed value. + /// + /// Returns `Ok(Some(ProposedValue))` if the proposal is valid and stored, + /// `Ok(None)` if validation fails, or an error for storage/engine failures. + pub async fn process_complete_proposal_parts( &mut self, - from: PeerId, - part: StreamMessage, + parts: &ProposalParts, engine: &Engine, retry_config: &RetryConfig, ) -> eyre::Result>> { + // Validate proposal (proposer + signature) + if let Err(error) = self.validate_proposal_parts(parts) { + error!( + height = %parts.height, + round = %parts.round, + proposer = %parts.proposer, + error = ?error, + "Rejecting invalid proposal" + ); + return Ok(None); + } + + // Assemble the proposal from its parts + let (value, data) = assemble_value_from_parts(parts.clone()); + + // Log first 32 bytes of proposal data and total size + if data.len() >= 32 { + info!( + "Proposal data[0..32]: {}, total_size: {} bytes, id: {:x}", + hex::encode(&data[..32]), + data.len(), + value.value.id().as_u64() + ); + } + + // Validate the execution payload with the execution engine + let validity = self + .validate_execution_payload(&data, value.height, value.round, engine, retry_config) + .await?; + + if validity == Validity::Invalid { + warn!( + height = %parts.height, + round = %parts.round, + "Proposal has invalid execution payload, rejecting" + ); + return Ok(None); + } + + // Store as undecided + info!(%value.height, %value.round, %value.proposer, "Storing validated proposal as undecided"); + self.store_undecided_block_data(value.height, value.round, value.value.id(), data) + .await?; + self.store.store_undecided_proposal(value.clone()).await?; + + Ok(Some(value)) + } + + /// Reassembles proposal parts from streamed messages. + /// + /// Handles height filtering: + /// - Outdated proposals (height < current) are dropped + /// - Future proposals (height > current) are stored as pending + /// - Current height proposals are returned for validation + /// + /// Returns `Some(ProposalParts)` when a complete proposal is ready for validation, + /// `None` if the proposal is incomplete, outdated, or stored for later. + pub async fn reassemble_proposal( + &mut self, + from: PeerId, + part: StreamMessage, + ) -> eyre::Result> { let sequence = part.sequence; // Check if we have a full proposal @@ -458,62 +521,10 @@ impl State { return Ok(None); } - // For current height, validate proposal (proposer + signature) - match self.validate_proposal_parts(&parts) { - Ok(()) => { - // Validation passed - assemble and store as undecided - // Re-assemble the proposal from its parts - let (value, data) = assemble_value_from_parts(parts); - - // Log first 32 bytes of proposal data and total size - if data.len() >= 32 { - info!( - "Proposal data[0..32]: {}, total_size: {} bytes, id: {:x}", - hex::encode(&data[..32]), - data.len(), - value.value.id().as_u64() - ); - } - - // Validate the execution payload with the execution engine - let validity = self - .validate_execution_payload( - &data, - value.height, - value.round, - engine, - retry_config, - ) - .await?; - - if validity == Validity::Invalid { - warn!( - height = %self.consensus_height, - round = %self.consensus_round, - "Received proposal with invalid execution payload, ignoring" - ); - return Ok(None); - } - info!(%value.height, %value.round, %value.proposer, "Storing validated proposal as undecided"); - self.store_undecided_block_data(value.height, value.round, value.value.id(), data) - .await?; - self.store.store_undecided_proposal(value.clone()).await?; - - Ok(Some(value)) - } - Err(error) => { - // Any validation error indicates invalid proposal - log and reject - error!( - height = %parts.height, - round = %parts.round, - proposer = %parts.proposer, - error = ?error, - "Rejecting invalid proposal" - ); - Ok(None) - } - } + // For current height, return parts for validation + Ok(Some(parts)) } + pub async fn store_undecided_block_data( &mut self, height: Height, From 688c25d90bf05e62d14478254949884cfac4ff90 Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 10:12:38 +0100 Subject: [PATCH 02/11] remove store_undecided_block_data wrapper --- app/src/state.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/app/src/state.rs b/app/src/state.rs index 6e8100c3..5b6ab0e2 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -472,7 +472,8 @@ impl State { // Store as undecided info!(%value.height, %value.round, %value.proposer, "Storing validated proposal as undecided"); - self.store_undecided_block_data(value.height, value.round, value.value.id(), data) + self.store + .store_undecided_block_data(value.height, value.round, value.value.id(), data) .await?; self.store.store_undecided_proposal(value.clone()).await?; @@ -525,19 +526,6 @@ impl State { Ok(Some(parts)) } - pub async fn store_undecided_block_data( - &mut self, - height: Height, - round: Round, - value_id: ValueId, - data: Bytes, - ) -> eyre::Result<()> { - self.store - .store_undecided_block_data(height, round, value_id, data) - .await - .map_err(|e| eyre::Report::new(e)) - } - /// Retrieves a decided block data at the given height pub async fn get_block_data( &self, @@ -743,7 +731,8 @@ impl State { // Store the block data at the proposal's height/round, // which will be passed to the execution client (EL) on commit. // WARN: THE ORDER OF THE FOLLOWING TWO OPERATIONS IS IMPORTANT. - self.store_undecided_block_data(height, round, proposal.value.id(), data.clone()) + self.store + .store_undecided_block_data(height, round, proposal.value.id(), data.clone()) .await?; // Insert the new proposal into the undecided proposals. From 64661577eb41b6e1310798e04d8fc30f69fa01ba Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 10:30:32 +0100 Subject: [PATCH 03/11] use validate_execution_payload in on_process_synced_value --- app/src/app.rs | 45 ++++++++++----------------------------------- app/src/state.rs | 7 ++++--- 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/app/src/app.rs b/app/src/app.rs index 9d3244d9..11360e4e 100644 --- a/app/src/app.rs +++ b/app/src/app.rs @@ -28,7 +28,7 @@ alloy_sol_types::sol!( ); use crate::state::{decode_value, State}; -use crate::sync_handler::{get_decided_value_for_sync, validate_payload}; +use crate::sync_handler::get_decided_value_for_sync; pub async fn initialize_state_from_genesis(state: &mut State, engine: &Engine) -> eyre::Result<()> { // Get the genesis block from the execution engine @@ -811,40 +811,18 @@ pub async fn on_process_synced_value( info!(%height, %round, "🟢🟢 Processing synced value"); let value = decode_value(value_bytes); - - // Extract execution payload from the synced value for validation let block_bytes = value.extensions.clone(); - let execution_payload = ExecutionPayloadV3::from_ssz_bytes(&block_bytes).map_err(|e| { - eyre::eyre!( - "Failed to decode synced ExecutionPayloadV3 at height {}: {:?}", - height, - e - ) - })?; - let new_block_hash = execution_payload.payload_inner.payload_inner.block_hash; - // Collect hashes from blob transactions - let block: Block = execution_payload.clone().try_into_block().map_err(|e| { - eyre::eyre!( - "Failed to convert synced ExecutionPayloadV3 to Block at height {}: {}", + // Validate the synced block + let validity = state + .validate_execution_payload( + &block_bytes, height, - e + round, + engine, + &emerald_config.retry_config, ) - })?; - let versioned_hashes: Vec = - block.body.blob_versioned_hashes_iter().copied().collect(); - - // Validate the synced block - let validity = validate_payload( - state.validated_cache_mut(), - engine, - &execution_payload, - &versioned_hashes, - &emerald_config.retry_config, - height, - round, - ) - .await?; + .await?; if validity == Validity::Invalid { // Reject invalid blocks - don't store or reply with them @@ -864,10 +842,7 @@ pub async fn on_process_synced_value( return Ok(()); } - debug!( - "💡 Sync block validated at height {} with hash: {}", - height, new_block_hash - ); + debug!(%height, "💡 Sync block validated"); let proposed_value: ProposedValue = ProposedValue { height, round, diff --git a/app/src/state.rs b/app/src/state.rs index 5b6ab0e2..e66dc837 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -368,9 +368,10 @@ impl State { Ok(()) } - /// Validates execution payload with the execution engine - /// Returns Ok(Validity) - Invalid if decoding fails or payload is invalid - async fn validate_execution_payload( + /// Validates execution payload with the execution engine. + /// Returns `Ok(Validity::Invalid)` if decoding fails or payload is invalid, + /// `Ok(Validity::Valid)` if valid, or `Err` for engine communication failures. + pub async fn validate_execution_payload( &mut self, data: &Bytes, height: Height, From d315459a4976caf4d388daa5897fbda78025049a Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 10:50:01 +0100 Subject: [PATCH 04/11] use validate_execution_payload in on_decided --- app/src/app.rs | 40 ++++++++++------------------------------ app/src/sync_handler.rs | 13 ++++++++----- 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/app/src/app.rs b/app/src/app.rs index 11360e4e..dc1af341 100644 --- a/app/src/app.rs +++ b/app/src/app.rs @@ -693,36 +693,16 @@ pub async fn on_decided( ); } - // Get validation status from cache or call newPayload - let validity = if let Some(cached) = state.validated_cache_mut().get(&block_hash) { - cached - } else { - // Collect hashes from blob transactions - let block: Block = execution_payload.clone().try_into_block().map_err(|e| { - eyre::eyre!( - "Failed to convert decided ExecutionPayloadV3 to Block at height {}: {}", - height, - e - ) - })?; - let versioned_hashes: Vec = - block.body.blob_versioned_hashes_iter().copied().collect(); - - // Ask the EL to validate the execution payload - let payload_status = engine - .notify_new_block(execution_payload, versioned_hashes) - .await?; - - let validity = if payload_status.status.is_valid() { - Validity::Valid - } else { - Validity::Invalid - }; - - // TODO: insert validation outcome into cache also when calling notify_new_block_with_retry in validate_payload - state.validated_cache_mut().insert(block_hash, validity); - validity - }; + // Validate the execution payload (uses cache internally) + let validity = state + .validate_execution_payload( + &block_bytes, + height, + round, + engine, + &emerald_config.retry_config, + ) + .await?; if validity == Validity::Invalid { return Err(eyre!("Block validation failed for hash: {}", block_hash)); diff --git a/app/src/sync_handler.rs b/app/src/sync_handler.rs index 08b85e27..2b53d546 100644 --- a/app/src/sync_handler.rs +++ b/app/src/sync_handler.rs @@ -54,15 +54,18 @@ pub async fn validate_payload( ) })?; - if payload_status.status.is_valid() { - Ok(Validity::Valid) + let validity = if payload_status.status.is_valid() { + Validity::Valid } else { // INVALID or ACCEPTED - both are treated as invalid // INVALID: malicious block // ACCEPTED: Non-canonical payload - should not happen with instant finality - error!(%height, %round, "🔴 Synced block validation failed: {}", payload_status.status); - Ok(Validity::Invalid) - } + error!(%height, %round, "🔴 Block validation failed: {}", payload_status.status); + Validity::Invalid + }; + + cache.insert(block_hash, validity); + Ok(validity) } /// Retrieves a decided value for sync at the given height. From b16574e4561126128579ba2bb125b4f8b45e7857 Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 11:06:24 +0100 Subject: [PATCH 05/11] add payload.rs mod with exec payload utilities --- app/src/app.rs | 37 ++++---- app/src/lib.rs | 1 + app/src/payload.rs | 187 ++++++++++++++++++++++++++++++++++++++++ app/src/state.rs | 143 +++--------------------------- app/src/sync_handler.rs | 62 +------------ 5 files changed, 225 insertions(+), 205 deletions(-) create mode 100644 app/src/payload.rs diff --git a/app/src/app.rs b/app/src/app.rs index dc1af341..82d363fd 100644 --- a/app/src/app.rs +++ b/app/src/app.rs @@ -27,6 +27,7 @@ alloy_sol_types::sol!( "../solidity/out/ValidatorManager.sol/ValidatorManager.json" ); +use crate::payload::validate_execution_payload; use crate::state::{decode_value, State}; use crate::sync_handler::get_decided_value_for_sync; @@ -694,15 +695,15 @@ pub async fn on_decided( } // Validate the execution payload (uses cache internally) - let validity = state - .validate_execution_payload( - &block_bytes, - height, - round, - engine, - &emerald_config.retry_config, - ) - .await?; + let validity = validate_execution_payload( + state.validated_cache_mut(), + &block_bytes, + height, + round, + engine, + &emerald_config.retry_config, + ) + .await?; if validity == Validity::Invalid { return Err(eyre!("Block validation failed for hash: {}", block_hash)); @@ -794,15 +795,15 @@ pub async fn on_process_synced_value( let block_bytes = value.extensions.clone(); // Validate the synced block - let validity = state - .validate_execution_payload( - &block_bytes, - height, - round, - engine, - &emerald_config.retry_config, - ) - .await?; + let validity = validate_execution_payload( + state.validated_cache_mut(), + &block_bytes, + height, + round, + engine, + &emerald_config.retry_config, + ) + .await?; if validity == Validity::Invalid { // Reject invalid blocks - don't store or reply with them diff --git a/app/src/lib.rs b/app/src/lib.rs index 38d2f64e..0cbaa713 100644 --- a/app/src/lib.rs +++ b/app/src/lib.rs @@ -1,6 +1,7 @@ pub mod app; mod metrics; pub mod node; +mod payload; pub mod state; mod store; mod streaming; diff --git a/app/src/payload.rs b/app/src/payload.rs new file mode 100644 index 00000000..cef4a18a --- /dev/null +++ b/app/src/payload.rs @@ -0,0 +1,187 @@ +//! Execution payload utilities for validation, caching, and manipulation. + +use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; +use bytes::Bytes; +use caches::lru::AdaptiveCache; +use caches::Cache; +use color_eyre::eyre::{self, eyre}; +use malachitebft_app_channel::app::types::core::Validity; +use malachitebft_eth_engine::engine::Engine; +use malachitebft_eth_engine::json_structures::ExecutionPayloadBodyV1; +use malachitebft_eth_types::{Block, BlockHash, Height, RetryConfig}; +use malachitebft_app_channel::app::types::core::Round; +use ssz::Decode; +use tracing::{debug, error, warn}; + +/// Cache for tracking recently validated execution payloads to avoid redundant validation. +/// Stores both the block hash and its validity result (Valid or Invalid). +pub struct ValidatedPayloadCache { + cache: AdaptiveCache, +} + +impl ValidatedPayloadCache { + pub fn new(max_size: usize) -> Self { + Self { + cache: AdaptiveCache::new(max_size) + .expect("Failed to create AdaptiveCache: invalid cache size"), + } + } + + /// Check if a block hash has been validated and return its cached validity + pub fn get(&mut self, block_hash: &BlockHash) -> Option { + self.cache.get(block_hash).copied() + } + + /// Insert a block hash and its validity result into the cache + pub fn insert(&mut self, block_hash: BlockHash, validity: Validity) { + self.cache.put(block_hash, validity); + } +} + +/// Validates an already-decoded execution payload with the execution engine. +/// Uses cache to avoid duplicate validation calls. +/// +/// Returns `Ok(Validity::Valid)` if valid, `Ok(Validity::Invalid)` if invalid, +/// or `Err` for engine communication failures. +pub async fn validate_payload( + cache: &mut ValidatedPayloadCache, + engine: &Engine, + execution_payload: &ExecutionPayloadV3, + versioned_hashes: &[BlockHash], + retry_config: &RetryConfig, + height: Height, + round: Round, +) -> eyre::Result { + let block_hash = execution_payload.payload_inner.payload_inner.block_hash; + + // Check if we've already called newPayload for this block + if let Some(cached_validity) = cache.get(&block_hash) { + debug!( + %height, %round, %block_hash, validity = ?cached_validity, + "Skipping duplicate newPayload call, returning cached result" + ); + return Ok(cached_validity); + } + + let payload_status = engine + .notify_new_block_with_retry( + execution_payload.clone(), + versioned_hashes.to_vec(), + retry_config, + ) + .await + .map_err(|e| { + eyre!( + "Execution client stuck in SYNCING for {:?} at height {}: {}", + retry_config.max_elapsed_time, + height, + e + ) + })?; + + let validity = if payload_status.status.is_valid() { + Validity::Valid + } else { + // INVALID or ACCEPTED - both are treated as invalid + // INVALID: malicious block + // ACCEPTED: Non-canonical payload - should not happen with instant finality + error!(%height, %round, "Block validation failed: {}", payload_status.status); + Validity::Invalid + }; + + cache.insert(block_hash, validity); + Ok(validity) +} + +/// Validates execution payload bytes with the execution engine. +/// Decodes the payload, extracts versioned hashes, and validates. +/// +/// Returns `Ok(Validity::Invalid)` if decoding fails or payload is invalid, +/// `Ok(Validity::Valid)` if valid, or `Err` for engine communication failures. +pub async fn validate_execution_payload( + cache: &mut ValidatedPayloadCache, + data: &Bytes, + height: Height, + round: Round, + engine: &Engine, + retry_config: &RetryConfig, +) -> eyre::Result { + // Decode execution payload + let execution_payload = match ExecutionPayloadV3::from_ssz_bytes(data) { + Ok(payload) => payload, + Err(e) => { + warn!( + height = %height, + round = %round, + error = ?e, + "Proposal has invalid ExecutionPayloadV3 encoding" + ); + return Ok(Validity::Invalid); + } + }; + + // Extract versioned hashes for blob transactions + let block: Block = match execution_payload.clone().try_into_block() { + Ok(block) => block, + Err(e) => { + warn!( + height = %height, + round = %round, + error = ?e, + "Failed to convert ExecutionPayloadV3 to Block" + ); + return Ok(Validity::Invalid); + } + }; + let versioned_hashes: Vec = + block.body.blob_versioned_hashes_iter().copied().collect(); + + // Validate with execution engine + validate_payload( + cache, + engine, + &execution_payload, + &versioned_hashes, + retry_config, + height, + round, + ) + .await +} + +/// Extracts a block header from an ExecutionPayloadV3 by removing transactions and withdrawals. +/// +/// Returns an ExecutionPayloadV3 with empty transactions and withdrawals vectors, +/// containing only the block header fields. +pub fn extract_block_header(payload: &ExecutionPayloadV3) -> ExecutionPayloadV3 { + ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + transactions: vec![], + ..payload.payload_inner.payload_inner.clone() + }, + withdrawals: vec![], + }, + ..payload.clone() + } +} + +/// Reconstructs a complete ExecutionPayloadV3 from a block header and payload body. +/// +/// Takes a header (ExecutionPayloadV3 with empty transactions/withdrawals) and combines it +/// with the transactions and withdrawals from an ExecutionPayloadBodyV1 to create a full payload. +pub fn reconstruct_execution_payload( + header: ExecutionPayloadV3, + body: ExecutionPayloadBodyV1, +) -> ExecutionPayloadV3 { + ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + transactions: body.transactions, + ..header.payload_inner.payload_inner + }, + withdrawals: body.withdrawals.unwrap_or_default(), + }, + ..header + } +} diff --git a/app/src/state.rs b/app/src/state.rs index e66dc837..907850b6 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -6,8 +6,6 @@ use std::fmt; use alloy_genesis::ChainConfig; use alloy_rpc_types_engine::ExecutionPayloadV3; use bytes::Bytes; -use caches::lru::AdaptiveCache; -use caches::Cache; use color_eyre::eyre; use malachitebft_app_channel::app::streaming::{StreamContent, StreamId, StreamMessage}; use malachitebft_app_channel::app::types::codec::Codec; @@ -19,8 +17,8 @@ use malachitebft_eth_engine::json_structures::ExecutionBlock; use malachitebft_eth_types::codec::proto::ProtobufCodec; use malachitebft_eth_types::secp256k1::K256Provider; use malachitebft_eth_types::{ - Address, Block, BlockHash, BlockTimestamp, EmeraldContext, Genesis, Height, ProposalData, - ProposalFin, ProposalInit, ProposalPart, RetryConfig, ValidatorSet, Value, ValueId, + Address, BlockTimestamp, EmeraldContext, Genesis, Height, ProposalData, ProposalFin, + ProposalInit, ProposalPart, RetryConfig, ValidatorSet, Value, ValueId, }; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -30,35 +28,10 @@ use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; use crate::metrics::Metrics; +use crate::payload::{extract_block_header, validate_execution_payload, ValidatedPayloadCache}; use crate::store::Store; use crate::streaming::{PartStreamsMap, ProposalParts}; -/// Cache for tracking recently validated execution payloads to avoid redundant validation. -/// Stores both the block hash and its validity result (Valid or Invalid). -pub struct ValidatedPayloadCache { - cache: AdaptiveCache, -} - -impl ValidatedPayloadCache { - pub fn new(max_size: usize) -> Self { - Self { - cache: AdaptiveCache::new(max_size) - .expect("Failed to create AdaptiveCache: invalid cache size"), - } - } - - /// Check if a block hash has been validated and return its cached validity - pub fn get(&mut self, block_hash: &BlockHash) -> Option { - self.cache.get(block_hash).copied() - } - - /// Insert a block hash and its validity result into the cache - pub fn insert(&mut self, block_hash: BlockHash, validity: Validity) { - self.cache.put(block_hash, validity); - } -} -use crate::sync_handler::validate_payload; - pub struct StateMetrics { pub txs_count: u64, pub chain_bytes: u64, @@ -368,60 +341,6 @@ impl State { Ok(()) } - /// Validates execution payload with the execution engine. - /// Returns `Ok(Validity::Invalid)` if decoding fails or payload is invalid, - /// `Ok(Validity::Valid)` if valid, or `Err` for engine communication failures. - pub async fn validate_execution_payload( - &mut self, - data: &Bytes, - height: Height, - round: Round, - engine: &Engine, - retry_config: &RetryConfig, - ) -> eyre::Result { - // Decode execution payload - let execution_payload = match ExecutionPayloadV3::from_ssz_bytes(data) { - Ok(payload) => payload, - Err(e) => { - warn!( - height = %height, - round = %round, - error = ?e, - "Proposal has invalid ExecutionPayloadV3 encoding" - ); - return Ok(Validity::Invalid); - } - }; - - // Extract versioned hashes for blob transactions - let block: Block = match execution_payload.clone().try_into_block() { - Ok(block) => block, - Err(e) => { - warn!( - height = %height, - round = %round, - error = ?e, - "Failed to convert ExecutionPayloadV3 to Block" - ); - return Ok(Validity::Invalid); - } - }; - let versioned_hashes: Vec = - block.body.blob_versioned_hashes_iter().copied().collect(); - - // Validate with execution engine - validate_payload( - &mut self.validated_payload_cache, - engine, - &execution_payload, - &versioned_hashes, - retry_config, - height, - round, - ) - .await - } - /// Processes complete proposal parts: validates, stores, and returns the proposed value. /// /// Returns `Ok(Some(ProposedValue))` if the proposal is valid and stored, @@ -458,9 +377,15 @@ impl State { } // Validate the execution payload with the execution engine - let validity = self - .validate_execution_payload(&data, value.height, value.round, engine, retry_config) - .await?; + let validity = validate_execution_payload( + &mut self.validated_payload_cache, + &data, + value.height, + value.round, + engine, + retry_config, + ) + .await?; if validity == Validity::Invalid { warn!( @@ -881,45 +806,5 @@ pub fn decode_value(bytes: Bytes) -> Value { ProtobufCodec.decode(bytes).unwrap() } -/// Extracts a block header from an ExecutionPayloadV3 by removing transactions and withdrawals. -/// -/// Returns an ExecutionPayloadV3 with empty transactions and withdrawals vectors, -/// containing only the block header fields. -pub fn extract_block_header( - payload: &alloy_rpc_types_engine::ExecutionPayloadV3, -) -> alloy_rpc_types_engine::ExecutionPayloadV3 { - use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; - - ExecutionPayloadV3 { - payload_inner: ExecutionPayloadV2 { - payload_inner: ExecutionPayloadV1 { - transactions: vec![], - ..payload.payload_inner.payload_inner.clone() - }, - withdrawals: vec![], - }, - ..payload.clone() - } -} - -/// Reconstructs a complete ExecutionPayloadV3 from a block header and payload body. -/// -/// Takes a header (ExecutionPayloadV3 with empty transactions/withdrawals) and combines it -/// with the transactions and withdrawals from an ExecutionPayloadBodyV1 to create a full payload. -pub fn reconstruct_execution_payload( - header: alloy_rpc_types_engine::ExecutionPayloadV3, - body: malachitebft_eth_engine::json_structures::ExecutionPayloadBodyV1, -) -> alloy_rpc_types_engine::ExecutionPayloadV3 { - use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; - - ExecutionPayloadV3 { - payload_inner: ExecutionPayloadV2 { - payload_inner: ExecutionPayloadV1 { - transactions: body.transactions, - ..header.payload_inner.payload_inner - }, - withdrawals: body.withdrawals.unwrap_or_default(), - }, - ..header - } -} +// Re-export payload utilities for backwards compatibility +pub use crate::payload::reconstruct_execution_payload; diff --git a/app/src/sync_handler.rs b/app/src/sync_handler.rs index 2b53d546..0328b030 100644 --- a/app/src/sync_handler.rs +++ b/app/src/sync_handler.rs @@ -1,73 +1,19 @@ -//! Sync handler functions for processing synced payloads +//! Sync handler functions for retrieving decided values for sync. use alloy_rpc_types_engine::ExecutionPayloadV3; use bytes::Bytes; use color_eyre::eyre::{self, eyre}; use malachitebft_app_channel::app::types::codec::Codec; -use malachitebft_app_channel::app::types::core::{Round, Validity}; use malachitebft_app_channel::app::types::sync::RawDecidedValue; use malachitebft_eth_engine::engine::Engine; use malachitebft_eth_types::codec::proto::ProtobufCodec; -use malachitebft_eth_types::{BlockHash, EmeraldContext, Height, RetryConfig, Value}; +use malachitebft_eth_types::{EmeraldContext, Height, Value}; use ssz::{Decode, Encode}; -use tracing::{debug, error, info}; +use tracing::{error, info}; -use crate::state::{reconstruct_execution_payload, ValidatedPayloadCache}; +use crate::payload::reconstruct_execution_payload; use crate::store::Store; -/// Generic function to validate execution payload with retry mechanism for SYNCING status. -/// Returns the validity of the payload or an error if timeout is exceeded. -/// Uses cache to avoid duplicate validation -pub async fn validate_payload( - cache: &mut ValidatedPayloadCache, - engine: &Engine, - execution_payload: &ExecutionPayloadV3, - versioned_hashes: &[BlockHash], - retry_config: &RetryConfig, - height: Height, - round: Round, -) -> eyre::Result { - let block_hash = execution_payload.payload_inner.payload_inner.block_hash; - - // Check if we've already called newPayload for this block - if let Some(cached_validity) = cache.get(&block_hash) { - debug!( - %height, %round, %block_hash, validity = ?cached_validity, - "Skipping duplicate newPayload call, returning cached result" - ); - return Ok(cached_validity); - } - - let payload_status = engine - .notify_new_block_with_retry( - execution_payload.clone(), - versioned_hashes.to_vec(), - retry_config, - ) - .await - .map_err(|e| { - eyre!( - "Execution client stuck in SYNCING for {:?} at height {}: {}", - retry_config.max_elapsed_time, - height, - e - ) - })?; - - let validity = if payload_status.status.is_valid() { - Validity::Valid - } else { - // INVALID or ACCEPTED - both are treated as invalid - // INVALID: malicious block - // ACCEPTED: Non-canonical payload - should not happen with instant finality - error!(%height, %round, "🔴 Block validation failed: {}", payload_status.status); - Validity::Invalid - }; - - cache.insert(block_hash, validity); - Ok(validity) -} - /// Retrieves a decided value for sync at the given height. /// If the value is pruned from storage, reconstructs it from the block header and execution layer. pub async fn get_decided_value_for_sync( From 730490251e17460d4641cdd7b7ec5b9ef758c5ba Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 11:26:46 +0100 Subject: [PATCH 06/11] add store_undecided_value --- app/src/app.rs | 14 +------------- app/src/state.rs | 33 +++++++++++++++++++-------------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/app/src/app.rs b/app/src/app.rs index ebd259fd..39a2e2c0 100644 --- a/app/src/app.rs +++ b/app/src/app.rs @@ -802,19 +802,7 @@ pub async fn on_process_synced_value( validity: Validity::Valid, }; - if let Err(e) = state - .store - .store_undecided_block_data(height, round, proposed_value.value.id(), block_bytes) - .await - { - error!(%height, %round, error = %e, "Failed to store synced block data"); - } - // Store the synced value and block data - if let Err(e) = state - .store - .store_undecided_proposal(proposed_value.clone()) - .await - { + if let Err(e) = state.store_undecided_value(&proposed_value, block_bytes).await { error!(%height, %round, error = %e, "Failed to store synced value"); } diff --git a/app/src/state.rs b/app/src/state.rs index 4a3626ae..ed940141 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -402,10 +402,7 @@ impl State { // Store as undecided info!(%value.height, %value.round, %value.proposer, "Storing validated proposal as undecided"); - self.store - .store_undecided_block_data(value.height, value.round, value.value.id(), data) - .await?; - self.store.store_undecided_proposal(value.clone()).await?; + self.store_undecided_value(&value, data).await?; Ok(Some(value)) } @@ -470,6 +467,22 @@ impl State { .flatten() } + /// Stores an undecided proposal along with its block data. + /// + /// WARN: The order of the two storage operations is important. + /// TODO: Add more context on why the order is important. + pub async fn store_undecided_value( + &self, + value: &ProposedValue, + data: Bytes, + ) -> eyre::Result<()> { + self.store + .store_undecided_block_data(value.height, value.round, value.value.id(), data) + .await?; + self.store.store_undecided_proposal(value.clone()).await?; + Ok(()) + } + /// Commits a value with the given certificate, updating internal state /// and moving to the next height pub async fn commit( @@ -658,17 +671,9 @@ impl State { value, validity: Validity::Valid, // Our proposals are de facto valid }; - // Store the block data at the proposal's height/round, - // which will be passed to the execution client (EL) on commit. - // WARN: THE ORDER OF THE FOLLOWING TWO OPERATIONS IS IMPORTANT. - self.store - .store_undecided_block_data(height, round, proposal.value.id(), data.clone()) - .await?; - // Insert the new proposal into the undecided proposals. - self.store - .store_undecided_proposal(proposal.clone()) - .await?; + // Store the proposal and its block data + self.store_undecided_value(&proposal, data).await?; Ok(LocallyProposedValue::new( proposal.height, From 401560f08fc5c7d149b74484afe9e6558904347c Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 11:34:48 +0100 Subject: [PATCH 07/11] remove validate_payload --- app/src/payload.rs | 105 +++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 65 deletions(-) diff --git a/app/src/payload.rs b/app/src/payload.rs index cef4a18a..a653b551 100644 --- a/app/src/payload.rs +++ b/app/src/payload.rs @@ -38,63 +38,9 @@ impl ValidatedPayloadCache { } } -/// Validates an already-decoded execution payload with the execution engine. -/// Uses cache to avoid duplicate validation calls. -/// -/// Returns `Ok(Validity::Valid)` if valid, `Ok(Validity::Invalid)` if invalid, -/// or `Err` for engine communication failures. -pub async fn validate_payload( - cache: &mut ValidatedPayloadCache, - engine: &Engine, - execution_payload: &ExecutionPayloadV3, - versioned_hashes: &[BlockHash], - retry_config: &RetryConfig, - height: Height, - round: Round, -) -> eyre::Result { - let block_hash = execution_payload.payload_inner.payload_inner.block_hash; - - // Check if we've already called newPayload for this block - if let Some(cached_validity) = cache.get(&block_hash) { - debug!( - %height, %round, %block_hash, validity = ?cached_validity, - "Skipping duplicate newPayload call, returning cached result" - ); - return Ok(cached_validity); - } - - let payload_status = engine - .notify_new_block_with_retry( - execution_payload.clone(), - versioned_hashes.to_vec(), - retry_config, - ) - .await - .map_err(|e| { - eyre!( - "Execution client stuck in SYNCING for {:?} at height {}: {}", - retry_config.max_elapsed_time, - height, - e - ) - })?; - - let validity = if payload_status.status.is_valid() { - Validity::Valid - } else { - // INVALID or ACCEPTED - both are treated as invalid - // INVALID: malicious block - // ACCEPTED: Non-canonical payload - should not happen with instant finality - error!(%height, %round, "Block validation failed: {}", payload_status.status); - Validity::Invalid - }; - - cache.insert(block_hash, validity); - Ok(validity) -} - /// Validates execution payload bytes with the execution engine. /// Decodes the payload, extracts versioned hashes, and validates. +/// Uses cache to avoid duplicate validation calls. /// /// Returns `Ok(Validity::Invalid)` if decoding fails or payload is invalid, /// `Ok(Validity::Valid)` if valid, or `Err` for engine communication failures. @@ -120,6 +66,17 @@ pub async fn validate_execution_payload( } }; + let block_hash = execution_payload.payload_inner.payload_inner.block_hash; + + // Check if we've already validated this block + if let Some(cached_validity) = cache.get(&block_hash) { + debug!( + %height, %round, %block_hash, validity = ?cached_validity, + "Skipping duplicate newPayload call, returning cached result" + ); + return Ok(cached_validity); + } + // Extract versioned hashes for blob transactions let block: Block = match execution_payload.clone().try_into_block() { Ok(block) => block, @@ -137,16 +94,34 @@ pub async fn validate_execution_payload( block.body.blob_versioned_hashes_iter().copied().collect(); // Validate with execution engine - validate_payload( - cache, - engine, - &execution_payload, - &versioned_hashes, - retry_config, - height, - round, - ) - .await + let payload_status = engine + .notify_new_block_with_retry( + execution_payload, + versioned_hashes, + retry_config, + ) + .await + .map_err(|e| { + eyre!( + "Execution client stuck in SYNCING for {:?} at height {}: {}", + retry_config.max_elapsed_time, + height, + e + ) + })?; + + let validity = if payload_status.status.is_valid() { + Validity::Valid + } else { + // INVALID or ACCEPTED - both are treated as invalid + // INVALID: malicious block + // ACCEPTED: Non-canonical payload - should not happen with instant finality + error!(%height, %round, "Block validation failed: {}", payload_status.status); + Validity::Invalid + }; + + cache.insert(block_hash, validity); + Ok(validity) } /// Extracts a block header from an ExecutionPayloadV3 by removing transactions and withdrawals. From abd0db30e0c62956886d26d3132ab52ece553f5e Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 11:53:17 +0100 Subject: [PATCH 08/11] cargo fmt --- app/src/app.rs | 5 ++++- app/src/payload.rs | 8 ++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/app/src/app.rs b/app/src/app.rs index 39a2e2c0..9051f6af 100644 --- a/app/src/app.rs +++ b/app/src/app.rs @@ -802,7 +802,10 @@ pub async fn on_process_synced_value( validity: Validity::Valid, }; - if let Err(e) = state.store_undecided_value(&proposed_value, block_bytes).await { + if let Err(e) = state + .store_undecided_value(&proposed_value, block_bytes) + .await + { error!(%height, %round, error = %e, "Failed to store synced value"); } diff --git a/app/src/payload.rs b/app/src/payload.rs index a653b551..ed5302d3 100644 --- a/app/src/payload.rs +++ b/app/src/payload.rs @@ -5,11 +5,11 @@ use bytes::Bytes; use caches::lru::AdaptiveCache; use caches::Cache; use color_eyre::eyre::{self, eyre}; +use malachitebft_app_channel::app::types::core::Round; use malachitebft_app_channel::app::types::core::Validity; use malachitebft_eth_engine::engine::Engine; use malachitebft_eth_engine::json_structures::ExecutionPayloadBodyV1; use malachitebft_eth_types::{Block, BlockHash, Height, RetryConfig}; -use malachitebft_app_channel::app::types::core::Round; use ssz::Decode; use tracing::{debug, error, warn}; @@ -95,11 +95,7 @@ pub async fn validate_execution_payload( // Validate with execution engine let payload_status = engine - .notify_new_block_with_retry( - execution_payload, - versioned_hashes, - retry_config, - ) + .notify_new_block_with_retry(execution_payload, versioned_hashes, retry_config) .await .map_err(|e| { eyre!( From 15b7174032fcde8412c87e607842357f646f8de0 Mon Sep 17 00:00:00 2001 From: mpoke Date: Wed, 28 Jan 2026 12:31:26 +0100 Subject: [PATCH 09/11] fix linter --- app/src/payload.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/src/payload.rs b/app/src/payload.rs index ed5302d3..9595c401 100644 --- a/app/src/payload.rs +++ b/app/src/payload.rs @@ -5,8 +5,7 @@ use bytes::Bytes; use caches::lru::AdaptiveCache; use caches::Cache; use color_eyre::eyre::{self, eyre}; -use malachitebft_app_channel::app::types::core::Round; -use malachitebft_app_channel::app::types::core::Validity; +use malachitebft_app_channel::app::types::core::{Round, Validity}; use malachitebft_eth_engine::engine::Engine; use malachitebft_eth_engine::json_structures::ExecutionPayloadBodyV1; use malachitebft_eth_types::{Block, BlockHash, Height, RetryConfig}; From 3ae88fb756cb9855fd798569a6ae70c1ccc35aae Mon Sep 17 00:00:00 2001 From: mpoke Date: Thu, 5 Feb 2026 08:57:52 +0100 Subject: [PATCH 10/11] apply review suggestions --- app/src/state.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/app/src/state.rs b/app/src/state.rs index ed940141..01b02fc6 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -371,14 +371,12 @@ impl State { let (value, data) = assemble_value_from_parts(parts.clone()); // Log first 32 bytes of proposal data and total size - if data.len() >= 32 { - info!( - "Proposal data[0..32]: {}, total_size: {} bytes, id: {:x}", - hex::encode(&data[..32]), - data.len(), - value.value.id().as_u64() - ); - } + info!( + data = %hex::encode(&data[..data.len().min(32)]), + total_size = %data.len(), + id = %value.value.id().as_u64(), + "Proposal data" + ); // Validate the execution payload with the execution engine let validity = validate_execution_payload( @@ -869,6 +867,3 @@ pub fn assemble_value_from_parts(parts: ProposalParts) -> (ProposedValue Value { ProtobufCodec.decode(bytes).unwrap() } - -// Re-export payload utilities for backwards compatibility -pub use crate::payload::reconstruct_execution_payload; From a1f785acd8847efc67a019a3148a2818e6f4c72b Mon Sep 17 00:00:00 2001 From: mpoke Date: Thu, 5 Feb 2026 09:37:16 +0100 Subject: [PATCH 11/11] add context to warning --- app/src/state.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/src/state.rs b/app/src/state.rs index 01b02fc6..09828f2b 100644 --- a/app/src/state.rs +++ b/app/src/state.rs @@ -468,7 +468,10 @@ impl State { /// Stores an undecided proposal along with its block data. /// /// WARN: The order of the two storage operations is important. - /// TODO: Add more context on why the order is important. + /// Block data must be stored before the proposal metadata to prevent crashes from + /// leaving a proposal that references non-existent block data. If a crash occurs + /// between the operations, orphaned block data is safe, but a dangling proposal + /// reference would cause retrieval failures. pub async fn store_undecided_value( &self, value: &ProposedValue,