Skip to content

Commit e53354b

Browse files
committed
Oversubscribe during column reconstruction
1 parent 9911f34 commit e53354b

File tree

1 file changed

+63
-14
lines changed
  • beacon_node/beacon_processor/src

1 file changed

+63
-14
lines changed

beacon_node/beacon_processor/src/lib.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
627627
}
628628
}
629629

630-
#[derive(IntoStaticStr, PartialEq, Eq, Debug)]
630+
#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
631631
#[strum(serialize_all = "snake_case")]
632632
pub enum WorkType {
633633
GossipAttestation,
@@ -734,7 +734,7 @@ impl<E: EthSpec> Work<E> {
734734
/// Unifies all the messages processed by the `BeaconProcessor`.
735735
enum InboundEvent<E: EthSpec> {
736736
/// A worker has completed a task and is free.
737-
WorkerIdle,
737+
WorkerIdle(WorkType),
738738
/// There is new work to be done.
739739
WorkEvent(WorkEvent<E>),
740740
/// A work event that was queued for re-processing has become ready.
@@ -747,7 +747,7 @@ enum InboundEvent<E: EthSpec> {
747747
/// control (specifically in the ordering of event processing).
748748
struct InboundEvents<E: EthSpec> {
749749
/// Used by workers when they finish a task.
750-
idle_rx: mpsc::Receiver<()>,
750+
idle_rx: mpsc::Receiver<WorkType>,
751751
/// Used by upstream processes to send new work to the `BeaconProcessor`.
752752
event_rx: mpsc::Receiver<WorkEvent<E>>,
753753
/// Used internally for queuing work ready to be re-processed.
@@ -761,8 +761,8 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
761761
// Always check for idle workers before anything else. This allows us to ensure that a big
762762
// stream of new events doesn't suppress the processing of existing events.
763763
match self.idle_rx.poll_recv(cx) {
764-
Poll::Ready(Some(())) => {
765-
return Poll::Ready(Some(InboundEvent::WorkerIdle));
764+
Poll::Ready(Some(work_type)) => {
765+
return Poll::Ready(Some(InboundEvent::WorkerIdle(work_type)));
766766
}
767767
Poll::Ready(None) => {
768768
return Poll::Ready(None);
@@ -829,7 +829,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
829829
queue_lengths: BeaconProcessorQueueLengths,
830830
) -> Result<(), String> {
831831
// Used by workers to communicate that they are finished a task.
832-
let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN);
832+
let (idle_tx, idle_rx) = mpsc::channel::<WorkType>(MAX_IDLE_QUEUE_LEN);
833833

834834
// Using LIFO queues for attestations since validator profits rely upon getting fresh
835835
// attestations into blocks. Additionally, later attestations contain more information than
@@ -931,8 +931,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
931931

932932
loop {
933933
let work_event = match inbound_events.next().await {
934-
Some(InboundEvent::WorkerIdle) => {
935-
self.current_workers = self.current_workers.saturating_sub(1);
934+
Some(InboundEvent::WorkerIdle(work_type)) => {
935+
let threads_freed = match work_type {
936+
WorkType::ColumnReconstruction => 4,
937+
_ => 1,
938+
};
939+
self.current_workers = self.current_workers.saturating_sub(threads_freed);
936940
None
937941
}
938942
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
@@ -1007,6 +1011,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
10071011
}
10081012

10091013
let can_spawn = self.current_workers < self.config.max_workers;
1014+
let can_spawn_extra_threads = self.current_workers < self.config.max_workers + 4;
10101015
let drop_during_sync = work_event
10111016
.as_ref()
10121017
.is_some_and(|event| event.drop_during_sync);
@@ -1245,7 +1250,30 @@ impl<E: EthSpec> BeaconProcessor<E> {
12451250

12461251
if let Some(work_event) = work_event {
12471252
let work_type = work_event.to_type();
1248-
self.spawn_worker(work_event, idle_tx);
1253+
let thread_count = match work_type {
1254+
WorkType::ColumnReconstruction => 4,
1255+
_ => 1,
1256+
};
1257+
self.spawn_worker(work_event, idle_tx, thread_count);
1258+
Some(work_type)
1259+
} else {
1260+
None
1261+
}
1262+
}
1263+
None if can_spawn_extra_threads => {
1264+
let work_event: Option<Work<E>> =
1265+
if let Some(item) = column_reconstruction_queue.pop() {
1266+
Some(item)
1267+
} else {
1268+
None
1269+
};
1270+
if let Some(work_event) = work_event {
1271+
let work_type = work_event.to_type();
1272+
let thread_count = match work_type {
1273+
WorkType::ColumnReconstruction => 4,
1274+
_ => 1,
1275+
};
1276+
self.spawn_worker(work_event, idle_tx, thread_count);
12491277
Some(work_type)
12501278
} else {
12511279
None
@@ -1293,7 +1321,20 @@ impl<E: EthSpec> BeaconProcessor<E> {
12931321
)
12941322
}
12951323
}
1296-
_ if can_spawn => self.spawn_worker(work, idle_tx),
1324+
_ if can_spawn => {
1325+
let thread_count = match work.to_type() {
1326+
WorkType::ColumnReconstruction => 4,
1327+
_ => 1,
1328+
};
1329+
self.spawn_worker(work, idle_tx, thread_count);
1330+
}
1331+
_ if can_spawn_extra_threads => {
1332+
let thread_count = match work.to_type() {
1333+
WorkType::ColumnReconstruction => 4,
1334+
_ => 1,
1335+
};
1336+
self.spawn_worker(work, idle_tx, thread_count)
1337+
}
12971338
Work::GossipAttestation { .. } => attestation_queue.push(work),
12981339
// Attestation batches are formed internally within the
12991340
// `BeaconProcessor`, they are not sent from external services.
@@ -1486,7 +1527,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
14861527
/// Spawns a blocking worker thread to process some `Work`.
14871528
///
14881529
/// Sends an message on `idle_tx` when the work is complete and the task is stopping.
1489-
fn spawn_worker(&mut self, work: Work<E>, idle_tx: mpsc::Sender<()>) {
1530+
fn spawn_worker(
1531+
&mut self,
1532+
work: Work<E>,
1533+
idle_tx: mpsc::Sender<WorkType>,
1534+
thread_count: usize,
1535+
) {
14901536
let work_id = work.str_id();
14911537
let worker_timer =
14921538
metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
@@ -1502,11 +1548,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
15021548
// As such, this instantiation should happen as early in the function as possible.
15031549
let send_idle_on_drop = SendOnDrop {
15041550
tx: idle_tx,
1551+
work_type: work.to_type(),
15051552
_worker_timer: worker_timer,
15061553
};
15071554

15081555
let worker_id = self.current_workers;
1509-
self.current_workers = self.current_workers.saturating_add(1);
1556+
self.current_workers = self.current_workers.saturating_add(thread_count);
15101557

15111558
let executor = self.executor.clone();
15121559

@@ -1655,14 +1702,16 @@ impl TaskSpawner {
16551702
///
16561703
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
16571704
pub struct SendOnDrop {
1658-
tx: mpsc::Sender<()>,
1705+
tx: mpsc::Sender<WorkType>,
1706+
work_type: WorkType,
16591707
// The field is unused, but it's here to ensure the timer is dropped once the task has finished.
16601708
_worker_timer: Option<metrics::HistogramTimer>,
16611709
}
16621710

16631711
impl Drop for SendOnDrop {
16641712
fn drop(&mut self) {
1665-
if let Err(e) = self.tx.try_send(()) {
1713+
let work_type = self.work_type.clone();
1714+
if let Err(e) = self.tx.try_send(work_type) {
16661715
warn!(
16671716
msg = "did not free worker, shutdown may be underway",
16681717
error = %e,

0 commit comments

Comments
 (0)