diff --git a/Cargo.lock b/Cargo.lock index b97e470ba8..4053f997ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4225,6 +4225,7 @@ dependencies = [ "locktick", "lru 0.16.2", "mockall", + "once_cell", "open", "parking_lot", "paste", @@ -4234,6 +4235,8 @@ dependencies = [ "rand_chacha 0.3.1", "rand_distr", "rayon", + "serde", + "serde_json", "sha2", "snarkos-account", "snarkos-node-bft", diff --git a/Cargo.toml b/Cargo.toml index 9c441c07cb..b35700fd0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -296,10 +296,15 @@ serial = [ "snarkos-node-sync/serial", "snarkvm/serial" ] +tokio_console = [ "snarkos-cli/tokio_console" ] +# Set low puzzle targets for easier testing. test_targets = [ "snarkos-cli/test_targets" ] +# Set custom consensus heights for easier testing. test_consensus_heights = [ "snarkos-cli/test_consensus_heights" ] +# Set low puzzle targets and custom consensus heights for easier testing. test_network = [ "snarkos-cli/test_network" ] -tokio_console = [ "snarkos-cli/tokio_console" ] +# Track duration of consensus stages. +test_consensus_tracking = [ "snarkos-node/test_consensus_tracking" ] [dependencies.clap] workspace = true diff --git a/node/Cargo.toml b/node/Cargo.toml index 063755359c..4f66977fcd 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -54,6 +54,7 @@ serial = [ "snarkvm/serial", "snarkos-node-bft/serial" ] +test_consensus_tracking = [ "snarkos-node-bft/test_consensus_tracking", "snarkos-node-consensus/test_consensus_tracking" ] test = [] [dependencies.aleo-std] diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 3ae7b8d5f9..8e3ffb07f2 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -45,6 +45,7 @@ test = [ "snarkos-node-bft-ledger-service/test", "snarkos-node-bft-storage-service/test" ] +test_consensus_tracking = [ ] serial = [ "snarkos-node-metrics/serial", "snarkos-node-bft-ledger-service/serial" @@ -97,6 +98,16 @@ workspace = true [dependencies.rayon] workspace = true +[dependencies.serde] +workspace = true +features = [ "derive" ] + +[dependencies.serde_json] +workspace = true + +[dependencies.once_cell] +version = "1.0" + [dependencies.sha2] version = "0.10" default-features = false diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 0ce559dab7..a8db38e67b 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "test_consensus_tracking")] +use crate::helpers::{ConsensusStage, SubdagStage, record_event, start_subdag_stage}; use crate::{ MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, Primary, @@ -506,6 +508,9 @@ impl BFT { // Insert the certificate into the DAG. self.dag.write().insert(certificate); + #[cfg(feature = "test_consensus_tracking")] + record_event(certificate_round, ConsensusStage::CertificateAdded); + // ### Second, determine if a new leader certificate can be committed. ### let commit_round = certificate_round.saturating_sub(1); @@ -596,6 +601,9 @@ impl BFT { // Fetch the leader round. let latest_leader_round = leader_certificate.round(); + + #[cfg(feature = "test_consensus_tracking")] + start_subdag_stage(latest_leader_round.saturating_sub(2), latest_leader_round, SubdagStage::SubdagProcessing); // Determine the list of all previous leader certificates since the last committed round. // The order of the leader certificates is from **newest** to **oldest**. let mut leader_certificates = vec![leader_certificate.clone()]; diff --git a/node/bft/src/helpers/mod.rs b/node/bft/src/helpers/mod.rs index 28f27da3cb..e644e556f8 100644 --- a/node/bft/src/helpers/mod.rs +++ b/node/bft/src/helpers/mod.rs @@ -51,6 +51,11 @@ pub use telemetry::*; pub mod timestamp; pub use timestamp::*; +#[cfg(feature = "test_consensus_tracking")] +pub mod timing; +#[cfg(feature = "test_consensus_tracking")] +pub use timing::*; + /// Formats an ID into a truncated identifier (for logging purposes). pub fn fmt_id(id: impl ToString) -> String { let id = id.to_string(); diff --git a/node/bft/src/helpers/timing.rs b/node/bft/src/helpers/timing.rs new file mode 100644 index 0000000000..203ec73d26 --- /dev/null +++ b/node/bft/src/helpers/timing.rs @@ -0,0 +1,299 @@ +// Copyright (c) 2019-2025 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, sync::Arc, time::SystemTime}; + +#[cfg(feature = "locktick")] +use locktick::parking_lot::RwLock; +#[cfg(not(feature = "locktick"))] +use parking_lot::RwLock; + +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +/// Global storage for round-based event data +static ROUND_EVENTS: Lazy>>> = Lazy::new(Default::default); + +/// Global storage for subdag-based timing data (using lowest/highest rounds from subdag) +static SUBDAG_TIMINGS: Lazy>>> = Lazy::new(Default::default); + +/// Consensus stages that occur per round +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConsensusStage { + ProposalSeen, + ProposalCreated, + CertificateAdded, +} + +/// Block processing stages that occur per subdag +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SubdagStage { + SubdagProcessing, + PrepareAdvanceToNextQuorumBlock, + CheckNextBlock, + AdvanceToNextBlock, +} + +/// Timing event for a specific consensus stage +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TimingEvent { + pub round: u64, + pub timestamp: SystemTime, + pub event_type: String, +} + +/// Timing data for round-based events +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RoundEvents { + pub round: u64, + pub proposal_seen: Vec, + pub proposal_created: Vec, + pub certificate_added: Vec, +} + +/// Timing data for a specific subdag (identified by lowest and highest rounds) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubdagTimings { + pub lowest_round: u64, + pub highest_round: u64, + pub subdag_processing: Option<(SystemTime, Option)>, + pub prepare_advance_to_next_quorum_block: Option<(SystemTime, Option)>, + pub check_next_block: Option<(SystemTime, Option)>, + pub advance_to_next_block: Option<(SystemTime, Option)>, +} + +/// Combined timing data for export +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TimingSnapshot { + pub timestamp: SystemTime, + pub round_events: HashMap, + pub subdag_timings: HashMap, +} + +impl RoundEvents { + fn new(round: u64) -> Self { + Self { round, proposal_seen: Vec::new(), proposal_created: Vec::new(), certificate_added: Vec::new() } + } + + fn add_event(&mut self, stage: ConsensusStage, timestamp: SystemTime) { + let event = TimingEvent { + round: self.round, + timestamp, + event_type: match stage { + ConsensusStage::ProposalSeen => "proposal_seen".to_string(), + ConsensusStage::ProposalCreated => "proposal_created".to_string(), + ConsensusStage::CertificateAdded => "certificate_added".to_string(), + }, + }; + + match stage { + ConsensusStage::ProposalSeen => self.proposal_seen.push(event), + ConsensusStage::ProposalCreated => self.proposal_created.push(event), + ConsensusStage::CertificateAdded => self.certificate_added.push(event), + } + } +} + +impl SubdagTimings { + fn new(lowest_round: u64, highest_round: u64) -> Self { + Self { + lowest_round, + highest_round, + subdag_processing: None, + prepare_advance_to_next_quorum_block: None, + check_next_block: None, + advance_to_next_block: None, + } + } + + fn start_stage(&mut self, stage: SubdagStage) { + let now = SystemTime::now(); + match stage { + SubdagStage::SubdagProcessing => { + self.subdag_processing = Some((now, None)); + } + SubdagStage::PrepareAdvanceToNextQuorumBlock => { + self.prepare_advance_to_next_quorum_block = Some((now, None)); + } + SubdagStage::CheckNextBlock => { + self.check_next_block = Some((now, None)); + } + SubdagStage::AdvanceToNextBlock => { + self.advance_to_next_block = Some((now, None)); + } + } + } + + fn end_stage(&mut self, stage: SubdagStage) { + let now = SystemTime::now(); + match stage { + SubdagStage::SubdagProcessing => { + if let Some((start, _)) = self.subdag_processing { + self.subdag_processing = Some((start, Some(now))); + } else { + warn!("SubdagProcessing stage not started"); + } + } + SubdagStage::PrepareAdvanceToNextQuorumBlock => { + if let Some((start, _)) = self.prepare_advance_to_next_quorum_block { + self.prepare_advance_to_next_quorum_block = Some((start, Some(now))); + } else { + warn!("PrepareAdvanceToNextQuorumBlock stage not started"); + } + } + SubdagStage::CheckNextBlock => { + if let Some((start, _)) = self.check_next_block { + self.check_next_block = Some((start, Some(now))); + } else { + warn!("CheckNextBlock stage not started"); + } + } + SubdagStage::AdvanceToNextBlock => { + if let Some((start, _)) = self.advance_to_next_block { + self.advance_to_next_block = Some((start, Some(now))); + } else { + warn!("AdvanceToNextBlock stage not started"); + } + } + } + } +} + +/// Record a consensus event for a specific round +pub fn record_event(round: u64, stage: ConsensusStage) { + let now = SystemTime::now(); + let mut round_events = ROUND_EVENTS.write(); + let round_event = round_events.entry(round).or_insert_with(|| RoundEvents::new(round)); + round_event.add_event(stage, now); +} + +/// Record a consensus event with custom timestamp for a specific round +pub fn record_event_with_timestamp(round: u64, stage: ConsensusStage, timestamp: SystemTime) { + let mut events = ROUND_EVENTS.write(); + let round_events = events.entry(round).or_insert_with(|| RoundEvents::new(round)); + round_events.add_event(stage, timestamp); +} + +/// Record the start of a subdag processing stage +pub fn start_subdag_stage(lowest_round: u64, highest_round: u64, stage: SubdagStage) { + let key = (lowest_round, highest_round); + let mut timings = SUBDAG_TIMINGS.write(); + let subdag_timing = timings.entry(key).or_insert_with(|| SubdagTimings::new(lowest_round, highest_round)); + subdag_timing.start_stage(stage); +} + +/// Record the end of a subdag processing stage +pub fn end_subdag_stage(lowest_round: u64, highest_round: u64, stage: SubdagStage) { + let key = (lowest_round, highest_round); + let mut timings = SUBDAG_TIMINGS.write(); + if let Some(subdag_timing) = timings.get_mut(&key) { + subdag_timing.end_stage(stage); + } +} + +/// Get event data for a specific round +pub fn get_round_events(round: u64) -> Option { + ROUND_EVENTS.read().get(&round).cloned() +} + +/// Get timing data for a specific subdag +pub fn get_subdag_timings(lowest_round: u64, highest_round: u64) -> Option { + let key = (lowest_round, highest_round); + SUBDAG_TIMINGS.read().get(&key).cloned() +} + +/// Get a snapshot of all current timing data +pub fn get_timing_snapshot() -> TimingSnapshot { + let round_events = ROUND_EVENTS.read(); + let subdag_timings = SUBDAG_TIMINGS.read(); + + // Convert keys to strings for JSON serialization + let round_events_str: HashMap = + round_events.iter().map(|(k, v)| (k.to_string(), v.clone())).collect(); + + let subdag_timings_str: HashMap = + subdag_timings.iter().map(|((low, high), v)| (format!("{low}-{high}"), v.clone())).collect(); + + TimingSnapshot { timestamp: SystemTime::now(), round_events: round_events_str, subdag_timings: subdag_timings_str } +} + +/// Export current timing state to a JSON file +pub fn export_to_json(file_path: &str) -> Result<(), Box> { + let snapshot = get_timing_snapshot(); + let json = serde_json::to_string_pretty(&snapshot)?; + std::fs::write(file_path, json)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{thread, time::Duration}; + + #[test] + fn test_event_recording() { + let round = 12345; + + record_event(round, ConsensusStage::ProposalCreated); + record_event(round, ConsensusStage::ProposalSeen); + + let events = get_round_events(round).unwrap(); + assert_eq!(events.proposal_created.len(), 1); + assert_eq!(events.proposal_seen.len(), 1); + assert_eq!(events.certificate_added.len(), 0); + } + + #[test] + fn test_subdag_timing() { + let (low, high) = (100, 105); + + start_subdag_stage(low, high, SubdagStage::SubdagProcessing); + thread::sleep(Duration::from_millis(10)); + end_subdag_stage(low, high, SubdagStage::SubdagProcessing); + + let timings = get_subdag_timings(low, high).unwrap(); + assert!(timings.subdag_processing.is_some()); + + if let Some((start, Some(end))) = timings.subdag_processing { + assert!(end > start); + } else { + panic!("Expected complete timing data"); + } + } + + #[test] + fn test_json_export() { + let round = 54321; + let (low, high) = (200, 210); + + // Add some timing data + record_event(round, ConsensusStage::ProposalCreated); + thread::sleep(Duration::from_millis(5)); + record_event(round, ConsensusStage::ProposalCreated); + + start_subdag_stage(low, high, SubdagStage::SubdagProcessing); + thread::sleep(Duration::from_millis(5)); + end_subdag_stage(low, high, SubdagStage::SubdagProcessing); + + // Test JSON export + let snapshot = get_timing_snapshot(); + let json_result = serde_json::to_string_pretty(&snapshot); + assert!(json_result.is_ok(), "JSON serialization should succeed"); + + let json_str = json_result.unwrap(); + assert!(json_str.contains(&round.to_string()), "JSON should contain round data"); + assert!(json_str.contains(&format!("{low}-{high}")), "JSON should contain subdag data"); + } +} diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index f04389ae72..fa1dc9fa3e 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "test_consensus_tracking")] +use crate::helpers::{ConsensusStage, record_event}; use crate::{ Gateway, MAX_BATCH_DELAY_IN_MS, @@ -715,6 +717,10 @@ impl Primary { error!("{}", flatten_error(err.context("Failed to reinsert transmissions"))); } })?; + + #[cfg(feature = "test_consensus_tracking")] + record_event(round, ConsensusStage::ProposalCreated); + // Broadcast the batch to all validators for signing. self.gateway.broadcast(Event::BatchPropose(batch_header.into())); // Set the timestamp of the latest proposed batch. @@ -782,6 +788,9 @@ impl Primary { ); } + #[cfg(feature = "test_consensus_tracking")] + record_event(batch_round, ConsensusStage::ProposalSeen); + // Retrieve the cached round and batch ID for this validator. if let Some((signed_round, signed_batch_id, signature)) = self.signed_proposals.read().get(&batch_author).copied() diff --git a/node/consensus/Cargo.toml b/node/consensus/Cargo.toml index 7d6b75a66a..47ffc26bc7 100644 --- a/node/consensus/Cargo.toml +++ b/node/consensus/Cargo.toml @@ -28,6 +28,7 @@ locktick = [ metrics = [ "dep:snarkos-node-metrics" ] telemetry = [ "snarkos-node-bft/telemetry" ] cuda = [ "snarkvm/cuda", "snarkos-account/cuda", "snarkos-node-bft-ledger-service/cuda" ] +test_consensus_tracking = [ ] serial = [ "snarkos-node-bft-ledger-service/serial", "snarkos-node-metrics/serial", diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index c72a112f8b..42ca716720 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -25,6 +25,8 @@ extern crate tracing; extern crate snarkos_node_metrics as metrics; use snarkos_account::Account; +#[cfg(feature = "test_consensus_tracking")] +use snarkos_node_bft::helpers::{SubdagStage, end_subdag_stage, start_subdag_stage}; use snarkos_node_bft::{ BFT, MAX_BATCH_DELAY_IN_MS, @@ -552,12 +554,55 @@ impl Consensus { #[cfg(feature = "metrics")] let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp(); + // Extract lowest and highest rounds from the subdag + let _lowest_round = subdag.first_key_value().map(|(k, _)| *k).unwrap_or(0); + let _highest_round = subdag.last_key_value().map(|(k, _)| *k).unwrap_or(0); + + #[cfg(feature = "test_consensus_tracking")] + { + // NOTE: we use `_highest_round.saturating_sub(2)` because this is + // the only information available to us at the respective + // start_subdag_stage. + end_subdag_stage(_highest_round.saturating_sub(2), _highest_round, SubdagStage::SubdagProcessing); + start_subdag_stage(_lowest_round, _highest_round, SubdagStage::PrepareAdvanceToNextQuorumBlock); + } + // Create the candidate next block. let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?; + + #[cfg(feature = "test_consensus_tracking")] + { + end_subdag_stage(_lowest_round, _highest_round, SubdagStage::PrepareAdvanceToNextQuorumBlock); + start_subdag_stage(_lowest_round, _highest_round, SubdagStage::CheckNextBlock); + } + // Check that the block is well-formed. self.ledger.check_next_block(&next_block)?; + + #[cfg(feature = "test_consensus_tracking")] + { + end_subdag_stage(_lowest_round, _highest_round, SubdagStage::CheckNextBlock); + start_subdag_stage(_lowest_round, _highest_round, SubdagStage::AdvanceToNextBlock); + } + // Advance to the next block. self.ledger.advance_to_next_block(&next_block)?; + + // Finalize subdag stage tracking and export timing data to JSON after block generation + #[cfg(feature = "test_consensus_tracking")] + { + end_subdag_stage(_lowest_round, _highest_round, SubdagStage::AdvanceToNextBlock); + let json_filename = match self.bft().primary().gateway().dev() { + Some(dev) => format!("consensus_timing_block_{dev}.json"), + None => "consensus_timing_block_prod.json".to_string(), + }; + if let Err(e) = snarkos_node_bft::helpers::export_to_json(&json_filename) { + warn!("Failed to export timing data to {}: {}", json_filename, e); + } else { + info!("Exported timing data to {}", json_filename); + } + } + #[cfg(feature = "telemetry")] // Fetch the latest committee let latest_committee = self.ledger.current_committee()?;