Skip to content

Commit 1943507

Browse files
committed
Auto pass spans to blocking handles
1 parent 26575c5 commit 1943507

File tree

6 files changed

+20
-65
lines changed

6 files changed

+20
-65
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ use store::{
127127
};
128128
use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor};
129129
use tokio_stream::Stream;
130-
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
130+
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn};
131131
use tree_hash::TreeHash;
132132
use types::blob_sidecar::FixedBlobSidecarList;
133133
use types::data_column_sidecar::ColumnIndex;
@@ -2838,12 +2838,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28382838

28392839
// Filter uninteresting blocks from the chain segment in a blocking task.
28402840
let chain = self.clone();
2841-
let filter_chain_segment = debug_span!("filter_chain_segment");
28422841
let filtered_chain_segment_future = self.spawn_blocking_handle(
2843-
move || {
2844-
let _guard = filter_chain_segment.enter();
2845-
chain.filter_chain_segment(chain_segment)
2846-
},
2842+
move || chain.filter_chain_segment(chain_segment),
28472843
"filter_chain_segment",
28482844
);
28492845
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
@@ -2874,12 +2870,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28742870
std::mem::swap(&mut blocks, &mut filtered_chain_segment);
28752871

28762872
let chain = self.clone();
2877-
let current_span = Span::current();
28782873
let signature_verification_future = self.spawn_blocking_handle(
2879-
move || {
2880-
let _guard = current_span.enter();
2881-
signature_verify_chain_segment(blocks, &chain)
2882-
},
2874+
move || signature_verify_chain_segment(blocks, &chain),
28832875
"signature_verify_chain_segment",
28842876
);
28852877

@@ -2969,12 +2961,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29692961
block: Arc<SignedBeaconBlock<T::EthSpec>>,
29702962
) -> Result<GossipVerifiedBlock<T>, BlockError> {
29712963
let chain = self.clone();
2972-
let span = Span::current();
29732964
self.task_executor
29742965
.clone()
29752966
.spawn_blocking_handle(
29762967
move || {
2977-
let _guard = span.enter();
29782968
let slot = block.slot();
29792969
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();
29802970

@@ -3272,11 +3262,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32723262

32733263
let data_availability_checker = self.data_availability_checker.clone();
32743264

3275-
let current_span = Span::current();
32763265
let result = self
32773266
.task_executor
32783267
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
3279-
let _guard = current_span.enter();
32803268
data_availability_checker.reconstruct_data_columns(&block_root)
32813269
})
32823270
.await
@@ -3741,13 +3729,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37413729
// TODO(das) record custody column available timestamp
37423730

37433731
let block_root = {
3744-
// Capture the current span before moving into the blocking task
3745-
let current_span = tracing::Span::current();
37463732
let chain = self.clone();
37473733
self.spawn_blocking_handle(
37483734
move || {
3749-
// Enter the captured span in the blocking thread
3750-
let _guard = current_span.enter();
37513735
chain.import_block(
37523736
block,
37533737
block_root,
@@ -4483,15 +4467,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
44834467
//
44844468
// Load the parent state from disk.
44854469
let chain = self.clone();
4486-
let span = Span::current();
44874470
let (state, state_root_opt) = self
44884471
.task_executor
44894472
.spawn_blocking_handle(
4490-
move || {
4491-
let _guard =
4492-
debug_span!(parent: span, "load_state_for_block_production").entered();
4493-
chain.load_state_for_block_production(slot)
4494-
},
4473+
move || chain.load_state_for_block_production(slot),
44954474
"load_state_for_block_production",
44964475
)
44974476
.ok_or(BlockProductionError::ShuttingDown)?
@@ -5047,13 +5026,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
50475026
.graffiti_calculator
50485027
.get_graffiti(validator_graffiti)
50495028
.await;
5050-
let span = Span::current();
50515029
let mut partial_beacon_block = self
50525030
.task_executor
50535031
.spawn_blocking_handle(
50545032
move || {
5055-
let _guard =
5056-
debug_span!(parent: span, "produce_partial_beacon_block").entered();
50575033
chain.produce_partial_beacon_block(
50585034
state,
50595035
state_root_opt,
@@ -5089,14 +5065,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
50895065
match block_contents_type {
50905066
BlockProposalContentsType::Full(block_contents) => {
50915067
let chain = self.clone();
5092-
let span = Span::current();
50935068
let beacon_block_response = self
50945069
.task_executor
50955070
.spawn_blocking_handle(
50965071
move || {
5097-
let _guard =
5098-
debug_span!(parent: span, "complete_partial_beacon_block")
5099-
.entered();
51005072
chain.complete_partial_beacon_block(
51015073
partial_beacon_block,
51025074
Some(block_contents),
@@ -5113,14 +5085,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
51135085
}
51145086
BlockProposalContentsType::Blinded(block_contents) => {
51155087
let chain = self.clone();
5116-
let span = Span::current();
51175088
let beacon_block_response = self
51185089
.task_executor
51195090
.spawn_blocking_handle(
51205091
move || {
5121-
let _guard =
5122-
debug_span!(parent: span, "complete_partial_beacon_block")
5123-
.entered();
51245092
chain.complete_partial_beacon_block(
51255093
partial_beacon_block,
51265094
Some(block_contents),
@@ -5138,13 +5106,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
51385106
}
51395107
} else {
51405108
let chain = self.clone();
5141-
let span = Span::current();
51425109
let beacon_block_response = self
51435110
.task_executor
51445111
.spawn_blocking_handle(
51455112
move || {
5146-
let _guard =
5147-
debug_span!(parent: span, "complete_partial_beacon_block").entered();
51485113
chain.complete_partial_beacon_block(
51495114
partial_beacon_block,
51505115
None,

beacon_node/beacon_chain/src/canonical_head.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use fork_choice::{
4747
ResetPayloadStatuses,
4848
};
4949
use itertools::process_results;
50-
use lighthouse_tracing::SPAN_RECOMPUTE_HEAD;
5150
use logging::crit;
5251
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
5352
use slot_clock::SlotClock;
@@ -58,7 +57,6 @@ use store::{
5857
Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator,
5958
};
6059
use task_executor::{JoinHandle, ShutdownReason};
61-
use tracing::info_span;
6260
use tracing::{debug, error, info, instrument, warn};
6361
use types::*;
6462

@@ -513,21 +511,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
513511
/// situation can be rectified. We avoid returning an error here so that calling functions
514512
/// can't abort block import because an error is returned here.
515513
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) {
516-
let span = info_span!(
517-
SPAN_RECOMPUTE_HEAD,
518-
slot = %current_slot
519-
);
520-
521514
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
522515
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
523516

524517
let chain = self.clone();
525518
match self
526519
.spawn_blocking_handle(
527-
move || {
528-
let _guard = span.enter();
529-
chain.recompute_head_at_slot_internal(current_slot)
530-
},
520+
move || chain.recompute_head_at_slot_internal(current_slot),
531521
"recompute_head_internal",
532522
)
533523
.await

beacon_node/beacon_chain/src/fetch_blobs/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use mockall_double::double;
3232
use ssz_types::FixedVector;
3333
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
3434
use std::sync::Arc;
35-
use tracing::{Span, debug, instrument, warn};
35+
use tracing::{debug, instrument, warn};
3636
use types::blob_sidecar::BlobSidecarError;
3737
use types::data_column_sidecar::DataColumnSidecarError;
3838
use types::{
@@ -345,12 +345,10 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
345345
let spec = chain_adapter.spec().clone();
346346
let chain_adapter_cloned = chain_adapter.clone();
347347
let custody_columns_indices = custody_columns_indices.to_vec();
348-
let current_span = Span::current();
349348
chain_adapter
350349
.executor()
351350
.spawn_blocking_handle(
352351
move || {
353-
let _guard = current_span.enter();
354352
let mut timer = metrics::start_timer_vec(
355353
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
356354
&[&blobs.len().to_string()],

beacon_node/http_api/src/publish_blocks.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525
use std::sync::atomic::{AtomicBool, Ordering};
2626
use std::time::Duration;
2727
use tokio::sync::mpsc::UnboundedSender;
28-
use tracing::{Span, debug, debug_span, error, info, instrument, warn};
28+
use tracing::{Span, debug, error, info, instrument, warn};
2929
use tree_hash::TreeHash;
3030
use types::{
3131
AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource,
@@ -143,12 +143,8 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
143143
let slot = block.message().slot();
144144
let sender_clone = network_tx.clone();
145145

146-
let build_sidecar_task_handle = spawn_build_data_sidecar_task(
147-
chain.clone(),
148-
block.clone(),
149-
unverified_blobs,
150-
current_span.clone(),
151-
)?;
146+
let build_sidecar_task_handle =
147+
spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?;
152148

153149
// Gossip verify the block and blobs/data columns separately.
154150
let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain);
@@ -360,7 +356,6 @@ fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
360356
chain: Arc<BeaconChain<T>>,
361357
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
362358
proofs_and_blobs: UnverifiedBlobs<T>,
363-
current_span: Span,
364359
) -> Result<impl Future<Output = BuildDataSidecarTaskResult<T>>, Rejection> {
365360
chain
366361
.clone()
@@ -370,7 +365,6 @@ fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
370365
let Some((kzg_proofs, blobs)) = proofs_and_blobs else {
371366
return Ok((vec![], vec![]));
372367
};
373-
let _guard = debug_span!(parent: current_span, "build_data_sidecars").entered();
374368

375369
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
376370
if !peer_das_enabled {

beacon_node/lighthouse_tracing/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
2929
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";
3030

3131
/// Fork choice root spans
32-
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
32+
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot_internal";
3333

3434
/// RPC methods root spans
3535
pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request";

common/task_executor/src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use futures::channel::mpsc::Sender;
66
use futures::prelude::*;
77
use std::sync::{Arc, Weak};
88
use tokio::runtime::{Handle, Runtime};
9-
use tracing::debug;
9+
use tracing::{Span, debug};
1010

1111
use crate::rayon_pool_provider::RayonPoolProvider;
1212
pub use crate::rayon_pool_provider::RayonPoolType;
@@ -243,9 +243,11 @@ impl TaskExecutor {
243243
F: FnOnce() + Send + 'static,
244244
{
245245
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
246+
let span = Span::current();
246247
self.spawn_blocking(
247248
move || {
248249
thread_pool.install(|| {
250+
let _guard = span.enter();
249251
task();
250252
});
251253
},
@@ -265,8 +267,10 @@ impl TaskExecutor {
265267
{
266268
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
267269
let (tx, rx) = tokio::sync::oneshot::channel();
270+
let span = Span::current();
268271

269272
thread_pool.spawn(move || {
273+
let _guard = span.enter();
270274
let result = task();
271275
let _ = tx.send(result);
272276
});
@@ -338,8 +342,12 @@ impl TaskExecutor {
338342
let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]);
339343
metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]);
340344

345+
let span = Span::current();
341346
let join_handle = if let Some(handle) = self.handle() {
342-
handle.spawn_blocking(task)
347+
handle.spawn_blocking(move || {
348+
let _guard = span.enter();
349+
task()
350+
})
343351
} else {
344352
debug!("Couldn't spawn task. Runtime shutting down");
345353
return None;

0 commit comments

Comments
 (0)