diff --git a/Cargo.lock b/Cargo.lock index ebbec661b13..2e81911f194 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2450,8 +2450,20 @@ version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" dependencies = [ - "bytecheck_derive", - "ptr_meta", + "bytecheck_derive 0.6.12", + "ptr_meta 0.1.4", + "simdutf8", +] + +[[package]] +name = "bytecheck" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0caa33a2c0edca0419d15ac723dff03f1956f7978329b1e3b5fdaaaed9d3ca8b" +dependencies = [ + "bytecheck_derive 0.8.2", + "ptr_meta 0.3.1", + "rancor", "simdutf8", ] @@ -2466,6 +2478,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecheck_derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89385e82b5d1821d2219e0b095efa2cc1f246cbf99080f3be46a1a85c0d392d9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "bytemuck" version = "1.23.2" @@ -2587,7 +2610,7 @@ dependencies = [ "portpicker", "prometheus", "rand 0.8.5", - "rkyv", + "rkyv 0.7.45", "tokio", "tracing", "tracing-subscriber 0.3.20", @@ -2644,7 +2667,7 @@ dependencies = [ "rand 0.8.5", "rcgen", "redis", - "rkyv", + "rkyv 0.7.45", "rustls 0.23.31", "rustls-pki-types", "sqlx", @@ -2952,6 +2975,14 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "collector-common" +version = "0.1.0" +dependencies = [ + "anyhow", + "rkyv 0.8.12", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -5202,6 +5233,7 @@ dependencies = [ "cdn-client", "cdn-marshal", "chrono", + "collector-common", "committable", "dashmap", "derive_more 2.0.1", @@ -5259,6 +5291,7 @@ dependencies = [ "bytemuck", "coarsetime", "committable", + "csv", "derive_more 2.0.1", "futures", "hotshot", @@ -5276,6 +5309,7 @@ dependencies = [ "sha2 0.10.9", "tagged-base64", "tide-disco", + "time 0.3.43", "tokio", "tracing", "tracing-subscriber 0.3.20", @@ -5535,6 +5569,7 @@ dependencies = [ "csv", "futures", "hotshot-types", + "itertools 0.12.1", "libp2p-identity", "multiaddr", "serde", @@ -5674,12 +5709,14 @@ dependencies = [ "async-trait", "bincode", "chrono", + "collector-common", "committable", "csv", "either", "futures", "hotshot-builder-api", "hotshot-contract-adapter", + "hotshot-orchestrator", "hotshot-task", "hotshot-types", "hotshot-utils", @@ -7696,6 +7733,26 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "munge" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e17401f259eba956ca16491461b6e8f72913a0a114e39736ce404410f915a0c" +dependencies = [ + "munge_macro", +] + +[[package]] +name = "munge_macro" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4568f25ccbd45ab5d5603dc34318c1ec56b117531781260002151b8530a9f931" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -8909,7 +8966,16 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" dependencies = [ - "ptr_meta_derive", + "ptr_meta_derive 0.1.4", +] + +[[package]] +name = "ptr_meta" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b9a0cf95a1196af61d4f1cbdab967179516d9a4a4312af1f31948f8f6224a79" +dependencies = [ + "ptr_meta_derive 0.3.1", ] [[package]] @@ -8923,6 +8989,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "ptr_meta_derive" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7347867d0a7e1208d93b46767be83e2b8f978c3dad35f775ac8d8847551d6fe1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -9040,6 +9117,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "rancor" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a063ea72381527c2a0561da9c80000ef822bdd7c3241b1cc1b12100e3df081ee" +dependencies = [ + "ptr_meta 0.3.1", +] + [[package]] name = "rand" version = "0.7.3" @@ -9357,7 +9443,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" dependencies = [ - "bytecheck", + "bytecheck 0.6.12", +] + +[[package]] +name = "rend" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cadadef317c2f20755a64d7fdc48f9e7178ee6b0e1f7fce33fa60f1d68a276e6" +dependencies = [ + "bytecheck 0.8.2", ] [[package]] @@ -9483,17 +9578,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" dependencies = [ "bitvec", - "bytecheck", + "bytecheck 0.6.12", "bytes 1.10.1", "hashbrown 0.12.3", - "ptr_meta", - "rend", - "rkyv_derive", + "ptr_meta 0.1.4", + "rend 0.4.2", + "rkyv_derive 0.7.45", "seahash", "tinyvec", "uuid", ] +[[package]] +name = "rkyv" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35a640b26f007713818e9a9b65d34da1cf58538207b052916a83d80e43f3ffa4" +dependencies = [ + "bytecheck 0.8.2", + "bytes 1.10.1", + "hashbrown 0.15.5", + "indexmap 2.11.3", + "munge", + "ptr_meta 0.3.1", + "rancor", + "rend 0.5.3", + "rkyv_derive 0.8.12", + "tinyvec", + "uuid", +] + [[package]] name = "rkyv_derive" version = "0.7.45" @@ -9505,6 +9619,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rkyv_derive" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd83f5f173ff41e00337d97f6572e416d022ef8a19f371817259ae960324c482" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "rlp" version = "0.5.2" @@ -9698,7 +9823,7 @@ dependencies = [ "bytes 1.10.1", "num-traits", "rand 0.8.5", - "rkyv", + "rkyv 0.7.45", "serde", "serde_json", ] diff --git a/Cargo.toml b/Cargo.toml index 4e16d38925b..c5c84ef085e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,6 +178,10 @@ bytesize = "1.3" committable = { git = "https://github.com/EspressoSystems/commit.git", features = ["ark-serialize"] } derivative = "2.2" itertools = "0.12" +jf-advz = { git = "https://github.com/EspressoSystems/jellyfish-compat", tag = "jf-advz-v0.2.1", features = [ + "std", + "parallel", +] } jf-crhf = { git = "https://github.com/EspressoSystems/jellyfish", tag = "jf-crhf-v0.2.0" } jf-merkle-tree-compat = { git = "https://github.com/EspressoSystems/jellyfish-compat", tag = "jf-merkle-tree-compat-v0.1.0", features = [ "std", @@ -200,10 +204,6 @@ jf-signature = { git = "https://github.com/EspressoSystems/jellyfish", tag = "jf "std", ] } jf-utils = { git = "https://github.com/EspressoSystems/jellyfish", tag = "jf-utils-v0.5.0" } -jf-advz = { git = "https://github.com/EspressoSystems/jellyfish-compat", tag = "jf-advz-v0.2.1", features = [ - "std", - "parallel", -] } libp2p = { package = "libp2p", version = "0.56", default-features = false, features = [ "macros", "autonat", diff --git a/benchmark-stats/src/main.rs b/benchmark-stats/src/main.rs index eb3a3516ab2..c510c9e74a5 100644 --- a/benchmark-stats/src/main.rs +++ b/benchmark-stats/src/main.rs @@ -4,9 +4,10 @@ use std::{ }; use clap::{Parser, Subcommand}; -use espresso_types::SeqTypes; -use hotshot_task_impls::stats::{LeaderViewStats, ReplicaViewStats}; -use hotshot_types::data::ViewNumber; +use hotshot_types::{ + benchmarking::{LeaderViewStats, ReplicaViewStats}, + data::ViewNumber, +}; use plotly::{ common::{HoverInfo, Line, Marker, MarkerSymbol, Mode}, layout::{self, Axis, GridPattern, LayoutGrid}, @@ -81,14 +82,14 @@ struct ReplicaStats { /// Read replica stats from CSV into a BTreeMap fn read_replica_view_stats( path: &Path, -) -> Result>, Box> { +) -> Result>, Box> { println!("\n**--- Replica Stats ---**"); let mut reader = csv::Reader::from_path(path) .map_err(|e| format!("Failed to open replica stats CSV at {path:?}: {e}"))?; let mut replica_view_stats = BTreeMap::new(); for result in reader.deserialize() { - let record: ReplicaViewStats = result?; + let record: ReplicaViewStats = result?; replica_view_stats.insert(record.view, record); } @@ -97,7 +98,7 @@ fn read_replica_view_stats( /// Generate plots of replica stats fn plot_replica_stats( - replica_view_stats: &BTreeMap>, + replica_view_stats: &BTreeMap>, output_file: &Path, ) -> Result<(), Box> { let mut x_views = Vec::new(); @@ -294,7 +295,7 @@ fn plot_replica_stats( /// it generates the time difference for VID/DAC/Proposal /// from the view change event fn generate_replica_stats( - replica_view_stats: &BTreeMap>, + replica_view_stats: &BTreeMap>, ) -> ReplicaStats { let mut vid_deltas_from_vc = Vec::new(); let mut dac_deltas_from_vc = Vec::new(); @@ -336,21 +337,21 @@ fn print_replica_stats(stats: &ReplicaStats) { /// Read leader stats from CSV into a BTreeMap fn read_leader_view_stats( path: &Path, -) -> Result>, Box> { +) -> Result>, Box> { println!("\n**--- Leader Stats ---**"); let mut reader = csv::Reader::from_path(path) .map_err(|e| format!("Failed to open leader stats CSV at {path:?}: {e}"))?; - let mut leader_view_stats = BTreeMap::>::new(); + let mut leader_view_stats = BTreeMap::>::new(); for result in reader.deserialize() { - let record: LeaderViewStats = result?; + let record: LeaderViewStats = result?; leader_view_stats.insert(record.view, record); } Ok(leader_view_stats) } fn plot_and_print_leader_stats( - leader_view_stats: &BTreeMap>, + leader_view_stats: &BTreeMap>, output_file: &Path, ) -> Result<(), Box> { let mut views = Vec::new(); diff --git a/crates/collector-common/.gitignore b/crates/collector-common/.gitignore new file mode 100644 index 00000000000..ea8c4bf7f35 --- /dev/null +++ b/crates/collector-common/.gitignore @@ -0,0 +1 @@ +/target diff --git a/crates/collector-common/Cargo.toml b/crates/collector-common/Cargo.toml new file mode 100644 index 00000000000..a1d2e6bf50f --- /dev/null +++ b/crates/collector-common/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "collector-common" +version = "0.1.0" +edition = "2024" + +[dependencies] +rkyv = "0.8" +anyhow = "1" \ No newline at end of file diff --git a/crates/collector-common/src/lib.rs b/crates/collector-common/src/lib.rs new file mode 100644 index 00000000000..7c552a0ae9f --- /dev/null +++ b/crates/collector-common/src/lib.rs @@ -0,0 +1,90 @@ +use std::{ + net::{SocketAddr, UdpSocket}, + str::FromStr, + sync::{Arc, OnceLock}, + time::SystemTime, +}; + +use anyhow::{Context, Result}; +use rkyv::{Archive, Deserialize, Serialize}; + +/// A trace with a timestamp +#[derive(Serialize, Deserialize, Archive, Clone, Debug)] +pub struct TraceWithTimestamp { + pub trace: Trace, + pub timestamp: f64, +} + +/// The types of traces that can be collected +#[derive(Serialize, Deserialize, Archive, Clone, Debug)] +pub enum Trace { + ProposalSendEventGenerated(u64), + ProposalSent(u64), + ProposalReceived(u64), + ProposalReceivedEventGenerated(u64), +} + +/// The UDP socket for sending traces +const UDP_SOCKET: OnceLock> = OnceLock::new(); + +/// Send a trace with a specific timestamp (as seconds since the UNIX epoch) +pub fn send_trace_with_timestamp(trace: &Trace, timestamp: f64) -> Result<()> { + // Wrap it in our type that contains the timestamp + let trace_with_timestamp = TraceWithTimestamp { + trace: trace.clone(), + timestamp, + }; + + // Serialize it + let trace_bytes = rkyv::to_bytes(&trace_with_timestamp) + .map_err(|e: rkyv::rancor::Error| anyhow::anyhow!("failed to serialize trace: {}", e))?; + + // Create or get the UDP socket + let udp_socket = UDP_SOCKET.get().cloned(); + let udp_socket = match udp_socket { + Some(udp_socket) => udp_socket, + None => { + // Get the collector endpoint + let collector_endpoint = std::env::var("COLLECTOR_ENDPOINT") + .with_context(|| "failed to get collector endpoint")?; + + // Parse the collector endpoint + let collector_endpoint = SocketAddr::from_str(&collector_endpoint) + .with_context(|| "failed to parse collector endpoint")?; + + // Bind the UDP socket + let udp_socket = + UdpSocket::bind("0.0.0.0:0").with_context(|| "failed to bind UDP socket")?; + + // Set it to nonblocking + udp_socket + .set_nonblocking(true) + .with_context(|| "failed to set nonblocking")?; + + let udp_socket = Arc::new((udp_socket, collector_endpoint)); + + // Set it in the once lock + let _ = UDP_SOCKET.set(udp_socket.clone()); + udp_socket + }, + }; + + // Send the trace + udp_socket + .0 + .send_to(&trace_bytes, udp_socket.1) + .with_context(|| "failed to send trace")?; + + Ok(()) +} + +/// Write a trace to a stream along with the +pub fn send_trace(trace: &Trace) -> Result<()> { + // Get the current timestamp for the trace + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .with_context(|| "failed to get current timestamp")? + .as_secs_f64(); + + send_trace_with_timestamp(trace, timestamp) +} diff --git a/crates/hotshot-builder/legacy/Cargo.toml b/crates/hotshot-builder/legacy/Cargo.toml index c8abf6dfdbb..6738b122560 100644 --- a/crates/hotshot-builder/legacy/Cargo.toml +++ b/crates/hotshot-builder/legacy/Cargo.toml @@ -15,6 +15,7 @@ bincode = { workspace = true } bytemuck = { version = "1.19", features = ["derive"] } coarsetime = "0.1.34" committable = { workspace = true } +csv = "1" derive_more = { workspace = true, features = ["deref", "deref_mut", "debug"] } futures = { workspace = true } hotshot = { workspace = true } @@ -26,6 +27,7 @@ serde = { workspace = true, features = ["derive"] } sha2 = { workspace = true } tagged-base64 = { workspace = true } tide-disco = { workspace = true } +time = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/hotshot-builder/legacy/src/service.rs b/crates/hotshot-builder/legacy/src/service.rs index 899f97eb857..09c6fc4f627 100644 --- a/crates/hotshot-builder/legacy/src/service.rs +++ b/crates/hotshot-builder/legacy/src/service.rs @@ -1,6 +1,7 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, fmt::Display, + fs::OpenOptions, num::NonZeroUsize, sync::Arc, time::{Duration, Instant}, @@ -11,6 +12,7 @@ use async_broadcast::{Sender as BroadcastSender, TrySendError}; use async_lock::RwLock; use async_trait::async_trait; use committable::{Commitment, Committable}; +use csv::Writer; use futures::{future::BoxFuture, stream::StreamExt, Stream}; use hotshot::types::Event; use hotshot_builder_api::{ @@ -27,16 +29,18 @@ use hotshot_types::{ event::EventType, message::Proposal, traits::{ - block_contents::{BlockPayload, Transaction}, + block_contents::{BlockHeader, BlockPayload, Transaction}, node_implementation::{ConsensusTime, NodeType}, signature_key::{BuilderSignatureKey, SignatureKey}, }, utils::BuilderCommitment, + vote::HasViewNumber, }; use lru::LruCache; use sha2::{Digest, Sha256}; use tagged_base64::TaggedBase64; use tide_disco::method::ReadState; +use time::OffsetDateTime; use tokio::{ sync::{mpsc::unbounded_channel, oneshot}, time::{sleep, timeout}, @@ -47,7 +51,6 @@ use crate::builder_state::{ BuildBlockInfo, DaProposalMessage, DecideMessage, MessageType, QuorumProposalMessage, RequestMessage, ResponseMessage, TransactionSource, TriggerStatus, }; - // It holds all the necessary information for a block #[derive(Debug)] pub struct BlockInfo { @@ -180,6 +183,16 @@ pub struct GlobalState { /// /// Initial value may be updated by the `claim_block_with_num_nodes` endpoint. pub num_nodes: usize, + + /// Timestamps for stats + timestamps: BTreeMap, +} + +#[derive(Debug, Clone, Default)] +struct BuilderTimestamps { + pub events: BuilderEventTimestamps, + pub requests: BuilderRequestTimestamps, + pub responses: BuilderResponseTimestamps, } /// `GetChannelForMatchingBuilderError` is an error enum that represents the @@ -245,6 +258,7 @@ impl GlobalState { NonZeroUsize::new(max_txn_num).expect("max_txn_num must be greater than zero"), )), num_nodes, + timestamps: Default::default(), } } @@ -335,7 +349,7 @@ impl GlobalState { .insert(state_id, response_msg); if let Some(previous_builder_state_entry) = previous_builder_state_entry { - tracing::warn!( + tracing::info!( "block {id} overwrote previous block: {previous_builder_state_entry:?}. previous \ cache entry: {previous_cache_entry:?}" ); @@ -475,6 +489,134 @@ impl GlobalState { *builder_view == self.highest_view_num_builder_id.parent_view && !self.check_builder_state_existence_for_a_view(proposal_view) } + pub fn add_event_timestamp(&mut self, timestamp: i128, event: &EventType) { + match event { + EventType::DaProposal { proposal, .. } => { + self.timestamps + .entry(*proposal.data.view_number()) + .or_default() + .events + .da_proposal_recv + .get_or_insert(timestamp); + }, + EventType::QuorumProposal { proposal, .. } => { + self.timestamps + .entry(*proposal.data.view_number()) + .or_default() + .events + .quorum_proposal_recv + .get_or_insert(timestamp); + }, + EventType::Decide { leaf_chain, .. } => { + for leaf in leaf_chain.iter() { + self.timestamps + .entry(*leaf.leaf.view_number()) + .or_default() + .events + .decide_recv + .get_or_insert(timestamp); + } + }, + _ => {}, + } + } + pub fn add_available_blocks_request_timestamp(&mut self, view_number: u64, timestamp: i128) { + let entry = self.timestamps.entry(view_number).or_default(); + let replaced = entry.requests.available_blocks.replace(timestamp); + if replaced.is_some() { + entry.requests.retries += 1; + } + } + pub fn add_claim_block_request_timestamp(&mut self, view_number: u64, timestamp: i128) { + let entry = self.timestamps.entry(view_number).or_default(); + let replaced = entry.requests.claim_block.replace(timestamp); + if replaced.is_some() { + entry.requests.retries += 1; + } + } + pub fn add_claim_block_header_input_request_timestamp( + &mut self, + view_number: u64, + timestamp: i128, + ) { + let entry = self.timestamps.entry(view_number).or_default(); + let replaced = entry.requests.claim_block_header_input.replace(timestamp); + if replaced.is_some() { + entry.requests.retries += 1; + } + } + pub fn add_available_blocks_response_timestamp(&mut self, view_number: u64, timestamp: i128) { + self.timestamps + .entry(view_number) + .or_default() + .responses + .available_blocks = Some(timestamp); + } + pub fn add_claim_block_response_timestamp(&mut self, view_number: u64, timestamp: i128) { + self.timestamps + .entry(view_number) + .or_default() + .responses + .claim_block = Some(timestamp); + } + pub fn add_claim_block_header_input_response_timestamp( + &mut self, + view_number: u64, + timestamp: i128, + ) { + self.timestamps + .entry(view_number) + .or_default() + .responses + .claim_block_header_input = Some(timestamp); + } + + pub fn log_timestamps(&self) { + let leader_results_csv_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open("timestamps.csv") + .unwrap(); + let mut wtr = Writer::from_writer(leader_results_csv_file); + let _ = wtr.write_record([ + "view_number", + "proposal_recv", + "da_proposal_recv", + "decided", + "available_blocks_recv", + "claim_block_recv", + "claim_block_header_input_recv", + "retries", + "available_blocks_send", + "claim_block_send", + "claim_block_header_input_send", + ]); + for (view_number, entry) in self.timestamps.iter() { + let _ = wtr.write_record([ + view_number.to_string(), + entry.events.quorum_proposal_recv.unwrap_or(0).to_string(), + entry.events.da_proposal_recv.unwrap_or(0).to_string(), + entry.events.decide_recv.unwrap_or(0).to_string(), + entry.requests.available_blocks.unwrap_or(0).to_string(), + entry.requests.claim_block.unwrap_or(0).to_string(), + entry + .requests + .claim_block_header_input + .unwrap_or(0) + .to_string(), + entry.requests.retries.to_string(), + entry.responses.available_blocks.unwrap_or(0).to_string(), + entry.responses.claim_block.unwrap_or(0).to_string(), + entry + .responses + .claim_block_header_input + .unwrap_or(0) + .to_string(), + ]); + } + wtr.flush().unwrap(); + } } #[derive(derive_more::Deref, derive_more::DerefMut)] @@ -950,9 +1092,19 @@ where sender: Types::SignatureKey, signature: &::PureAssembledSignatureType, ) -> Result>, BuildError> { - Ok(self + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + self.global_state + .write_arc() + .await + .add_available_blocks_request_timestamp(view_number + 1, now); + let result = self .available_blocks_implementation(for_parent, view_number, sender, signature) - .await?) + .await?; + self.global_state + .write_arc() + .await + .add_available_blocks_response_timestamp(view_number + 1, now); + Ok(result) } async fn claim_block( @@ -962,9 +1114,24 @@ where sender: Types::SignatureKey, signature: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, ) -> Result, BuildError> { - Ok(self + self.global_state + .write_arc() + .await + .add_claim_block_request_timestamp( + view_number, + OffsetDateTime::now_utc().unix_timestamp_nanos(), + ); + let result = self .claim_block_implementation(block_hash, view_number, sender, signature) - .await?) + .await?; + self.global_state + .write_arc() + .await + .add_claim_block_response_timestamp( + view_number, + OffsetDateTime::now_utc().unix_timestamp_nanos(), + ); + Ok(result) } async fn claim_block_with_num_nodes( @@ -977,9 +1144,24 @@ where ) -> Result, BuildError> { // Update the stored `num_nodes` with the given value, which will be used for VID computation. self.global_state.write_arc().await.num_nodes = num_nodes; - - self.claim_block(block_hash, view_number, sender, signature) + self.global_state + .write_arc() .await + .add_claim_block_request_timestamp( + view_number, + OffsetDateTime::now_utc().unix_timestamp_nanos(), + ); + let result = self + .claim_block_implementation(block_hash, view_number, sender, signature) + .await?; + self.global_state + .write_arc() + .await + .add_claim_block_response_timestamp( + view_number, + OffsetDateTime::now_utc().unix_timestamp_nanos(), + ); + Ok(result) } async fn claim_block_header_input( @@ -989,9 +1171,24 @@ where sender: Types::SignatureKey, signature: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType, ) -> Result, BuildError> { - Ok(self + self.global_state + .write_arc() + .await + .add_claim_block_header_input_request_timestamp( + view_number, + OffsetDateTime::now_utc().unix_timestamp_nanos(), + ); + let result = self .claim_block_header_input_implementation(block_hash, view_number, sender, signature) - .await?) + .await?; + self.global_state + .write_arc() + .await + .add_claim_block_header_input_response_timestamp( + view_number, + OffsetDateTime::now_utc().unix_timestamp_nanos(), + ); + Ok(result) } /// Returns the public key of the builder @@ -1075,6 +1272,26 @@ impl ReadState for ProxyGlobalState { } } +#[derive(Debug, Clone, Default)] +struct BuilderEventTimestamps { + pub da_proposal_recv: Option, + pub quorum_proposal_recv: Option, + pub decide_recv: Option, +} +#[derive(Debug, Clone, Default)] +struct BuilderRequestTimestamps { + pub available_blocks: Option, + pub claim_block: Option, + pub claim_block_header_input: Option, + pub retries: u64, +} + +#[derive(Debug, Clone, Default)] +pub struct BuilderResponseTimestamps { + pub available_blocks: Option, + pub claim_block: Option, + pub claim_block_header_input: Option, +} /* Running Non-Permissioned Builder Service */ @@ -1110,6 +1327,11 @@ pub async fn run_non_permissioned_standalone_builder_service< let Some(event) = hotshot_event_stream.next().await else { anyhow::bail!("Event stream ended"); }; + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + global_state + .write_arc() + .await + .add_event_timestamp(now, &event.event); match event.event { EventType::Error { error } => { @@ -1170,6 +1392,9 @@ pub async fn run_non_permissioned_standalone_builder_service< }, // QC proposal event EventType::QuorumProposal { proposal, sender } => { + if proposal.data.block_header().block_number() % 1000 == 0 { + global_state.read_arc().await.log_timestamps(); + } // get the leader for current view handle_quorum_event(&quorum_sender, Arc::new(proposal), sender).await; }, diff --git a/crates/hotshot/examples/infra/mod.rs b/crates/hotshot/examples/infra/mod.rs index a3746a4ae05..c48602f17b8 100755 --- a/crates/hotshot/examples/infra/mod.rs +++ b/crates/hotshot/examples/infra/mod.rs @@ -64,7 +64,6 @@ use hotshot_types::{ node_implementation::{ConsensusTime, NodeType, Versions}, states::TestableState, }, - utils::genesis_epoch_from_version, HotShotConfig, PeerConfig, ValidatorConfig, }; use rand::{rngs::StdRng, SeedableRng}; @@ -366,6 +365,7 @@ pub trait RunDa< async fn initialize_state_and_hotshot( &self, membership: Arc::Membership>>, + orchestrator_url: Option, ) -> SystemContextHandle { let initializer = hotshot::HotShotInitializer::::from_genesis::( TestInstanceState::default(), @@ -401,6 +401,7 @@ pub trait RunDa< ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + orchestrator_url, ) .await .expect("Could not init hotshot") @@ -415,7 +416,7 @@ pub trait RunDa< transactions: &mut Vec, transactions_to_send_per_round: u64, transaction_size_in_bytes: u64, - ) -> BenchResults { + ) -> BenchResults { let NetworkConfig { rounds, node_index, .. } = self.config(); @@ -525,16 +526,6 @@ pub trait RunDa< }, } } - // Panic if we don't have the genesis epoch, there is no recovery from that - let num_eligible_leaders = context - .hotshot - .membership_coordinator - .membership_for_epoch(genesis_epoch_from_version::()) - .await - .unwrap() - .stake_table() - .await - .len(); let consensus_lock = context.hotshot.consensus(); let consensus_reader = consensus_lock.read().await; let total_num_views = usize::try_from(consensus_reader.locked_view().u64()).unwrap(); @@ -566,23 +557,7 @@ pub trait RunDa< {avg_latency_in_sec} sec." ); - BenchResults { - partial_results: "Unset".to_string(), - avg_latency_in_sec, - num_latency, - minimum_latency_in_sec: minimum_latency, - maximum_latency_in_sec: maximum_latency, - throughput_bytes_per_sec, - total_transactions_committed, - transaction_size_in_bytes: transaction_size_in_bytes + 8, // extra 8 bytes for timestamp - total_time_elapsed_in_sec: total_time_elapsed.as_secs(), - total_num_views, - failed_num_views, - committee_type: format!( - "{} with {num_eligible_leaders} eligible leaders", - std::any::type_name::() - ), - } + BenchResults::default() } else { // all values with zero BenchResults::default() @@ -994,7 +969,9 @@ pub async fn main_entry_point< &membership, ) .await; - let hotshot = run.initialize_state_and_hotshot(membership).await; + let hotshot = run + .initialize_state_and_hotshot(membership, Some(args.url)) + .await; if let Some(task) = builder_task { task.start(Box::new(hotshot.event_stream())); @@ -1041,7 +1018,9 @@ pub async fn main_entry_point< (transaction_size + 8) as u64, // extra 8 bytes for transaction base, see `create_random_transaction`. ) .await; - orchestrator_client.post_bench_results(bench_results).await; + orchestrator_client + .post_bench_results::(bench_results) + .await; } /// Sets correct builder_url and registers a builder with orchestrator if this node is running one. diff --git a/crates/hotshot/hotshot/Cargo.toml b/crates/hotshot/hotshot/Cargo.toml index 53b89026b5f..0e072331746 100644 --- a/crates/hotshot/hotshot/Cargo.toml +++ b/crates/hotshot/hotshot/Cargo.toml @@ -49,6 +49,7 @@ rand = { workspace = true } serde = { workspace = true, features = ["rc"] } sha2 = { workspace = true } time = { workspace = true } +collector-common = { path = "../../collector-common" } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/hotshot/hotshot/src/lib.rs b/crates/hotshot/hotshot/src/lib.rs index ecbf041891e..db047ecff00 100644 --- a/crates/hotshot/hotshot/src/lib.rs +++ b/crates/hotshot/hotshot/src/lib.rs @@ -78,6 +78,7 @@ use hotshot_types::{ pub use rand; use tokio::{spawn, time::sleep}; use tracing::{debug, instrument, trace}; +use url::Url; // -- Rexports // External @@ -112,6 +113,9 @@ pub struct SystemContext, V: Versi /// Memberships used by consensus pub membership_coordinator: EpochMembershipCoordinator, + /// The orchestrator url + pub orchestrator_url: Option, + /// the metrics that the implementor is using. metrics: Arc, @@ -167,6 +171,7 @@ impl, V: Versions> Clone config: self.config.clone(), network: Arc::clone(&self.network), membership_coordinator: self.membership_coordinator.clone(), + orchestrator_url: self.orchestrator_url.clone(), metrics: Arc::clone(&self.metrics), consensus: self.consensus.clone(), instance_state: Arc::clone(&self.instance_state), @@ -209,6 +214,7 @@ impl, V: Versions> SystemContext, ) -> Arc { let internal_chan = broadcast(EVENT_CHANNEL_SIZE); let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE); @@ -227,6 +233,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext>>, ), external_channel: (Sender>, Receiver>), + orchestrator_url: Option, ) -> Arc { debug!("Creating a new hotshot"); @@ -414,6 +422,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext, ) -> Result< ( SystemContextHandle, @@ -697,6 +707,7 @@ impl, V: Versions> SystemContext { + let _ = send_trace(&Trace::ProposalReceived(*(proposal.data.view_number()))); + }, + MessageKind::Consensus(SequencingMessage::General( + GeneralConsensusMessage::Proposal2Legacy(proposal), + )) => { + let _ = send_trace(&Trace::ProposalReceived(*(proposal.data.view_number()))); + }, + MessageKind::Consensus(SequencingMessage::General( + GeneralConsensusMessage::Proposal2(proposal), + )) => { + let _ = send_trace(&Trace::ProposalReceived(*(proposal.data.view_number()))); + }, + _ => {}, + } + + // Special case: external messages (version 0.0). We want to make sure it is an external message // and warn and continue otherwise. if version == EXTERNAL_MESSAGE_VERSION @@ -364,6 +390,7 @@ where consensus_metrics, storage.clone(), storage_metrics, + None, ) .await; let consensus_registry = ConsensusTaskRegistry::new(); diff --git a/crates/hotshot/hotshot/src/tasks/task_state.rs b/crates/hotshot/hotshot/src/tasks/task_state.rs index 2a4d71bd832..6f8163d2942 100644 --- a/crates/hotshot/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/hotshot/src/tasks/task_state.rs @@ -348,11 +348,13 @@ impl, V: Versions> CreateTaskState { async fn create_from(handle: &SystemContextHandle) -> Self { StatsTaskState::::new( + handle.hotshot.id, handle.cur_view().await, handle.cur_epoch().await, handle.public_key().clone(), OuterConsensus::new(handle.hotshot.consensus()), handle.hotshot.membership_coordinator.clone(), + handle.hotshot.orchestrator_url.clone(), ) } } diff --git a/crates/hotshot/orchestrator/Cargo.toml b/crates/hotshot/orchestrator/Cargo.toml index 1df5928fcfc..30e161836e0 100644 --- a/crates/hotshot/orchestrator/Cargo.toml +++ b/crates/hotshot/orchestrator/Cargo.toml @@ -13,6 +13,7 @@ clap = { workspace = true } csv = "1" futures = { workspace = true } hotshot-types = { workspace = true } +itertools = { workspace = true } libp2p-identity = { workspace = true } multiaddr = { workspace = true } serde = { workspace = true } diff --git a/crates/hotshot/orchestrator/api.toml b/crates/hotshot/orchestrator/api.toml index 7a18fbb193d..23d71c4573c 100644 --- a/crates/hotshot/orchestrator/api.toml +++ b/crates/hotshot/orchestrator/api.toml @@ -102,3 +102,11 @@ METHOD = "POST" DOC = """ Register a builder URL to orchestrator's pool of builder URLs """ + + +# GET whether or not to start the run +[route.get_metrics] +PATH = ["metrics"] +DOC = """ +Get the normalized metrics for the most recent epoch +""" diff --git a/crates/hotshot/orchestrator/src/client.rs b/crates/hotshot/orchestrator/src/client.rs index c64d68cf93d..df896233f05 100644 --- a/crates/hotshot/orchestrator/src/client.rs +++ b/crates/hotshot/orchestrator/src/client.rs @@ -4,17 +4,23 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{net::SocketAddr, time::Duration}; +use std::{ + collections::{BTreeMap, BTreeSet}, + net::SocketAddr, + time::Duration, +}; use clap::Parser; use futures::{Future, FutureExt}; use hotshot_types::{ + benchmarking::{LeaderViewStats, ReplicaViewStats}, network::{NetworkConfig, NetworkConfigSource}, - traits::node_implementation::NodeType, + traits::node_implementation::{ConsensusTime, NodeType}, PeerConfig, ValidatorConfig, }; use libp2p_identity::PeerId; use multiaddr::Multiaddr; +use serde::{Deserialize, Serialize}; use surf_disco::{error::ClientError, Client}; use tide_disco::Url; use tokio::time::sleep; @@ -30,102 +36,35 @@ pub struct OrchestratorClient { } /// Struct describing a benchmark result -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] -pub struct BenchResults { - /// Whether it's partial collected results - pub partial_results: String, - /// The average latency of the transactions - pub avg_latency_in_sec: i64, - /// The number of transactions that were latency measured - pub num_latency: i64, - /// The minimum latency of the transactions - pub minimum_latency_in_sec: i64, - /// The maximum latency of the transactions - pub maximum_latency_in_sec: i64, - /// The throughput of the consensus protocol = number of transactions committed per second * transaction size in bytes - pub throughput_bytes_per_sec: u64, - /// The number of transactions committed during benchmarking - pub total_transactions_committed: u64, - /// The size of each transaction in bytes - pub transaction_size_in_bytes: u64, - /// The total time elapsed for benchmarking - pub total_time_elapsed_in_sec: u64, - /// The total number of views during benchmarking - pub total_num_views: usize, - /// The number of failed views during benchmarking - pub failed_num_views: usize, - /// The membership committee type used - pub committee_type: String, +#[derive(Serialize, Deserialize, Clone)] +pub struct BenchResults { + pub node_index: u64, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub leader_view_stats: BTreeMap>, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub replica_view_stats: BTreeMap>, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub latencies_by_view: BTreeMap, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub sizes_by_view: BTreeMap, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub timeouts: BTreeSet, + pub total_time_millis: i128, } -impl BenchResults { - /// printout the results of one example run - pub fn printout(&self) { - println!("====================="); - println!("{0} Benchmark results:", self.partial_results); - println!("Committee type: {}", self.committee_type); - println!( - "Average latency: {} seconds, Minimum latency: {} seconds, Maximum latency: {} seconds", - self.avg_latency_in_sec, self.minimum_latency_in_sec, self.maximum_latency_in_sec - ); - println!("Throughput: {} bytes/sec", self.throughput_bytes_per_sec); - println!( - "Total transactions committed: {}", - self.total_transactions_committed - ); - println!( - "Total number of views: {}, Failed number of views: {}", - self.total_num_views, self.failed_num_views - ); - println!("====================="); +impl Default for BenchResults { + fn default() -> Self { + Self { + node_index: 0, + leader_view_stats: BTreeMap::new(), + replica_view_stats: BTreeMap::new(), + latencies_by_view: BTreeMap::new(), + sizes_by_view: BTreeMap::new(), + timeouts: BTreeSet::new(), + total_time_millis: 0, + } } } - -/// Struct describing a benchmark result needed for download, also include the config -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] -pub struct BenchResultsDownloadConfig { - // Config starting here - /// The commit this benchmark was run on - pub commit_sha: String, - /// Total number of nodes - pub total_nodes: usize, - /// The size of the da committee - pub da_committee_size: usize, - /// The number of fixed_leader_for_gpuvid when we enable the feature [fixed-leader-election] - pub fixed_leader_for_gpuvid: usize, - /// Number of transactions submitted per round - pub transactions_per_round: usize, - /// The size of each transaction in bytes - pub transaction_size: u64, - /// The number of rounds - pub rounds: usize, - - // Results starting here - /// Whether the results are partially collected - /// "One" when the results are collected for one node - /// "Half" when the results are collective for half running nodes if not all nodes terminate successfully - /// "Full" if the results are successfully collected from all nodes - pub partial_results: String, - /// The average latency of the transactions - pub avg_latency_in_sec: i64, - /// The minimum latency of the transactions - pub minimum_latency_in_sec: i64, - /// The maximum latency of the transactions - pub maximum_latency_in_sec: i64, - /// The throughput of the consensus protocol = number of transactions committed per second * transaction size in bytes - pub throughput_bytes_per_sec: u64, - /// The number of transactions committed during benchmarking - pub total_transactions_committed: u64, - /// The total time elapsed for benchmarking - pub total_time_elapsed_in_sec: u64, - /// The total number of views during benchmarking - pub total_num_views: usize, - /// The number of failed views during benchmarking - pub failed_num_views: usize, - /// The membership committee type used - pub committee_type: String, -} - // VALIDATOR #[derive(Parser, Debug, Clone)] @@ -493,7 +432,10 @@ impl OrchestratorClient { /// # Panics /// Panics if unable to post #[instrument(skip_all, name = "orchestrator metrics")] - pub async fn post_bench_results(&self, bench_results: BenchResults) { + pub async fn post_bench_results( + &self, + bench_results: BenchResults, + ) { let _send_metrics_f: Result<(), ClientError> = self .client .post("api/results") diff --git a/crates/hotshot/orchestrator/src/lib.rs b/crates/hotshot/orchestrator/src/lib.rs index 013af54d807..e8f85fa92ea 100644 --- a/crates/hotshot/orchestrator/src/lib.rs +++ b/crates/hotshot/orchestrator/src/lib.rs @@ -9,17 +9,19 @@ /// The orchestrator's clients pub mod client; +/// Metrics for a benchmark run, collected by the orchestrator +pub mod metrics; + use std::{ - collections::{HashMap, HashSet}, - fs, - fs::OpenOptions, - io, + collections::{BTreeMap, HashMap, HashSet}, + fs::{self, OpenOptions}, + io, thread, time::Duration, }; use alloy::primitives::U256; use async_lock::RwLock; -use client::{BenchResults, BenchResultsDownloadConfig}; +use client::BenchResults; use csv::Writer; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use hotshot_types::{ @@ -47,6 +49,8 @@ use vbs::{ BinarySerializer, }; +use crate::metrics::NormalizedViewTimeline; + /// Orchestrator is not, strictly speaking, bound to the network; it can have its own versioning. /// Orchestrator Version (major) pub const ORCHESTRATOR_MAJOR_VERSION: u16 = 0; @@ -91,7 +95,7 @@ struct OrchestratorState { /// The total nodes that have posted they are ready to start nodes_connected: HashSet>, /// The results of the benchmarks - bench_results: BenchResults, + bench_results: BTreeMap>, /// The number of nodes that have posted their results nodes_post_results: u64, /// Whether the orchestrator can be started manually @@ -135,7 +139,7 @@ impl OrchestratorState { pub_posted: HashMap::new(), nodes_connected: HashSet::new(), start: false, - bench_results: BenchResults::default(), + bench_results: BTreeMap::new(), nodes_post_results: 0, manual_start_allowed: true, accepting_new_keys: true, @@ -145,37 +149,73 @@ impl OrchestratorState { } /// Output the results to a csv file according to orchestrator state - pub fn output_to_csv(&self) { - let output_csv = BenchResultsDownloadConfig { - commit_sha: self.config.commit_sha.clone(), - total_nodes: self.config.config.num_nodes_with_stake.into(), - da_committee_size: self.config.config.da_staked_committee_size, - fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid, - transactions_per_round: self.config.transactions_per_round, - transaction_size: self.bench_results.transaction_size_in_bytes, - rounds: self.config.rounds, - partial_results: self.bench_results.partial_results.clone(), - avg_latency_in_sec: self.bench_results.avg_latency_in_sec, - minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec, - maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec, - throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec, - total_transactions_committed: self.bench_results.total_transactions_committed, - total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec, - total_num_views: self.bench_results.total_num_views, - failed_num_views: self.bench_results.failed_num_views, - committee_type: self.bench_results.committee_type.clone(), - }; + pub fn output_to_csv(result: BenchResults) { // Open the CSV file in append mode - let results_csv_file = OpenOptions::new() + let leader_results_csv_file = OpenOptions::new() + .write(true) + .create(true) + .open(format!("Leader_results_{}.csv", result.node_index)) + .unwrap(); + // Open a file for writing + let mut wtr = Writer::from_writer(leader_results_csv_file); + + for (_, leader_stats) in result.leader_view_stats.iter() { + let _ = wtr + .serialize(leader_stats) + .map_err(|e| tracing::warn!("Failed to serialize leader stats: {}", e)); + } + let _ = wtr.flush(); + println!( + "Results successfully saved in leader_results_{}.csv", + result.node_index + ); + + // Do the same for the replica results + let replica_results_csv_file = OpenOptions::new() + .write(true) .create(true) - .append(true) // Open in append mode - .open("scripts/benchmarks_results/results.csv") + .open(format!("replica_results_{}.csv", result.node_index)) .unwrap(); // Open a file for writing - let mut wtr = Writer::from_writer(results_csv_file); - let _ = wtr.serialize(output_csv); + let mut wtr = Writer::from_writer(replica_results_csv_file); + + for (_, replica_stats) in result.replica_view_stats.iter() { + let _ = wtr + .serialize(replica_stats) + .map_err(|e| tracing::warn!("Failed to serialize replica stats: {}", e)); + } let _ = wtr.flush(); - println!("Results successfully saved in scripts/benchmarks_results/results.csv"); + + // Log the Latencies of each block by view + let latency_results_csv_file = OpenOptions::new() + .write(true) + .create(true) + .open(format!("latency_results_{}.csv", result.node_index)) + .unwrap(); + let mut wtr = Writer::from_writer(latency_results_csv_file); + let _ = wtr.write_record(["view", "latency"]); + for (view, latency) in result.latencies_by_view { + let _ = wtr.write_record([view.to_string(), latency.to_string()]); + } + let _ = wtr.flush(); + + // Log the Sizes of each block by view + let sizes_results_csv_file = OpenOptions::new() + .write(true) + .create(true) + .open(format!("sizes_results_{}.csv", result.node_index)) + .unwrap(); + let mut wtr = Writer::from_writer(sizes_results_csv_file); + let _ = wtr.write_record(["view", "size"]); + for (view, size) in result.sizes_by_view { + let _ = wtr.write_record([view.to_string(), size.to_string()]); + } + let _ = wtr.flush(); + + println!( + "Results successfully saved in replica_results_{}.csv", + result.node_index + ); } } @@ -223,7 +263,11 @@ pub trait OrchestratorApi { /// post endpoint for the results of the run /// # Errors /// if unable to serve - fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>; + fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>; + /// get endpoint for the normalized metrics for the most recent epoch + /// # Errors + /// if unable to serve + fn get_metrics(&self) -> Result, ServerError>; /// A node POSTs its public key to let the orchestrator know that it is ready /// # Errors /// if unable to serve @@ -590,70 +634,33 @@ where } // Aggregates results of the run from all nodes - fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> { - if metrics.total_transactions_committed != 0 { - // Deal with the bench results - if self.bench_results.total_transactions_committed == 0 { - self.bench_results = metrics; - } else { - // Deal with the bench results from different nodes - let cur_metrics = self.bench_results.clone(); - self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec - * metrics.num_latency - + cur_metrics.avg_latency_in_sec * cur_metrics.num_latency) - / (metrics.num_latency + cur_metrics.num_latency); - self.bench_results.num_latency += metrics.num_latency; - self.bench_results.minimum_latency_in_sec = metrics - .minimum_latency_in_sec - .min(cur_metrics.minimum_latency_in_sec); - self.bench_results.maximum_latency_in_sec = metrics - .maximum_latency_in_sec - .max(cur_metrics.maximum_latency_in_sec); - self.bench_results.throughput_bytes_per_sec = metrics - .throughput_bytes_per_sec - .max(cur_metrics.throughput_bytes_per_sec); - self.bench_results.total_transactions_committed = metrics - .total_transactions_committed - .max(cur_metrics.total_transactions_committed); - self.bench_results.total_time_elapsed_in_sec = metrics - .total_time_elapsed_in_sec - .max(cur_metrics.total_time_elapsed_in_sec); - self.bench_results.total_num_views = - metrics.total_num_views.min(cur_metrics.total_num_views); - self.bench_results.failed_num_views = - metrics.failed_num_views.max(cur_metrics.failed_num_views); - } - } + fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> { self.nodes_post_results += 1; - if self.bench_results.partial_results == "Unset" { - self.bench_results.partial_results = "One".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } - if self.bench_results.partial_results == "One" - && self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2) - { - self.bench_results.partial_results = "HalfDA".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } - if self.bench_results.partial_results == "HalfDA" - && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2) - { - self.bench_results.partial_results = "Half".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } - if self.bench_results.partial_results != "Full" - && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64) - { - self.bench_results.partial_results = "Full".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } + self.bench_results + .insert(metrics.node_index, metrics.clone()); + thread::spawn(move || { + OrchestratorState::::output_to_csv(metrics); + }); Ok(()) } + fn get_metrics(&self) -> Result, ServerError> { + let mut leader_stats = BTreeMap::new(); + let mut replica_stats = BTreeMap::new(); + for (_id, stats) in self.bench_results.iter() { + for (view, leader_stat) in stats.leader_view_stats.iter() { + leader_stats.insert(**view, leader_stat.clone().into()); + } + for (view, replica_stat) in stats.replica_view_stats.iter() { + replica_stats + .entry(**view) + .or_insert(Vec::new()) + .push(replica_stat.clone().into()); + } + } + Ok(metrics::get_metrics(&replica_stats, leader_stats)) + } + fn post_builder(&mut self, builder: Url) -> Result<(), ServerError> { self.builders.push(builder); Ok(()) @@ -779,11 +786,14 @@ where })? .post("post_results", |req, state| { async move { - let metrics: Result = req.body_json(); + let metrics: Result, RequestError> = req.body_json(); state.post_run_results(metrics.unwrap()) } .boxed() })? + .get("get_metrics", |_req, state| { + async move { state.get_metrics() }.boxed() + })? .post("post_builder", |req, state| { async move { // Read the bytes from the body diff --git a/crates/hotshot/orchestrator/src/metrics.rs b/crates/hotshot/orchestrator/src/metrics.rs new file mode 100644 index 00000000000..390299ba550 --- /dev/null +++ b/crates/hotshot/orchestrator/src/metrics.rs @@ -0,0 +1,214 @@ +use std::collections::BTreeMap; + +use hotshot_types::{ + benchmarking::{ + LeaderViewStats as OtherLeaderViewStats, ReplicaViewStats as OtherReplicaViewStats, + }, + traits::node_implementation::ConsensusTime, +}; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ReplicaViewStats { + pub view: u64, + pub view_change: Option, + pub proposal_timestamp: Option, + pub proposal_recv: Option, + pub vote_send: Option, + pub timeout_vote_send: Option, + pub da_proposal_received: Option, + pub da_proposal_validated: Option, + pub da_certificate_recv: Option, + pub proposal_prelim_validated: Option, + pub proposal_validated: Option, + pub timeout_triggered: Option, + pub vid_share_validated: Option, + pub vid_share_recv: Option, +} + +impl From> for ReplicaViewStats { + fn from(stats: OtherReplicaViewStats) -> Self { + Self { + view: stats.view.u64(), + ..stats.into() + } + } +} + +impl From> for LeaderViewStats { + fn from(stats: OtherLeaderViewStats) -> Self { + Self { + view: stats.view.u64(), + ..stats.into() + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct LeaderViewStats { + pub view: u64, + pub prev_proposal_send: Option, + pub proposal_send: Option, + pub vote_recv: Option, + pub da_proposal_send: Option, + pub builder_start: Option, + pub block_built: Option, + pub vid_disperse_send: Option, + pub timeout_certificate_formed: Option, + pub qc_formed: Option, + pub da_cert_send: Option, +} +#[derive(Clone, Debug, Serialize)] +pub struct NormalizedViewTimeline { + view: u64, + previous_proposal_send_time: i128, + leader_received_proposal_time: i128, + previous_view_last_quorum_vote_time: i128, + qc_formed_time: i128, + block_built_time: i128, + proposal_send_time: i128, + da_proposal_send_time: i128, + vid_sent_time: i128, + proposal_recv_time: i128, + proposal_validation_time: i128, + vid_recv_time: i128, + da_cert_formed_time: i128, + da_cert_recv_time: i128, + quorum_vote_send_time: i128, +} + +impl NormalizedViewTimeline { + fn from_stats( + leader_record: &LeaderViewStats, + replica_records: &[ReplicaViewStats], + last_proposal_send_time: i128, + last_view_last_quorum_vote_time: i128, + ) -> Option { + Some(Self { + view: leader_record.view, + previous_proposal_send_time: last_proposal_send_time, + leader_received_proposal_time: leader_record.builder_start? / 1_000_000 + - last_proposal_send_time, + previous_view_last_quorum_vote_time: last_view_last_quorum_vote_time + - last_proposal_send_time, + qc_formed_time: leader_record.qc_formed? / 1_000_000 - last_proposal_send_time, + block_built_time: leader_record.block_built? / 1_000_000 - last_proposal_send_time, + proposal_send_time: leader_record.proposal_send? / 1_000_000 - last_proposal_send_time, + da_proposal_send_time: leader_record.da_proposal_send? / 1_000_000 + - last_proposal_send_time, + vid_sent_time: leader_record.vid_disperse_send? / 1_000_000 - last_proposal_send_time, + proposal_recv_time: replica_records + .iter() + .map(|r| r.proposal_recv.unwrap_or(0) / 1_000_000) + .sorted() + .nth((replica_records.len() * 2) / 3 + 1) + .unwrap() + - last_proposal_send_time, + proposal_validation_time: replica_records + .iter() + .map(|r| r.proposal_validated.unwrap_or(0) / 1_000_000) + .sorted() + .nth((replica_records.len() * 2) / 3 + 1) + .unwrap() + - last_proposal_send_time, + vid_recv_time: replica_records + .iter() + .filter(|r| r.vid_share_validated.is_some()) + .map(|r| r.vid_share_validated.unwrap_or(0) / 1_000_000) + .sorted() + .nth((replica_records.len() * 2) / 3 + 1) + .unwrap() + - last_proposal_send_time, + da_cert_formed_time: leader_record.da_cert_send.unwrap() / 1_000_000 + - last_proposal_send_time, + da_cert_recv_time: replica_records + .iter() + .map(|r| r.da_certificate_recv.unwrap_or(0) / 1_000_000) + .sorted() + .nth((replica_records.len() * 2) / 3 + 1) + .unwrap() + - last_proposal_send_time, + quorum_vote_send_time: replica_records + .iter() + .map(|r| r.vote_send.unwrap_or(0) / 1_000_000) + .sorted() + .nth((replica_records.len() * 2) / 3 + 1) + .unwrap() + - last_proposal_send_time, + }) + } +} + +pub fn remove_views_with_no_preceding( + normalized_views: &mut BTreeMap, +) { + let mut previous_view = 0; + let mut to_remove = Vec::new(); + for (view, _) in normalized_views.iter() { + if *view != previous_view + 1 { + to_remove.push(*view); + } + previous_view = *view; + } + for view in to_remove { + normalized_views.remove(&view); + } +} + +fn get_last_quorum_vote_time(records_by_view: &[ReplicaViewStats]) -> i128 { + records_by_view + .iter() + .map(|r| r.vote_send.unwrap() / 1_000_000) + .nth((records_by_view.len() * 2) / 3 + 1) + .unwrap() +} + +fn normalize_views( + records_by_view: &BTreeMap>, + leader_records_by_view: &BTreeMap, +) -> BTreeMap { + let mut normalized_views = BTreeMap::new(); + let first_leader_record = leader_records_by_view.iter().next().unwrap(); + let mut proposal_send_time = first_leader_record.1.proposal_send.unwrap() / 1_000_000; + let mut last_view_last_quorum_vote_time = + get_last_quorum_vote_time(records_by_view.get(first_leader_record.0).unwrap()); + for (view, record) in leader_records_by_view.iter().skip(1) { + let Some(replica_records) = records_by_view.get(view) else { + println!("Replica records not found for view: {}", view); + continue; + }; + let Some(normalized_view) = NormalizedViewTimeline::from_stats( + record, + replica_records, + proposal_send_time, + last_view_last_quorum_vote_time, + ) else { + println!("Normalized view not found for view: {}", view); + continue; + }; + proposal_send_time = record.proposal_send.unwrap() / 1_000_000; + last_view_last_quorum_vote_time = get_last_quorum_vote_time(replica_records); + normalized_views.insert(*view, normalized_view); + } + normalized_views +} + +pub fn get_metrics( + records_by_view: &BTreeMap>, + leader_records_by_view: BTreeMap, +) -> BTreeMap { + let mut records_by_view = records_by_view.clone(); + + records_by_view.retain(|_, records| records.len() > 60); + let mut leader_records_by_view = leader_records_by_view.clone(); + let first_view = *records_by_view.keys().next().unwrap(); + println!("First view: {}", first_view); + leader_records_by_view = leader_records_by_view.split_off(&(first_view)); + + let mut normalized_views = normalize_views(&records_by_view, &leader_records_by_view); + + // let processed_view_stats = process_view_stats(records_by_view, leader_records_by_view); + // let durations = calculate_durations(processed_view_stats); + remove_views_with_no_preceding(&mut normalized_views); + normalized_views +} diff --git a/crates/hotshot/task-impls/Cargo.toml b/crates/hotshot/task-impls/Cargo.toml index 79db91b226e..be2303b21fc 100644 --- a/crates/hotshot/task-impls/Cargo.toml +++ b/crates/hotshot/task-impls/Cargo.toml @@ -25,6 +25,7 @@ either = { workspace = true } futures = { workspace = true } hotshot-builder-api = { workspace = true } hotshot-contract-adapter = { workspace = true } +hotshot-orchestrator = { workspace = true } hotshot-task = { workspace = true } hotshot-types = { workspace = true } hotshot-utils = { workspace = true } @@ -42,6 +43,7 @@ tracing = { workspace = true } url = { workspace = true } vbs = { workspace = true } vec1 = { workspace = true } +collector-common = { path = "../../collector-common" } [lints] workspace = true diff --git a/crates/hotshot/task-impls/src/da.rs b/crates/hotshot/task-impls/src/da.rs index e15fd93e14e..d53ecc7efef 100644 --- a/crates/hotshot/task-impls/src/da.rs +++ b/crates/hotshot/task-impls/src/da.rs @@ -10,7 +10,7 @@ use async_broadcast::{Receiver, Sender}; use async_trait::async_trait; use hotshot_task::task::TaskState; use hotshot_types::{ - consensus::{Consensus, OuterConsensus, PayloadWithMetadata}, + consensus::{OuterConsensus, PayloadWithMetadata}, data::{vid_commitment, vid_disperse::vid_total_weight, DaProposal2, PackedBundle}, epoch_membership::EpochMembershipCoordinator, event::{Event, EventType}, @@ -19,7 +19,6 @@ use hotshot_types::{ simple_vote::{DaData2, DaVote2}, storage_metrics::StorageMetricsValue, traits::{ - network::ConnectedNetwork, node_implementation::{NodeImplementation, NodeType, Versions}, signature_key::SignatureKey, storage::Storage, @@ -30,7 +29,7 @@ use hotshot_types::{ }; use hotshot_utils::anytrace::*; use sha2::{Digest, Sha256}; -use tokio::{spawn, task::spawn_blocking}; +use tokio::task::spawn_blocking; use tracing::instrument; use crate::{ @@ -284,76 +283,6 @@ impl, V: Versions> DaTaskState( - OuterConsensus::new(Arc::clone(&consensus.inner_consensus)), - view_number, - target_epoch, - membership.coordinator.clone(), - &pk, - &upgrade_lock, - ) - .await; - if let Some(vid_share) = consensus - .read() - .await - .vid_shares() - .get(&view_number) - .and_then(|key_map| key_map.get(&public_key)) - .and_then(|epoch_map| epoch_map.get(&target_epoch)) - { - tracing::debug!( - "Primary network is down. Calculated own VID share for epoch \ - {target_epoch:?}, my id {my_id}" - ); - broadcast_event( - Arc::new(HotShotEvent::VidShareRecv( - public_key.clone(), - vid_share.clone(), - )), - &chan, - ) - .await; - } - } - }); - } }, HotShotEvent::DaVoteRecv(ref vote) => { tracing::debug!("DA vote recv, Main Task {}", vote.view_number()); diff --git a/crates/hotshot/task-impls/src/network.rs b/crates/hotshot/task-impls/src/network.rs index 7b9b4d3e60d..8daac31f9fb 100644 --- a/crates/hotshot/task-impls/src/network.rs +++ b/crates/hotshot/task-impls/src/network.rs @@ -12,6 +12,7 @@ use std::{ use async_broadcast::{Receiver, Sender}; use async_trait::async_trait; +use collector_common::{send_trace, Trace}; use hotshot_task::task::TaskState; use hotshot_types::{ consensus::OuterConsensus, @@ -101,7 +102,19 @@ impl NetworkMessageTaskState { ); return; } - HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender) + + let proposal_view_number = proposal.data.view_number(); + let event = HotShotEvent::QuorumProposalRecv( + convert_proposal(proposal), + sender, + ); + + // Send the trace when we receive the proposal + let _ = send_trace(&Trace::ProposalReceivedEventGenerated( + *proposal_view_number, + )); + + event }, GeneralConsensusMessage::Proposal2Legacy(proposal) => { if !self @@ -116,7 +129,19 @@ impl NetworkMessageTaskState { ); return; } - HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender) + + let proposal_view_number = proposal.data.view_number(); + let event = HotShotEvent::QuorumProposalRecv( + convert_proposal(proposal), + sender, + ); + + // Send the trace when we receive the proposal + let _ = send_trace(&Trace::ProposalReceivedEventGenerated( + *proposal_view_number, + )); + + event }, GeneralConsensusMessage::Proposal2(proposal) => { if !self @@ -131,7 +156,19 @@ impl NetworkMessageTaskState { ); return; } - HotShotEvent::QuorumProposalRecv(convert_proposal(proposal), sender) + + let proposal_view_number = proposal.data.view_number(); + let event = HotShotEvent::QuorumProposalRecv( + convert_proposal(proposal), + sender, + ); + + // Send the trace when we receive the proposal + let _ = send_trace(&Trace::ProposalReceivedEventGenerated( + *proposal_view_number, + )); + + event }, GeneralConsensusMessage::ProposalRequested(req, sig) => { HotShotEvent::QuorumProposalRequestRecv(req, sig) @@ -1523,6 +1560,26 @@ impl< }, }; + // Match on the message kind + match &message.kind { + MessageKind::Consensus(SequencingMessage::General( + GeneralConsensusMessage::Proposal(proposal), + )) => { + let _ = send_trace(&Trace::ProposalSent(*(proposal.data.view_number()))); + }, + MessageKind::Consensus(SequencingMessage::General( + GeneralConsensusMessage::Proposal2(proposal), + )) => { + let _ = send_trace(&Trace::ProposalSent(*(proposal.data.view_number()))); + }, + MessageKind::Consensus(SequencingMessage::General( + GeneralConsensusMessage::Proposal2Legacy(proposal), + )) => { + let _ = send_trace(&Trace::ProposalSent(*(proposal.data.view_number()))); + }, + _ => {}, + } + let transmit_result = match transmit { TransmitType::Direct(recipient) => { network.direct_message(serialized_message, recipient).await diff --git a/crates/hotshot/task-impls/src/quorum_proposal/handlers.rs b/crates/hotshot/task-impls/src/quorum_proposal/handlers.rs index 4a430909571..c2edd68a2a8 100644 --- a/crates/hotshot/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/hotshot/task-impls/src/quorum_proposal/handlers.rs @@ -14,6 +14,7 @@ use std::{ }; use async_broadcast::{Receiver, Sender}; +use collector_common::{Trace, send_trace}; use committable::{Commitment, Committable}; use hotshot_task::dependency_task::HandleDepOutput; use hotshot_types::{ @@ -653,6 +654,9 @@ impl ProposalDependencyHandle { proposed_leaf.justify_qc().view_number() ); + // Send the trace when we generate the proposal event + let _ = send_trace(&Trace::ProposalSendEventGenerated(*(proposed_leaf.view_number()))); + broadcast_event( Arc::new(HotShotEvent::QuorumProposalSend( message.clone(), diff --git a/crates/hotshot/task-impls/src/quorum_proposal/mod.rs b/crates/hotshot/task-impls/src/quorum_proposal/mod.rs index e9560375d5e..3b1612b82ed 100644 --- a/crates/hotshot/task-impls/src/quorum_proposal/mod.rs +++ b/crates/hotshot/task-impls/src/quorum_proposal/mod.rs @@ -40,7 +40,7 @@ use crate::{ quorum_proposal::handlers::handle_eqc_formed, }; -mod handlers; +pub mod handlers; /// The state for the quorum proposal task. pub struct QuorumProposalTaskState, V: Versions> { diff --git a/crates/hotshot/task-impls/src/stats.rs b/crates/hotshot/task-impls/src/stats.rs index 10c18150462..0a3839dc682 100644 --- a/crates/hotshot/task-impls/src/stats.rs +++ b/crates/hotshot/task-impls/src/stats.rs @@ -6,10 +6,13 @@ use std::{ use async_broadcast::{Receiver, Sender}; use async_trait::async_trait; use either::Either; +use hotshot_orchestrator::client::{BenchResults, OrchestratorClient}; use hotshot_task::task::TaskState; use hotshot_types::{ + benchmarking::{LeaderViewStats, ReplicaViewStats}, consensus::OuterConsensus, epoch_membership::EpochMembershipCoordinator, + simple_vote::HasEpoch, traits::{ block_contents::BlockHeader, node_implementation::{ConsensusTime, NodeType}, @@ -21,106 +24,39 @@ use hotshot_utils::{ anytrace::{Error, Level, Result}, line_info, warn, }; -use serde::{Deserialize, Serialize}; use time::OffsetDateTime; +use url::Url; use crate::events::HotShotEvent; -#[derive(Serialize, Deserialize)] -pub struct LeaderViewStats { - pub view: TYPES::View, - pub prev_proposal_send: Option, - pub proposal_send: Option, - pub vote_recv: Option, - pub da_proposal_send: Option, - pub builder_start: Option, - pub block_built: Option, - pub vid_disperse_send: Option, - pub timeout_certificate_formed: Option, - pub qc_formed: Option, - pub da_cert_send: Option, -} - -#[derive(Serialize, Deserialize)] -pub struct ReplicaViewStats { - pub view: TYPES::View, - pub view_change: Option, - pub proposal_timestamp: Option, - pub proposal_recv: Option, - pub vote_send: Option, - pub timeout_vote_send: Option, - pub da_proposal_received: Option, - pub da_proposal_validated: Option, - pub da_certificate_recv: Option, - pub proposal_prelim_validated: Option, - pub proposal_validated: Option, - pub timeout_triggered: Option, - pub vid_share_validated: Option, - pub vid_share_recv: Option, -} - -impl LeaderViewStats { - fn new(view: TYPES::View) -> Self { - Self { - view, - prev_proposal_send: None, - proposal_send: None, - vote_recv: None, - da_proposal_send: None, - builder_start: None, - block_built: None, - vid_disperse_send: None, - timeout_certificate_formed: None, - qc_formed: None, - da_cert_send: None, - } - } -} - -impl ReplicaViewStats { - fn new(view: TYPES::View) -> Self { - Self { - view, - view_change: None, - proposal_timestamp: None, - proposal_recv: None, - vote_send: None, - timeout_vote_send: None, - da_proposal_received: None, - da_proposal_validated: None, - da_certificate_recv: None, - proposal_prelim_validated: None, - proposal_validated: None, - timeout_triggered: None, - vid_share_validated: None, - vid_share_recv: None, - } - } -} - pub struct StatsTaskState { + node_index: u64, view: TYPES::View, epoch: Option, public_key: TYPES::SignatureKey, consensus: OuterConsensus, membership_coordinator: EpochMembershipCoordinator, - leader_stats: BTreeMap>, - replica_stats: BTreeMap>, + leader_stats: BTreeMap>, + replica_stats: BTreeMap>, latencies_by_view: BTreeMap, sizes_by_view: BTreeMap, epoch_start_times: BTreeMap, timeouts: BTreeSet, + orchestrator_client: Option, } impl StatsTaskState { pub fn new( + node_index: u64, view: TYPES::View, epoch: Option, public_key: TYPES::SignatureKey, consensus: OuterConsensus, membership_coordinator: EpochMembershipCoordinator, + orchestrator_url: Option, ) -> Self { Self { + node_index, view, epoch, public_key, @@ -132,14 +68,15 @@ impl StatsTaskState { sizes_by_view: BTreeMap::new(), epoch_start_times: BTreeMap::new(), timeouts: BTreeSet::new(), + orchestrator_client: orchestrator_url.map(OrchestratorClient::new), } } - fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats { + fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats { self.leader_stats .entry(view) .or_insert_with(|| LeaderViewStats::new(view)) } - fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats { + fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats { self.replica_stats .entry(view) .or_insert_with(|| ReplicaViewStats::new(view)) @@ -184,16 +121,21 @@ impl StatsTaskState { Ok(()) } - fn log_basic_stats(&self, now: i128, epoch: &TYPES::Epoch) { + fn log_basic_stats(&self, now: i128, epoch: &TYPES::Epoch) -> i128 { let num_views = self.latencies_by_view.len(); let total_size = self.sizes_by_view.values().sum::(); // Either we have no views logged yet, no TXNs or we are not in the DA committee and don't know block sizes if num_views == 0 || total_size == 0 { - return; + return 0; } let total_latency = self.latencies_by_view.values().sum::(); + let elapsed_time = if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) { + now - epoch_start_time + } else { + 0 + }; let average_latency = total_latency / num_views as i128; tracing::warn!("Average latency: {}ms", average_latency); tracing::warn!( @@ -201,12 +143,18 @@ impl StatsTaskState { epoch, self.timeouts.len() ); - if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) { - let elapsed_time = now - epoch_start_time; + let total_size = self.sizes_by_view.values().sum::(); + if total_size == 0 { + // Either no TXNs or we are not in the DA committee and don't know block sizes + return elapsed_time; + } + if elapsed_time > 0 { // multiply by 1000 to convert to seconds let throughput = (total_size / elapsed_time) * 1000; tracing::warn!("Throughput: {} bytes/s", throughput); + return elapsed_time; } + elapsed_time } } @@ -224,33 +172,54 @@ impl TaskState for StatsTaskState { match event.as_ref() { HotShotEvent::BlockRecv(block_recv) => { - self.leader_entry(block_recv.view_number).block_built = Some(now); + self.leader_entry(block_recv.view_number) + .block_built + .get_or_insert(now); }, HotShotEvent::QuorumProposalRecv(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .proposal_recv = Some(now); + .proposal_recv + .get_or_insert(now); + let leader = self + .membership_coordinator + .membership_for_epoch(proposal.data.epoch()) + .await? + .leader(proposal.data.view_number() + 1) + .await?; + if leader == self.public_key { + self.leader_entry(proposal.data.view_number() + 1) + .builder_start + .get_or_insert(now); + } }, HotShotEvent::QuorumVoteRecv(_vote) => {}, HotShotEvent::TimeoutVoteRecv(_vote) => {}, HotShotEvent::TimeoutVoteSend(vote) => { - self.replica_entry(vote.view_number()).timeout_vote_send = Some(now); + self.replica_entry(vote.view_number()) + .timeout_vote_send + .get_or_insert(now); }, HotShotEvent::DaProposalRecv(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .da_proposal_received = Some(now); + .da_proposal_received + .get_or_insert(now); }, HotShotEvent::DaProposalValidated(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .da_proposal_validated = Some(now); + .da_proposal_validated + .get_or_insert(now); }, HotShotEvent::DaVoteRecv(_simple_vote) => {}, HotShotEvent::DaCertificateRecv(simple_certificate) => { self.replica_entry(simple_certificate.view_number()) - .da_certificate_recv = Some(now); + .da_certificate_recv + .get_or_insert(now); }, HotShotEvent::DaCertificateValidated(_simple_certificate) => {}, HotShotEvent::QuorumProposalSend(proposal, _) => { - self.leader_entry(proposal.data.view_number()).proposal_send = Some(now); + self.leader_entry(proposal.data.view_number()) + .proposal_send + .get_or_insert(now); // If the last view succeeded, add the metric for time between proposals if proposal.data.view_change_evidence().is_none() { @@ -259,7 +228,8 @@ impl TaskState for StatsTaskState { .proposal_recv { self.leader_entry(proposal.data.view_number()) - .prev_proposal_send = Some(previous_proposal_time); + .prev_proposal_send + .get_or_insert(previous_proposal_time); // calculate the elapsed time as milliseconds (from nanoseconds) let elapsed_time = (now - previous_proposal_time) / 1_000_000; @@ -277,58 +247,67 @@ impl TaskState for StatsTaskState { } }, HotShotEvent::QuorumVoteSend(simple_vote) => { - self.replica_entry(simple_vote.view_number()).vote_send = Some(now); + self.replica_entry(simple_vote.view_number()) + .vote_send + .get_or_insert(now); }, HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => { - self.replica_entry(simple_vote.view_number()).vote_send = Some(now); + self.replica_entry(simple_vote.view_number()) + .vote_send + .get_or_insert(now); }, HotShotEvent::QuorumProposalValidated(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .proposal_validated = Some(now); + .proposal_validated + .get_or_insert(now); self.replica_entry(proposal.data.view_number()) .proposal_timestamp = Some(proposal.data.block_header().timestamp_millis() as i128); }, HotShotEvent::DaProposalSend(proposal, _) => { self.leader_entry(proposal.data.view_number()) - .da_proposal_send = Some(now); + .da_proposal_send + .get_or_insert(now); }, HotShotEvent::DaVoteSend(simple_vote) => { - self.replica_entry(simple_vote.view_number()).vote_send = Some(now); + self.replica_entry(simple_vote.view_number()) + .vote_send + .get_or_insert(now); }, HotShotEvent::QcFormed(either) => { match either { - Either::Left(qc) => { - self.leader_entry(qc.view_number() + 1).qc_formed = Some(now) - }, - Either::Right(tc) => { - self.leader_entry(tc.view_number()) - .timeout_certificate_formed = Some(now) - }, + Either::Left(qc) => self + .leader_entry(qc.view_number() + 1) + .qc_formed + .get_or_insert(now), + Either::Right(tc) => self + .leader_entry(tc.view_number()) + .timeout_certificate_formed + .get_or_insert(now), }; }, HotShotEvent::Qc2Formed(either) => { match either { - Either::Left(qc) => { - self.leader_entry(qc.view_number() + 1).qc_formed = Some(now) - }, - Either::Right(tc) => { - self.leader_entry(tc.view_number()) - .timeout_certificate_formed = Some(now) - }, + Either::Left(qc) => self + .leader_entry(qc.view_number() + 1) + .qc_formed + .get_or_insert(now), + Either::Right(tc) => self + .leader_entry(tc.view_number()) + .timeout_certificate_formed + .get_or_insert(now), }; }, HotShotEvent::DacSend(simple_certificate, _) => { self.leader_entry(simple_certificate.view_number()) - .da_cert_send = Some(now); + .da_cert_send + .get_or_insert(now); }, HotShotEvent::ViewChange(view, epoch) => { // Record the timestamp of the first observed view change // This can happen when transitioning to the next view, either due to voting // or receiving a proposal, but we only store the first one - if self.replica_entry(*view + 1).view_change.is_none() { - self.replica_entry(*view + 1).view_change = Some(now); - } + self.replica_entry(*view + 1).view_change.get_or_insert(now); if *epoch <= self.epoch && *view <= self.view { return Ok(()); @@ -347,10 +326,25 @@ impl TaskState for StatsTaskState { } if new_epoch { - if let Some(prev_epoch) = prev_epoch { - self.log_basic_stats(now, &prev_epoch); - } + let elapsed_time = if let Some(prev_epoch) = prev_epoch { + self.log_basic_stats(now, &prev_epoch) + } else { + 0 + }; let _ = self.dump_stats(); + if let Some(orchestrator_client) = self.orchestrator_client.as_ref() { + orchestrator_client + .post_bench_results::(BenchResults:: { + node_index: self.node_index, + leader_view_stats: self.leader_stats.clone(), + replica_view_stats: self.replica_stats.clone(), + latencies_by_view: self.latencies_by_view.clone(), + sizes_by_view: self.sizes_by_view.clone(), + timeouts: self.timeouts.clone(), + total_time_millis: elapsed_time, + }) + .await; + } self.garbage_collect(*view - 1); } @@ -361,7 +355,7 @@ impl TaskState for StatsTaskState { .leader(*view) .await?; if leader == self.public_key { - self.leader_entry(*view).builder_start = Some(now); + self.leader_entry(*view).builder_start.get_or_insert(now); } }, HotShotEvent::Timeout(view, _) => { @@ -373,19 +367,24 @@ impl TaskState for StatsTaskState { // #3526 https://github.com/EspressoSystems/espresso-network/issues/3526 }, HotShotEvent::SendPayloadCommitmentAndMetadata(_, _, _, view, _) => { - self.leader_entry(*view).vid_disperse_send = Some(now); + self.leader_entry(*view) + .vid_disperse_send + .get_or_insert(now); }, HotShotEvent::VidShareRecv(_, proposal) => { self.replica_entry(proposal.data.view_number()) - .vid_share_recv = Some(now); + .vid_share_recv + .get_or_insert(now); }, HotShotEvent::VidShareValidated(proposal) => { self.replica_entry(proposal.data.view_number()) - .vid_share_validated = Some(now); + .vid_share_validated + .get_or_insert(now); }, HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => { self.replica_entry(proposal.data.view_number()) - .proposal_prelim_validated = Some(now); + .proposal_prelim_validated + .get_or_insert(now); }, HotShotEvent::LeavesDecided(leaves) => { for leaf in leaves { diff --git a/crates/hotshot/task-impls/src/transactions.rs b/crates/hotshot/task-impls/src/transactions.rs index bd05abb1ebe..525903ddc4d 100644 --- a/crates/hotshot/task-impls/src/transactions.rs +++ b/crates/hotshot/task-impls/src/transactions.rs @@ -21,7 +21,7 @@ use hotshot_types::{ event::{Event, EventType}, message::UpgradeLock, traits::{ - block_contents::{BuilderFee, EncodeBytes}, + block_contents::{BlockHeader, BuilderFee, EncodeBytes}, node_implementation::{ConsensusTime, NodeType, Versions}, signature_key::{BuilderSignatureKey, SignatureKey}, BlockPayload, @@ -116,16 +116,9 @@ impl TransactionTaskState { event_stream: &Sender>>, block_view: TYPES::View, block_epoch: Option, + vid: Option, ) -> Option { - let _version = match self.upgrade_lock.version(block_view).await { - Ok(v) => v, - Err(e) => { - tracing::error!("Failed to calculate version: {e:?}"); - return None; - }, - }; - - self.handle_view_change_legacy(event_stream, block_view, block_epoch) + self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid) .await } @@ -136,6 +129,7 @@ impl TransactionTaskState { event_stream: &Sender>>, block_view: TYPES::View, block_epoch: Option, + vid: Option, ) -> Option { let version = match self.upgrade_lock.version(block_view).await { Ok(v) => v, @@ -219,7 +213,7 @@ impl TransactionTaskState { { None } else { - self.wait_for_block(block_view).await + self.wait_for_block(block_view, vid).await } }; @@ -366,7 +360,50 @@ impl TransactionTaskState { .leader(view) .await?; if leader == self.public_key { - self.handle_view_change(&event_stream, view, *epoch).await; + self.handle_view_change(&event_stream, view, *epoch, None) + .await; + return Ok(()); + } + }, + HotShotEvent::QuorumProposalValidated(proposal, _leaf) => { + let view_number = proposal.data.view_number(); + let next_view = view_number + 1; + + let version = match self.upgrade_lock.version(next_view).await { + Ok(v) => v, + Err(e) => { + tracing::error!("Failed to calculate version: {e:?}"); + return Ok(()); + }, + }; + + if version < V::DrbAndHeaderUpgrade::VERSION { + return Ok(()); + } + + let vid = proposal.data.block_header().payload_commitment(); + let block_height = proposal.data.block_header().block_number(); + if is_epoch_transition(block_height, self.epoch_height) { + return Ok(()); + } + if is_last_block(block_height, self.epoch_height) { + return Ok(()); + } + if next_view <= self.cur_view { + return Ok(()); + } + // move to next view for this task only + self.cur_view = next_view; + + let leader = self + .membership_coordinator + .membership_for_epoch(self.cur_epoch) + .await? + .leader(next_view) + .await?; + if leader == self.public_key { + self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid)) + .await; return Ok(()); } }, @@ -444,19 +481,27 @@ impl TransactionTaskState { } #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")] - async fn wait_for_block(&self, block_view: TYPES::View) -> Option> { + async fn wait_for_block( + &self, + block_view: TYPES::View, + vid: Option, + ) -> Option> { let task_start_time = Instant::now(); // Find commitment to the block we want to build upon - let (parent_view, parent_comm) = match self - .last_vid_commitment_retry(block_view, task_start_time) - .await - { - Ok((v, c)) => (v, c), - Err(e) => { - tracing::warn!("Failed to find last vid commitment in time: {e}"); - return None; - }, + let (parent_view, parent_comm) = if let Some(vid) = vid { + (block_view - 1, vid) + } else { + match self + .last_vid_commitment_retry(block_view, task_start_time) + .await + { + Ok((v, c)) => (v, c), + Err(e) => { + tracing::warn!("Failed to find last vid commitment in time: {e}"); + return None; + }, + } }; let parent_comm_sig = match <::SignatureKey as SignatureKey>::sign( diff --git a/crates/hotshot/testing/src/helpers.rs b/crates/hotshot/testing/src/helpers.rs index 9b32a6211f3..7d165d2e161 100644 --- a/crates/hotshot/testing/src/helpers.rs +++ b/crates/hotshot/testing/src/helpers.rs @@ -147,6 +147,7 @@ pub async fn build_system_handle_from_launcher< ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + None, ) .await .expect("Could not init hotshot"); diff --git a/crates/hotshot/testing/src/test_builder.rs b/crates/hotshot/testing/src/test_builder.rs index 669882d30d4..5dda87f90f4 100644 --- a/crates/hotshot/testing/src/test_builder.rs +++ b/crates/hotshot/testing/src/test_builder.rs @@ -337,6 +337,7 @@ pub async fn create_test_handle< ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + None, ) .await; diff --git a/crates/hotshot/testing/src/test_runner.rs b/crates/hotshot/testing/src/test_runner.rs index 51c8cda3d49..d656a30a840 100644 --- a/crates/hotshot/testing/src/test_runner.rs +++ b/crates/hotshot/testing/src/test_runner.rs @@ -553,6 +553,7 @@ where ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + None, ) .await } @@ -595,6 +596,7 @@ where StorageMetricsValue::default(), internal_channel, external_channel, + None, ) .await } diff --git a/crates/hotshot/types/Cargo.toml b/crates/hotshot/types/Cargo.toml index 9b912384eae..8ee76603364 100644 --- a/crates/hotshot/types/Cargo.toml +++ b/crates/hotshot/types/Cargo.toml @@ -38,12 +38,12 @@ dyn-clone = "1.0.17" either = { workspace = true } futures = { workspace = true, features = ["alloc"] } hotshot-utils = { workspace = true } +jf-advz = { workspace = true } jf-crhf = { workspace = true } jf-pcs = { workspace = true } jf-rescue = { workspace = true } jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-utils = { workspace = true } -jf-advz = { workspace = true } lazy_static = { workspace = true } libp2p-identity = { workspace = true } memoize = { workspace = true } diff --git a/crates/hotshot/types/src/benchmarking.rs b/crates/hotshot/types/src/benchmarking.rs new file mode 100644 index 00000000000..852edd59f1d --- /dev/null +++ b/crates/hotshot/types/src/benchmarking.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct LeaderViewStats { + pub view: V, + pub prev_proposal_send: Option, + pub proposal_send: Option, + pub vote_recv: Option, + pub da_proposal_send: Option, + pub builder_start: Option, + pub block_built: Option, + pub vid_disperse_send: Option, + pub timeout_certificate_formed: Option, + pub qc_formed: Option, + pub da_cert_send: Option, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ReplicaViewStats { + pub view: V, + pub view_change: Option, + pub proposal_timestamp: Option, + pub proposal_recv: Option, + pub vote_send: Option, + pub timeout_vote_send: Option, + pub da_proposal_received: Option, + pub da_proposal_validated: Option, + pub da_certificate_recv: Option, + pub proposal_prelim_validated: Option, + pub proposal_validated: Option, + pub timeout_triggered: Option, + pub vid_share_validated: Option, + pub vid_share_recv: Option, +} + +impl LeaderViewStats { + pub fn new(view: V) -> Self { + Self { + view, + prev_proposal_send: None, + proposal_send: None, + vote_recv: None, + da_proposal_send: None, + builder_start: None, + block_built: None, + vid_disperse_send: None, + timeout_certificate_formed: None, + qc_formed: None, + da_cert_send: None, + } + } +} + +impl ReplicaViewStats { + pub fn new(view: V) -> Self { + Self { + view, + view_change: None, + proposal_timestamp: None, + proposal_recv: None, + vote_send: None, + timeout_vote_send: None, + da_proposal_received: None, + da_proposal_validated: None, + da_certificate_recv: None, + proposal_prelim_validated: None, + proposal_validated: None, + timeout_triggered: None, + vid_share_validated: None, + vid_share_recv: None, + } + } +} diff --git a/crates/hotshot/types/src/lib.rs b/crates/hotshot/types/src/lib.rs index 30762b036d1..63644cc80f7 100644 --- a/crates/hotshot/types/src/lib.rs +++ b/crates/hotshot/types/src/lib.rs @@ -52,6 +52,8 @@ pub mod utils; pub mod vid; pub mod vote; +pub mod benchmarking; + /// Pinned future that is Send and Sync pub type BoxSyncFuture<'a, T> = Pin + Send + Sync + 'a>>; diff --git a/data/genesis/benchmark.toml b/data/genesis/benchmark.toml index 2f19b9d7b1b..bd7985db0fe 100644 --- a/data/genesis/benchmark.toml +++ b/data/genesis/benchmark.toml @@ -1,7 +1,7 @@ base_version = "0.4" upgrade_version = "0.4" genesis_version = "0.4" -epoch_height = 3000 +epoch_height = 1000 drb_difficulty = 10 drb_upgrade_difficulty = 10 epoch_start_block = 1000 @@ -12,7 +12,7 @@ capacity = 200 [chain_config] chain_id = 999999999 base_fee = "1 wei" -max_block_size = "100mb" +max_block_size = "5mb" fee_recipient = "0x0000000000000000000000000000000000000000" fee_contract = "0xa15bb66138824a1c7167f5e85b957d04dd34e468" stake_table_contract = "0x196dbcbb54b8ec4958c959d8949ebfe87ac2aaaf" @@ -23,7 +23,7 @@ timestamp = "1970-01-01T00:00:00Z" [header.chain_config] chain_id = 999999999 base_fee = "1 wei" -max_block_size = "100mb" +max_block_size = "5mb" fee_recipient = "0x0000000000000000000000000000000000000000" fee_contract = "0xa15bb66138824a1c7167f5e85b957d04dd34e468" stake_table_contract = "0x196dbcbb54b8ec4958c959d8949ebfe87ac2aaaf" diff --git a/hotshot-query-service/Cargo.toml b/hotshot-query-service/Cargo.toml index b8018a692ae..a2dfda0a4e1 100644 --- a/hotshot-query-service/Cargo.toml +++ b/hotshot-query-service/Cargo.toml @@ -74,10 +74,10 @@ hotshot-types = { workspace = true } # Dependencies enabled by feature "sql-data-source". include_dir = { version = "0.7", optional = true } itertools = "0.12.1" -jf-merkle-tree-compat = { workspace = true , features = [ +jf-advz = { workspace = true } +jf-merkle-tree-compat = { workspace = true, features = [ "std", ] } -jf-advz = { workspace = true } lazy_static = "1" log = { version = "0.4", optional = true } portpicker = { version = "0.1", optional = true } diff --git a/hotshot-query-service/examples/simple-server.rs b/hotshot-query-service/examples/simple-server.rs index 8d45406e0a6..613a64e2a5f 100644 --- a/hotshot-query-service/examples/simple-server.rs +++ b/hotshot-query-service/examples/simple-server.rs @@ -269,6 +269,7 @@ async fn init_consensus( ConsensusMetricsValue::new(&*data_source.populate_metrics()), storage, StorageMetricsValue::new(&*data_source.populate_metrics()), + None, ) .await .unwrap() diff --git a/hotshot-query-service/src/testing/consensus.rs b/hotshot-query-service/src/testing/consensus.rs index d1e71879072..a795c9a73d7 100644 --- a/hotshot-query-service/src/testing/consensus.rs +++ b/hotshot-query-service/src/testing/consensus.rs @@ -225,6 +225,7 @@ impl MockNetwork { ConsensusMetricsValue::new(&*data_source.populate_metrics()), hs_storage, StorageMetricsValue::new(&*data_source.populate_metrics()), + None, ) .await .unwrap() diff --git a/sdks/crypto-helper/Cargo.toml b/sdks/crypto-helper/Cargo.toml index 0b8cfca4779..9d1f25acef7 100644 --- a/sdks/crypto-helper/Cargo.toml +++ b/sdks/crypto-helper/Cargo.toml @@ -16,11 +16,11 @@ ark-ff = { workspace = true } ark-serialize = { workspace = true } committable = { workspace = true } espresso-types = { path = "../../types" } -primitive-types = { version = "0.13" } hotshot-query-service = { workspace = true } hotshot-types = { workspace = true } jf-crhf = { workspace = true } jf-merkle-tree-compat = { workspace = true, features = ["std"] } +primitive-types = { version = "0.13" } serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 129695e5a3f..22ea3f67cfb 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -81,8 +81,8 @@ jf-crhf = { workspace = true } jf-merkle-tree-compat = { workspace = true } jf-rescue = { workspace = true } -jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-advz = { workspace = true } +jf-signature = { workspace = true, features = ["bls", "schnorr"] } libp2p = { workspace = true } num_enum = "0.7" parking_lot = "0.12" diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index f22437e2736..97b5aceda8a 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -108,6 +108,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer: impl PersistenceEventConsumer + 'static, _: V, proposal_fetcher_cfg: ProposalFetcherConfig, + orchestrator_url: Option, ) -> anyhow::Result { let config = &network_config.config; let pub_key = validator_config.public_key; @@ -152,6 +153,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence ConsensusMetricsValue::new(metrics), Arc::clone(&persistence), StorageMetricsValue::new(metrics), + orchestrator_url, ) .await? .0; diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 48f11eb3396..04a546adb9c 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -31,7 +31,6 @@ use libp2p::Multiaddr; use network::libp2p::split_off_peer_id; use options::Identity; use proposal_fetcher::ProposalFetcherConfig; -use tokio::select; use tracing::info; use url::Url; @@ -45,9 +44,8 @@ use espresso_types::v0::traits::SequencerPersistence; pub use genesis::Genesis; use hotshot::{ traits::implementations::{ - derive_libp2p_multiaddr, derive_libp2p_peer_id, CdnMetricsValue, CdnTopic, - CombinedNetworks, GossipConfig, KeyPair, Libp2pNetwork, MemoryNetwork, PushCdnNetwork, - RequestResponseConfig, WrappedSignatureKey, + derive_libp2p_multiaddr, derive_libp2p_peer_id, CdnMetricsValue, CdnTopic, GossipConfig, + KeyPair, MemoryNetwork, PushCdnNetwork, RequestResponseConfig, WrappedSignatureKey, }, types::SignatureKey, }; @@ -292,7 +290,7 @@ where info!("Libp2p advertise address: {}", libp2p_advertise_address); // Orchestrator client - let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url); + let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url.clone()); let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key); let validator_config = ValidatorConfig { public_key: pub_key, @@ -563,43 +561,31 @@ where // Initialize the Libp2p network let network = { - let p2p_network = Libp2pNetwork::from_config( - network_config.clone(), - persistence.clone(), - coordinator.membership().clone(), - gossip_config, - request_response_config, - libp2p_bind_address, - &validator_config.public_key, - // We need the private key so we can derive our Libp2p keypair - // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html) - &validator_config.private_key, - hotshot::traits::implementations::Libp2pMetricsValue::new(metrics), - ) - .await - .with_context(|| { - format!( - "Failed to create libp2p network on node {node_index}; binding to {:?}", - network_params.libp2p_bind_address - ) - })?; - - tracing::warn!("Waiting for at least one connection to be initialized"); - select! { - _ = cdn_network.wait_for_ready() => { - tracing::warn!("CDN connection initialized"); - }, - _ = p2p_network.wait_for_ready() => { - tracing::warn!("P2P connection initialized"); - }, - }; + // let p2p_network = Libp2pNetwork::from_config( + // network_config.clone(), + // persistence.clone(), + // gossip_config, + // request_response_config, + // libp2p_bind_address, + // &validator_config.public_key, + // // We need the private key so we can derive our Libp2p keypair + // // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html) + // &validator_config.private_key, + // hotshot::traits::implementations::Libp2pMetricsValue::new(metrics), + // ) + // .await + // .with_context(|| { + // format!( + // "Failed to create libp2p network on node {node_index}; binding to {:?}", + // network_params.libp2p_bind_address + // ) + // })?; + + tracing::warn!("Waiting for the CDN connection to be initialized"); + cdn_network.wait_for_ready().await; // Combine the CDN and P2P networks - Arc::from(CombinedNetworks::new( - cdn_network, - p2p_network, - Some(Duration::from_secs(1)), - )) + Arc::from(cdn_network) }; let mut ctx = SequencerContext::init( @@ -617,6 +603,7 @@ where event_consumer, seq_versions, proposal_fetcher_config, + Some(network_params.orchestrator_url), ) .await?; if wait_for_orchestrator { @@ -1327,6 +1314,7 @@ pub mod testing { event_consumer, bind_version, Default::default(), + None, ) .await .unwrap() diff --git a/sequencer/src/network/mod.rs b/sequencer/src/network/mod.rs index d16c35f9993..74818648966 100644 --- a/sequencer/src/network/mod.rs +++ b/sequencer/src/network/mod.rs @@ -5,6 +5,6 @@ use super::*; pub mod cdn; pub mod libp2p; -pub type Production = CombinedNetworks; +pub type Production = PushCdnNetwork; pub type Memory = MemoryNetwork; diff --git a/types/Cargo.toml b/types/Cargo.toml index 68f2b4add89..e65b818f3f1 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -47,9 +47,9 @@ humantime = "2" indexmap = { workspace = true } insta = { version = "1.43", features = ["yaml"] } itertools = { workspace = true } +jf-advz = { workspace = true } jf-merkle-tree-compat = { workspace = true } jf-utils = { workspace = true } # TODO temporary: used only for test_rng() -jf-advz = { workspace = true } lru = { workspace = true } num-traits = { workspace = true } parking_lot = "0.12"