Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 76 additions & 68 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,74 +364,82 @@ impl Blockchain {
// Replace the VM's store with the caching version
vm.db.store = caching_store.clone();

let (execution_result, merkleization_result, warmer_duration) = std::thread::scope(|s| {
let vm_type = vm.vm_type;
let warm_handle = std::thread::Builder::new()
.name("block_executor_warmer".to_string())
.spawn_scoped(s, move || {
// Warming uses the same caching store, sharing cached state with execution
let start = Instant::now();
let _ = LEVM::warm_block(block, caching_store, vm_type);
start.elapsed()
})
.expect("Failed to spawn block_executor warmer thread");
let max_queue_length_ref = &mut max_queue_length;
let (tx, rx) = channel();
let execution_handle = std::thread::Builder::new()
.name("block_executor_execution".to_string())
.spawn_scoped(s, move || -> Result<_, ChainError> {
let execution_result =
vm.execute_block_pipeline(block, tx, queue_length_ref)?;

// Validate execution went alright
validate_gas_used(execution_result.block_gas_used, &block.header)?;
validate_receipts_root(&block.header, &execution_result.receipts)?;
validate_requests_hash(
&block.header,
&chain_config,
&execution_result.requests,
)?;

let exec_end_instant = Instant::now();
Ok((execution_result, exec_end_instant))
})
.expect("Failed to spawn block_executor exec thread");
let parent_header_ref = &parent_header; // Avoid moving to thread
let merkleize_handle = std::thread::Builder::new()
.name("block_executor_merkleizer".to_string())
.spawn_scoped(s, move || -> Result<_, StoreError> {
let (account_updates_list, accumulated_updates) = self.handle_merkleization(
s,
rx,
parent_header_ref,
queue_length_ref,
max_queue_length_ref,
)?;
let merkle_end_instant = Instant::now();
Ok((
account_updates_list,
accumulated_updates,
merkle_end_instant,
))
})
.expect("Failed to spawn block_executor merkleizer thread");
let warmer_duration = warm_handle
.join()
.inspect_err(|e| warn!("Warming thread error: {e:?}"))
.ok()
.unwrap_or(Duration::ZERO);
(
execution_handle.join().unwrap_or_else(|_| {
Err(ChainError::Custom("execution thread panicked".to_string()))
}),
merkleize_handle.join().unwrap_or_else(|_| {
Err(StoreError::Custom(
"merklization thread panicked".to_string(),
))
}),
warmer_duration,
)
});
let (execution_result, merkleization_result, warmer_duration) =
std::thread::scope(|s| -> Result<_, ChainError> {
let vm_type = vm.vm_type;
let warm_handle = std::thread::Builder::new()
.name("block_executor_warmer".to_string())
.spawn_scoped(s, move || {
// Warming uses the same caching store, sharing cached state with execution
let start = Instant::now();
let _ = LEVM::warm_block(block, caching_store, vm_type);
start.elapsed()
})
.map_err(|e| {
ChainError::Custom(format!("Failed to spawn warmer thread: {e}"))
})?;
let max_queue_length_ref = &mut max_queue_length;
let (tx, rx) = channel();
let execution_handle = std::thread::Builder::new()
.name("block_executor_execution".to_string())
.spawn_scoped(s, move || -> Result<_, ChainError> {
let execution_result =
vm.execute_block_pipeline(block, tx, queue_length_ref)?;

// Validate execution went alright
validate_gas_used(execution_result.block_gas_used, &block.header)?;
validate_receipts_root(&block.header, &execution_result.receipts)?;
validate_requests_hash(
&block.header,
&chain_config,
&execution_result.requests,
)?;

let exec_end_instant = Instant::now();
Ok((execution_result, exec_end_instant))
})
.map_err(|e| {
ChainError::Custom(format!("Failed to spawn execution thread: {e}"))
})?;
let parent_header_ref = &parent_header; // Avoid moving to thread
let merkleize_handle = std::thread::Builder::new()
.name("block_executor_merkleizer".to_string())
.spawn_scoped(s, move || -> Result<_, StoreError> {
let (account_updates_list, accumulated_updates) = self
.handle_merkleization(
s,
rx,
parent_header_ref,
queue_length_ref,
max_queue_length_ref,
)?;
let merkle_end_instant = Instant::now();
Ok((
account_updates_list,
accumulated_updates,
merkle_end_instant,
))
})
.map_err(|e| {
ChainError::Custom(format!("Failed to spawn merkleizer thread: {e}"))
})?;
let warmer_duration = warm_handle
.join()
.inspect_err(|e| warn!("Warming thread error: {e:?}"))
.ok()
.unwrap_or(Duration::ZERO);
Ok((
execution_handle.join().unwrap_or_else(|_| {
Err(ChainError::Custom("execution thread panicked".to_string()))
}),
merkleize_handle.join().unwrap_or_else(|_| {
Err(StoreError::Custom(
"merklization thread panicked".to_string(),
))
}),
warmer_duration,
))
})?;
let (account_updates_list, accumulated_updates, merkle_end_instant) = merkleization_result?;
let (execution_result, exec_end_instant) = execution_result?;

Expand Down
9 changes: 6 additions & 3 deletions crates/blockchain/dev/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ pub async fn start_block_producer(
continue;
}
};
let payload_id = fork_choice_response
.payload_id
.expect("Failed to produce block: payload_id is None in ForkChoiceResponse");
let Some(payload_id) = fork_choice_response.payload_id else {
tracing::error!("Failed to produce block: payload_id is None in ForkChoiceResponse");
sleep(Duration::from_millis(300)).await;
tries += 1;
continue;
};

// Wait to retrieve the payload.
// Note that this makes getPayload failures result in skipped blocks.
Expand Down
24 changes: 6 additions & 18 deletions crates/common/types/block_execution_witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,12 @@ impl GuestProgramState {

if update.removed {
// Remove account from trie
self.state_trie
.remove(hashed_address)
.expect("failed to remove from trie");
self.state_trie.remove(hashed_address)?;
} else {
// Add or update AccountState in the trie
// Fetch current state or create a new state to be inserted
let mut account_state = match self
.state_trie
.get(hashed_address)
.expect("failed to get account state from trie")
{
Some(encoded_state) => AccountState::decode(&encoded_state)
.expect("failed to decode account state"),
let mut account_state = match self.state_trie.get(hashed_address)? {
Some(encoded_state) => AccountState::decode(&encoded_state)?,
None => AccountState::default(),
};
if update.removed_storage {
Expand Down Expand Up @@ -291,24 +284,19 @@ impl GuestProgramState {
.partition(|(_k, v)| v.is_zero());

for (hashed_key, storage_value) in inserts {
storage_trie
.insert(hashed_key, storage_value.encode_to_vec())
.expect("failed to insert in trie");
storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
}

for (hashed_key, _) in deletes {
storage_trie
.remove(&hashed_key)
.expect("failed to remove key");
storage_trie.remove(&hashed_key)?;
}

let storage_root = storage_trie.hash_no_commit();
account_state.storage_root = storage_root;
}

self.state_trie
.insert(hashed_address.clone(), account_state.encode_to_vec())
.expect("failed to insert into storage");
.insert(hashed_address.clone(), account_state.encode_to_vec())?;
}
}
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ pub enum NetworkError {
DiscoveryServerError(#[from] DiscoveryServerError),
#[error("Failed to start Tx Broadcaster: {0}")]
TxBroadcasterError(#[from] TxBroadcasterError),
#[error("Failed to bind UDP socket: {0}")]
UdpSocketError(#[from] std::io::Error),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Adding a blanket From<std::io::Error> to NetworkError means any io::Error from any source in a function returning Result<_, NetworkError> will silently become UdpSocketError. Currently start_network only has the one UDP bind site, but this could be misleading if the function grows. A scoped .map_err() at the call site would be more precise:

let udp_socket = UdpSocket::bind(context.local_node.udp_addr())
    .await
    .map_err(|e| NetworkError::UdpSocketError(e))?;

and drop the #[from] on the variant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in cbdbce5

}

pub async fn start_network(context: P2PContext, bootnodes: Vec<Node>) -> Result<(), NetworkError> {
let udp_socket = UdpSocket::bind(context.local_node.udp_addr())
.await
.expect("Failed to bind udp socket");
let udp_socket = UdpSocket::bind(context.local_node.udp_addr()).await?;

DiscoveryServer::spawn(
context.storage.clone(),
Expand Down
40 changes: 20 additions & 20 deletions crates/networking/p2p/sync/healing/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,34 +284,33 @@ async fn heal_state_trie(
let to_write = std::mem::take(&mut nodes_to_write);
let store = store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
if !db_joinset.is_empty() {
db_joinset
.join_next()
.await
.expect("we just checked joinset is not empty")?;
if !db_joinset.is_empty()
&& let Some(result) = db_joinset.join_next().await
{
result??;
}
db_joinset.spawn_blocking(move || {
db_joinset.spawn_blocking(move || -> Result<(), SyncError> {
let mut encoded_to_write = BTreeMap::new();
for (path, node) in to_write {
for i in 0..path.len() {
encoded_to_write.insert(path.slice(0, i), vec![]);
}
encoded_to_write.insert(path, node.encode_to_vec());
}
let trie_db = store
.open_direct_state_trie(*EMPTY_TRIE_HASH)
.expect("Store should open");
let trie_db = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
let db = trie_db.db();
// PERF: use put_batch_no_alloc (note that it needs to remove nodes too)
db.put_batch(encoded_to_write.into_iter().collect())
.expect("The put batch on the store failed");
db.put_batch(encoded_to_write.into_iter().collect())?;
Ok(())
});
}

// End loop if we have no more paths to fetch nor nodes to heal and no inflight tasks
if is_done {
debug!("Nothing more to heal found");
db_joinset.join_all().await;
for result in db_joinset.join_all().await {
result?;
}
break;
}

Expand All @@ -323,7 +322,9 @@ async fn heal_state_trie(

if is_stale && nodes_to_heal.is_empty() && inflight_tasks == 0 {
debug!("Finished inflight tasks");
db_joinset.join_all().await;
for result in db_joinset.join_all().await {
result?;
}
break;
}
}
Expand Down Expand Up @@ -357,7 +358,7 @@ fn heal_state_batch(
&path.parent_path,
healing_queue,
nodes_to_write,
);
)?;
} else {
let entry = HealingQueueEntry {
node: node.clone(),
Expand All @@ -376,16 +377,14 @@ fn commit_node(
parent_path: &Nibbles,
healing_queue: &mut StateHealingQueue,
nodes_to_write: &mut Vec<(Nibbles, Node)>,
) {
) -> Result<(), SyncError> {
nodes_to_write.push((path.clone(), node));

if parent_path == path {
return; // Case where we're saving the root
return Ok(()); // Case where we're saving the root
}

let mut healing_queue_entry = healing_queue.remove(parent_path).unwrap_or_else(|| {
panic!("The parent should exist. Parent: {parent_path:?}, path: {path:?}")
});
let mut healing_queue_entry = healing_queue.remove(parent_path).ok_or(SyncError::CorruptPath)?;

healing_queue_entry.pending_children_count -= 1;
if healing_queue_entry.pending_children_count == 0 {
Expand All @@ -395,10 +394,11 @@ fn commit_node(
&healing_queue_entry.parent_path,
healing_queue,
nodes_to_write,
);
)?;
} else {
healing_queue.insert(parent_path.clone(), healing_queue_entry);
}
Ok(())
}

/// Returns the partial paths to the node's children if they are not already part of the trie state
Expand Down
16 changes: 14 additions & 2 deletions crates/networking/p2p/sync/healing/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::{
sync::mpsc::{Sender, error::TrySendError},
task::yield_now,
};
use tracing::{debug, trace};
use tracing::{debug, trace, warn};

/// This struct stores the metadata we need when we request a node
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -449,7 +449,19 @@ async fn zip_requeue_node_responses_score_peer(
}

if request.requests.len() < nodes_size {
panic!("The node responded with more data than us!");
warn!(
peer = ?request.peer_id,
requested = request.requests.len(),
received = nodes_size,
"Peer responded with more trie nodes than requested"
);
*failed_downloads += 1;
peer_handler
.peer_table
.record_failure(&request.peer_id)
.await?;
download_queue.extend(request.requests);
return Ok(None);
Comment on lines 451 to +464
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drops extra peer data

In zip_requeue_node_responses_score_peer, when nodes_size > request.requests.len() you treat it as a peer failure and requeue the original requests, but you silently drop the extra trie nodes that were already received (trie_nodes.nodes), even though they may contain useful nodes for other pending requests. If this situation can occur due to a peer bug/misalbehavior (or message framing issues), this code will now reliably discard those bytes and redo requests, potentially causing repeated download loops.

If you want to ignore the response, consider explicitly documenting that extra nodes are discarded by design (and ensure the peer is disconnected/penalized enough to prevent repeated churn), or alternatively keep the first request.requests.len() nodes and process those while penalizing the peer for the overflow.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/storage_healing.rs
Line: 443:456

Comment:
**Drops extra peer data**

In `zip_requeue_node_responses_score_peer`, when `nodes_size > request.requests.len()` you treat it as a peer failure and requeue the original requests, but you silently drop the *extra* trie nodes that were already received (`trie_nodes.nodes`), even though they may contain useful nodes for other pending requests. If this situation can occur due to a peer bug/misalbehavior (or message framing issues), this code will now reliably discard those bytes and redo requests, potentially causing repeated download loops.

If you want to ignore the response, consider explicitly documenting that extra nodes are discarded by design (and ensure the peer is disconnected/penalized enough to prevent repeated churn), or alternatively keep the first `request.requests.len()` nodes and process those while penalizing the peer for the overflow.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior goes against spec, a node exhibiting this behavior is malfunctioning.

The peer is in fact penalized.

}

if let Ok(nodes) = request
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1539,8 +1539,8 @@ impl Store {
let file = std::fs::File::open(genesis_path)
.map_err(|error| StoreError::Custom(format!("Failed to open genesis file: {error}")))?;
let reader = std::io::BufReader::new(file);
let genesis: Genesis =
serde_json::from_reader(reader).expect("Failed to deserialize genesis file");
let genesis: Genesis = serde_json::from_reader(reader)
.map_err(|e| StoreError::Custom(format!("Failed to deserialize genesis file: {e}")))?;
let mut store = Self::new(store_path, engine_type)?;
store.add_initial_state(genesis).await?;
Ok(store)
Expand Down
Loading