Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 4 additions & 39 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ use store::{
};
use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::ColumnIndex;
Expand Down Expand Up @@ -2838,12 +2838,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
let filter_chain_segment = debug_span!("filter_chain_segment");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if there's a parent span here, might need to keep the debug_span!

let filtered_chain_segment_future = self.spawn_blocking_handle(
move || {
let _guard = filter_chain_segment.enter();
chain.filter_chain_segment(chain_segment)
},
move || chain.filter_chain_segment(chain_segment),
"filter_chain_segment",
);
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
Expand Down Expand Up @@ -2874,12 +2870,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
std::mem::swap(&mut blocks, &mut filtered_chain_segment);

let chain = self.clone();
let current_span = Span::current();
let signature_verification_future = self.spawn_blocking_handle(
move || {
let _guard = current_span.enter();
signature_verify_chain_segment(blocks, &chain)
},
move || signature_verify_chain_segment(blocks, &chain),
"signature_verify_chain_segment",
);

Expand Down Expand Up @@ -2969,12 +2961,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
let chain = self.clone();
let span = Span::current();
self.task_executor
.clone()
.spawn_blocking_handle(
move || {
let _guard = span.enter();
let slot = block.slot();
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();

Expand Down Expand Up @@ -3272,11 +3262,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let data_availability_checker = self.data_availability_checker.clone();

let current_span = Span::current();
let result = self
.task_executor
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
let _guard = current_span.enter();
data_availability_checker.reconstruct_data_columns(&block_root)
})
.await
Expand Down Expand Up @@ -3741,13 +3729,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO(das) record custody column available timestamp

let block_root = {
// Capture the current span before moving into the blocking task
let current_span = tracing::Span::current();
let chain = self.clone();
self.spawn_blocking_handle(
move || {
// Enter the captured span in the blocking thread
let _guard = current_span.enter();
chain.import_block(
block,
block_root,
Expand Down Expand Up @@ -4483,15 +4467,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
//
// Load the parent state from disk.
let chain = self.clone();
let span = Span::current();
let (state, state_root_opt) = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "load_state_for_block_production").entered();
chain.load_state_for_block_production(slot)
},
move || chain.load_state_for_block_production(slot),
"load_state_for_block_production",
)
.ok_or(BlockProductionError::ShuttingDown)?
Expand Down Expand Up @@ -5047,13 +5026,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.graffiti_calculator
.get_graffiti(validator_graffiti)
.await;
let span = Span::current();
let mut partial_beacon_block = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "produce_partial_beacon_block").entered();
chain.produce_partial_beacon_block(
state,
state_root_opt,
Expand Down Expand Up @@ -5089,14 +5065,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match block_contents_type {
BlockProposalContentsType::Full(block_contents) => {
let chain = self.clone();
let span = Span::current();
let beacon_block_response = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "complete_partial_beacon_block")
.entered();
chain.complete_partial_beacon_block(
partial_beacon_block,
Some(block_contents),
Expand All @@ -5113,14 +5085,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
BlockProposalContentsType::Blinded(block_contents) => {
let chain = self.clone();
let span = Span::current();
let beacon_block_response = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "complete_partial_beacon_block")
.entered();
chain.complete_partial_beacon_block(
partial_beacon_block,
Some(block_contents),
Expand All @@ -5138,13 +5106,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
} else {
let chain = self.clone();
let span = Span::current();
let beacon_block_response = self
.task_executor
.spawn_blocking_handle(
move || {
let _guard =
debug_span!(parent: span, "complete_partial_beacon_block").entered();
chain.complete_partial_beacon_block(
partial_beacon_block,
None,
Expand Down
12 changes: 1 addition & 11 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use fork_choice::{
ResetPayloadStatuses,
};
use itertools::process_results;
use lighthouse_tracing::SPAN_RECOMPUTE_HEAD;
use logging::crit;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock;
Expand All @@ -58,7 +57,6 @@ use store::{
Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator,
};
use task_executor::{JoinHandle, ShutdownReason};
use tracing::info_span;
use tracing::{debug, error, info, instrument, warn};
use types::*;

Expand Down Expand Up @@ -513,21 +511,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// situation can be rectified. We avoid returning an error here so that calling functions
/// can't abort block import because an error is returned here.
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) {
let span = info_span!(
SPAN_RECOMPUTE_HEAD,
slot = %current_slot
);

metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);

let chain = self.clone();
match self
.spawn_blocking_handle(
move || {
let _guard = span.enter();
chain.recompute_head_at_slot_internal(current_slot)
},
move || chain.recompute_head_at_slot_internal(current_slot),
"recompute_head_internal",
)
.await
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/beacon_chain/src/fetch_blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use mockall_double::double;
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::sync::Arc;
use tracing::{Span, debug, instrument, warn};
use tracing::{debug, instrument, warn};
use types::blob_sidecar::BlobSidecarError;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{
Expand Down Expand Up @@ -345,12 +345,10 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
let spec = chain_adapter.spec().clone();
let chain_adapter_cloned = chain_adapter.clone();
let custody_columns_indices = custody_columns_indices.to_vec();
let current_span = Span::current();
chain_adapter
.executor()
.spawn_blocking_handle(
move || {
let _guard = current_span.enter();
let mut timer = metrics::start_timer_vec(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
Expand Down
12 changes: 3 additions & 9 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{Span, debug, debug_span, error, info, instrument, warn};
use tracing::{Span, debug, error, info, instrument, warn};
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource,
Expand Down Expand Up @@ -143,12 +143,8 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
let slot = block.message().slot();
let sender_clone = network_tx.clone();

let build_sidecar_task_handle = spawn_build_data_sidecar_task(
chain.clone(),
block.clone(),
unverified_blobs,
current_span.clone(),
)?;
let build_sidecar_task_handle =
spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?;

// Gossip verify the block and blobs/data columns separately.
let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain);
Expand Down Expand Up @@ -360,7 +356,6 @@ fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
proofs_and_blobs: UnverifiedBlobs<T>,
current_span: Span,
) -> Result<impl Future<Output = BuildDataSidecarTaskResult<T>>, Rejection> {
chain
.clone()
Expand All @@ -370,7 +365,6 @@ fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
let Some((kzg_proofs, blobs)) = proofs_and_blobs else {
return Ok((vec![], vec![]));
};
let _guard = debug_span!(parent: current_span, "build_data_sidecars").entered();

let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
if !peer_das_enabled {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";

/// Fork choice root spans
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot_internal";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can delete this


/// RPC methods root spans
pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request";
Expand Down
12 changes: 10 additions & 2 deletions common/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::channel::mpsc::Sender;
use futures::prelude::*;
use std::sync::{Arc, Weak};
use tokio::runtime::{Handle, Runtime};
use tracing::debug;
use tracing::{Span, debug};

use crate::rayon_pool_provider::RayonPoolProvider;
pub use crate::rayon_pool_provider::RayonPoolType;
Expand Down Expand Up @@ -243,9 +243,11 @@ impl TaskExecutor {
F: FnOnce() + Send + 'static,
{
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
let span = Span::current();
self.spawn_blocking(
move || {
thread_pool.install(|| {
let _guard = span.enter();
task();
});
},
Expand All @@ -265,8 +267,10 @@ impl TaskExecutor {
{
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
let (tx, rx) = tokio::sync::oneshot::channel();
let span = Span::current();

thread_pool.spawn(move || {
let _guard = span.enter();
let result = task();
let _ = tx.send(result);
});
Expand Down Expand Up @@ -338,8 +342,12 @@ impl TaskExecutor {
let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]);
metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]);

let span = Span::current();
let join_handle = if let Some(handle) = self.handle() {
handle.spawn_blocking(task)
handle.spawn_blocking(move || {
let _guard = span.enter();
task()
})
} else {
debug!("Couldn't spawn task. Runtime shutting down");
return None;
Expand Down
Loading