Skip to content

Commit 4111bcb

Browse files
jimmygcheneserilev
andauthored
Use scoped rayon pool for backfill chain segment processing (#7924)
Part of #7866 - Continuation of #7921 In the above PR, we enabled rayon for batch KZG verification in chain segment processing. However, using the global rayon thread pool for backfill is likely to create resource contention with higher-priority beacon processor work. This PR introduces a dedicated low-priority rayon thread pool `LOW_PRIORITY_RAYON_POOL` and uses it for processing backfill chain segments. This prevents backfill KZG verification from using the global rayon thread pool and competing with high-priority beacon processor tasks for CPU resources. However, this PR by itself doesn't prevent CPU oversubscription because other tasks could still fill up the global rayon thread pool, and having an extra thread pool could make things worse. To address this we need the beacon processor to coordinate total CPU allocation across all tasks, which is covered in: - #7789 Co-Authored-By: Jimmy Chen <[email protected]> Co-Authored-By: Eitan Seri- Levi <[email protected]> Co-Authored-By: Eitan Seri-Levi <[email protected]>
1 parent 51321da commit 4111bcb

File tree

11 files changed

+230
-110
lines changed

11 files changed

+230
-110
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ logging = { workspace = true }
1212
metrics = { workspace = true }
1313
num_cpus = { workspace = true }
1414
parking_lot = { workspace = true }
15+
rayon = { workspace = true }
1516
serde = { workspace = true }
1617
slot_clock = { workspace = true }
1718
strum = { workspace = true }

beacon_node/beacon_processor/src/lib.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
3939
//! task.
4040
41+
use crate::rayon_manager::RayonManager;
4142
use crate::work_reprocessing_queue::{
4243
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
4344
};
@@ -47,6 +48,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
4748
use logging::TimeLatch;
4849
use logging::crit;
4950
use parking_lot::Mutex;
51+
use rayon::ThreadPool;
5052
pub use scheduler::work_reprocessing_queue;
5153
use serde::{Deserialize, Serialize};
5254
use slot_clock::SlotClock;
@@ -74,6 +76,7 @@ use work_reprocessing_queue::{
7476
};
7577

7678
mod metrics;
79+
pub mod rayon_manager;
7780
pub mod scheduler;
7881

7982
/// The maximum size of the channel for work events to the `BeaconProcessor`.
@@ -603,7 +606,7 @@ pub enum Work<E: EthSpec> {
603606
process_fn: BlockingFn,
604607
},
605608
ChainSegment(AsyncFn),
606-
ChainSegmentBackfill(AsyncFn),
609+
ChainSegmentBackfill(BlockingFn),
607610
Status(BlockingFn),
608611
BlocksByRangeRequest(AsyncFn),
609612
BlocksByRootsRequest(AsyncFn),
@@ -807,6 +810,7 @@ pub struct BeaconProcessor<E: EthSpec> {
807810
pub network_globals: Arc<NetworkGlobals<E>>,
808811
pub executor: TaskExecutor,
809812
pub current_workers: usize,
813+
pub rayon_manager: RayonManager,
810814
pub config: BeaconProcessorConfig,
811815
}
812816

@@ -1603,7 +1607,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
16031607
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
16041608
task_spawner.spawn_async(work)
16051609
}
1606-
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
1610+
Work::ChainSegmentBackfill(process_fn) => {
1611+
if self.config.enable_backfill_rate_limiting {
1612+
task_spawner.spawn_blocking_with_rayon(
1613+
self.rayon_manager.low_priority_threadpool.clone(),
1614+
process_fn,
1615+
)
1616+
} else {
1617+
// use the global rayon thread pool if backfill rate limiting is disabled.
1618+
task_spawner.spawn_blocking(process_fn)
1619+
}
1620+
}
16071621
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
16081622
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
16091623
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
@@ -1665,6 +1679,22 @@ impl TaskSpawner {
16651679
WORKER_TASK_NAME,
16661680
)
16671681
}
1682+
1683+
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
1684+
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
1685+
where
1686+
F: FnOnce() + Send + 'static,
1687+
{
1688+
self.executor.spawn_blocking(
1689+
move || {
1690+
thread_pool.install(|| {
1691+
task();
1692+
});
1693+
drop(self.send_idle_on_drop)
1694+
},
1695+
WORKER_TASK_NAME,
1696+
)
1697+
}
16681698
}
16691699

16701700
/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use rayon::{ThreadPool, ThreadPoolBuilder};
2+
use std::sync::Arc;
3+
4+
const DEFAULT_LOW_PRIORITY_DIVISOR: usize = 4;
5+
const MINIMUM_LOW_PRIORITY_THREAD_COUNT: usize = 1;
6+
7+
pub struct RayonManager {
8+
/// Smaller rayon thread pool for lower-priority, compute-intensive tasks.
9+
/// By default ~25% of CPUs or a minimum of 1 thread.
10+
pub low_priority_threadpool: Arc<ThreadPool>,
11+
}
12+
13+
impl Default for RayonManager {
14+
fn default() -> Self {
15+
let low_prio_threads =
16+
(num_cpus::get() / DEFAULT_LOW_PRIORITY_DIVISOR).max(MINIMUM_LOW_PRIORITY_THREAD_COUNT);
17+
let low_priority_threadpool = Arc::new(
18+
ThreadPoolBuilder::new()
19+
.num_threads(low_prio_threads)
20+
.build()
21+
.expect("failed to build low-priority rayon pool"),
22+
);
23+
Self {
24+
low_priority_threadpool,
25+
}
26+
}
27+
}

beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ pub struct IgnoredRpcBlock {
173173
}
174174

175175
/// A backfill batch work that has been queued for processing later.
176-
pub struct QueuedBackfillBatch(pub AsyncFn);
176+
pub struct QueuedBackfillBatch(pub BlockingFn);
177177

178178
pub struct QueuedColumnReconstruction {
179179
pub block_root: Hash256,
@@ -1084,7 +1084,7 @@ mod tests {
10841084
// Now queue a backfill sync batch.
10851085
work_reprocessing_tx
10861086
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
1087-
Box::pin(async {}),
1087+
Box::new(|| {}),
10881088
)))
10891089
.unwrap();
10901090
tokio::task::yield_now().await;

beacon_node/client/src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use beacon_chain::{
1717
store::{HotColdDB, ItemStore, StoreConfig},
1818
};
1919
use beacon_chain::{Kzg, LightClientProducerEvent};
20+
use beacon_processor::rayon_manager::RayonManager;
2021
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
2122
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
2223
use environment::RuntimeContext;
@@ -680,6 +681,7 @@ where
680681
executor: beacon_processor_context.executor.clone(),
681682
current_workers: 0,
682683
config: beacon_processor_config,
684+
rayon_manager: RayonManager::default(),
683685
}
684686
.spawn_manager(
685687
beacon_processor_channels.beacon_processor_rx,

beacon_node/http_api/src/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use beacon_chain::{
55
};
66
use beacon_processor::{
77
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
8+
rayon_manager::RayonManager,
89
};
910
use directory::DEFAULT_ROOT_DIR;
1011
use eth2::{BeaconNodeHttpClient, Timeouts};
@@ -247,6 +248,7 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
247248
executor: test_runtime.task_executor.clone(),
248249
current_workers: 0,
249250
config: beacon_processor_config,
251+
rayon_manager: RayonManager::default(),
250252
}
251253
.spawn_manager(
252254
beacon_processor_rx,

beacon_node/lighthouse_tracing/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
2626
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
2727
pub const SPAN_PROCESS_RPC_CUSTODY_COLUMNS: &str = "process_rpc_custody_columns";
2828
pub const SPAN_PROCESS_CHAIN_SEGMENT: &str = "process_chain_segment";
29+
pub const SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL: &str = "process_chain_segment_backfill";
2930

3031
/// Fork choice root spans
3132
pub const SPAN_RECOMPUTE_HEAD: &str = "recompute_head_at_slot";
@@ -61,6 +62,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
6162
SPAN_PROCESS_RPC_BLOBS,
6263
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,
6364
SPAN_PROCESS_CHAIN_SEGMENT,
65+
SPAN_PROCESS_CHAIN_SEGMENT_BACKFILL,
6466
SPAN_HANDLE_BLOCKS_BY_RANGE_REQUEST,
6567
SPAN_HANDLE_BLOBS_BY_RANGE_REQUEST,
6668
SPAN_HANDLE_DATA_COLUMNS_BY_RANGE_REQUEST,

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss
66
use beacon_chain::fetch_blobs::{
77
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
88
};
9-
use beacon_chain::{
10-
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
11-
};
9+
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
1210
use beacon_processor::{
1311
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
1412
WorkEvent as BeaconWorkEvent,
@@ -500,33 +498,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
500498
process_id: ChainSegmentProcessId,
501499
blocks: Vec<RpcBlock<T::EthSpec>>,
502500
) -> Result<(), Error<T::EthSpec>> {
503-
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
504501
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");
505-
506502
let processor = self.clone();
507-
let process_fn = async move {
508-
let notify_execution_layer = if processor
509-
.network_globals
510-
.sync_state
511-
.read()
512-
.is_syncing_finalized()
513-
{
514-
NotifyExecutionLayer::No
515-
} else {
516-
NotifyExecutionLayer::Yes
517-
};
518-
processor
519-
.process_chain_segment(process_id, blocks, notify_execution_layer)
520-
.await;
521-
};
522-
let process_fn = Box::pin(process_fn);
523503

524504
// Back-sync batches are dispatched with a different `Work` variant so
525505
// they can be rate-limited.
526-
let work = if is_backfill {
527-
Work::ChainSegmentBackfill(process_fn)
528-
} else {
529-
Work::ChainSegment(process_fn)
506+
let work = match process_id {
507+
ChainSegmentProcessId::RangeBatchId(_, _) => {
508+
let process_fn = async move {
509+
processor.process_chain_segment(process_id, blocks).await;
510+
};
511+
Work::ChainSegment(Box::pin(process_fn))
512+
}
513+
ChainSegmentProcessId::BackSyncBatchId(_) => {
514+
let process_fn =
515+
move || processor.process_chain_segment_backfill(process_id, blocks);
516+
Work::ChainSegmentBackfill(Box::new(process_fn))
517+
}
530518
};
531519

532520
self.try_send(BeaconWorkEvent {

0 commit comments

Comments
 (0)