Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ pub static BEACON_PROCESSOR_GET_BLOCK_ROOTS_TIME: LazyLock<Result<HistogramVec>>
&["source"],
)
});
pub static BEACON_RPC_COLUMNS_BY_ROOT_RESPONSE_OUTCOME: LazyLock<Result<IntCounterVec>> =
LazyLock::new(|| {
try_create_int_counter_vec(
"beacon_rpc_columns_by_root_response_outcome",
"Total number of outcomes per requested (block_root, index) in a RPC data_columns_by_root response",
&["outcome"],
)
});

/*
* Gossip processor
Expand Down
148 changes: 128 additions & 20 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped};
use itertools::{Itertools, process_results};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
Expand All @@ -19,7 +19,7 @@ use lighthouse_tracing::{
};
use methods::LightClientUpdatesByRangeRequest;
use slot_clock::SlotClock;
use std::collections::{HashMap, hash_map::Entry};
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::sync::Arc;
use tokio_stream::StreamExt;
use tracing::{Span, debug, error, field, instrument, warn};
Expand Down Expand Up @@ -411,29 +411,114 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
inbound_request_id: InboundRequestId,
request: DataColumnsByRootRequest<T::EthSpec>,
) -> Result<(), (RpcErrorResponse, &'static str)> {
let mut send_data_column_count = 0;
// Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of.
let available_columns = self.chain.custody_columns_for_epoch(None);
// block known, block has data, we custody the requested index, and the column is found in the store
let mut sent_data_columns = 0;
// requested a block root not found in our caches or the store
let mut requested_unknown_block_root = 0;
// requested a known block, but it has 0 blobs
let mut requested_block_without_data = 0;
// requested known block with data that we custod, but columns not in store missing_columns
let mut requested_missing_columns = 0;
// requested known block, columns not found in store, but block is not imported yet
let mut requested_pending_block = 0;
// requested columns not found in store, and their epoch is outside the PeerDAS da_window
let mut requested_outside_da_check = 0;
// requested column index we don't custody at the block's epoch
let mut requested_we_dont_custody = 0;
let mut requested_indices_we_dont_custody = HashSet::new();

for data_column_ids_by_root in request.data_column_ids.as_slice() {
let indices_to_retrieve = data_column_ids_by_root
.columns
let block_root = data_column_ids_by_root.block_root;

// For debugging purposes check if the block of this data column request is known and
// has data.
let (block_slot, block_has_data, block_imported) = if let Some(block) =
match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
} {
(block.slot(), block.has_data(), false)
} else if let Some(block) = self
.chain
.get_blinded_block(&block_root)
.map_err(|_| (RpcErrorResponse::ServerError, "Error fetching column block"))?
{
(block.slot(), block.has_data(), true)
} else {
// Peer requested columns for an unknown block
requested_unknown_block_root += 1;
// Okay to not even try to fetch data_columns and continue to the next block_root.
// We remove the block from the processing cache AFTER completing import. So there is
// never an instant where the block is known, not pruned and missing from both the
// caches and the store. Therefore, it's race condition safe to check the caches
// first, and then check the store.
continue;
};

if !block_has_data {
requested_block_without_data += 1;
// If the block has zero KZG commitments there will never exist data columns for it.
continue;
}

let block_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch());
// TODO(peerdas): Is it correct to lookup the epoch of the block?
// Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of.
let columns_we_custody = self.chain.custody_columns_for_epoch(Some(block_epoch));

let requested_columns = data_column_ids_by_root.columns.as_ref();
let requested_columns_we_custody = requested_columns
.iter()
.copied()
.filter(|c| available_columns.contains(c))
.filter(|index| {
if columns_we_custody.contains(index) {
true
} else {
// Peer requested a column we don't custody
requested_we_dont_custody += 1;
requested_indices_we_dont_custody.insert(*index);
false
}
})
.collect::<Vec<_>>();
match self.chain.get_data_columns_checking_all_caches(
data_column_ids_by_root.block_root,
&indices_to_retrieve,
) {
match self
.chain
.get_data_columns_checking_all_caches(block_root, &requested_columns_we_custody)
{
Ok(data_columns) => {
send_data_column_count += data_columns.len();
for data_column in data_columns {
self.send_response(
peer_id,
inbound_request_id,
Response::DataColumnsByRoot(Some(data_column)),
);
for index in requested_columns_we_custody {
if let Some(data_column) = data_columns.iter().find(|c| c.index == index) {
self.send_response(
peer_id,
inbound_request_id,
Response::DataColumnsByRoot(Some(data_column.clone())),
);
sent_data_columns += 1;
} else {
// Peer requested a column we custody, but we don't have it in our
// caches or our database
if block_imported {
if self
.chain
.data_availability_checker
.data_columns_required_for_epoch(block_epoch)
{
// If the block is imported (check done **before** attempting to
// read columns from the store) the columns should be there.
requested_missing_columns += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be useful to record the root and the column index for this case for debugging

} else {
// Unless it's outside the da_window of PeerDAS
requested_outside_da_check += 1;
}
} else {
// We should custody this column but the block is not imported yet. We
// may or may not have the columns at this point. We are allowed to
// return empty, as we have not sent any signal that we have
// imported the columns
requested_pending_block += 1;
}
}
}
}
Err(e) => {
Expand All @@ -452,10 +537,33 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
debug!(
%peer_id,
request = ?request.data_column_ids,
returned = send_data_column_count,
returned = sent_data_columns,
requested_unknown_block_root,
requested_block_without_data,
requested_missing_columns,
requested_pending_block,
requested_outside_da_check,
requested_we_dont_custody,
?requested_indices_we_dont_custody,
"Received DataColumnsByRoot Request"
);

for (outcome, count) in [
("sent_data_columns", sent_data_columns),
("requested_unknown_block_root", requested_unknown_block_root),
("requested_block_without_data", requested_block_without_data),
("requested_missing_columns", requested_missing_columns),
("requested_pending_block", requested_pending_block),
("requested_outside_da_check", requested_outside_da_check),
("requested_we_dont_custody", requested_we_dont_custody),
] {
metrics::inc_counter_vec_by(
&metrics::BEACON_RPC_COLUMNS_BY_ROOT_RESPONSE_OUTCOME,
&[outcome],
count,
);
}

Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions consensus/types/src/signed_beacon_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ impl<E: EthSpec, Payload: AbstractExecPayload<E>> SignedBeaconBlock<E, Payload>
.unwrap_or(0)
}

pub fn has_data(&self) -> bool {
self.num_expected_blobs() > 0
}

/// Used for displaying commitments in logs.
pub fn commitments_formatted(&self) -> String {
let Ok(commitments) = self.message().body().blob_kzg_commitments() else {
Expand Down
Loading