Skip to content

Commit 47a80e5

Browse files
committed
Use scoped rayon pool for chain segment backfill.
1 parent c41d118 commit 47a80e5

File tree

7 files changed

+173
-104
lines changed

7 files changed

+173
-104
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ logging = { workspace = true }
1212
metrics = { workspace = true }
1313
num_cpus = { workspace = true }
1414
parking_lot = { workspace = true }
15+
rayon = { workspace = true }
1516
serde = { workspace = true }
1617
slot_clock = { workspace = true }
1718
strum = { workspace = true }

beacon_node/beacon_processor/src/lib.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
4747
use logging::TimeLatch;
4848
use logging::crit;
4949
use parking_lot::Mutex;
50+
use rayon::{ThreadPool, ThreadPoolBuilder};
5051
pub use scheduler::work_reprocessing_queue;
5152
use serde::{Deserialize, Serialize};
5253
use slot_clock::SlotClock;
@@ -55,7 +56,7 @@ use std::collections::{HashSet, VecDeque};
5556
use std::fmt;
5657
use std::future::Future;
5758
use std::pin::Pin;
58-
use std::sync::Arc;
59+
use std::sync::{Arc, LazyLock};
5960
use std::task::Context;
6061
use std::time::{Duration, Instant};
6162
use strum::IntoStaticStr;
@@ -100,6 +101,17 @@ const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;
100101
/// seems reasonable.
101102
const MIN_QUEUE_LEN: usize = 128;
102103

104+
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks: ~25% of CPUs, min 2.
105+
pub static LOW_PRIORITY_RAYON_POOL: LazyLock<Arc<ThreadPool>> = LazyLock::new(|| {
106+
let n_threads = (num_cpus::get() / 4).max(2);
107+
Arc::new(
108+
ThreadPoolBuilder::new()
109+
.num_threads(n_threads)
110+
.build()
111+
.expect("failed to build low-priority rayon pool"),
112+
)
113+
});
114+
103115
/// Maximum number of queued items that will be stored before dropping them
104116
pub struct BeaconProcessorQueueLengths {
105117
aggregate_queue: usize,
@@ -603,7 +615,7 @@ pub enum Work<E: EthSpec> {
603615
process_fn: BlockingFn,
604616
},
605617
ChainSegment(AsyncFn),
606-
ChainSegmentBackfill(AsyncFn),
618+
ChainSegmentBackfill(BlockingFn),
607619
Status(BlockingFn),
608620
BlocksByRangeRequest(AsyncFn),
609621
BlocksByRootsRequest(AsyncFn),
@@ -833,6 +845,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
833845
) -> Result<(), String> {
834846
// Used by workers to communicate that they are finished a task.
835847
let (idle_tx, idle_rx) = mpsc::channel::<WorkType>(MAX_IDLE_QUEUE_LEN);
848+
// Initialise the rayon threadpool.
849+
let _ = LazyLock::force(&LOW_PRIORITY_RAYON_POOL);
836850

837851
// Using LIFO queues for attestations since validator profits rely upon getting fresh
838852
// attestations into blocks. Additionally, later attestations contain more information than
@@ -1605,7 +1619,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
16051619
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
16061620
task_spawner.spawn_async(work)
16071621
}
1608-
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
1622+
Work::ChainSegmentBackfill(process_fn) => {
1623+
task_spawner.spawn_blocking_with_rayon(LOW_PRIORITY_RAYON_POOL.clone(), process_fn)
1624+
}
16091625
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
16101626
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
16111627
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
@@ -1667,6 +1683,22 @@ impl TaskSpawner {
16671683
WORKER_TASK_NAME,
16681684
)
16691685
}
1686+
1687+
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
1688+
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
1689+
where
1690+
F: FnOnce() + Send + 'static,
1691+
{
1692+
self.executor.spawn_blocking(
1693+
move || {
1694+
thread_pool.install(|| {
1695+
task();
1696+
});
1697+
drop(self.send_idle_on_drop)
1698+
},
1699+
WORKER_TASK_NAME,
1700+
)
1701+
}
16701702
}
16711703

16721704
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged

beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ pub struct IgnoredRpcBlock {
168168
}
169169

170170
/// A backfill batch work that has been queued for processing later.
171-
pub struct QueuedBackfillBatch(pub AsyncFn);
171+
pub struct QueuedBackfillBatch(pub BlockingFn);
172172

173173
pub struct QueuedColumnReconstruction {
174174
pub block_root: Hash256,
@@ -1042,7 +1042,7 @@ mod tests {
10421042
// Now queue a backfill sync batch.
10431043
work_reprocessing_tx
10441044
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
1045-
Box::pin(async {}),
1045+
Box::new(|| {}),
10461046
)))
10471047
.unwrap();
10481048
tokio::task::yield_now().await;

beacon_node/lighthouse_tracing/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
1919
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
2020
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
2121
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
22+
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";
2223

2324
/// RPC methods root spans
2425
pub const SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST: &str = "handle_blocks_by_range_request";
@@ -48,6 +49,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
4849
SPAN_PROCESS_RPC_BLOBS,
4950
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
5051
SPAN_PROCESS_CHAIN_SEGMENT,
52+
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
5153
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
5254
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
5355
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss
66
use beacon_chain::fetch_blobs::{
77
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
88
};
9-
use beacon_chain::{
10-
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
11-
};
9+
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
1210
use beacon_processor::{
1311
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
1412
WorkEvent as BeaconWorkEvent,
@@ -500,33 +498,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
500498
process_id: ChainSegmentProcessId,
501499
blocks: Vec<RpcBlock<T::EthSpec>>,
502500
) -> Result<(), Error<T::EthSpec>> {
503-
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
504501
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");
505-
506502
let processor = self.clone();
507-
let process_fn = async move {
508-
let notify_execution_layer = if processor
509-
.network_globals
510-
.sync_state
511-
.read()
512-
.is_syncing_finalized()
513-
{
514-
NotifyExecutionLayer::No
515-
} else {
516-
NotifyExecutionLayer::Yes
517-
};
518-
processor
519-
.process_chain_segment(process_id, blocks, notify_execution_layer)
520-
.await;
521-
};
522-
let process_fn = Box::pin(process_fn);
523503

524504
// Back-sync batches are dispatched with a different `Work` variant so
525505
// they can be rate-limited.
526-
let work = if is_backfill {
527-
Work::ChainSegmentBackfill(process_fn)
528-
} else {
529-
Work::ChainSegment(process_fn)
506+
let work = match process_id {
507+
ChainSegmentProcessId::RangeBatchId(_, _) => {
508+
let process_fn = async move {
509+
processor.process_chain_segment(process_id, blocks).await;
510+
};
511+
Work::ChainSegment(Box::pin(process_fn))
512+
}
513+
ChainSegmentProcessId::BackSyncBatchId(_) => {
514+
let process_fn =
515+
move || processor.process_chain_segment_backfill(process_id, blocks);
516+
Work::ChainSegmentBackfill(Box::new(process_fn))
517+
}
530518
};
531519

532520
self.try_send(BeaconWorkEvent {

0 commit comments

Comments
 (0)