diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 4bfa0bc84a0..25d035c13f3 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -399,82 +399,90 @@ impl Blockchain { // Replace the VM's store with the caching version vm.db.store = caching_store.clone(); - let (execution_result, merkleization_result, warmer_duration) = std::thread::scope(|s| { - let vm_type = vm.vm_type; - let warm_handle = std::thread::Builder::new() - .name("block_executor_warmer".to_string()) - .spawn_scoped(s, move || { - // Warming uses the same caching store, sharing cached state with execution - let start = Instant::now(); - let _ = LEVM::warm_block(block, caching_store, vm_type); - start.elapsed() - }) - .expect("Failed to spawn block_executor warmer thread"); - let max_queue_length_ref = &mut max_queue_length; - let (tx, rx) = channel(); - let execution_handle = std::thread::Builder::new() - .name("block_executor_execution".to_string()) - .spawn_scoped(s, move || -> Result<_, ChainError> { - let (execution_result, bal) = - vm.execute_block_pipeline(block, tx, queue_length_ref)?; - - // Validate execution went alright - validate_gas_used(execution_result.block_gas_used, &block.header)?; - validate_receipts_root(&block.header, &execution_result.receipts)?; - validate_requests_hash( - &block.header, - &chain_config, - &execution_result.requests, - )?; - if let Some(bal) = &bal { - validate_block_access_list_hash( + let (execution_result, merkleization_result, warmer_duration) = + std::thread::scope(|s| -> Result<_, ChainError> { + let vm_type = vm.vm_type; + let warm_handle = std::thread::Builder::new() + .name("block_executor_warmer".to_string()) + .spawn_scoped(s, move || { + // Warming uses the same caching store, sharing cached state with execution + let start = Instant::now(); + let _ = LEVM::warm_block(block, caching_store, vm_type); + start.elapsed() + }) + .map_err(|e| { + ChainError::Custom(format!("Failed to spawn warmer thread: {e}")) + })?; + let max_queue_length_ref = &mut max_queue_length; + let (tx, rx) = channel(); + let execution_handle = std::thread::Builder::new() + .name("block_executor_execution".to_string()) + .spawn_scoped(s, move || -> Result<_, ChainError> { + let (execution_result, bal) = + vm.execute_block_pipeline(block, tx, queue_length_ref)?; + + // Validate execution went alright + validate_gas_used(execution_result.block_gas_used, &block.header)?; + validate_receipts_root(&block.header, &execution_result.receipts)?; + validate_requests_hash( &block.header, &chain_config, - bal, - block.body.transactions.len(), + &execution_result.requests, )?; - } + if let Some(bal) = &bal { + validate_block_access_list_hash( + &block.header, + &chain_config, + bal, + block.body.transactions.len(), + )?; + } - let exec_end_instant = Instant::now(); - Ok((execution_result, exec_end_instant)) - }) - .expect("Failed to spawn block_executor exec thread"); - let parent_header_ref = &parent_header; // Avoid moving to thread - let merkleize_handle = std::thread::Builder::new() - .name("block_executor_merkleizer".to_string()) - .spawn_scoped(s, move || -> Result<_, StoreError> { - let (account_updates_list, accumulated_updates) = self.handle_merkleization( - s, - rx, - parent_header_ref, - queue_length_ref, - max_queue_length_ref, - )?; - let merkle_end_instant = Instant::now(); - Ok(( - account_updates_list, - accumulated_updates, - merkle_end_instant, - )) - }) - .expect("Failed to spawn block_executor merkleizer thread"); - let warmer_duration = warm_handle - .join() - .inspect_err(|e| warn!("Warming thread error: {e:?}")) - .ok() - .unwrap_or(Duration::ZERO); - ( - execution_handle.join().unwrap_or_else(|_| { - Err(ChainError::Custom("execution thread panicked".to_string())) - }), - merkleize_handle.join().unwrap_or_else(|_| { - Err(StoreError::Custom( - "merklization thread panicked".to_string(), - )) - }), - warmer_duration, - ) - }); + let exec_end_instant = Instant::now(); + Ok((execution_result, exec_end_instant)) + }) + .map_err(|e| { + ChainError::Custom(format!("Failed to spawn execution thread: {e}")) + })?; + let parent_header_ref = &parent_header; // Avoid moving to thread + let merkleize_handle = std::thread::Builder::new() + .name("block_executor_merkleizer".to_string()) + .spawn_scoped(s, move || -> Result<_, StoreError> { + let (account_updates_list, accumulated_updates) = self + .handle_merkleization( + s, + rx, + parent_header_ref, + queue_length_ref, + max_queue_length_ref, + )?; + let merkle_end_instant = Instant::now(); + Ok(( + account_updates_list, + accumulated_updates, + merkle_end_instant, + )) + }) + .map_err(|e| { + ChainError::Custom(format!("Failed to spawn merkleizer thread: {e}")) + })?; + let warmer_duration = warm_handle + .join() + .inspect_err(|e| warn!("Warming thread error: {e:?}")) + .ok() + .unwrap_or(Duration::ZERO); + Ok(( + execution_handle.join().unwrap_or_else(|_| { + Err(ChainError::Custom("execution thread panicked".to_string())) + }), + merkleize_handle.join().unwrap_or_else(|_| { + Err(StoreError::Custom( + "merklization thread panicked".to_string(), + )) + }), + warmer_duration, + )) + })?; let (account_updates_list, accumulated_updates, merkle_end_instant) = merkleization_result?; let (execution_result, exec_end_instant) = execution_result?; diff --git a/crates/blockchain/dev/block_producer.rs b/crates/blockchain/dev/block_producer.rs index 328190aa668..a733f26169f 100644 --- a/crates/blockchain/dev/block_producer.rs +++ b/crates/blockchain/dev/block_producer.rs @@ -55,9 +55,12 @@ pub async fn start_block_producer( continue; } }; - let payload_id = fork_choice_response - .payload_id - .expect("Failed to produce block: payload_id is None in ForkChoiceResponse"); + let Some(payload_id) = fork_choice_response.payload_id else { + tracing::error!("Failed to produce block: payload_id is None in ForkChoiceResponse"); + sleep(Duration::from_millis(300)).await; + tries += 1; + continue; + }; // Wait to retrieve the payload. // Note that this makes getPayload failures result in skipped blocks. diff --git a/crates/common/types/block_execution_witness.rs b/crates/common/types/block_execution_witness.rs index 1bb371f8956..28b4f1a913a 100644 --- a/crates/common/types/block_execution_witness.rs +++ b/crates/common/types/block_execution_witness.rs @@ -249,19 +249,12 @@ impl GuestProgramState { if update.removed { // Remove account from trie - self.state_trie - .remove(hashed_address) - .expect("failed to remove from trie"); + self.state_trie.remove(hashed_address)?; } else { // Add or update AccountState in the trie // Fetch current state or create a new state to be inserted - let mut account_state = match self - .state_trie - .get(hashed_address) - .expect("failed to get account state from trie") - { - Some(encoded_state) => AccountState::decode(&encoded_state) - .expect("failed to decode account state"), + let mut account_state = match self.state_trie.get(hashed_address)? { + Some(encoded_state) => AccountState::decode(&encoded_state)?, None => AccountState::default(), }; if update.removed_storage { @@ -291,15 +284,11 @@ impl GuestProgramState { .partition(|(_k, v)| v.is_zero()); for (hashed_key, storage_value) in inserts { - storage_trie - .insert(hashed_key, storage_value.encode_to_vec()) - .expect("failed to insert in trie"); + storage_trie.insert(hashed_key, storage_value.encode_to_vec())?; } for (hashed_key, _) in deletes { - storage_trie - .remove(&hashed_key) - .expect("failed to remove key"); + storage_trie.remove(&hashed_key)?; } let storage_root = storage_trie.hash_no_commit(); @@ -307,8 +296,7 @@ impl GuestProgramState { } self.state_trie - .insert(hashed_address.clone(), account_state.encode_to_vec()) - .expect("failed to insert into storage"); + .insert(hashed_address.clone(), account_state.encode_to_vec())?; } } Ok(()) diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 2f1d8261f04..54458608310 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -102,12 +102,14 @@ pub enum NetworkError { DiscoveryServerError(#[from] DiscoveryServerError), #[error("Failed to start Tx Broadcaster: {0}")] TxBroadcasterError(#[from] TxBroadcasterError), + #[error("Failed to bind UDP socket: {0}")] + UdpSocketError(std::io::Error), } pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result<(), NetworkError> { let udp_socket = UdpSocket::bind(context.local_node.udp_addr()) .await - .expect("Failed to bind udp socket"); + .map_err(NetworkError::UdpSocketError)?; DiscoveryServer::spawn( context.storage.clone(), diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 4ad64c9ac0d..82acf153159 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -221,8 +221,10 @@ pub enum SyncError { NoBlockHeaders, #[error("Peer handler error: {0}")] PeerHandler(#[from] PeerHandlerError), - #[error("Corrupt Path")] - CorruptPath, + #[error("Parent not found in healing queue. Parent: {0}, path: {1}")] + HealingQueueInconsistency(String, String), + #[error("Filesystem error: {0}")] + FileSystem(String), #[error("Sorted Trie Generation Error: {0}")] TrieGenerationError(#[from] TrieGenerationError), #[error("Failed to get account temp db directory: {0}")] @@ -255,7 +257,7 @@ impl SyncError { | SyncError::DifferentStateRoots(_, _, _) | SyncError::NoBlockHeaders | SyncError::PeerHandler(_) - | SyncError::CorruptPath + | SyncError::HealingQueueInconsistency(_, _) | SyncError::TrieGenerationError(_) | SyncError::AccountTempDBDirNotFound(_) | SyncError::StorageTempDBDirNotFound(_) @@ -264,7 +266,8 @@ impl SyncError { | SyncError::NoLatestCanonical | SyncError::PeerTableError(_) | SyncError::MissingFullsyncBatch - | SyncError::Snap(_) => false, + | SyncError::Snap(_) + | SyncError::FileSystem(_) => false, SyncError::Chain(_) | SyncError::Store(_) | SyncError::Send(_) diff --git a/crates/networking/p2p/sync/healing/state.rs b/crates/networking/p2p/sync/healing/state.rs index 3c6e0766440..576960cc143 100644 --- a/crates/networking/p2p/sync/healing/state.rs +++ b/crates/networking/p2p/sync/healing/state.rs @@ -284,13 +284,12 @@ async fn heal_state_trie( let to_write = std::mem::take(&mut nodes_to_write); let store = store.clone(); // NOTE: we keep only a single task in the background to avoid out of order deletes - if !db_joinset.is_empty() { - db_joinset - .join_next() - .await - .expect("we just checked joinset is not empty")?; + if !db_joinset.is_empty() + && let Some(result) = db_joinset.join_next().await + { + result??; } - db_joinset.spawn_blocking(move || { + db_joinset.spawn_blocking(move || -> Result<(), SyncError> { let mut encoded_to_write = BTreeMap::new(); for (path, node) in to_write { for i in 0..path.len() { @@ -298,20 +297,20 @@ async fn heal_state_trie( } encoded_to_write.insert(path, node.encode_to_vec()); } - let trie_db = store - .open_direct_state_trie(*EMPTY_TRIE_HASH) - .expect("Store should open"); + let trie_db = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?; let db = trie_db.db(); // PERF: use put_batch_no_alloc (note that it needs to remove nodes too) - db.put_batch(encoded_to_write.into_iter().collect()) - .expect("The put batch on the store failed"); + db.put_batch(encoded_to_write.into_iter().collect())?; + Ok(()) }); } // End loop if we have no more paths to fetch nor nodes to heal and no inflight tasks if is_done { debug!("Nothing more to heal found"); - db_joinset.join_all().await; + for result in db_joinset.join_all().await { + result?; + } break; } @@ -323,7 +322,9 @@ async fn heal_state_trie( if is_stale && nodes_to_heal.is_empty() && inflight_tasks == 0 { debug!("Finished inflight tasks"); - db_joinset.join_all().await; + for result in db_joinset.join_all().await { + result?; + } break; } } @@ -357,7 +358,7 @@ fn heal_state_batch( &path.parent_path, healing_queue, nodes_to_write, - ); + )?; } else { let entry = HealingQueueEntry { node: node.clone(), @@ -376,16 +377,16 @@ fn commit_node( parent_path: &Nibbles, healing_queue: &mut StateHealingQueue, nodes_to_write: &mut Vec<(Nibbles, Node)>, -) { +) -> Result<(), SyncError> { nodes_to_write.push((path.clone(), node)); if parent_path == path { - return; // Case where we're saving the root + return Ok(()); // Case where we're saving the root } - let mut healing_queue_entry = healing_queue.remove(parent_path).unwrap_or_else(|| { - panic!("The parent should exist. Parent: {parent_path:?}, path: {path:?}") - }); + let mut healing_queue_entry = healing_queue.remove(parent_path).ok_or_else(|| { + SyncError::HealingQueueInconsistency(format!("{parent_path:?}"), format!("{path:?}")) + })?; healing_queue_entry.pending_children_count -= 1; if healing_queue_entry.pending_children_count == 0 { @@ -395,10 +396,11 @@ fn commit_node( &healing_queue_entry.parent_path, healing_queue, nodes_to_write, - ); + )?; } else { healing_queue.insert(parent_path.clone(), healing_queue_entry); } + Ok(()) } /// Returns the partial paths to the node's children if they are not already part of the trie state diff --git a/crates/networking/p2p/sync/healing/storage.rs b/crates/networking/p2p/sync/healing/storage.rs index 4882ea01c7e..dc9e42c709f 100644 --- a/crates/networking/p2p/sync/healing/storage.rs +++ b/crates/networking/p2p/sync/healing/storage.rs @@ -34,7 +34,7 @@ use tokio::{ sync::mpsc::{Sender, error::TrySendError}, task::yield_now, }; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; /// This struct stores the metadata we need when we request a node #[derive(Debug, Clone)] @@ -449,7 +449,19 @@ async fn zip_requeue_node_responses_score_peer( } if request.requests.len() < nodes_size { - panic!("The node responded with more data than us!"); + warn!( + peer = ?request.peer_id, + requested = request.requests.len(), + received = nodes_size, + "Peer responded with more trie nodes than requested" + ); + *failed_downloads += 1; + peer_handler + .peer_table + .record_failure(&request.peer_id) + .await?; + download_queue.extend(request.requests); + return Ok(None); } if let Ok(nodes) = request diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 5d47db58f89..78376a716b9 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -280,7 +280,11 @@ pub async fn snap_sync( let account_storages_snapshots_dir = get_account_storages_snapshots_dir(datadir); let code_hashes_snapshot_dir = get_code_hashes_snapshots_dir(datadir); - std::fs::create_dir_all(&code_hashes_snapshot_dir).map_err(|_| SyncError::CorruptPath)?; + std::fs::create_dir_all(&code_hashes_snapshot_dir).map_err(|e| { + SyncError::FileSystem(format!( + "Failed to create {code_hashes_snapshot_dir:?}: {e}" + )) + })?; // Create collector to store code hashes in files let mut code_hash_collector: CodeHashCollector = @@ -499,7 +503,8 @@ pub async fn snap_sync( for entry in std::fs::read_dir(&code_hashes_dir) .map_err(|_| SyncError::CodeHashesSnapshotsDirNotFound)? { - let entry = entry.map_err(|_| SyncError::CorruptPath)?; + let entry = + entry.map_err(|e| SyncError::FileSystem(format!("Failed to read dir entry: {e}")))?; let snapshot_contents = std::fs::read(entry.path()) .map_err(|err| SyncError::SnapshotReadError(entry.path(), err))?; let code_hashes: Vec = RLPDecode::decode(&snapshot_contents) diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 2632dcb1fcc..6eec9a989bf 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1539,8 +1539,8 @@ impl Store { let file = std::fs::File::open(genesis_path) .map_err(|error| StoreError::Custom(format!("Failed to open genesis file: {error}")))?; let reader = std::io::BufReader::new(file); - let genesis: Genesis = - serde_json::from_reader(reader).expect("Failed to deserialize genesis file"); + let genesis: Genesis = serde_json::from_reader(reader) + .map_err(|e| StoreError::Custom(format!("Failed to deserialize genesis file: {e}")))?; let mut store = Self::new(store_path, engine_type)?; store.add_initial_state(genesis).await?; Ok(store)