diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 28ed0cca913..2fd8a2267df 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,20 +38,21 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. +pub use crate::scheduler::BeaconProcessorQueueLengths; +use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; -use logging::TimeLatch; use logging::crit; use parking_lot::Mutex; pub use scheduler::work_reprocessing_queue; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use std::fmt; use std::future::Future; use std::pin::Pin; @@ -63,10 +64,7 @@ use task_executor::{RayonPoolType, TaskExecutor}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, trace, warn}; -use types::{ - BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, - SingleAttestation, Slot, SubnetId, -}; +use types::{EthSpec, Hash256, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId}; use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, @@ -90,124 +88,6 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; -/// Over-provision queues based on active validator count by some factor. The beacon chain has -/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning -/// slightly, we don't need to adjust the queues during the lifetime of a process. -const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; - -/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues -/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that -/// seems reasonable. -const MIN_QUEUE_LEN: usize = 128; - -/// Maximum number of queued items that will be stored before dropping them -pub struct BeaconProcessorQueueLengths { - aggregate_queue: usize, - attestation_queue: usize, - unknown_block_aggregate_queue: usize, - unknown_block_attestation_queue: usize, - sync_message_queue: usize, - sync_contribution_queue: usize, - gossip_voluntary_exit_queue: usize, - gossip_proposer_slashing_queue: usize, - gossip_attester_slashing_queue: usize, - unknown_light_client_update_queue: usize, - rpc_block_queue: usize, - rpc_blob_queue: usize, - rpc_custody_column_queue: usize, - column_reconstruction_queue: usize, - chain_segment_queue: usize, - backfill_chain_segment: usize, - gossip_block_queue: usize, - gossip_blob_queue: usize, - gossip_data_column_queue: usize, - delayed_block_queue: usize, - status_queue: usize, - bbrange_queue: usize, - bbroots_queue: usize, - blbroots_queue: usize, - blbrange_queue: usize, - dcbroots_queue: usize, - dcbrange_queue: usize, - gossip_bls_to_execution_change_queue: usize, - lc_gossip_finality_update_queue: usize, - lc_gossip_optimistic_update_queue: usize, - lc_bootstrap_queue: usize, - lc_rpc_optimistic_update_queue: usize, - lc_rpc_finality_update_queue: usize, - lc_update_range_queue: usize, - api_request_p0_queue: usize, - api_request_p1_queue: usize, -} - -impl BeaconProcessorQueueLengths { - pub fn from_state( - state: &BeaconState, - spec: &ChainSpec, - ) -> Result { - let active_validator_count = - match state.get_cached_active_validator_indices(RelativeEpoch::Current) { - Ok(indices) => indices.len(), - Err(_) => state - .get_active_validator_indices(state.current_epoch(), spec) - .map_err(|e| format!("Error computing active indices: {:?}", e))? - .len(), - }; - let active_validator_count = - (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; - let slots_per_epoch = E::slots_per_epoch() as usize; - - Ok(Self { - aggregate_queue: 4096, - unknown_block_aggregate_queue: 1024, - // Capacity for a full slot's worth of attestations if subscribed to all subnets - attestation_queue: std::cmp::max( - active_validator_count / slots_per_epoch, - MIN_QUEUE_LEN, - ), - // Capacity for a full slot's worth of attestations if subscribed to all subnets - unknown_block_attestation_queue: std::cmp::max( - active_validator_count / slots_per_epoch, - MIN_QUEUE_LEN, - ), - sync_message_queue: 2048, - sync_contribution_queue: 1024, - gossip_voluntary_exit_queue: 4096, - gossip_proposer_slashing_queue: 4096, - gossip_attester_slashing_queue: 4096, - unknown_light_client_update_queue: 128, - rpc_block_queue: 1024, - rpc_blob_queue: 1024, - // We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit - // this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB. - rpc_custody_column_queue: 64, - column_reconstruction_queue: 1, - chain_segment_queue: 64, - backfill_chain_segment: 64, - gossip_block_queue: 1024, - gossip_blob_queue: 1024, - gossip_data_column_queue: 1024, - delayed_block_queue: 1024, - status_queue: 1024, - bbrange_queue: 1024, - bbroots_queue: 1024, - blbroots_queue: 1024, - blbrange_queue: 1024, - dcbroots_queue: 1024, - dcbrange_queue: 1024, - gossip_bls_to_execution_change_queue: 16384, - lc_gossip_finality_update_queue: 1024, - lc_gossip_optimistic_update_queue: 1024, - lc_bootstrap_queue: 1024, - lc_rpc_optimistic_update_queue: 512, - lc_rpc_finality_update_queue: 512, - lc_update_range_queue: 512, - api_request_p0_queue: 1024, - api_request_p1_queue: 1024, - }) - } -} - /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -279,89 +159,6 @@ impl Default for BeaconProcessorChannels { } } -/// A simple first-in-first-out queue with a maximum length. -struct FifoQueue { - queue: VecDeque, - max_length: usize, -} - -impl FifoQueue { - /// Create a new, empty queue with the given length. - pub fn new(max_length: usize) -> Self { - Self { - queue: VecDeque::default(), - max_length, - } - } - - /// Add a new item to the queue. - /// - /// Drops `item` if the queue is full. - pub fn push(&mut self, item: T, item_desc: &str) { - if self.queue.len() == self.max_length { - error!( - msg = "the system has insufficient resources for load", - queue_len = self.max_length, - queue = item_desc, - "Work queue is full" - ) - } else { - self.queue.push_back(item); - } - } - - /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } - - /// Returns the current length of the queue. - pub fn len(&self) -> usize { - self.queue.len() - } -} - -/// A simple last-in-first-out queue with a maximum length. -struct LifoQueue { - queue: VecDeque, - max_length: usize, -} - -impl LifoQueue { - /// Create a new, empty queue with the given length. - pub fn new(max_length: usize) -> Self { - Self { - queue: VecDeque::default(), - max_length, - } - } - - /// Add a new item to the front of the queue. - /// - /// If the queue is full, the item at the back of the queue is dropped. - pub fn push(&mut self, item: T) { - if self.queue.len() == self.max_length { - self.queue.pop_back(); - } - self.queue.push_front(item); - } - - /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } - - /// Returns `true` if the queue is full. - pub fn is_full(&self) -> bool { - self.queue.len() >= self.max_length - } - - /// Returns the current length of the queue. - pub fn len(&self) -> usize { - self.queue.len() - } -} - /// A handle that sends a message on the provided channel to a receiver when it gets dropped. /// /// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache` @@ -834,74 +631,8 @@ impl BeaconProcessor { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::(MAX_IDLE_QUEUE_LEN); - // Using LIFO queues for attestations since validator profits rely upon getting fresh - // attestations into blocks. Additionally, later attestations contain more information than - // earlier ones, so we consider them more valuable. - let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); - let mut aggregate_debounce = TimeLatch::default(); - let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); - let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue); - let mut attestation_debounce = TimeLatch::default(); - let mut unknown_block_aggregate_queue = - LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); - let mut unknown_block_attestation_queue = - LifoQueue::new(queue_lengths.unknown_block_attestation_queue); - - let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); - let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); - - // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have - // a strong feeling about queue type for exits. - let mut gossip_voluntary_exit_queue = - FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); - - // Using a FIFO queue for slashing to prevent people from flushing their slashings from the - // queues with lots of junk messages. - let mut gossip_proposer_slashing_queue = - FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); - let mut gossip_attester_slashing_queue = - FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); - - // Using a FIFO queue since blocks need to be imported sequentially. - let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); - let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); - let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); - let mut column_reconstruction_queue = - LifoQueue::new(queue_lengths.column_reconstruction_queue); - let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); - let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); - let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); - let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); - let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); - let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); - - let mut status_queue = FifoQueue::new(queue_lengths.status_queue); - let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); - let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); - let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); - let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); - let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); - let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); - - let mut gossip_bls_to_execution_change_queue = - FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); - - // Using FIFO queues for light client updates to maintain sequence order. - let mut lc_gossip_finality_update_queue = - FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); - let mut lc_gossip_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); - let mut unknown_light_client_update_queue = - FifoQueue::new(queue_lengths.unknown_light_client_update_queue); - let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); - let mut lc_rpc_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); - let mut lc_rpc_finality_update_queue = - FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); - let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue); - - let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); - let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); + // Initialize the worker queues. + let mut work_queues: WorkQueues = WorkQueues::new(queue_lengths); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). @@ -1030,228 +761,240 @@ impl BeaconProcessor { None if can_spawn => { // Check for chain segments first, they're the most efficient way to get // blocks into the system. - let work_event: Option> = - if let Some(item) = chain_segment_queue.pop() { - Some(item) - // Check sync blocks before gossip blocks, since we've already explicitly - // requested these blocks. - } else if let Some(item) = rpc_block_queue.pop() { - Some(item) - } else if let Some(item) = rpc_blob_queue.pop() { - Some(item) - } else if let Some(item) = rpc_custody_column_queue.pop() { - Some(item) - } else if let Some(item) = rpc_custody_column_queue.pop() { - Some(item) - // Check delayed blocks before gossip blocks, the gossip blocks might rely - // on the delayed ones. - } else if let Some(item) = delayed_block_queue.pop() { - Some(item) - // Check gossip blocks before gossip attestations, since a block might be - // required to verify some attestations. - } else if let Some(item) = gossip_block_queue.pop() { - Some(item) - } else if let Some(item) = gossip_blob_queue.pop() { - Some(item) - } else if let Some(item) = gossip_data_column_queue.pop() { - Some(item) - } else if let Some(item) = column_reconstruction_queue.pop() { - Some(item) - // Check the priority 0 API requests after blocks and blobs, but before attestations. - } else if let Some(item) = api_request_p0_queue.pop() { - Some(item) - // Check the aggregates, *then* the unaggregates since we assume that - // aggregates are more valuable to local validators and effectively give us - // more information with less signature verification time. - } else if aggregate_queue.len() > 0 { - let batch_size = cmp::min( - aggregate_queue.len(), - self.config.max_gossip_aggregate_batch_size, - ); - - if batch_size < 2 { - // One single aggregate is in the queue, process it individually. - aggregate_queue.pop() - } else { - // Collect two or more aggregates into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAggregate` item into a - // `Work::GossipAggregateBatch` item. - let mut aggregates = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = aggregate_queue.pop() { - match item { - Work::GossipAggregate { - aggregate, - process_individual: _, - process_batch, - } => { - aggregates.push(*aggregate); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } - } - _ => { - error!("Invalid item in aggregate queue"); + let work_event: Option> = if let Some(item) = + work_queues.chain_segment_queue.pop() + { + Some(item) + // Check sync blocks before gossip blocks, since we've already explicitly + // requested these blocks. + } else if let Some(item) = work_queues.rpc_block_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_blob_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { + Some(item) + // Check delayed blocks before gossip blocks, the gossip blocks might rely + // on the delayed ones. + } else if let Some(item) = work_queues.delayed_block_queue.pop() { + Some(item) + // Check gossip blocks before gossip attestations, since a block might be + // required to verify some attestations. + } else if let Some(item) = work_queues.gossip_block_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.gossip_blob_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.gossip_data_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.column_reconstruction_queue.pop() { + Some(item) + // Check the priority 0 API requests after blocks and blobs, but before attestations. + } else if let Some(item) = work_queues.api_request_p0_queue.pop() { + Some(item) + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively give us + // more information with less signature verification time. + } else if !work_queues.aggregate_queue.is_empty() { + let batch_size = cmp::min( + work_queues.aggregate_queue.len(), + self.config.max_gossip_aggregate_batch_size, + ); + + if batch_size < 2 { + // One single aggregate is in the queue, process it individually. + work_queues.aggregate_queue.pop() + } else { + // Collect two or more aggregates into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAggregate` item into a + // `Work::GossipAggregateBatch` item. + let mut aggregates = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = work_queues.aggregate_queue.pop() { + match item { + Work::GossipAggregate { + aggregate, + process_individual: _, + process_batch, + } => { + aggregates.push(*aggregate); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); } } + _ => { + error!("Invalid item in aggregate queue"); + } } } - - if let Some(process_batch) = process_batch_opt { - // Process all aggregates with a single worker. - Some(Work::GossipAggregateBatch { - aggregates, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing aggregate work"); - None - } } - // Check the unaggregated attestation queue. - // - // Potentially use batching. - } else if attestation_queue.len() > 0 { - let batch_size = cmp::min( - attestation_queue.len(), - self.config.max_gossip_attestation_batch_size, - ); - - if batch_size < 2 { - // One single attestation is in the queue, process it individually. - attestation_queue.pop() + + if let Some(process_batch) = process_batch_opt { + // Process all aggregates with a single worker. + Some(Work::GossipAggregateBatch { + aggregates, + process_batch, + }) } else { - // Collect two or more attestations into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAttestation` item into a - // `Work::GossipAttestationBatch` item. - let mut attestations = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = attestation_queue.pop() { - match item { - Work::GossipAttestation { - attestation, - process_individual: _, - process_batch, - } => { - attestations.push(*attestation); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing aggregate work"); + None + } + } + // Check the unaggregated attestation queue. + // + // Potentially use batching. + } else if !work_queues.attestation_queue.is_empty() { + let batch_size = cmp::min( + work_queues.attestation_queue.len(), + self.config.max_gossip_attestation_batch_size, + ); + + if batch_size < 2 { + // One single attestation is in the queue, process it individually. + work_queues.attestation_queue.pop() + } else { + // Collect two or more attestations into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAttestation` item into a + // `Work::GossipAttestationBatch` item. + let mut attestations = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = work_queues.attestation_queue.pop() { + match item { + Work::GossipAttestation { + attestation, + process_individual: _, + process_batch, + } => { + attestations.push(*attestation); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); } - _ => error!("Invalid item in attestation queue"), } + _ => error!("Invalid item in attestation queue"), } } - - if let Some(process_batch) = process_batch_opt { - // Process all attestations with a single worker. - Some(Work::GossipAttestationBatch { - attestations, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing attestations work"); - None - } } - // Convert any gossip attestations that need to be converted. - } else if let Some(item) = attestation_to_convert_queue.pop() { - Some(item) - // Check sync committee messages after attestations as their rewards are lesser - // and they don't influence fork choice. - } else if let Some(item) = sync_contribution_queue.pop() { - Some(item) - } else if let Some(item) = sync_message_queue.pop() { - Some(item) - // Aggregates and unaggregates queued for re-processing are older and we - // care about fresher ones, so check those first. - } else if let Some(item) = unknown_block_aggregate_queue.pop() { - Some(item) - } else if let Some(item) = unknown_block_attestation_queue.pop() { - Some(item) - // Check RPC methods next. Status messages are needed for sync so - // prioritize them over syncing requests from other peers (BlocksByRange - // and BlocksByRoot) - } else if let Some(item) = status_queue.pop() { - Some(item) - } else if let Some(item) = bbrange_queue.pop() { - Some(item) - } else if let Some(item) = bbroots_queue.pop() { - Some(item) - } else if let Some(item) = blbrange_queue.pop() { - Some(item) - } else if let Some(item) = blbroots_queue.pop() { - Some(item) - } else if let Some(item) = dcbroots_queue.pop() { - Some(item) - } else if let Some(item) = dcbrange_queue.pop() { - Some(item) - // Check slashings after all other consensus messages so we prioritize - // following head. - // - // Check attester slashings before proposer slashings since they have the - // potential to slash multiple validators at once. - } else if let Some(item) = gossip_attester_slashing_queue.pop() { - Some(item) - } else if let Some(item) = gossip_proposer_slashing_queue.pop() { - Some(item) - // Check exits and address changes late since our validators don't get - // rewards from them. - } else if let Some(item) = gossip_voluntary_exit_queue.pop() { - Some(item) - } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { - Some(item) - // Check the priority 1 API requests after we've - // processed all the interesting things from the network - // and things required for us to stay in good repute - // with our P2P peers. - } else if let Some(item) = api_request_p1_queue.pop() { - Some(item) - // Handle backfill sync chain segments. - } else if let Some(item) = backfill_chain_segment.pop() { - Some(item) - // Handle light client requests. - } else if let Some(item) = lc_gossip_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() { - Some(item) - } else if let Some(item) = unknown_light_client_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_bootstrap_queue.pop() { - Some(item) - } else if let Some(item) = lc_rpc_optimistic_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_rpc_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_update_range_queue.pop() { - Some(item) - // This statement should always be the final else statement. - } else { - // Let the journal know that a worker is freed and there's nothing else - // for it to do. - if let Some(work_journal_tx) = &work_journal_tx { - // We don't care if this message was successfully sent, we only use the journal - // during testing. - let _ = work_journal_tx.try_send(NOTHING_TO_DO); + + if let Some(process_batch) = process_batch_opt { + // Process all attestations with a single worker. + Some(Work::GossipAttestationBatch { + attestations, + process_batch, + }) + } else { + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing attestations work"); + None } - None - }; + } + // Convert any gossip attestations that need to be converted. + } else if let Some(item) = work_queues.attestation_to_convert_queue.pop() { + Some(item) + // Check sync committee messages after attestations as their rewards are lesser + // and they don't influence fork choice. + } else if let Some(item) = work_queues.sync_contribution_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.sync_message_queue.pop() { + Some(item) + // Aggregates and unaggregates queued for re-processing are older and we + // care about fresher ones, so check those first. + } else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.unknown_block_attestation_queue.pop() + { + Some(item) + // Check RPC methods next. Status messages are needed for sync so + // prioritize them over syncing requests from other peers (BlocksByRange + // and BlocksByRoot) + } else if let Some(item) = work_queues.status_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.bbrange_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.bbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.blbrange_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.blbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.dcbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.dcbrange_queue.pop() { + Some(item) + // Check slashings after all other consensus messages so we prioritize + // following head. + // + // Check attester slashings before proposer slashings since they have the + // potential to slash multiple validators at once. + } else if let Some(item) = work_queues.gossip_attester_slashing_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.gossip_proposer_slashing_queue.pop() + { + Some(item) + // Check exits and address changes late since our validators don't get + // rewards from them. + } else if let Some(item) = work_queues.gossip_voluntary_exit_queue.pop() { + Some(item) + } else if let Some(item) = + work_queues.gossip_bls_to_execution_change_queue.pop() + { + Some(item) + // Check the priority 1 API requests after we've + // processed all the interesting things from the network + // and things required for us to stay in good repute + // with our P2P peers. + } else if let Some(item) = work_queues.api_request_p1_queue.pop() { + Some(item) + // Handle backfill sync chain segments. + } else if let Some(item) = work_queues.backfill_chain_segment.pop() { + Some(item) + // Handle light client requests. + } else if let Some(item) = work_queues.lc_gossip_finality_update_queue.pop() + { + Some(item) + } else if let Some(item) = + work_queues.lc_gossip_optimistic_update_queue.pop() + { + Some(item) + } else if let Some(item) = + work_queues.unknown_light_client_update_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.lc_bootstrap_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.lc_rpc_optimistic_update_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.lc_rpc_finality_update_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.lc_update_range_queue.pop() { + Some(item) + // This statement should always be the final else statement. + } else { + // Let the journal know that a worker is freed and there's nothing else + // for it to do. + if let Some(work_journal_tx) = &work_journal_tx { + // We don't care if this message was successfully sent, we only use the journal + // during testing. + let _ = work_journal_tx.try_send(NOTHING_TO_DO); + } + None + }; if let Some(work_event) = work_event { let work_type = work_event.to_type(); @@ -1304,14 +1047,16 @@ impl BeaconProcessor { } } _ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx), - Work::GossipAttestation { .. } => attestation_queue.push(work), + Work::GossipAttestation { .. } => { + work_queues.attestation_queue.push(work) + } // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. Work::GossipAttestationBatch { .. } => crit!( work_type = "GossipAttestationBatch", "Unsupported inbound event" ), - Work::GossipAggregate { .. } => aggregate_queue.push(work), + Work::GossipAggregate { .. } => work_queues.aggregate_queue.push(work), // Aggregate batches are formed internally within the `BeaconProcessor`, // they are not sent from external services. Work::GossipAggregateBatch { .. } => { @@ -1320,82 +1065,104 @@ impl BeaconProcessor { "Unsupported inbound event" ) } - Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id), - Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id), + Work::GossipBlock { .. } => { + work_queues.gossip_block_queue.push(work, work_id) + } + Work::GossipBlobSidecar { .. } => { + work_queues.gossip_blob_queue.push(work, work_id) + } Work::GossipDataColumnSidecar { .. } => { - gossip_data_column_queue.push(work, work_id) + work_queues.gossip_data_column_queue.push(work, work_id) } Work::DelayedImportBlock { .. } => { - delayed_block_queue.push(work, work_id) + work_queues.delayed_block_queue.push(work, work_id) } Work::GossipVoluntaryExit { .. } => { - gossip_voluntary_exit_queue.push(work, work_id) - } - Work::GossipProposerSlashing { .. } => { - gossip_proposer_slashing_queue.push(work, work_id) + work_queues.gossip_voluntary_exit_queue.push(work, work_id) } - Work::GossipAttesterSlashing { .. } => { - gossip_attester_slashing_queue.push(work, work_id) + Work::GossipProposerSlashing { .. } => work_queues + .gossip_proposer_slashing_queue + .push(work, work_id), + Work::GossipAttesterSlashing { .. } => work_queues + .gossip_attester_slashing_queue + .push(work, work_id), + Work::GossipSyncSignature { .. } => { + work_queues.sync_message_queue.push(work) } - Work::GossipSyncSignature { .. } => sync_message_queue.push(work), Work::GossipSyncContribution { .. } => { - sync_contribution_queue.push(work) - } - Work::GossipLightClientFinalityUpdate { .. } => { - lc_gossip_finality_update_queue.push(work, work_id) - } - Work::GossipLightClientOptimisticUpdate { .. } => { - lc_gossip_optimistic_update_queue.push(work, work_id) + work_queues.sync_contribution_queue.push(work) } + Work::GossipLightClientFinalityUpdate { .. } => work_queues + .lc_gossip_finality_update_queue + .push(work, work_id), + Work::GossipLightClientOptimisticUpdate { .. } => work_queues + .lc_gossip_optimistic_update_queue + .push(work, work_id), Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { - rpc_block_queue.push(work, work_id) + work_queues.rpc_block_queue.push(work, work_id) } - Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id), + Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), Work::RpcCustodyColumn { .. } => { - rpc_custody_column_queue.push(work, work_id) + work_queues.rpc_custody_column_queue.push(work, work_id) + } + Work::ColumnReconstruction(_) => { + work_queues.column_reconstruction_queue.push(work) + } + Work::ChainSegment { .. } => { + work_queues.chain_segment_queue.push(work, work_id) } - Work::ColumnReconstruction(_) => column_reconstruction_queue.push(work), - Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id), Work::ChainSegmentBackfill { .. } => { - backfill_chain_segment.push(work, work_id) + work_queues.backfill_chain_segment.push(work, work_id) } - Work::Status { .. } => status_queue.push(work, work_id), - Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id), - Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id), - Work::BlobsByRangeRequest { .. } => blbrange_queue.push(work, work_id), - Work::LightClientBootstrapRequest { .. } => { - lc_bootstrap_queue.push(work, work_id) + Work::Status { .. } => work_queues.status_queue.push(work, work_id), + Work::BlocksByRangeRequest { .. } => { + work_queues.bbrange_queue.push(work, work_id) } - Work::LightClientOptimisticUpdateRequest { .. } => { - lc_rpc_optimistic_update_queue.push(work, work_id) + Work::BlocksByRootsRequest { .. } => { + work_queues.bbroots_queue.push(work, work_id) + } + Work::BlobsByRangeRequest { .. } => { + work_queues.blbrange_queue.push(work, work_id) + } + Work::LightClientBootstrapRequest { .. } => { + work_queues.lc_bootstrap_queue.push(work, work_id) } + Work::LightClientOptimisticUpdateRequest { .. } => work_queues + .lc_rpc_optimistic_update_queue + .push(work, work_id), Work::LightClientFinalityUpdateRequest { .. } => { - lc_rpc_finality_update_queue.push(work, work_id) + work_queues.lc_rpc_finality_update_queue.push(work, work_id) } Work::LightClientUpdatesByRangeRequest { .. } => { - lc_update_range_queue.push(work, work_id) + work_queues.lc_update_range_queue.push(work, work_id) } Work::UnknownBlockAttestation { .. } => { - unknown_block_attestation_queue.push(work) + work_queues.unknown_block_attestation_queue.push(work) } Work::UnknownBlockAggregate { .. } => { - unknown_block_aggregate_queue.push(work) + work_queues.unknown_block_aggregate_queue.push(work) } - Work::GossipBlsToExecutionChange { .. } => { - gossip_bls_to_execution_change_queue.push(work, work_id) + Work::GossipBlsToExecutionChange { .. } => work_queues + .gossip_bls_to_execution_change_queue + .push(work, work_id), + Work::BlobsByRootsRequest { .. } => { + work_queues.blbroots_queue.push(work, work_id) } - Work::BlobsByRootsRequest { .. } => blbroots_queue.push(work, work_id), Work::DataColumnsByRootsRequest { .. } => { - dcbroots_queue.push(work, work_id) + work_queues.dcbroots_queue.push(work, work_id) } Work::DataColumnsByRangeRequest { .. } => { - dcbrange_queue.push(work, work_id) + work_queues.dcbrange_queue.push(work, work_id) } - Work::UnknownLightClientOptimisticUpdate { .. } => { - unknown_light_client_update_queue.push(work, work_id) + Work::UnknownLightClientOptimisticUpdate { .. } => work_queues + .unknown_light_client_update_queue + .push(work, work_id), + Work::ApiRequestP0 { .. } => { + work_queues.api_request_p0_queue.push(work, work_id) + } + Work::ApiRequestP1 { .. } => { + work_queues.api_request_p1_queue.push(work, work_id) } - Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id), - Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id), }; Some(work_type) } @@ -1403,57 +1170,81 @@ impl BeaconProcessor { if let Some(modified_queue_id) = modified_queue_id { let queue_len = match modified_queue_id { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), + WorkType::GossipAttestation => work_queues.attestation_queue.len(), + WorkType::GossipAttestationToConvert => { + work_queues.attestation_to_convert_queue.len() + } + WorkType::UnknownBlockAttestation => { + work_queues.unknown_block_attestation_queue.len() + } WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::GossipAggregate => work_queues.aggregate_queue.len(), + WorkType::UnknownBlockAggregate => { + work_queues.unknown_block_aggregate_queue.len() + } WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() + work_queues.unknown_light_client_update_queue.len() } WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipBlock => work_queues.gossip_block_queue.len(), + WorkType::GossipBlobSidecar => work_queues.gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => { + work_queues.gossip_data_column_queue.len() + } + WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => { + work_queues.gossip_voluntary_exit_queue.len() + } + WorkType::GossipProposerSlashing => { + work_queues.gossip_proposer_slashing_queue.len() + } + WorkType::GossipAttesterSlashing => { + work_queues.gossip_attester_slashing_queue.len() + } + WorkType::GossipSyncSignature => work_queues.sync_message_queue.len(), + WorkType::GossipSyncContribution => { + work_queues.sync_contribution_queue.len() + } WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() + work_queues.lc_gossip_finality_update_queue.len() } WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() + work_queues.lc_gossip_optimistic_update_queue.len() + } + WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { + work_queues.rpc_blob_queue.len() + } + WorkType::RpcCustodyColumn => work_queues.rpc_custody_column_queue.len(), + WorkType::ColumnReconstruction => { + work_queues.column_reconstruction_queue.len() } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::ColumnReconstruction => column_reconstruction_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::ChainSegment => work_queues.chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => work_queues.backfill_chain_segment.len(), + WorkType::Status => work_queues.status_queue.len(), + WorkType::BlocksByRangeRequest => work_queues.blbrange_queue.len(), + WorkType::BlocksByRootsRequest => work_queues.blbroots_queue.len(), + WorkType::BlobsByRangeRequest => work_queues.bbrange_queue.len(), + WorkType::BlobsByRootsRequest => work_queues.bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => work_queues.dcbrange_queue.len(), WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() + work_queues.gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => { + work_queues.lc_bootstrap_queue.len() } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() + work_queues.lc_rpc_optimistic_update_queue.len() } WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() + work_queues.lc_rpc_finality_update_queue.len() + } + WorkType::LightClientUpdatesByRangeRequest => { + work_queues.lc_update_range_queue.len() } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::ApiRequestP0 => work_queues.api_request_p0_queue.len(), + WorkType::ApiRequestP1 => work_queues.api_request_p1_queue.len(), WorkType::Reprocess => 0, }; metrics::observe_vec( @@ -1463,18 +1254,21 @@ impl BeaconProcessor { ); } - if aggregate_queue.is_full() && aggregate_debounce.elapsed() { + if work_queues.aggregate_queue.is_full() && work_queues.aggregate_debounce.elapsed() + { error!( msg = "the system has insufficient resources for load", - queue_len = aggregate_queue.max_length, + queue_len = work_queues.aggregate_queue.max_length, "Aggregate attestation queue full" ) } - if attestation_queue.is_full() && attestation_debounce.elapsed() { + if work_queues.attestation_queue.is_full() + && work_queues.attestation_debounce.elapsed() + { error!( msg = "the system has insufficient resources for load", - queue_len = attestation_queue.max_length, + queue_len = work_queues.attestation_queue.max_length, "Attestation queue full" ) } @@ -1722,21 +1516,3 @@ impl Drop for SendOnDrop { } } } - -#[cfg(test)] -mod tests { - use super::*; - use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; - - #[test] - fn min_queue_len() { - // State with no validators. - let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::mainnet()); - let genesis_time = 0; - let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); - assert_eq!(state.validators().len(), 0); - let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); - assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); - assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); - } -} diff --git a/beacon_node/beacon_processor/src/scheduler/mod.rs b/beacon_node/beacon_processor/src/scheduler/mod.rs index e1a076a7c54..96393f9f3b8 100644 --- a/beacon_node/beacon_processor/src/scheduler/mod.rs +++ b/beacon_node/beacon_processor/src/scheduler/mod.rs @@ -1 +1,4 @@ +pub mod work_queue; pub mod work_reprocessing_queue; + +pub use work_queue::BeaconProcessorQueueLengths; diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs new file mode 100644 index 00000000000..bfebbf12737 --- /dev/null +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -0,0 +1,388 @@ +use crate::Work; +use logging::TimeLatch; +use std::collections::VecDeque; +use tracing::error; +use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; + +/// Over-provision queues based on active validator count by some factor. The beacon chain has +/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning +/// slightly, we don't need to adjust the queues during the lifetime of a process. +const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; + +/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues +/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that +/// seems reasonable. +const MIN_QUEUE_LEN: usize = 128; + +/// A simple first-in-first-out queue with a maximum length. +pub struct FifoQueue { + queue: VecDeque, + max_length: usize, +} + +impl FifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the queue. + /// + /// Drops `item` if the queue is full. + pub fn push(&mut self, item: T, item_desc: &str) { + if self.queue.len() == self.max_length { + error!( + queue = item_desc, + queue_len = self.max_length, + msg = "the system has insufficient resources for load", + "Work queue is full", + ) + } else { + self.queue.push_back(item); + } + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/// A simple last-in-first-out queue with a maximum length. +pub struct LifoQueue { + queue: VecDeque, + pub max_length: usize, +} + +impl LifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the front of the queue. + /// + /// If the queue is full, the item at the back of the queue is dropped. + pub fn push(&mut self, item: T) { + if self.queue.len() == self.max_length { + self.queue.pop_back(); + } + self.queue.push_front(item); + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns `true` if the queue is full. + pub fn is_full(&self) -> bool { + self.queue.len() >= self.max_length + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/// Maximum number of queued items that will be stored before dropping them +pub struct BeaconProcessorQueueLengths { + aggregate_queue: usize, + attestation_queue: usize, + unknown_block_aggregate_queue: usize, + unknown_block_attestation_queue: usize, + sync_message_queue: usize, + sync_contribution_queue: usize, + gossip_voluntary_exit_queue: usize, + gossip_proposer_slashing_queue: usize, + gossip_attester_slashing_queue: usize, + unknown_light_client_update_queue: usize, + rpc_block_queue: usize, + rpc_blob_queue: usize, + rpc_custody_column_queue: usize, + column_reconstruction_queue: usize, + chain_segment_queue: usize, + backfill_chain_segment: usize, + gossip_block_queue: usize, + gossip_blob_queue: usize, + gossip_data_column_queue: usize, + delayed_block_queue: usize, + status_queue: usize, + bbrange_queue: usize, + bbroots_queue: usize, + blbroots_queue: usize, + blbrange_queue: usize, + dcbroots_queue: usize, + dcbrange_queue: usize, + gossip_bls_to_execution_change_queue: usize, + lc_bootstrap_queue: usize, + lc_rpc_optimistic_update_queue: usize, + lc_rpc_finality_update_queue: usize, + lc_gossip_finality_update_queue: usize, + lc_gossip_optimistic_update_queue: usize, + lc_update_range_queue: usize, + api_request_p0_queue: usize, + api_request_p1_queue: usize, +} + +impl BeaconProcessorQueueLengths { + pub fn from_state( + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + let active_validator_count = + match state.get_cached_active_validator_indices(RelativeEpoch::Current) { + Ok(indices) => indices.len(), + Err(_) => state + .get_active_validator_indices(state.current_epoch(), spec) + .map_err(|e| format!("Error computing active indices: {:?}", e))? + .len(), + }; + let active_validator_count = + (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; + let slots_per_epoch = E::slots_per_epoch() as usize; + + Ok(Self { + aggregate_queue: 4096, + unknown_block_aggregate_queue: 1024, + // Capacity for a full slot's worth of attestations if subscribed to all subnets + attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), + // Capacity for a full slot's worth of attestations if subscribed to all subnets + unknown_block_attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), + sync_message_queue: 2048, + sync_contribution_queue: 1024, + gossip_voluntary_exit_queue: 4096, + gossip_proposer_slashing_queue: 4096, + gossip_attester_slashing_queue: 4096, + unknown_light_client_update_queue: 128, + rpc_block_queue: 1024, + rpc_blob_queue: 1024, + // We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit + // this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB. + rpc_custody_column_queue: 64, + column_reconstruction_queue: 1, + chain_segment_queue: 64, + backfill_chain_segment: 64, + gossip_block_queue: 1024, + gossip_blob_queue: 1024, + gossip_data_column_queue: 1024, + delayed_block_queue: 1024, + status_queue: 1024, + bbrange_queue: 1024, + bbroots_queue: 1024, + blbroots_queue: 1024, + blbrange_queue: 1024, + dcbroots_queue: 1024, + dcbrange_queue: 1024, + gossip_bls_to_execution_change_queue: 16384, + lc_gossip_finality_update_queue: 1024, + lc_gossip_optimistic_update_queue: 1024, + lc_bootstrap_queue: 1024, + lc_rpc_optimistic_update_queue: 512, + lc_rpc_finality_update_queue: 512, + lc_update_range_queue: 512, + api_request_p0_queue: 1024, + api_request_p1_queue: 1024, + }) + } +} + +pub struct WorkQueues { + pub aggregate_queue: LifoQueue>, + pub aggregate_debounce: TimeLatch, + pub attestation_queue: LifoQueue>, + pub attestation_to_convert_queue: LifoQueue>, + pub attestation_debounce: TimeLatch, + pub unknown_block_aggregate_queue: LifoQueue>, + pub unknown_block_attestation_queue: LifoQueue>, + pub sync_message_queue: LifoQueue>, + pub sync_contribution_queue: LifoQueue>, + pub gossip_voluntary_exit_queue: FifoQueue>, + pub gossip_proposer_slashing_queue: FifoQueue>, + pub gossip_attester_slashing_queue: FifoQueue>, + pub unknown_light_client_update_queue: FifoQueue>, + pub rpc_block_queue: FifoQueue>, + pub rpc_blob_queue: FifoQueue>, + pub rpc_custody_column_queue: FifoQueue>, + pub column_reconstruction_queue: LifoQueue>, + pub chain_segment_queue: FifoQueue>, + pub backfill_chain_segment: FifoQueue>, + pub gossip_block_queue: FifoQueue>, + pub gossip_blob_queue: FifoQueue>, + pub gossip_data_column_queue: FifoQueue>, + pub delayed_block_queue: FifoQueue>, + pub status_queue: FifoQueue>, + pub bbrange_queue: FifoQueue>, + pub bbroots_queue: FifoQueue>, + pub blbroots_queue: FifoQueue>, + pub blbrange_queue: FifoQueue>, + pub dcbroots_queue: FifoQueue>, + pub dcbrange_queue: FifoQueue>, + pub gossip_bls_to_execution_change_queue: FifoQueue>, + pub lc_gossip_finality_update_queue: FifoQueue>, + pub lc_gossip_optimistic_update_queue: FifoQueue>, + pub lc_bootstrap_queue: FifoQueue>, + pub lc_rpc_optimistic_update_queue: FifoQueue>, + pub lc_rpc_finality_update_queue: FifoQueue>, + pub lc_update_range_queue: FifoQueue>, + pub api_request_p0_queue: FifoQueue>, + pub api_request_p1_queue: FifoQueue>, +} + +impl WorkQueues { + pub fn new(queue_lengths: BeaconProcessorQueueLengths) -> Self { + // Using LIFO queues for attestations since validator profits rely upon getting fresh + // attestations into blocks. Additionally, later attestations contain more information than + // earlier ones, so we consider them more valuable. + let aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); + let aggregate_debounce = TimeLatch::default(); + let attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); + let attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue); + let attestation_debounce = TimeLatch::default(); + let unknown_block_aggregate_queue = + LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); + let unknown_block_attestation_queue = + LifoQueue::new(queue_lengths.unknown_block_attestation_queue); + + let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); + let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); + + // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have + // a strong feeling about queue type for exits. + let gossip_voluntary_exit_queue = FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); + + // Using a FIFO queue for slashing to prevent people from flushing their slashings from the + // queues with lots of junk messages. + let gossip_proposer_slashing_queue = + FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); + let gossip_attester_slashing_queue = + FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); + + // Using a FIFO queue for light client updates to maintain sequence order. + let unknown_light_client_update_queue = + FifoQueue::new(queue_lengths.unknown_light_client_update_queue); + // Using a FIFO queue since blocks need to be imported sequentially. + let rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); + let rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); + let rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); + let column_reconstruction_queue = LifoQueue::new(queue_lengths.column_reconstruction_queue); + let chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); + let backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); + let gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); + let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); + let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); + let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + + let status_queue = FifoQueue::new(queue_lengths.status_queue); + let bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); + let bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); + let blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); + let blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); + let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); + let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); + + let gossip_bls_to_execution_change_queue = + FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); + + let lc_gossip_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); + let lc_gossip_finality_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); + let lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); + let lc_rpc_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); + let lc_rpc_finality_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); + let lc_update_range_queue: FifoQueue> = + FifoQueue::new(queue_lengths.lc_update_range_queue); + + let api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); + let api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); + + WorkQueues { + aggregate_queue, + aggregate_debounce, + attestation_queue, + attestation_to_convert_queue, + attestation_debounce, + unknown_block_aggregate_queue, + unknown_block_attestation_queue, + sync_message_queue, + sync_contribution_queue, + gossip_voluntary_exit_queue, + gossip_proposer_slashing_queue, + gossip_attester_slashing_queue, + unknown_light_client_update_queue, + rpc_block_queue, + rpc_blob_queue, + rpc_custody_column_queue, + chain_segment_queue, + column_reconstruction_queue, + backfill_chain_segment, + gossip_block_queue, + gossip_blob_queue, + gossip_data_column_queue, + delayed_block_queue, + status_queue, + bbrange_queue, + bbroots_queue, + blbroots_queue, + blbrange_queue, + dcbroots_queue, + dcbrange_queue, + gossip_bls_to_execution_change_queue, + lc_gossip_optimistic_update_queue, + lc_gossip_finality_update_queue, + lc_bootstrap_queue, + lc_rpc_optimistic_update_queue, + lc_rpc_finality_update_queue, + lc_update_range_queue, + api_request_p0_queue, + api_request_p1_queue, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; + + #[test] + fn min_queue_len() { + // State with no validators. + let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet()); + let genesis_time = 0; + let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); + assert_eq!(state.validators().len(), 0); + let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); + assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); + assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); + } +}