Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 81 additions & 67 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::{Duration, Instant};
use strum::IntoStaticStr;
use strum::{EnumIter, IntoEnumIterator, IntoStaticStr};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
Expand Down Expand Up @@ -627,7 +627,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
}
}

#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
#[derive(IntoStaticStr, EnumIter, PartialEq, Eq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
Expand Down Expand Up @@ -731,6 +731,17 @@ impl<E: EthSpec> Work<E> {
}
}

impl WorkType {
/// Return the type of work that was batched to create this type of work, if any.
pub fn batched_from(self) -> Option<WorkType> {
match self {
WorkType::GossipAggregateBatch => Some(WorkType::GossipAggregate),
WorkType::GossipAttestationBatch => Some(WorkType::GossipAttestation),
_ => None,
}
}
}

/// Unifies all the messages processed by the `BeaconProcessor`.
enum InboundEvent<E: EthSpec> {
/// A worker has completed a task and is free.
Expand Down Expand Up @@ -1022,7 +1033,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
.is_some_and(|event| event.drop_during_sync);

let idle_tx = idle_tx.clone();
let modified_queue_id = match work_event {
match work_event {
// There is no new work event, but we are able to spawn a new worker.
//
// We don't check the `work.drop_during_sync` here. We assume that if it made
Expand Down Expand Up @@ -1403,68 +1414,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
};

if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
WorkType::DelayedImportBlock => delayed_block_queue.len(),
WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(),
WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
WorkType::GossipSyncSignature => sync_message_queue.len(),
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
WorkType::GossipLightClientFinalityUpdate => {
lc_gossip_finality_update_queue.len()
}
WorkType::GossipLightClientOptimisticUpdate => {
lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
gossip_bls_to_execution_change_queue.len()
}
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
WorkType::LightClientOptimisticUpdateRequest => {
lc_rpc_optimistic_update_queue.len()
}
WorkType::LightClientFinalityUpdateRequest => {
lc_rpc_finality_update_queue.len()
}
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[modified_queue_id.into()],
queue_len as f64,
);
}

if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
msg = "the system has insufficient resources for load",
Expand All @@ -1480,6 +1429,71 @@ impl<E: EthSpec> BeaconProcessor<E> {
"Attestation queue full"
)
}

let get_queue_len = |work_type| match work_type {
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
WorkType::DelayedImportBlock => delayed_block_queue.len(),
WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(),
WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
WorkType::GossipSyncSignature => sync_message_queue.len(),
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
WorkType::GossipLightClientFinalityUpdate => {
lc_gossip_finality_update_queue.len()
}
WorkType::GossipLightClientOptimisticUpdate => {
lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
gossip_bls_to_execution_change_queue.len()
}
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
WorkType::LightClientOptimisticUpdateRequest => {
lc_rpc_optimistic_update_queue.len()
}
WorkType::LightClientFinalityUpdateRequest => {
lc_rpc_finality_update_queue.len()
}
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};

// Update metrics for all work events.
for work_type in WorkType::iter() {
// Update the metric for the length of this work type's queue.
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[work_type.into()],
get_queue_len(work_type) as f64,
);
}
}
};

Expand Down Expand Up @@ -1690,10 +1704,10 @@ impl Drop for SendOnDrop {
fn drop(&mut self) {
metrics::dec_gauge_vec(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
&[self.work_type.clone().into()],
&[self.work_type.into()],
);

if let Err(e) = self.tx.try_send(self.work_type.clone()) {
if let Err(e) = self.tx.try_send(self.work_type) {
warn!(
msg = "did not free worker, shutdown may be underway",
error = %e,
Expand Down
Loading