Skip to content

Commit 145f116

Browse files
authored
refactor(app): Extract payload validation utilities and reduce code duplication (#199)
* add process_complete_proposal_parts * remove store_undecided_block_data wrapper * use validate_execution_payload in on_process_synced_value * use validate_execution_payload in on_decided * add payload.rs mod with exec payload utilities * add store_undecided_value * remove validate_payload * cargo fmt * fix linter * apply review suggestions * add context to warning
1 parent 7ebefe3 commit 145f116

File tree

5 files changed

+293
-378
lines changed

5 files changed

+293
-378
lines changed

app/src/app.rs

Lines changed: 45 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ alloy_sol_types::sol!(
2727
"../solidity/out/ValidatorManager.sol/ValidatorManager.json"
2828
);
2929

30-
use crate::state::{assemble_value_from_parts, decode_value, State};
31-
use crate::sync_handler::{get_decided_value_for_sync, validate_payload};
30+
use crate::payload::validate_execution_payload;
31+
use crate::state::{decode_value, State};
32+
use crate::sync_handler::get_decided_value_for_sync;
3233

3334
pub async fn initialize_state_from_genesis(state: &mut State, engine: &Engine) -> eyre::Result<()> {
3435
// Get the genesis block from the execution engine
@@ -396,54 +397,21 @@ pub async fn on_started_round(
396397
);
397398

398399
for parts in &pending_parts {
399-
match state.validate_proposal_parts(parts) {
400-
Ok(()) => {
401-
// Validate execution payload with the execution engine before storing it as undecided proposal
402-
let (value, data) = assemble_value_from_parts(parts.clone());
403-
404-
let validity = state
405-
.validate_execution_payload(
406-
&data,
407-
parts.height,
408-
parts.round,
409-
engine,
410-
&emerald_config.retry_config,
411-
)
412-
.await?;
413-
414-
if validity == Validity::Invalid {
415-
warn!(
416-
height = %parts.height,
417-
round = %parts.round,
418-
"Pending proposal has invalid execution payload, rejecting"
419-
);
420-
continue;
421-
}
400+
// Validate and store the pending proposal
401+
let result = state
402+
.process_complete_proposal_parts(parts, engine, &emerald_config.retry_config)
403+
.await?;
422404

423-
state.store.store_undecided_proposal(value.clone()).await?;
405+
if result.is_some() {
406+
info!(
407+
height = %parts.height,
408+
round = %parts.round,
409+
proposer = %parts.proposer,
410+
"Moved valid pending proposal to undecided after validation"
411+
);
412+
}
424413

425-
state
426-
.store
427-
.store_undecided_block_data(value.height, value.round, value.value.id(), data)
428-
.await?;
429-
info!(
430-
height = %parts.height,
431-
round = %parts.round,
432-
proposer = %parts.proposer,
433-
"Moved valid pending proposal to undecided after validation"
434-
);
435-
}
436-
Err(error) => {
437-
// Validation failed, log error
438-
error!(
439-
height = %parts.height,
440-
round = %parts.round,
441-
proposer = %parts.proposer,
442-
error = ?error,
443-
"Removed invalid pending proposal"
444-
);
445-
}
446-
} // Remove the parts from pending
414+
// Remove the parts from pending regardless of validation outcome
447415
state
448416
.store
449417
.remove_pending_proposal_parts(parts.clone())
@@ -596,15 +564,20 @@ pub async fn on_received_proposal_part(
596564
"Received proposal part"
597565
);
598566

599-
// Try to reassemble the proposal from received parts. If present,
600-
// validate it with the execution engine and mark invalid when
601-
// parsing or validation fails. Keep the outer `Option` and send it
602-
// back to the caller (consensus) regardless.
603-
let proposed_value = state
604-
.received_proposal_part(from, part, engine, &emerald_config.retry_config)
605-
.await?;
567+
// Try to reassemble the proposal from received parts
568+
let parts = state.reassemble_proposal(from, part).await?;
606569

607-
if let Some(proposed_value) = proposed_value.clone() {
570+
// If we have complete parts, validate and store the proposal
571+
let proposed_value = match parts {
572+
Some(parts) => {
573+
state
574+
.process_complete_proposal_parts(&parts, engine, &emerald_config.retry_config)
575+
.await?
576+
}
577+
None => None,
578+
};
579+
580+
if let Some(ref proposed_value) = proposed_value {
608581
debug!("✅ Received complete proposal: {:?}", proposed_value);
609582
}
610583

@@ -680,36 +653,16 @@ pub async fn on_decided(
680653
.block_hash;
681654
assert_eq!(latest_block_hash, parent_block_hash);
682655

683-
// Get validation status from cache or call newPayload
684-
let validity = if let Some(cached) = state.validated_cache_mut().get(&block_hash) {
685-
cached
686-
} else {
687-
// Collect hashes from blob transactions
688-
let block: Block = execution_payload.clone().try_into_block().map_err(|e| {
689-
eyre::eyre!(
690-
"Failed to convert decided ExecutionPayloadV3 to Block at height {}: {}",
691-
height,
692-
e
693-
)
694-
})?;
695-
let versioned_hashes: Vec<BlockHash> =
696-
block.body.blob_versioned_hashes_iter().copied().collect();
697-
698-
// Ask the EL to validate the execution payload
699-
let payload_status = engine
700-
.notify_new_block(execution_payload, versioned_hashes)
701-
.await?;
702-
703-
let validity = if payload_status.status.is_valid() {
704-
Validity::Valid
705-
} else {
706-
Validity::Invalid
707-
};
708-
709-
// TODO: insert validation outcome into cache also when calling notify_new_block_with_retry in validate_payload
710-
state.validated_cache_mut().insert(block_hash, validity);
711-
validity
712-
};
656+
// Validate the execution payload (uses cache internally)
657+
let validity = validate_execution_payload(
658+
state.validated_cache_mut(),
659+
&block_bytes,
660+
height,
661+
round,
662+
engine,
663+
&emerald_config.retry_config,
664+
)
665+
.await?;
713666

714667
if validity == Validity::Invalid {
715668
return Err(eyre!("Block validation failed for hash: {}", block_hash));
@@ -808,38 +761,16 @@ pub async fn on_process_synced_value(
808761
info!(%height, %round, "🟢🟢 Processing synced value");
809762

810763
let value = decode_value(value_bytes);
811-
812-
// Extract execution payload from the synced value for validation
813764
let block_bytes = value.extensions.clone();
814-
let execution_payload = ExecutionPayloadV3::from_ssz_bytes(&block_bytes).map_err(|e| {
815-
eyre::eyre!(
816-
"Failed to decode synced ExecutionPayloadV3 at height {}: {:?}",
817-
height,
818-
e
819-
)
820-
})?;
821-
let new_block_hash = execution_payload.payload_inner.payload_inner.block_hash;
822-
823-
// Collect hashes from blob transactions
824-
let block: Block = execution_payload.clone().try_into_block().map_err(|e| {
825-
eyre::eyre!(
826-
"Failed to convert synced ExecutionPayloadV3 to Block at height {}: {}",
827-
height,
828-
e
829-
)
830-
})?;
831-
let versioned_hashes: Vec<BlockHash> =
832-
block.body.blob_versioned_hashes_iter().copied().collect();
833765

834766
// Validate the synced block
835-
let validity = validate_payload(
767+
let validity = validate_execution_payload(
836768
state.validated_cache_mut(),
837-
engine,
838-
&execution_payload,
839-
&versioned_hashes,
840-
&emerald_config.retry_config,
769+
&block_bytes,
841770
height,
842771
round,
772+
engine,
773+
&emerald_config.retry_config,
843774
)
844775
.await?;
845776

@@ -861,10 +792,7 @@ pub async fn on_process_synced_value(
861792
return Ok(());
862793
}
863794

864-
debug!(
865-
"💡 Sync block validated at height {} with hash: {}",
866-
height, new_block_hash
867-
);
795+
debug!(%height, "💡 Sync block validated");
868796
let proposed_value: ProposedValue<EmeraldContext> = ProposedValue {
869797
height,
870798
round,
@@ -875,16 +803,7 @@ pub async fn on_process_synced_value(
875803
};
876804

877805
if let Err(e) = state
878-
.store
879-
.store_undecided_block_data(height, round, proposed_value.value.id(), block_bytes)
880-
.await
881-
{
882-
error!(%height, %round, error = %e, "Failed to store synced block data");
883-
}
884-
// Store the synced value and block data
885-
if let Err(e) = state
886-
.store
887-
.store_undecided_proposal(proposed_value.clone())
806+
.store_undecided_value(&proposed_value, block_bytes)
888807
.await
889808
{
890809
error!(%height, %round, error = %e, "Failed to store synced value");

app/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod app;
22
mod metrics;
33
pub mod node;
4+
mod payload;
45
pub mod state;
56
mod store;
67
mod streaming;

app/src/payload.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
//! Execution payload utilities for validation, caching, and manipulation.
2+
3+
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
4+
use bytes::Bytes;
5+
use caches::lru::AdaptiveCache;
6+
use caches::Cache;
7+
use color_eyre::eyre::{self, eyre};
8+
use malachitebft_app_channel::app::types::core::{Round, Validity};
9+
use malachitebft_eth_engine::engine::Engine;
10+
use malachitebft_eth_engine::json_structures::ExecutionPayloadBodyV1;
11+
use malachitebft_eth_types::{Block, BlockHash, Height, RetryConfig};
12+
use ssz::Decode;
13+
use tracing::{debug, error, warn};
14+
15+
/// Cache for tracking recently validated execution payloads to avoid redundant validation.
16+
/// Stores both the block hash and its validity result (Valid or Invalid).
17+
pub struct ValidatedPayloadCache {
18+
cache: AdaptiveCache<BlockHash, Validity>,
19+
}
20+
21+
impl ValidatedPayloadCache {
22+
pub fn new(max_size: usize) -> Self {
23+
Self {
24+
cache: AdaptiveCache::new(max_size)
25+
.expect("Failed to create AdaptiveCache: invalid cache size"),
26+
}
27+
}
28+
29+
/// Check if a block hash has been validated and return its cached validity
30+
pub fn get(&mut self, block_hash: &BlockHash) -> Option<Validity> {
31+
self.cache.get(block_hash).copied()
32+
}
33+
34+
/// Insert a block hash and its validity result into the cache
35+
pub fn insert(&mut self, block_hash: BlockHash, validity: Validity) {
36+
self.cache.put(block_hash, validity);
37+
}
38+
}
39+
40+
/// Validates execution payload bytes with the execution engine.
41+
/// Decodes the payload, extracts versioned hashes, and validates.
42+
/// Uses cache to avoid duplicate validation calls.
43+
///
44+
/// Returns `Ok(Validity::Invalid)` if decoding fails or payload is invalid,
45+
/// `Ok(Validity::Valid)` if valid, or `Err` for engine communication failures.
46+
pub async fn validate_execution_payload(
47+
cache: &mut ValidatedPayloadCache,
48+
data: &Bytes,
49+
height: Height,
50+
round: Round,
51+
engine: &Engine,
52+
retry_config: &RetryConfig,
53+
) -> eyre::Result<Validity> {
54+
// Decode execution payload
55+
let execution_payload = match ExecutionPayloadV3::from_ssz_bytes(data) {
56+
Ok(payload) => payload,
57+
Err(e) => {
58+
warn!(
59+
height = %height,
60+
round = %round,
61+
error = ?e,
62+
"Proposal has invalid ExecutionPayloadV3 encoding"
63+
);
64+
return Ok(Validity::Invalid);
65+
}
66+
};
67+
68+
let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
69+
70+
// Check if we've already validated this block
71+
if let Some(cached_validity) = cache.get(&block_hash) {
72+
debug!(
73+
%height, %round, %block_hash, validity = ?cached_validity,
74+
"Skipping duplicate newPayload call, returning cached result"
75+
);
76+
return Ok(cached_validity);
77+
}
78+
79+
// Extract versioned hashes for blob transactions
80+
let block: Block = match execution_payload.clone().try_into_block() {
81+
Ok(block) => block,
82+
Err(e) => {
83+
warn!(
84+
height = %height,
85+
round = %round,
86+
error = ?e,
87+
"Failed to convert ExecutionPayloadV3 to Block"
88+
);
89+
return Ok(Validity::Invalid);
90+
}
91+
};
92+
let versioned_hashes: Vec<BlockHash> =
93+
block.body.blob_versioned_hashes_iter().copied().collect();
94+
95+
// Validate with execution engine
96+
let payload_status = engine
97+
.notify_new_block_with_retry(execution_payload, versioned_hashes, retry_config)
98+
.await
99+
.map_err(|e| {
100+
eyre!(
101+
"Execution client stuck in SYNCING for {:?} at height {}: {}",
102+
retry_config.max_elapsed_time,
103+
height,
104+
e
105+
)
106+
})?;
107+
108+
let validity = if payload_status.status.is_valid() {
109+
Validity::Valid
110+
} else {
111+
// INVALID or ACCEPTED - both are treated as invalid
112+
// INVALID: malicious block
113+
// ACCEPTED: Non-canonical payload - should not happen with instant finality
114+
error!(%height, %round, "Block validation failed: {}", payload_status.status);
115+
Validity::Invalid
116+
};
117+
118+
cache.insert(block_hash, validity);
119+
Ok(validity)
120+
}
121+
122+
/// Extracts a block header from an ExecutionPayloadV3 by removing transactions and withdrawals.
123+
///
124+
/// Returns an ExecutionPayloadV3 with empty transactions and withdrawals vectors,
125+
/// containing only the block header fields.
126+
pub fn extract_block_header(payload: &ExecutionPayloadV3) -> ExecutionPayloadV3 {
127+
ExecutionPayloadV3 {
128+
payload_inner: ExecutionPayloadV2 {
129+
payload_inner: ExecutionPayloadV1 {
130+
transactions: vec![],
131+
..payload.payload_inner.payload_inner.clone()
132+
},
133+
withdrawals: vec![],
134+
},
135+
..payload.clone()
136+
}
137+
}
138+
139+
/// Reconstructs a complete ExecutionPayloadV3 from a block header and payload body.
140+
///
141+
/// Takes a header (ExecutionPayloadV3 with empty transactions/withdrawals) and combines it
142+
/// with the transactions and withdrawals from an ExecutionPayloadBodyV1 to create a full payload.
143+
pub fn reconstruct_execution_payload(
144+
header: ExecutionPayloadV3,
145+
body: ExecutionPayloadBodyV1,
146+
) -> ExecutionPayloadV3 {
147+
ExecutionPayloadV3 {
148+
payload_inner: ExecutionPayloadV2 {
149+
payload_inner: ExecutionPayloadV1 {
150+
transactions: body.transactions,
151+
..header.payload_inner.payload_inner
152+
},
153+
withdrawals: body.withdrawals.unwrap_or_default(),
154+
},
155+
..header
156+
}
157+
}

0 commit comments

Comments
 (0)