diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index a2b5af8b086..fbbd179b5bf 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -93,6 +93,14 @@ pub static BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME: LazyLock> &["source"], ) }); +pub static BEACON_RPC_COLUMNS_BY_ROOT_RESPONSE_OUTCOME: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "beacon_rpc_columns_by_root_response_outcome", + "Total number of outcomes per requested (block_root, index) in a RPC data_columns_by_root response", + &["outcome"], + ) + }); /* * Gossip processor diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 9ddba86b81d..c76420bfb63 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -3,7 +3,7 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; -use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; +use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped}; use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, @@ -19,7 +19,7 @@ use lighthouse_tracing::{ }; use methods::LightClientUpdatesByRangeRequest; use slot_clock::SlotClock; -use std::collections::{HashMap, hash_map::Entry}; +use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::sync::Arc; use tokio_stream::StreamExt; use tracing::{Span, debug, error, field, instrument, warn}; @@ -411,29 +411,114 @@ impl NetworkBeaconProcessor { inbound_request_id: InboundRequestId, request: DataColumnsByRootRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { - let mut send_data_column_count = 0; - // Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of. - let available_columns = self.chain.custody_columns_for_epoch(None); + // block known, block has data, we custody the requested index, and the column is found in the store + let mut sent_data_columns = 0; + // requested a block root not found in our caches or the store + let mut requested_unknown_block_root = 0; + // requested a known block, but it has 0 blobs + let mut requested_block_without_data = 0; + // requested known block with data that we custod, but columns not in store missing_columns + let mut requested_missing_columns = 0; + // requested known block, columns not found in store, but block is not imported yet + let mut requested_pending_block = 0; + // requested columns not found in store, and their epoch is outside the PeerDAS da_window + let mut requested_outside_da_check = 0; + // requested column index we don't custody at the block's epoch + let mut requested_we_dont_custody = 0; + let mut requested_indices_we_dont_custody = HashSet::new(); for data_column_ids_by_root in request.data_column_ids.as_slice() { - let indices_to_retrieve = data_column_ids_by_root - .columns + let block_root = data_column_ids_by_root.block_root; + + // For debugging purposes check if the block of this data column request is known and + // has data. + let (block_slot, block_has_data, block_imported) = if let Some(block) = + match self.chain.get_block_process_status(&block_root) { + BlockProcessStatus::Unknown => None, + BlockProcessStatus::NotValidated(block) + | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), + } { + (block.slot(), block.has_data(), false) + } else if let Some(block) = self + .chain + .get_blinded_block(&block_root) + .map_err(|_| (RpcErrorResponse::ServerError, "Error fetching column block"))? + { + (block.slot(), block.has_data(), true) + } else { + // Peer requested columns for an unknown block + requested_unknown_block_root += 1; + // Okay to not even try to fetch data_columns and continue to the next block_root. + // We remove the block from the processing cache AFTER completing import. So there is + // never an instant where the block is known, not pruned and missing from both the + // caches and the store. Therefore, it's race condition safe to check the caches + // first, and then check the store. + continue; + }; + + if !block_has_data { + requested_block_without_data += 1; + // If the block has zero KZG commitments there will never exist data columns for it. + continue; + } + + let block_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch()); + // TODO(peerdas): Is it correct to lookup the epoch of the block? + // Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of. + let columns_we_custody = self.chain.custody_columns_for_epoch(Some(block_epoch)); + + let requested_columns = data_column_ids_by_root.columns.as_ref(); + let requested_columns_we_custody = requested_columns .iter() .copied() - .filter(|c| available_columns.contains(c)) + .filter(|index| { + if columns_we_custody.contains(index) { + true + } else { + // Peer requested a column we don't custody + requested_we_dont_custody += 1; + requested_indices_we_dont_custody.insert(*index); + false + } + }) .collect::>(); - match self.chain.get_data_columns_checking_all_caches( - data_column_ids_by_root.block_root, - &indices_to_retrieve, - ) { + match self + .chain + .get_data_columns_checking_all_caches(block_root, &requested_columns_we_custody) + { Ok(data_columns) => { - send_data_column_count += data_columns.len(); - for data_column in data_columns { - self.send_response( - peer_id, - inbound_request_id, - Response::DataColumnsByRoot(Some(data_column)), - ); + for index in requested_columns_we_custody { + if let Some(data_column) = data_columns.iter().find(|c| c.index == index) { + self.send_response( + peer_id, + inbound_request_id, + Response::DataColumnsByRoot(Some(data_column.clone())), + ); + sent_data_columns += 1; + } else { + // Peer requested a column we custody, but we don't have it in our + // caches or our database + if block_imported { + if self + .chain + .data_availability_checker + .data_columns_required_for_epoch(block_epoch) + { + // If the block is imported (check done **before** attempting to + // read columns from the store) the columns should be there. + requested_missing_columns += 1; + } else { + // Unless it's outside the da_window of PeerDAS + requested_outside_da_check += 1; + } + } else { + // We should custody this column but the block is not imported yet. We + // may or may not have the columns at this point. We are allowed to + // return empty, as we have not sent any signal that we have + // imported the columns + requested_pending_block += 1; + } + } } } Err(e) => { @@ -452,10 +537,33 @@ impl NetworkBeaconProcessor { debug!( %peer_id, request = ?request.data_column_ids, - returned = send_data_column_count, + returned = sent_data_columns, + requested_unknown_block_root, + requested_block_without_data, + requested_missing_columns, + requested_pending_block, + requested_outside_da_check, + requested_we_dont_custody, + ?requested_indices_we_dont_custody, "Received DataColumnsByRoot Request" ); + for (outcome, count) in [ + ("sent_data_columns", sent_data_columns), + ("requested_unknown_block_root", requested_unknown_block_root), + ("requested_block_without_data", requested_block_without_data), + ("requested_missing_columns", requested_missing_columns), + ("requested_pending_block", requested_pending_block), + ("requested_outside_da_check", requested_outside_da_check), + ("requested_we_dont_custody", requested_we_dont_custody), + ] { + metrics::inc_counter_vec_by( + &metrics::BEACON_RPC_COLUMNS_BY_ROOT_RESPONSE_OUTCOME, + &[outcome], + count, + ); + } + Ok(()) } diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 979b91e30d5..ce876405d3a 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -332,6 +332,10 @@ impl> SignedBeaconBlock .unwrap_or(0) } + pub fn has_data(&self) -> bool { + self.num_expected_blobs() > 0 + } + /// Used for displaying commitments in logs. pub fn commitments_formatted(&self) -> String { let Ok(commitments) = self.message().body().blob_kzg_commitments() else {