diff --git a/CLAUDE.md b/CLAUDE.md index df47a66d119..53a4433747b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -293,6 +293,7 @@ async fn process_block(&self, block: Block) -> Result<(), Error> { ## Build and Development Notes - Full builds and tests take 5+ minutes - use large timeouts (300s+) for any `cargo build`, `cargo test`, or `make` commands - Use `cargo check` for faster iteration during development and always run after code changes +- Use `cargo fmt --all && make lint-fix` to format code and fix linting issues once a task is complete - Prefer targeted package tests (`cargo test -p `) and individual tests over full test suite when debugging specific issues - Always understand the broader codebase patterns before making changes - Minimum Supported Rust Version (MSRV) is documented in `lighthouse/Cargo.toml` - ensure Rust version meets or exceeds this requirement \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2e525274783..6174c3df0f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -887,6 +887,7 @@ dependencies = [ "int_to_bytes", "itertools 0.10.5", "kzg", + "lighthouse_tracing", "lighthouse_version", "logging", "lru", @@ -5663,6 +5664,7 @@ dependencies = [ "futures", "initialized_validators", "lighthouse_network", + "lighthouse_tracing", "lighthouse_version", "logging", "malloc_utils", @@ -5745,6 +5747,10 @@ dependencies = [ "unused_port", ] +[[package]] +name = "lighthouse_tracing" +version = "0.1.0" + [[package]] name = "lighthouse_validator_store" version = "0.1.0" @@ -6412,6 +6418,7 @@ dependencies = [ "kzg", "libp2p-gossipsub", "lighthouse_network", + "lighthouse_tracing", "logging", "lru_cache", "matches", diff --git a/Cargo.toml b/Cargo.toml index a6ed9fc1b16..413e0faf891 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "beacon_node/http_api", "beacon_node/http_metrics", "beacon_node/lighthouse_network", + "beacon_node/lighthouse_tracing", "beacon_node/network", "beacon_node/operation_pool", "beacon_node/store", @@ -173,6 +174,7 @@ itertools = "0.10" kzg = { path = "crypto/kzg" } libsecp256k1 = "0.7" lighthouse_network = { path = "beacon_node/lighthouse_network" } +lighthouse_tracing = { path = "beacon_node/lighthouse_tracing" } lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" } lighthouse_version = { path = "common/lighthouse_version" } lockfile = { path = "common/lockfile" } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 575bc8ad904..dca351cbac6 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -33,6 +33,7 @@ hex = { workspace = true } int_to_bytes = { workspace = true } itertools = { workspace = true } kzg = { workspace = true } +lighthouse_tracing = { workspace = true } lighthouse_version = { workspace = true } logging = { workspace = true } lru = { workspace = true } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 6056fac2b81..1d6e050f7e3 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -826,7 +826,7 @@ impl GossipVerifiedBlock { /// on the p2p network. /// /// Returns an error if the block is invalid, or if the block was unable to be verified. - #[instrument(name = "verify_gossip_block", skip_all)] + #[instrument(name = "verify_gossip_block", skip_all, fields(block_root = tracing::field::Empty))] pub fn new( block: Arc>, chain: &BeaconChain, @@ -1227,27 +1227,20 @@ impl SignatureVerifiedBlock { signature_verifier .include_all_signatures_except_proposal(block.as_ref(), &mut consensus_context)?; - let sig_verify_span = info_span!("signature_verify", result = "started").entered(); - let result = signature_verifier.verify(); + let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify()); match result { - Ok(_) => { - sig_verify_span.record("result", "ok"); - Ok(Self { - block: MaybeAvailableBlock::AvailabilityPending { - block_root: from.block_root, - block, - }, + Ok(_) => Ok(Self { + block: MaybeAvailableBlock::AvailabilityPending { block_root: from.block_root, - parent: Some(parent), - consensus_context, - }) - } - Err(_) => { - sig_verify_span.record("result", "fail"); - Err(BlockError::InvalidSignature( - InvalidSignature::BlockBodySignatures, - )) - } + block, + }, + block_root: from.block_root, + parent: Some(parent), + consensus_context, + }), + Err(_) => Err(BlockError::InvalidSignature( + InvalidSignature::BlockBodySignatures, + )), } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 019849e7243..3c1b4e8b165 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -9,6 +9,7 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use lighthouse_tracing::SPAN_PENDING_COMPONENTS; use lru::LruCache; use parking_lot::RwLock; use std::cmp::Ordering; @@ -288,7 +289,7 @@ impl PendingComponents { /// Returns an empty `PendingComponents` object with the given block root. pub fn empty(block_root: Hash256, max_len: usize) -> Self { - let span = debug_span!(parent: None, "pending_components", %block_root); + let span = debug_span!(parent: None, SPAN_PENDING_COMPONENTS, %block_root); let _guard = span.clone().entered(); Self { block_root, diff --git a/beacon_node/lighthouse_tracing/Cargo.toml b/beacon_node/lighthouse_tracing/Cargo.toml new file mode 100644 index 00000000000..cd71c202531 --- /dev/null +++ b/beacon_node/lighthouse_tracing/Cargo.toml @@ -0,0 +1,4 @@ +[package] +name = "lighthouse_tracing" +version = "0.1.0" +edition = { workspace = true } diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs new file mode 100644 index 00000000000..a69428d5bda --- /dev/null +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -0,0 +1,61 @@ +//! This module contains root span identifiers for key code paths in the beacon node. +//! +//! TODO: These span identifiers will be used to implement selective tracing export (to be implemented), +//! where only the listed root spans and their descendants will be exported to the tracing backend. + +/// Data Availability checker span identifiers +pub const SPAN_PENDING_COMPONENTS: &str = "pending_components"; + +/// Gossip methods root spans +pub const SPAN_PROCESS_GOSSIP_DATA_COLUMN: &str = "process_gossip_data_column"; +pub const SPAN_PROCESS_GOSSIP_BLOB: &str = "process_gossip_blob"; +pub const SPAN_PROCESS_GOSSIP_BLOCK: &str = "process_gossip_block"; + +/// Sync methods root spans +pub const SPAN_SYNCING_CHAIN: &str = "syncing_chain"; +pub const SPAN_OUTGOING_RANGE_REQUEST: &str = "outgoing_range_request"; +pub const SPAN_OUTGOING_CUSTODY_REQUEST: &str = "outgoing_custody_request"; +pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block"; +pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs"; +pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns"; +pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment"; + +/// RPC methods root spans +pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request"; +pub const SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST: &str = "handle_blobs_by_range_request"; +pub const SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST: &str = "handle_data_columns_by_range_request"; +pub const SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST: &str = "handle_blocks_by_root_request"; +pub const SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST: &str = "handle_blobs_by_root_request"; +pub const SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST: &str = "handle_data_columns_by_root_request"; +pub const SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE: &str = "handle_light_client_updates_by_range"; +pub const SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP: &str = "handle_light_client_bootstrap"; +pub const SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = + "handle_light_client_optimistic_update"; +pub const SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE: &str = "handle_light_client_finality_update"; + +/// List of all root span names that are allowed to be exported to the tracing backend. +/// Only these spans and their descendants will be processed to reduce noise from +/// uninstrumented code paths. New root spans must be added to this list to be traced. +pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[ + SPAN_SYNCING_CHAIN, + SPAN_PENDING_COMPONENTS, + SPAN_PROCESS_GOSSIP_DATA_COLUMN, + SPAN_PROCESS_GOSSIP_BLOB, + SPAN_PROCESS_GOSSIP_BLOCK, + SPAN_OUTGOING_RANGE_REQUEST, + SPAN_OUTGOING_CUSTODY_REQUEST, + SPAN_PROCESS_RPC_BLOCK, + SPAN_PROCESS_RPC_BLOBS, + SPAN_PROCESS_RPC_CUSTODY_COLUMNS, + SPAN_PROCESS_CHAIN_SEGMENT, + SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, + SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, + SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, + SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST, + SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST, + SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST, + SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE, + SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, + SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, + SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE, +]; diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index dc251bd2d63..5615148648d 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -28,6 +28,7 @@ hex = { workspace = true } igd-next = { version = "0.16", features = ["aio_tokio"] } itertools = { workspace = true } lighthouse_network = { workspace = true } +lighthouse_tracing = { workspace = true } logging = { workspace = true } lru_cache = { workspace = true } metrics = { workspace = true } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 7ccbab19deb..7d26b42c339 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -21,6 +21,9 @@ use beacon_chain::{ }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; +use lighthouse_tracing::{ + SPAN_PROCESS_GOSSIP_BLOB, SPAN_PROCESS_GOSSIP_BLOCK, SPAN_PROCESS_GOSSIP_DATA_COLUMN, +}; use logging::crit; use operation_pool::ReceivedPreCapella; use slot_clock::SlotClock; @@ -602,7 +605,13 @@ impl NetworkBeaconProcessor { } } - #[instrument(skip_all, level = "trace", fields(slot = ?column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index), parent = None)] + #[instrument( + name = SPAN_PROCESS_GOSSIP_DATA_COLUMN, + parent = None, + level = "debug", + skip_all, + fields(slot = ?column_sidecar.slot(), block_root = ?column_sidecar.block_root(), index = column_sidecar.index), + )] pub async fn process_gossip_data_column_sidecar( self: &Arc, message_id: MessageId, @@ -760,7 +769,16 @@ impl NetworkBeaconProcessor { } #[allow(clippy::too_many_arguments)] - #[instrument(skip_all, level = "trace", fields(slot = ?blob_sidecar.slot(), block_root = ?blob_sidecar.block_root(), index = blob_sidecar.index), parent = None)] + #[instrument( + name = SPAN_PROCESS_GOSSIP_BLOB, + parent = None, + level = "debug", + skip_all, + fields( + slot = ?blob_sidecar.slot(), + block_root = ?blob_sidecar.block_root(), + index = blob_sidecar.index), + )] pub async fn process_gossip_blob( self: &Arc, message_id: MessageId, @@ -1098,7 +1116,13 @@ impl NetworkBeaconProcessor { /// /// Raises a log if there are errors. #[allow(clippy::too_many_arguments)] - #[instrument(skip_all, fields(block_root = tracing::field::Empty), parent = None)] + #[instrument( + name = SPAN_PROCESS_GOSSIP_BLOCK, + parent = None, + level = "debug", + skip_all, + fields(block_root = tracing::field::Empty), + )] pub async fn process_gossip_block( self: Arc, message_id: MessageId, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index bdfcc1d8a10..85e4f046410 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -10,14 +10,21 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; +use lighthouse_tracing::{ + SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST, + SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST, + SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST, + SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE, + SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE, +}; use methods::LightClientUpdatesByRangeRequest; use slot_clock::SlotClock; use std::collections::{HashMap, hash_map::Entry}; use std::sync::Arc; use tokio_stream::StreamExt; -use tracing::{debug, error, instrument, warn}; +use tracing::{Span, debug, error, field, instrument, warn}; use types::blob_sidecar::BlobIdentifier; -use types::{Epoch, EthSpec, Hash256, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -155,13 +162,22 @@ impl NetworkBeaconProcessor { } /// Handle a `BlocksByRoot` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_BLOCKS_BY_ROOT_REQUEST, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub async fn handle_blocks_by_root_request( self: Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, request: BlocksByRootRequest, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -246,13 +262,22 @@ impl NetworkBeaconProcessor { } /// Handle a `BlobsByRoot` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_BLOBS_BY_ROOT_REQUEST, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub fn handle_blobs_by_root_request( self: Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, request: BlobsByRootRequest, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -341,13 +366,36 @@ impl NetworkBeaconProcessor { } /// Handle a `DataColumnsByRoot` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_DATA_COLUMNS_BY_ROOT_REQUEST, + parent = None, + level = "debug", + skip_all, + fields( + peer_id = %peer_id, + client = tracing::field::Empty, + non_custody_indices = tracing::field::Empty, + ) + )] pub fn handle_data_columns_by_root_request( self: Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, request: DataColumnsByRootRequest, ) { + let requested_columns = request + .data_column_ids + .iter() + .flat_map(|id| id.columns.clone()) + .unique() + .collect::>(); + self.record_data_column_request_in_span( + &peer_id, + &requested_columns, + None, + Span::current(), + ); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -411,13 +459,22 @@ impl NetworkBeaconProcessor { Ok(()) } - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_LIGHT_CLIENT_UPDATES_BY_RANGE, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub fn handle_light_client_updates_by_range( self: &Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, request: LightClientUpdatesByRangeRequest, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -503,13 +560,22 @@ impl NetworkBeaconProcessor { } /// Handle a `LightClientBootstrap` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_LIGHT_CLIENT_BOOTSTRAP, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub fn handle_light_client_bootstrap( self: &Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, request: LightClientBootstrapRequest, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_single_item( peer_id, inbound_request_id, @@ -534,12 +600,21 @@ impl NetworkBeaconProcessor { } /// Handle a `LightClientOptimisticUpdate` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_LIGHT_CLIENT_OPTIMISTIC_UPDATE, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub fn handle_light_client_optimistic_update( self: &Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_single_item( peer_id, inbound_request_id, @@ -559,12 +634,21 @@ impl NetworkBeaconProcessor { } /// Handle a `LightClientFinalityUpdate` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_LIGHT_CLIENT_FINALITY_UPDATE, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub fn handle_light_client_finality_update( self: &Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_single_item( peer_id, inbound_request_id, @@ -584,13 +668,22 @@ impl NetworkBeaconProcessor { } /// Handle a `BlocksByRange` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST, + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub async fn handle_blocks_by_range_request( self: Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, req: BlocksByRangeRequest, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -871,13 +964,22 @@ impl NetworkBeaconProcessor { } /// Handle a `BlobsByRange` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST, + parent = None, + skip_all, + level = "debug", + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] pub fn handle_blobs_by_range_request( self: Arc, peer_id: PeerId, inbound_request_id: InboundRequestId, req: BlobsByRangeRequest, ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -999,13 +1101,27 @@ impl NetworkBeaconProcessor { } /// Handle a `DataColumnsByRange` request from the peer. - #[instrument(skip_all, level = "debug")] + #[instrument( + name = SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST, + parent = None, + skip_all, + level = "debug", + fields(peer_id = %peer_id, non_custody_indices = tracing::field::Empty, client = tracing::field::Empty) + )] pub fn handle_data_columns_by_range_request( &self, peer_id: PeerId, inbound_request_id: InboundRequestId, req: DataColumnsByRangeRequest, ) { + let epoch = Slot::new(req.start_slot).epoch(T::EthSpec::slots_per_epoch()); + self.record_data_column_request_in_span( + &peer_id, + &req.columns, + Some(epoch), + Span::current(), + ); + self.terminate_response_stream( peer_id, inbound_request_id, @@ -1181,4 +1297,29 @@ impl NetworkBeaconProcessor { } } } + + fn record_data_column_request_in_span( + &self, + peer_id: &PeerId, + requested_indices: &[ColumnIndex], + epoch_opt: Option, + span: Span, + ) { + let non_custody_indices = { + let custody_columns = self + .chain + .data_availability_checker + .custody_context() + .custody_columns_for_epoch(epoch_opt, &self.chain.spec); + requested_indices + .iter() + .filter(|subnet_id| !custody_columns.contains(subnet_id)) + .collect::>() + }; + // This field is used to identify if peers are sending requests on columns we don't custody. + span.record("non_custody_indices", field::debug(non_custody_indices)); + + let client = self.network_globals.client(peer_id); + span.record("client", field::display(client.kind)); + } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 306a184627e..f24495cc54c 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -18,10 +18,14 @@ use beacon_processor::{ }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; +use lighthouse_tracing::{ + SPAN_PROCESS_CHAIN_SEGMENT, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_BLOCK, + SPAN_PROCESS_RPC_CUSTODY_COLUMNS, +}; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; -use tracing::{Span, debug, error, info, instrument, warn}; +use tracing::{debug, error, info, instrument, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; @@ -97,7 +101,13 @@ impl NetworkBeaconProcessor { /// Attempt to process a block received from a direct RPC request. #[allow(clippy::too_many_arguments)] - #[instrument(skip_all, fields(?block_root), parent = None)] + #[instrument( + name = SPAN_PROCESS_RPC_BLOCK, + parent = None, + level = "debug", + skip_all, + fields(?block_root), + )] pub async fn process_rpc_block( self: Arc>, block_root: Hash256, @@ -244,7 +254,13 @@ impl NetworkBeaconProcessor { } /// Attempt to process a list of blobs received from a direct RPC request. - #[instrument(skip_all, fields(?block_root, outcome = tracing::field::Empty), parent = None)] + #[instrument( + name = SPAN_PROCESS_RPC_BLOBS, + parent = None, + level = "debug", + skip_all, + fields(?block_root), + )] pub async fn process_rpc_blobs( self: Arc>, block_root: Hash256, @@ -293,7 +309,6 @@ impl NetworkBeaconProcessor { match &result { Ok(AvailabilityProcessingStatus::Imported(hash)) => { - Span::current().record("outcome", "imported"); debug!( result = "imported block and blobs", %slot, @@ -303,7 +318,6 @@ impl NetworkBeaconProcessor { self.chain.recompute_head_at_current_slot().await; } Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - Span::current().record("outcome", "missing_components"); debug!( block_hash = %block_root, %slot, @@ -334,7 +348,13 @@ impl NetworkBeaconProcessor { }); } - #[instrument(skip_all, fields(?block_root), parent = None)] + #[instrument( + name = SPAN_PROCESS_RPC_CUSTODY_COLUMNS, + parent = None, + level = "debug", + skip_all, + fields(?block_root), + )] pub async fn process_rpc_custody_columns( self: Arc>, block_root: Hash256, @@ -420,7 +440,13 @@ impl NetworkBeaconProcessor { /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. - #[instrument(skip_all, fields(sync_type = ?sync_type, downloaded_blocks = downloaded_blocks.len(), imported_blocks = tracing::field::Empty), parent = None)] + #[instrument( + name = SPAN_PROCESS_CHAIN_SEGMENT, + parent = None, + level = "debug", + skip_all, + fields(sync_type = ?sync_type, downloaded_blocks = downloaded_blocks.len()) + )] pub async fn process_chain_segment( &self, sync_type: ChainSegmentProcessId, @@ -439,7 +465,6 @@ impl NetworkBeaconProcessor { .await { (imported_blocks, Ok(_)) => { - Span::current().record("imported_blocks", imported_blocks); debug!( batch_epoch = %epoch, first_block_slot = start_slot, @@ -454,7 +479,6 @@ impl NetworkBeaconProcessor { } } (imported_blocks, Err(e)) => { - Span::current().record("imported_blocks", imported_blocks); debug!( batch_epoch = %epoch, first_block_slot = start_slot, @@ -490,7 +514,6 @@ impl NetworkBeaconProcessor { match self.process_backfill_blocks(downloaded_blocks) { (imported_blocks, Ok(_)) => { - Span::current().record("imported_blocks", imported_blocks); debug!( batch_epoch = %epoch, first_block_slot = start_slot, @@ -532,7 +555,7 @@ impl NetworkBeaconProcessor { } /// Helper function to process blocks batches which only consumes the chain and blocks to process. - #[instrument(skip_all, fields(result = tracing::field::Empty))] + #[instrument(skip_all)] async fn process_blocks<'a>( &self, downloaded_blocks: impl Iterator>, @@ -545,7 +568,6 @@ impl NetworkBeaconProcessor { .await { ChainSegmentResult::Successful { imported_blocks } => { - Span::current().record("outcome", "success"); metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; @@ -556,7 +578,6 @@ impl NetworkBeaconProcessor { imported_blocks, error, } => { - Span::current().record("outcome", "failed"); metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); let r = self.handle_failed_chain_segment(error); if !imported_blocks.is_empty() { @@ -568,6 +589,7 @@ impl NetworkBeaconProcessor { } /// Helper function to process backfill block batches which only consumes the chain and blocks to process. + #[instrument(skip_all)] fn process_backfill_blocks( &self, downloaded_blocks: Vec>, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 23598cdd91f..ffc79c1550d 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -8,6 +8,7 @@ use lighthouse_network::{ }, }; use std::{collections::HashMap, sync::Arc}; +use tracing::Span; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, @@ -31,6 +32,8 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + /// Span to track the range request and all children range requests. + pub(crate) request_span: Span, } enum ByRangeRequest { @@ -81,6 +84,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) @@ -102,6 +106,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + request_span, } } @@ -471,6 +476,7 @@ mod tests { }; use rand::SeedableRng; use std::sync::Arc; + use tracing::Span; use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; fn components_id() -> ComponentsByRangeRequestId { @@ -526,7 +532,8 @@ mod tests { .collect::>>>(); let blocks_req_id = blocks_id(components_id()); - let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, None); + let mut info = + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -556,8 +563,12 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let blobs_req_id = blobs_id(components_id); - let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, Some(blobs_req_id), None); + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + Some(blobs_req_id), + None, + Span::none(), + ); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -597,6 +608,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + Span::none(), ); // Send blocks and complete terminate response info.add_blocks( @@ -656,6 +668,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + Span::none(), ); let mut rng = XorShiftRng::from_seed([42; 16]); @@ -735,6 +748,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + Span::none(), ); // AND: All blocks are received successfully @@ -814,6 +828,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + Span::none(), ); // AND: All blocks are received @@ -895,6 +910,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + Span::none(), ); // AND: All blocks are received diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c4f58c1f6e1..1f4a14b4bff 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -29,6 +29,7 @@ use lighthouse_network::service::api_types::{ DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; +use lighthouse_tracing::SPAN_OUTGOING_RANGE_REQUEST; use parking_lot::RwLock; pub use requests::LookupVerifyError; use requests::{ @@ -45,7 +46,7 @@ use std::time::Duration; #[cfg(test)] use task_executor::TaskExecutor; use tokio::sync::mpsc; -use tracing::{debug, error, warn}; +use tracing::{Span, debug, debug_span, error, warn}; use types::blob_sidecar::FixedBlobSidecarList; use types::{ BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, @@ -55,6 +56,18 @@ use types::{ pub mod custody; mod requests; +macro_rules! new_range_request_span { + ($self:expr, $name:literal, $parent:expr, $peer_id:expr) => {{ + let client = $self.client_type(&$peer_id).kind; + debug_span!( + parent: $parent, + $name, + peer_id = %$peer_id, + client = %client + ) + }}; +} + /// Max retries for block components after which we fail the batch. pub const MAX_COLUMN_RETRIES: usize = 3; @@ -444,10 +457,16 @@ impl SyncNetworkContext { request: BlocksByRangeRequest, failed_columns: &HashSet, ) -> Result<(), String> { - let Some(requester) = self + let Some((requester, parent_request_span)) = self .components_by_range_requests - .keys() - .find_map(|r| if r.id == id { Some(r.requester) } else { None }) + .iter() + .find_map(|(key, value)| { + if key.id == id { + Some((key.requester, value.request_span.clone())) + } else { + None + } + }) else { return Err("request id not present".to_string()); }; @@ -485,6 +504,12 @@ impl SyncNetworkContext { columns, }, id, + new_range_request_span!( + self, + "outgoing_columns_by_range_retry", + parent_request_span.clone(), + peer_id + ), ) }) .collect::, _>>() @@ -511,6 +536,13 @@ impl SyncNetworkContext { peers: &HashSet, peers_to_deprioritize: &HashSet, ) -> Result { + let range_request_span = debug_span!( + parent: None, + SPAN_OUTGOING_RANGE_REQUEST, + range_req_id = %requester, + peers = peers.len() + ); + let _guard = range_request_span.clone().entered(); let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(block_peer) = peers @@ -561,7 +593,17 @@ impl SyncNetworkContext { requester, }; - let blocks_req_id = self.send_blocks_by_range_request(block_peer, request.clone(), id)?; + let blocks_req_id = self.send_blocks_by_range_request( + block_peer, + request.clone(), + id, + new_range_request_span!( + self, + "outgoing_blocks_by_range", + range_request_span.clone(), + block_peer + ), + )?; let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { Some(self.send_blobs_by_range_request( @@ -571,6 +613,12 @@ impl SyncNetworkContext { count: *request.count(), }, id, + new_range_request_span!( + self, + "outgoing_blobs_by_range", + range_request_span.clone(), + block_peer + ), )?) } else { None @@ -589,6 +637,12 @@ impl SyncNetworkContext { columns, }, id, + new_range_request_span!( + self, + "outgoing_columns_by_range", + range_request_span.clone(), + peer_id + ), ) }) .collect::, _>>() @@ -605,6 +659,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -833,6 +888,8 @@ impl SyncNetworkContext { // block and the peer must have it. true, BlocksByRootRequestItems::new(request), + // Not implemented + Span::none(), ); Ok(LookupRequestResult::RequestSent(id.req_id)) @@ -927,6 +984,8 @@ impl SyncNetworkContext { // have imported the block+blobs. true, BlobsByRootRequestItems::new(request), + // Not implemented + Span::none(), ); Ok(LookupRequestResult::RequestSent(id.req_id)) @@ -970,6 +1029,9 @@ impl SyncNetworkContext { peer_id, expect_max_responses, DataColumnsByRootRequestItems::new(request), + // Span is tracked in `self.custody_columns_by_root_requests` in the + // `ActiveCustodyRequest` struct. + Span::none(), ); Ok(LookupRequestResult::RequestSent(id)) @@ -1065,6 +1127,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlocksByRangeRequest, parent_request_id: ComponentsByRangeRequestId, + request_span: Span, ) -> Result { let id = BlocksByRangeRequestId { id: self.next_id(), @@ -1094,6 +1157,7 @@ impl SyncNetworkContext { // know if there are missed blocks. false, BlocksByRangeRequestItems::new(request), + request_span, ); Ok(id) } @@ -1103,6 +1167,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlobsByRangeRequest, parent_request_id: ComponentsByRangeRequestId, + request_span: Span, ) -> Result { let id = BlobsByRangeRequestId { id: self.next_id(), @@ -1136,6 +1201,7 @@ impl SyncNetworkContext { // know if there are missed blocks. false, BlobsByRangeRequestItems::new(request, max_blobs_per_block), + request_span, ); Ok(id) } @@ -1145,6 +1211,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRangeRequest, parent_request_id: ComponentsByRangeRequestId, + request_span: Span, ) -> Result<(DataColumnsByRangeRequestId, Vec), RpcRequestSendError> { let requested_columns = request.columns.clone(); let id = DataColumnsByRangeRequestId { @@ -1177,6 +1244,7 @@ impl SyncNetworkContext { // know if there are missed blocks. false, DataColumnsByRangeRequestItems::new(request), + request_span, ); Ok((id, requested_columns)) } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 023d98daa60..d973e83cea7 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -6,13 +6,14 @@ use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; +use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST; use lru_cache::LRUTimeCache; use parking_lot::RwLock; use rand::Rng; use std::collections::HashSet; use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; -use tracing::{debug, warn}; +use tracing::{Span, debug, debug_span, field, warn}; use types::{DataColumnSidecar, Hash256, data_column_sidecar::ColumnIndex}; use types::{DataColumnSidecarList, EthSpec}; @@ -34,7 +35,8 @@ pub struct ActiveCustodyRequest { failed_peers: LRUTimeCache, /// Set of peers that claim to have imported this block and their custody columns lookup_peers: Arc>>, - + /// Span for tracing the lifetime of this request. + span: Span, _phantom: PhantomData, } @@ -55,6 +57,8 @@ pub enum Error { struct ActiveBatchColumnsRequest { indices: Vec, + /// Span for tracing the lifetime of this request. + span: Span, } pub type CustodyRequestResult = @@ -67,6 +71,7 @@ impl ActiveCustodyRequest { column_indices: &[ColumnIndex], lookup_peers: Arc>>, ) -> Self { + let span = debug_span!(parent: None, SPAN_OUTGOING_CUSTODY_REQUEST, %block_root); Self { block_root, custody_id, @@ -78,6 +83,7 @@ impl ActiveCustodyRequest { active_batch_columns_requests: <_>::default(), failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)), lookup_peers, + span, _phantom: PhantomData, } } @@ -106,6 +112,8 @@ impl ActiveCustodyRequest { return Ok(None); }; + let _guard = batch_request.span.clone().entered(); + match resp { Ok((data_columns, seen_timestamp)) => { debug!( @@ -163,6 +171,11 @@ impl ActiveCustodyRequest { "Custody column peer claims to not have some data" ); + batch_request.span.record( + "missing_column_indexes", + field::debug(missing_column_indexes), + ); + self.failed_peers.insert(peer_id); } } @@ -183,6 +196,11 @@ impl ActiveCustodyRequest { .on_download_error_and_mark_failure(req_id)?; } + batch_request.span.record( + "missing_column_indexes", + field::debug(&batch_request.indices), + ); + self.failed_peers.insert(peer_id); } }; @@ -194,6 +212,7 @@ impl ActiveCustodyRequest { &mut self, cx: &mut SyncNetworkContext, ) -> CustodyRequestResult { + let _guard = self.span.clone().entered(); if self.column_requests.values().all(|r| r.is_downloaded()) { // All requests have completed successfully. let mut peers = HashMap::>::new(); @@ -298,6 +317,9 @@ impl ActiveCustodyRequest { match request_result { LookupRequestResult::RequestSent(req_id) => { + let client = cx.network_globals().client(&peer_id).kind; + let batch_columns_req_span = debug_span!("batch_columns_req", %peer_id, %client, missing_column_indexes = tracing::field::Empty); + let _guard = batch_columns_req_span.clone().entered(); for column_index in &indices { let column_request = self .column_requests @@ -308,8 +330,13 @@ impl ActiveCustodyRequest { column_request.on_download_start(req_id)?; } - self.active_batch_columns_requests - .insert(req_id, ActiveBatchColumnsRequest { indices }); + self.active_batch_columns_requests.insert( + req_id, + ActiveBatchColumnsRequest { + indices, + span: batch_columns_req_span, + }, + ); } LookupRequestResult::NoRequestNeeded(_) => unreachable!(), LookupRequestResult::Pending(_) => unreachable!(), diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index f42595fb690..3183c06d762 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -4,6 +4,7 @@ use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; +use tracing::Span; use types::{Hash256, Slot}; pub use blobs_by_range::BlobsByRangeRequestItems; @@ -50,6 +51,7 @@ struct ActiveRequest { peer_id: PeerId, // Error if the request terminates before receiving max expected responses expect_max_responses: bool, + span: Span, } enum State { @@ -66,13 +68,22 @@ impl ActiveRequests { } } - pub fn insert(&mut self, id: K, peer_id: PeerId, expect_max_responses: bool, items: T) { + pub fn insert( + &mut self, + id: K, + peer_id: PeerId, + expect_max_responses: bool, + items: T, + span: Span, + ) { + let _guard = span.clone().entered(); self.requests.insert( id, ActiveRequest { state: State::Active(items), peer_id, expect_max_responses, + span, }, ); } @@ -106,6 +117,7 @@ impl ActiveRequests { // `ActiveRequestItems` validates the item before appending to its internal state. RpcEvent::Response(item, seen_timestamp) => { let request = &mut entry.get_mut(); + let _guard = request.span.clone().entered(); match &mut request.state { State::Active(items) => { match items.add(item) { @@ -141,6 +153,7 @@ impl ActiveRequests { // After stream termination we must forget about this request, there will be no more // messages coming from the network let request = entry.remove(); + let _guard = request.span.clone().entered(); match request.state { // Received a stream termination in a valid sequence, consume items State::Active(mut items) => { @@ -162,7 +175,9 @@ impl ActiveRequests { RpcEvent::RPCError(e) => { // After an Error event from the network we must forget about this request as this // may be the last message for this request. - match entry.remove().state { + let request = entry.remove(); + let _guard = request.span.clone().entered(); + match request.state { // Received error while request is still active, propagate error. State::Active(_) => Some(Err(e.into())), // Received error after completing the request, ignore the error. This is okay diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index cdbb9f25883..96319f2efad 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -9,10 +9,11 @@ use beacon_chain::BeaconChainTypes; use beacon_chain::block_verification_types::RpcBlock; use lighthouse_network::service::api_types::Id; use lighthouse_network::{PeerAction, PeerId}; +use lighthouse_tracing::SPAN_SYNCING_CHAIN; use logging::crit; use std::collections::{BTreeMap, HashSet, btree_map::Entry}; use strum::IntoStaticStr; -use tracing::{debug, warn}; +use tracing::{Span, debug, instrument, warn}; use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -111,6 +112,9 @@ pub struct SyncingChain { /// The current processing batch, if any. current_processing_batch: Option, + + /// The span to track the lifecycle of the syncing chain. + span: Span, } #[derive(PartialEq, Debug)] @@ -123,6 +127,13 @@ pub enum ChainSyncingState { impl SyncingChain { #[allow(clippy::too_many_arguments)] + #[instrument( + name = SPAN_SYNCING_CHAIN, + parent = None, + level="debug", + skip(id), + fields(chain_id = %id) + )] pub fn new( id: Id, start_epoch: Epoch, @@ -131,6 +142,7 @@ impl SyncingChain { peer_id: PeerId, chain_type: SyncingChainType, ) -> Self { + let span = Span::current(); SyncingChain { id, chain_type, @@ -145,6 +157,7 @@ impl SyncingChain { attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, + span, } } @@ -186,6 +199,8 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { + let _guard = self.span.clone().entered(); + debug!(peer = %peer_id, "Removing peer from chain"); self.peers.remove(peer_id); if self.peers.is_empty() { @@ -213,6 +228,7 @@ impl SyncingChain { request_id: Id, blocks: Vec>, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { None => { @@ -242,7 +258,14 @@ impl SyncingChain { let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; - debug!(epoch = %batch_id, blocks = received, batch_state = self.visualize_batch_state(), %awaiting_batches,"Batch downloaded"); + debug!( + epoch = %batch_id, + blocks = received, + batch_state = self.visualize_batch_state(), + %awaiting_batches, + %peer_id, + "Batch downloaded" + ); // pre-emptively request more blocks from peers whilst we process current blocks, self.request_batches(network)?; @@ -415,6 +438,7 @@ impl SyncingChain { batch_id: BatchId, result: &BatchProcessResult, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); // the first two cases are possible if the chain advances while waiting for a processing // result let batch_state = self.visualize_batch_state(); @@ -754,6 +778,7 @@ impl SyncingChain { } pub fn stop_syncing(&mut self) { + debug!(parent: &self.span, "Stopping syncing"); self.state = ChainSyncingState::Stopped; } @@ -767,6 +792,12 @@ impl SyncingChain { local_finalized_epoch: Epoch, optimistic_start_epoch: Epoch, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); + debug!( + ?local_finalized_epoch, + ?optimistic_start_epoch, + "Start syncing chain" + ); // to avoid dropping local progress, we advance the chain wrt its batch boundaries. This let align = |epoch| { // start_epoch + (number of batches in between)*length_of_batch @@ -804,6 +835,8 @@ impl SyncingChain { network: &mut SyncNetworkContext, peer_id: PeerId, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); + debug!(peer_id = %peer_id, "Adding peer to chain"); self.peers.insert(peer_id); self.request_batches(network) } @@ -819,6 +852,7 @@ impl SyncingChain { request_id: Id, err: RpcResponseError, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { @@ -911,6 +945,8 @@ impl SyncingChain { network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); + debug!(batch_epoch = %batch_id, "Requesting batch"); let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); @@ -981,7 +1017,7 @@ impl SyncingChain { } /// Retries partial column requests within the batch by creating new requests for the failed columns. - pub fn retry_partial_batch( + fn retry_partial_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, @@ -989,6 +1025,8 @@ impl SyncingChain { failed_columns: HashSet, mut failed_peers: HashSet, ) -> ProcessingResult { + let _guard = self.span.clone().entered(); + debug!(%batch_id, %id, ?failed_columns, "Retrying partial batch"); if let Some(batch) = self.batches.get_mut(&batch_id) { failed_peers.extend(&batch.failed_peers()); let req = batch.to_blocks_by_range_request().0; @@ -1037,6 +1075,8 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, ) -> Result { + let _guard = self.span.clone().entered(); + debug!("Resuming chain"); // Request more batches if needed. self.request_batches(network)?; // If there is any batch ready for processing, send it. diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index c1a38f7936a..849d30bcf2d 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -53,6 +53,7 @@ environment = { workspace = true } eth2_network_config = { workspace = true } ethereum_hashing = { workspace = true } futures = { workspace = true } +lighthouse_tracing = { workspace = true } lighthouse_version = { workspace = true } logging = { workspace = true } metrics = { workspace = true }