Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
}
}

#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
Expand Down Expand Up @@ -731,6 +731,17 @@ impl<E: EthSpec> Work<E> {
}
}

impl WorkType {
/// Return the type of work that was batched to create this type of work, if any.
pub fn batched_from(self) -> Option<WorkType> {
match self {
WorkType::GossipAggregateBatch => Some(WorkType::GossipAggregate),
WorkType::GossipAttestationBatch => Some(WorkType::GossipAttestation),
_ => None,
}
}
}

/// Unifies all the messages processed by the `BeaconProcessor`.
enum InboundEvent<E: EthSpec> {
/// A worker has completed a task and is free.
Expand Down Expand Up @@ -1404,7 +1415,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
};

if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
let get_queue_len = |work_type| match work_type {
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
Expand Down Expand Up @@ -1458,11 +1469,24 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};

// Update the metric for the length of this work type's queue.
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[modified_queue_id.into()],
queue_len as f64,
get_queue_len(modified_queue_id) as f64,
);

// If the work was batched from another type of work, also update that work
// event's queue length. There was previously a bug here where those queues
// metrics could go a long time without being updated at all.
if let Some(batched_work_type) = modified_queue_id.batched_from() {
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[batched_work_type.into()],
get_queue_len(batched_work_type) as f64,
);
}
}

if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
Expand Down Expand Up @@ -1690,10 +1714,10 @@ impl Drop for SendOnDrop {
fn drop(&mut self) {
metrics::dec_gauge_vec(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
&[self.work_type.clone().into()],
&[self.work_type.into()],
);

if let Err(e) = self.tx.try_send(self.work_type.clone()) {
if let Err(e) = self.tx.try_send(self.work_type) {
warn!(
msg = "did not free worker, shutdown may be underway",
error = %e,
Expand Down
Loading