diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ab9ab045f4e..7df2b378833 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -627,7 +627,7 @@ impl fmt::Debug for Work { } } -#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)] +#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, @@ -1022,7 +1022,7 @@ impl BeaconProcessor { .is_some_and(|event| event.drop_during_sync); let idle_tx = idle_tx.clone(); - let modified_queue_id = match work_event { + match work_event { // There is no new work event, but we are able to spawn a new worker. // // We don't check the `work.drop_during_sync` here. We assume that if it made @@ -1254,11 +1254,7 @@ impl BeaconProcessor { }; if let Some(work_event) = work_event { - let work_type = work_event.to_type(); self.spawn_worker(work_event, created_timestamp, idle_tx); - Some(work_type) - } else { - None } } // There is no new work event and we are unable to spawn a new worker. @@ -1269,7 +1265,6 @@ impl BeaconProcessor { msg = "no new work and cannot spawn worker", "Unexpected gossip processor condition" ); - None } // The chain is syncing and this event should be dropped during sync. Some(work_event) @@ -1286,7 +1281,6 @@ impl BeaconProcessor { work_id = work_id, "Gossip processor skipping work" ); - None } // There is a new work event and the chain is not syncing. Process it or queue // it. @@ -1294,6 +1288,88 @@ impl BeaconProcessor { let work_id = work.str_id(); let work_type = work.to_type(); + // Sample the length of the queue for this event type. It is important that + // we only sample upon receiving events, in order to make the full set of + // samples representative. + // + // See: https://github.com/sigp/lighthouse/pull/8020 + + // This closure gets the current length of the queue for a given + // `work_type`. + // + // It should be relocated when we shift the queues from being local + // variables to fields of the struct. + let get_queue_len = |work_type| match work_type { + WorkType::GossipAttestation => attestation_queue.len(), + WorkType::GossipAttestationToConvert => { + attestation_to_convert_queue.len() + } + WorkType::UnknownBlockAttestation => { + unknown_block_attestation_queue.len() + } + WorkType::GossipAttestationBatch => 0, // No queue + WorkType::GossipAggregate => aggregate_queue.len(), + WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::UnknownLightClientOptimisticUpdate => { + unknown_light_client_update_queue.len() + } + WorkType::GossipAggregateBatch => 0, // No queue + WorkType::GossipBlock => gossip_block_queue.len(), + WorkType::GossipBlobSidecar => gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), + WorkType::DelayedImportBlock => delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), + WorkType::GossipProposerSlashing => { + gossip_proposer_slashing_queue.len() + } + WorkType::GossipAttesterSlashing => { + gossip_attester_slashing_queue.len() + } + WorkType::GossipSyncSignature => sync_message_queue.len(), + WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipLightClientFinalityUpdate => { + lc_gossip_finality_update_queue.len() + } + WorkType::GossipLightClientOptimisticUpdate => { + lc_gossip_optimistic_update_queue.len() + } + WorkType::RpcBlock => rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), + WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), + WorkType::ColumnReconstruction => column_reconstruction_queue.len(), + WorkType::ChainSegment => chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::Status => status_queue.len(), + WorkType::BlocksByRangeRequest => blbrange_queue.len(), + WorkType::BlocksByRootsRequest => blbroots_queue.len(), + WorkType::BlobsByRangeRequest => bbrange_queue.len(), + WorkType::BlobsByRootsRequest => bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::GossipBlsToExecutionChange => { + gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), + WorkType::LightClientOptimisticUpdateRequest => { + lc_rpc_optimistic_update_queue.len() + } + WorkType::LightClientFinalityUpdateRequest => { + lc_rpc_finality_update_queue.len() + } + WorkType::LightClientUpdatesByRangeRequest => { + lc_update_range_queue.len() + } + WorkType::ApiRequestP0 => api_request_p0_queue.len(), + WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::Reprocess => 0, + }; + + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[work_type.into()], + get_queue_len(work_type) as f64, + ); + match work { Work::Reprocess(work_event) => { if let Err(e) = reprocess_work_tx.try_send(work_event) { @@ -1399,72 +1475,9 @@ impl BeaconProcessor { Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id), Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id), }; - Some(work_type) } }; - if let Some(modified_queue_id) = modified_queue_id { - let queue_len = match modified_queue_id { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), - WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), - WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() - } - WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), - WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() - } - WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() - } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::ColumnReconstruction => column_reconstruction_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), - WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() - } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), - WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() - } - WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() - } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), - WorkType::Reprocess => 0, - }; - metrics::observe_vec( - &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, - &[modified_queue_id.into()], - queue_len as f64, - ); - } - if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( msg = "the system has insufficient resources for load", @@ -1690,10 +1703,10 @@ impl Drop for SendOnDrop { fn drop(&mut self) { metrics::dec_gauge_vec( &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE, - &[self.work_type.clone().into()], + &[self.work_type.into()], ); - if let Err(e) = self.tx.try_send(self.work_type.clone()) { + if let Err(e) = self.tx.try_send(self.work_type) { warn!( msg = "did not free worker, shutdown may be underway", error = %e,