diff --git a/Cargo.lock b/Cargo.lock index ebbec661b13..be13f46014b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5680,6 +5680,7 @@ dependencies = [ "futures", "hotshot-builder-api", "hotshot-contract-adapter", + "hotshot-orchestrator", "hotshot-task", "hotshot-types", "hotshot-utils", diff --git a/Cargo.toml b/Cargo.toml index 4e16d38925b..c5c84ef085e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,6 +178,10 @@ bytesize = "1.3" committable = { git = "https://github.com/EspressoSystems/commit.git", features = ["ark-serialize"] } derivative = "2.2" itertools = "0.12" +jf-advz = { git = "https://github.com/EspressoSystems/jellyfish-compat", tag = "jf-advz-v0.2.1", features = [ + "std", + "parallel", +] } jf-crhf = { git = "https://github.com/EspressoSystems/jellyfish", tag = "jf-crhf-v0.2.0" } jf-merkle-tree-compat = { git = "https://github.com/EspressoSystems/jellyfish-compat", tag = "jf-merkle-tree-compat-v0.1.0", features = [ "std", @@ -200,10 +204,6 @@ jf-signature = { git = "https://github.com/EspressoSystems/jellyfish", tag = "jf "std", ] } jf-utils = { git = "https://github.com/EspressoSystems/jellyfish", tag = "jf-utils-v0.5.0" } -jf-advz = { git = "https://github.com/EspressoSystems/jellyfish-compat", tag = "jf-advz-v0.2.1", features = [ - "std", - "parallel", -] } libp2p = { package = "libp2p", version = "0.56", default-features = false, features = [ "macros", "autonat", diff --git a/benchmark-stats/src/main.rs b/benchmark-stats/src/main.rs index eb3a3516ab2..c510c9e74a5 100644 --- a/benchmark-stats/src/main.rs +++ b/benchmark-stats/src/main.rs @@ -4,9 +4,10 @@ use std::{ }; use clap::{Parser, Subcommand}; -use espresso_types::SeqTypes; -use hotshot_task_impls::stats::{LeaderViewStats, ReplicaViewStats}; -use hotshot_types::data::ViewNumber; +use hotshot_types::{ + benchmarking::{LeaderViewStats, ReplicaViewStats}, + data::ViewNumber, +}; use plotly::{ common::{HoverInfo, Line, Marker, MarkerSymbol, Mode}, layout::{self, Axis, GridPattern, LayoutGrid}, @@ -81,14 +82,14 @@ struct ReplicaStats { /// Read replica stats from CSV into a BTreeMap fn read_replica_view_stats( path: &Path, -) -> Result>, Box> { +) -> Result>, Box> { println!("\n**--- Replica Stats ---**"); let mut reader = csv::Reader::from_path(path) .map_err(|e| format!("Failed to open replica stats CSV at {path:?}: {e}"))?; let mut replica_view_stats = BTreeMap::new(); for result in reader.deserialize() { - let record: ReplicaViewStats = result?; + let record: ReplicaViewStats = result?; replica_view_stats.insert(record.view, record); } @@ -97,7 +98,7 @@ fn read_replica_view_stats( /// Generate plots of replica stats fn plot_replica_stats( - replica_view_stats: &BTreeMap>, + replica_view_stats: &BTreeMap>, output_file: &Path, ) -> Result<(), Box> { let mut x_views = Vec::new(); @@ -294,7 +295,7 @@ fn plot_replica_stats( /// it generates the time difference for VID/DAC/Proposal /// from the view change event fn generate_replica_stats( - replica_view_stats: &BTreeMap>, + replica_view_stats: &BTreeMap>, ) -> ReplicaStats { let mut vid_deltas_from_vc = Vec::new(); let mut dac_deltas_from_vc = Vec::new(); @@ -336,21 +337,21 @@ fn print_replica_stats(stats: &ReplicaStats) { /// Read leader stats from CSV into a BTreeMap fn read_leader_view_stats( path: &Path, -) -> Result>, Box> { +) -> Result>, Box> { println!("\n**--- Leader Stats ---**"); let mut reader = csv::Reader::from_path(path) .map_err(|e| format!("Failed to open leader stats CSV at {path:?}: {e}"))?; - let mut leader_view_stats = BTreeMap::>::new(); + let mut leader_view_stats = BTreeMap::>::new(); for result in reader.deserialize() { - let record: LeaderViewStats = result?; + let record: LeaderViewStats = result?; leader_view_stats.insert(record.view, record); } Ok(leader_view_stats) } fn plot_and_print_leader_stats( - leader_view_stats: &BTreeMap>, + leader_view_stats: &BTreeMap>, output_file: &Path, ) -> Result<(), Box> { let mut views = Vec::new(); diff --git a/crates/hotshot-builder/legacy/src/service.rs b/crates/hotshot-builder/legacy/src/service.rs index 899f97eb857..841718b92a2 100644 --- a/crates/hotshot-builder/legacy/src/service.rs +++ b/crates/hotshot-builder/legacy/src/service.rs @@ -335,7 +335,7 @@ impl GlobalState { .insert(state_id, response_msg); if let Some(previous_builder_state_entry) = previous_builder_state_entry { - tracing::warn!( + tracing::info!( "block {id} overwrote previous block: {previous_builder_state_entry:?}. previous \ cache entry: {previous_cache_entry:?}" ); diff --git a/crates/hotshot/examples/infra/mod.rs b/crates/hotshot/examples/infra/mod.rs index a3746a4ae05..c48602f17b8 100755 --- a/crates/hotshot/examples/infra/mod.rs +++ b/crates/hotshot/examples/infra/mod.rs @@ -64,7 +64,6 @@ use hotshot_types::{ node_implementation::{ConsensusTime, NodeType, Versions}, states::TestableState, }, - utils::genesis_epoch_from_version, HotShotConfig, PeerConfig, ValidatorConfig, }; use rand::{rngs::StdRng, SeedableRng}; @@ -366,6 +365,7 @@ pub trait RunDa< async fn initialize_state_and_hotshot( &self, membership: Arc::Membership>>, + orchestrator_url: Option, ) -> SystemContextHandle { let initializer = hotshot::HotShotInitializer::::from_genesis::( TestInstanceState::default(), @@ -401,6 +401,7 @@ pub trait RunDa< ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + orchestrator_url, ) .await .expect("Could not init hotshot") @@ -415,7 +416,7 @@ pub trait RunDa< transactions: &mut Vec, transactions_to_send_per_round: u64, transaction_size_in_bytes: u64, - ) -> BenchResults { + ) -> BenchResults { let NetworkConfig { rounds, node_index, .. } = self.config(); @@ -525,16 +526,6 @@ pub trait RunDa< }, } } - // Panic if we don't have the genesis epoch, there is no recovery from that - let num_eligible_leaders = context - .hotshot - .membership_coordinator - .membership_for_epoch(genesis_epoch_from_version::()) - .await - .unwrap() - .stake_table() - .await - .len(); let consensus_lock = context.hotshot.consensus(); let consensus_reader = consensus_lock.read().await; let total_num_views = usize::try_from(consensus_reader.locked_view().u64()).unwrap(); @@ -566,23 +557,7 @@ pub trait RunDa< {avg_latency_in_sec} sec." ); - BenchResults { - partial_results: "Unset".to_string(), - avg_latency_in_sec, - num_latency, - minimum_latency_in_sec: minimum_latency, - maximum_latency_in_sec: maximum_latency, - throughput_bytes_per_sec, - total_transactions_committed, - transaction_size_in_bytes: transaction_size_in_bytes + 8, // extra 8 bytes for timestamp - total_time_elapsed_in_sec: total_time_elapsed.as_secs(), - total_num_views, - failed_num_views, - committee_type: format!( - "{} with {num_eligible_leaders} eligible leaders", - std::any::type_name::() - ), - } + BenchResults::default() } else { // all values with zero BenchResults::default() @@ -994,7 +969,9 @@ pub async fn main_entry_point< &membership, ) .await; - let hotshot = run.initialize_state_and_hotshot(membership).await; + let hotshot = run + .initialize_state_and_hotshot(membership, Some(args.url)) + .await; if let Some(task) = builder_task { task.start(Box::new(hotshot.event_stream())); @@ -1041,7 +1018,9 @@ pub async fn main_entry_point< (transaction_size + 8) as u64, // extra 8 bytes for transaction base, see `create_random_transaction`. ) .await; - orchestrator_client.post_bench_results(bench_results).await; + orchestrator_client + .post_bench_results::(bench_results) + .await; } /// Sets correct builder_url and registers a builder with orchestrator if this node is running one. diff --git a/crates/hotshot/hotshot/src/lib.rs b/crates/hotshot/hotshot/src/lib.rs index ecbf041891e..db047ecff00 100644 --- a/crates/hotshot/hotshot/src/lib.rs +++ b/crates/hotshot/hotshot/src/lib.rs @@ -78,6 +78,7 @@ use hotshot_types::{ pub use rand; use tokio::{spawn, time::sleep}; use tracing::{debug, instrument, trace}; +use url::Url; // -- Rexports // External @@ -112,6 +113,9 @@ pub struct SystemContext, V: Versi /// Memberships used by consensus pub membership_coordinator: EpochMembershipCoordinator, + /// The orchestrator url + pub orchestrator_url: Option, + /// the metrics that the implementor is using. metrics: Arc, @@ -167,6 +171,7 @@ impl, V: Versions> Clone config: self.config.clone(), network: Arc::clone(&self.network), membership_coordinator: self.membership_coordinator.clone(), + orchestrator_url: self.orchestrator_url.clone(), metrics: Arc::clone(&self.metrics), consensus: self.consensus.clone(), instance_state: Arc::clone(&self.instance_state), @@ -209,6 +214,7 @@ impl, V: Versions> SystemContext, ) -> Arc { let internal_chan = broadcast(EVENT_CHANNEL_SIZE); let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE); @@ -227,6 +233,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext>>, ), external_channel: (Sender>, Receiver>), + orchestrator_url: Option, ) -> Arc { debug!("Creating a new hotshot"); @@ -414,6 +422,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext, ) -> Result< ( SystemContextHandle, @@ -697,6 +707,7 @@ impl, V: Versions> SystemContext, V: Versions> CreateTaskState { async fn create_from(handle: &SystemContextHandle) -> Self { StatsTaskState::::new( + handle.hotshot.id, handle.cur_view().await, handle.cur_epoch().await, handle.public_key().clone(), OuterConsensus::new(handle.hotshot.consensus()), handle.hotshot.membership_coordinator.clone(), + handle.hotshot.orchestrator_url.clone(), ) } } diff --git a/crates/hotshot/orchestrator/src/client.rs b/crates/hotshot/orchestrator/src/client.rs index c64d68cf93d..df896233f05 100644 --- a/crates/hotshot/orchestrator/src/client.rs +++ b/crates/hotshot/orchestrator/src/client.rs @@ -4,17 +4,23 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{net::SocketAddr, time::Duration}; +use std::{ + collections::{BTreeMap, BTreeSet}, + net::SocketAddr, + time::Duration, +}; use clap::Parser; use futures::{Future, FutureExt}; use hotshot_types::{ + benchmarking::{LeaderViewStats, ReplicaViewStats}, network::{NetworkConfig, NetworkConfigSource}, - traits::node_implementation::NodeType, + traits::node_implementation::{ConsensusTime, NodeType}, PeerConfig, ValidatorConfig, }; use libp2p_identity::PeerId; use multiaddr::Multiaddr; +use serde::{Deserialize, Serialize}; use surf_disco::{error::ClientError, Client}; use tide_disco::Url; use tokio::time::sleep; @@ -30,102 +36,35 @@ pub struct OrchestratorClient { } /// Struct describing a benchmark result -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] -pub struct BenchResults { - /// Whether it's partial collected results - pub partial_results: String, - /// The average latency of the transactions - pub avg_latency_in_sec: i64, - /// The number of transactions that were latency measured - pub num_latency: i64, - /// The minimum latency of the transactions - pub minimum_latency_in_sec: i64, - /// The maximum latency of the transactions - pub maximum_latency_in_sec: i64, - /// The throughput of the consensus protocol = number of transactions committed per second * transaction size in bytes - pub throughput_bytes_per_sec: u64, - /// The number of transactions committed during benchmarking - pub total_transactions_committed: u64, - /// The size of each transaction in bytes - pub transaction_size_in_bytes: u64, - /// The total time elapsed for benchmarking - pub total_time_elapsed_in_sec: u64, - /// The total number of views during benchmarking - pub total_num_views: usize, - /// The number of failed views during benchmarking - pub failed_num_views: usize, - /// The membership committee type used - pub committee_type: String, +#[derive(Serialize, Deserialize, Clone)] +pub struct BenchResults { + pub node_index: u64, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub leader_view_stats: BTreeMap>, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub replica_view_stats: BTreeMap>, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub latencies_by_view: BTreeMap, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub sizes_by_view: BTreeMap, + #[serde(bound(deserialize = "V: ConsensusTime"))] + pub timeouts: BTreeSet, + pub total_time_millis: i128, } -impl BenchResults { - /// printout the results of one example run - pub fn printout(&self) { - println!("====================="); - println!("{0} Benchmark results:", self.partial_results); - println!("Committee type: {}", self.committee_type); - println!( - "Average latency: {} seconds, Minimum latency: {} seconds, Maximum latency: {} seconds", - self.avg_latency_in_sec, self.minimum_latency_in_sec, self.maximum_latency_in_sec - ); - println!("Throughput: {} bytes/sec", self.throughput_bytes_per_sec); - println!( - "Total transactions committed: {}", - self.total_transactions_committed - ); - println!( - "Total number of views: {}, Failed number of views: {}", - self.total_num_views, self.failed_num_views - ); - println!("====================="); +impl Default for BenchResults { + fn default() -> Self { + Self { + node_index: 0, + leader_view_stats: BTreeMap::new(), + replica_view_stats: BTreeMap::new(), + latencies_by_view: BTreeMap::new(), + sizes_by_view: BTreeMap::new(), + timeouts: BTreeSet::new(), + total_time_millis: 0, + } } } - -/// Struct describing a benchmark result needed for download, also include the config -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] -pub struct BenchResultsDownloadConfig { - // Config starting here - /// The commit this benchmark was run on - pub commit_sha: String, - /// Total number of nodes - pub total_nodes: usize, - /// The size of the da committee - pub da_committee_size: usize, - /// The number of fixed_leader_for_gpuvid when we enable the feature [fixed-leader-election] - pub fixed_leader_for_gpuvid: usize, - /// Number of transactions submitted per round - pub transactions_per_round: usize, - /// The size of each transaction in bytes - pub transaction_size: u64, - /// The number of rounds - pub rounds: usize, - - // Results starting here - /// Whether the results are partially collected - /// "One" when the results are collected for one node - /// "Half" when the results are collective for half running nodes if not all nodes terminate successfully - /// "Full" if the results are successfully collected from all nodes - pub partial_results: String, - /// The average latency of the transactions - pub avg_latency_in_sec: i64, - /// The minimum latency of the transactions - pub minimum_latency_in_sec: i64, - /// The maximum latency of the transactions - pub maximum_latency_in_sec: i64, - /// The throughput of the consensus protocol = number of transactions committed per second * transaction size in bytes - pub throughput_bytes_per_sec: u64, - /// The number of transactions committed during benchmarking - pub total_transactions_committed: u64, - /// The total time elapsed for benchmarking - pub total_time_elapsed_in_sec: u64, - /// The total number of views during benchmarking - pub total_num_views: usize, - /// The number of failed views during benchmarking - pub failed_num_views: usize, - /// The membership committee type used - pub committee_type: String, -} - // VALIDATOR #[derive(Parser, Debug, Clone)] @@ -493,7 +432,10 @@ impl OrchestratorClient { /// # Panics /// Panics if unable to post #[instrument(skip_all, name = "orchestrator metrics")] - pub async fn post_bench_results(&self, bench_results: BenchResults) { + pub async fn post_bench_results( + &self, + bench_results: BenchResults, + ) { let _send_metrics_f: Result<(), ClientError> = self .client .post("api/results") diff --git a/crates/hotshot/orchestrator/src/lib.rs b/crates/hotshot/orchestrator/src/lib.rs index 013af54d807..970e1eeac53 100644 --- a/crates/hotshot/orchestrator/src/lib.rs +++ b/crates/hotshot/orchestrator/src/lib.rs @@ -10,16 +10,15 @@ pub mod client; use std::{ - collections::{HashMap, HashSet}, - fs, - fs::OpenOptions, - io, + collections::{BTreeMap, HashMap, HashSet}, + fs::{self, OpenOptions}, + io, thread, time::Duration, }; use alloy::primitives::U256; use async_lock::RwLock; -use client::{BenchResults, BenchResultsDownloadConfig}; +use client::BenchResults; use csv::Writer; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use hotshot_types::{ @@ -91,7 +90,7 @@ struct OrchestratorState { /// The total nodes that have posted they are ready to start nodes_connected: HashSet>, /// The results of the benchmarks - bench_results: BenchResults, + bench_results: BTreeMap>, /// The number of nodes that have posted their results nodes_post_results: u64, /// Whether the orchestrator can be started manually @@ -135,7 +134,7 @@ impl OrchestratorState { pub_posted: HashMap::new(), nodes_connected: HashSet::new(), start: false, - bench_results: BenchResults::default(), + bench_results: BTreeMap::new(), nodes_post_results: 0, manual_start_allowed: true, accepting_new_keys: true, @@ -145,37 +144,69 @@ impl OrchestratorState { } /// Output the results to a csv file according to orchestrator state - pub fn output_to_csv(&self) { - let output_csv = BenchResultsDownloadConfig { - commit_sha: self.config.commit_sha.clone(), - total_nodes: self.config.config.num_nodes_with_stake.into(), - da_committee_size: self.config.config.da_staked_committee_size, - fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid, - transactions_per_round: self.config.transactions_per_round, - transaction_size: self.bench_results.transaction_size_in_bytes, - rounds: self.config.rounds, - partial_results: self.bench_results.partial_results.clone(), - avg_latency_in_sec: self.bench_results.avg_latency_in_sec, - minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec, - maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec, - throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec, - total_transactions_committed: self.bench_results.total_transactions_committed, - total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec, - total_num_views: self.bench_results.total_num_views, - failed_num_views: self.bench_results.failed_num_views, - committee_type: self.bench_results.committee_type.clone(), - }; + pub fn output_to_csv(result: BenchResults) { // Open the CSV file in append mode - let results_csv_file = OpenOptions::new() + let leader_results_csv_file = OpenOptions::new() .create(true) - .append(true) // Open in append mode - .open("scripts/benchmarks_results/results.csv") + .open(format!("Leader_results_{}.csv", result.node_index)) .unwrap(); // Open a file for writing - let mut wtr = Writer::from_writer(results_csv_file); - let _ = wtr.serialize(output_csv); + let mut wtr = Writer::from_writer(leader_results_csv_file); + + for (_, leader_stats) in result.leader_view_stats.iter() { + let _ = wtr + .serialize(leader_stats) + .map_err(|e| tracing::warn!("Failed to serialize leader stats: {}", e)); + } let _ = wtr.flush(); - println!("Results successfully saved in scripts/benchmarks_results/results.csv"); + println!( + "Results successfully saved in leader_results_{}.csv", + result.node_index + ); + + // Do the same for the replica results + let replica_results_csv_file = OpenOptions::new() + .create(true) + .open(format!("replica_results_{}.csv", result.node_index)) + .unwrap(); + // Open a file for writing + let mut wtr = Writer::from_writer(replica_results_csv_file); + + for (_, replica_stats) in result.replica_view_stats.iter() { + let _ = wtr + .serialize(replica_stats) + .map_err(|e| tracing::warn!("Failed to serialize replica stats: {}", e)); + } + let _ = wtr.flush(); + + // Log the Latencies of each block by view + let latency_results_csv_file = OpenOptions::new() + .create(true) + .open(format!("latency_results_{}.csv", result.node_index)) + .unwrap(); + let mut wtr = Writer::from_writer(latency_results_csv_file); + let _ = wtr.write_record(["view", "latency"]); + for (view, latency) in result.latencies_by_view { + let _ = wtr.write_record([view.to_string(), latency.to_string()]); + } + let _ = wtr.flush(); + + // Log the Sizes of each block by view + let sizes_results_csv_file = OpenOptions::new() + .create(true) + .open(format!("sizes_results_{}.csv", result.node_index)) + .unwrap(); + let mut wtr = Writer::from_writer(sizes_results_csv_file); + let _ = wtr.write_record(["view", "size"]); + for (view, size) in result.sizes_by_view { + let _ = wtr.write_record([view.to_string(), size.to_string()]); + } + let _ = wtr.flush(); + + println!( + "Results successfully saved in replica_results_{}.csv", + result.node_index + ); } } @@ -223,7 +254,7 @@ pub trait OrchestratorApi { /// post endpoint for the results of the run /// # Errors /// if unable to serve - fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>; + fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>; /// A node POSTs its public key to let the orchestrator know that it is ready /// # Errors /// if unable to serve @@ -590,67 +621,13 @@ where } // Aggregates results of the run from all nodes - fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> { - if metrics.total_transactions_committed != 0 { - // Deal with the bench results - if self.bench_results.total_transactions_committed == 0 { - self.bench_results = metrics; - } else { - // Deal with the bench results from different nodes - let cur_metrics = self.bench_results.clone(); - self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec - * metrics.num_latency - + cur_metrics.avg_latency_in_sec * cur_metrics.num_latency) - / (metrics.num_latency + cur_metrics.num_latency); - self.bench_results.num_latency += metrics.num_latency; - self.bench_results.minimum_latency_in_sec = metrics - .minimum_latency_in_sec - .min(cur_metrics.minimum_latency_in_sec); - self.bench_results.maximum_latency_in_sec = metrics - .maximum_latency_in_sec - .max(cur_metrics.maximum_latency_in_sec); - self.bench_results.throughput_bytes_per_sec = metrics - .throughput_bytes_per_sec - .max(cur_metrics.throughput_bytes_per_sec); - self.bench_results.total_transactions_committed = metrics - .total_transactions_committed - .max(cur_metrics.total_transactions_committed); - self.bench_results.total_time_elapsed_in_sec = metrics - .total_time_elapsed_in_sec - .max(cur_metrics.total_time_elapsed_in_sec); - self.bench_results.total_num_views = - metrics.total_num_views.min(cur_metrics.total_num_views); - self.bench_results.failed_num_views = - metrics.failed_num_views.max(cur_metrics.failed_num_views); - } - } + fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> { self.nodes_post_results += 1; - if self.bench_results.partial_results == "Unset" { - self.bench_results.partial_results = "One".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } - if self.bench_results.partial_results == "One" - && self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2) - { - self.bench_results.partial_results = "HalfDA".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } - if self.bench_results.partial_results == "HalfDA" - && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2) - { - self.bench_results.partial_results = "Half".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } - if self.bench_results.partial_results != "Full" - && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64) - { - self.bench_results.partial_results = "Full".to_string(); - self.bench_results.printout(); - self.output_to_csv(); - } + self.bench_results + .insert(metrics.node_index, metrics.clone()); + thread::spawn(move || { + OrchestratorState::::output_to_csv(metrics); + }); Ok(()) } @@ -779,7 +756,7 @@ where })? .post("post_results", |req, state| { async move { - let metrics: Result = req.body_json(); + let metrics: Result, RequestError> = req.body_json(); state.post_run_results(metrics.unwrap()) } .boxed() diff --git a/crates/hotshot/task-impls/Cargo.toml b/crates/hotshot/task-impls/Cargo.toml index 79db91b226e..dbbc6efd62a 100644 --- a/crates/hotshot/task-impls/Cargo.toml +++ b/crates/hotshot/task-impls/Cargo.toml @@ -25,6 +25,7 @@ either = { workspace = true } futures = { workspace = true } hotshot-builder-api = { workspace = true } hotshot-contract-adapter = { workspace = true } +hotshot-orchestrator = { workspace = true } hotshot-task = { workspace = true } hotshot-types = { workspace = true } hotshot-utils = { workspace = true } diff --git a/crates/hotshot/task-impls/src/da.rs b/crates/hotshot/task-impls/src/da.rs index e15fd93e14e..d53ecc7efef 100644 --- a/crates/hotshot/task-impls/src/da.rs +++ b/crates/hotshot/task-impls/src/da.rs @@ -10,7 +10,7 @@ use async_broadcast::{Receiver, Sender}; use async_trait::async_trait; use hotshot_task::task::TaskState; use hotshot_types::{ - consensus::{Consensus, OuterConsensus, PayloadWithMetadata}, + consensus::{OuterConsensus, PayloadWithMetadata}, data::{vid_commitment, vid_disperse::vid_total_weight, DaProposal2, PackedBundle}, epoch_membership::EpochMembershipCoordinator, event::{Event, EventType}, @@ -19,7 +19,6 @@ use hotshot_types::{ simple_vote::{DaData2, DaVote2}, storage_metrics::StorageMetricsValue, traits::{ - network::ConnectedNetwork, node_implementation::{NodeImplementation, NodeType, Versions}, signature_key::SignatureKey, storage::Storage, @@ -30,7 +29,7 @@ use hotshot_types::{ }; use hotshot_utils::anytrace::*; use sha2::{Digest, Sha256}; -use tokio::{spawn, task::spawn_blocking}; +use tokio::task::spawn_blocking; use tracing::instrument; use crate::{ @@ -284,76 +283,6 @@ impl, V: Versions> DaTaskState( - OuterConsensus::new(Arc::clone(&consensus.inner_consensus)), - view_number, - target_epoch, - membership.coordinator.clone(), - &pk, - &upgrade_lock, - ) - .await; - if let Some(vid_share) = consensus - .read() - .await - .vid_shares() - .get(&view_number) - .and_then(|key_map| key_map.get(&public_key)) - .and_then(|epoch_map| epoch_map.get(&target_epoch)) - { - tracing::debug!( - "Primary network is down. Calculated own VID share for epoch \ - {target_epoch:?}, my id {my_id}" - ); - broadcast_event( - Arc::new(HotShotEvent::VidShareRecv( - public_key.clone(), - vid_share.clone(), - )), - &chan, - ) - .await; - } - } - }); - } }, HotShotEvent::DaVoteRecv(ref vote) => { tracing::debug!("DA vote recv, Main Task {}", vote.view_number()); diff --git a/crates/hotshot/task-impls/src/stats.rs b/crates/hotshot/task-impls/src/stats.rs index 10c18150462..bdd7d7e4f1d 100644 --- a/crates/hotshot/task-impls/src/stats.rs +++ b/crates/hotshot/task-impls/src/stats.rs @@ -6,8 +6,10 @@ use std::{ use async_broadcast::{Receiver, Sender}; use async_trait::async_trait; use either::Either; +use hotshot_orchestrator::client::{BenchResults, OrchestratorClient}; use hotshot_task::task::TaskState; use hotshot_types::{ + benchmarking::{LeaderViewStats, ReplicaViewStats}, consensus::OuterConsensus, epoch_membership::EpochMembershipCoordinator, traits::{ @@ -21,106 +23,39 @@ use hotshot_utils::{ anytrace::{Error, Level, Result}, line_info, warn, }; -use serde::{Deserialize, Serialize}; use time::OffsetDateTime; +use url::Url; use crate::events::HotShotEvent; -#[derive(Serialize, Deserialize)] -pub struct LeaderViewStats { - pub view: TYPES::View, - pub prev_proposal_send: Option, - pub proposal_send: Option, - pub vote_recv: Option, - pub da_proposal_send: Option, - pub builder_start: Option, - pub block_built: Option, - pub vid_disperse_send: Option, - pub timeout_certificate_formed: Option, - pub qc_formed: Option, - pub da_cert_send: Option, -} - -#[derive(Serialize, Deserialize)] -pub struct ReplicaViewStats { - pub view: TYPES::View, - pub view_change: Option, - pub proposal_timestamp: Option, - pub proposal_recv: Option, - pub vote_send: Option, - pub timeout_vote_send: Option, - pub da_proposal_received: Option, - pub da_proposal_validated: Option, - pub da_certificate_recv: Option, - pub proposal_prelim_validated: Option, - pub proposal_validated: Option, - pub timeout_triggered: Option, - pub vid_share_validated: Option, - pub vid_share_recv: Option, -} - -impl LeaderViewStats { - fn new(view: TYPES::View) -> Self { - Self { - view, - prev_proposal_send: None, - proposal_send: None, - vote_recv: None, - da_proposal_send: None, - builder_start: None, - block_built: None, - vid_disperse_send: None, - timeout_certificate_formed: None, - qc_formed: None, - da_cert_send: None, - } - } -} - -impl ReplicaViewStats { - fn new(view: TYPES::View) -> Self { - Self { - view, - view_change: None, - proposal_timestamp: None, - proposal_recv: None, - vote_send: None, - timeout_vote_send: None, - da_proposal_received: None, - da_proposal_validated: None, - da_certificate_recv: None, - proposal_prelim_validated: None, - proposal_validated: None, - timeout_triggered: None, - vid_share_validated: None, - vid_share_recv: None, - } - } -} - pub struct StatsTaskState { + node_index: u64, view: TYPES::View, epoch: Option, public_key: TYPES::SignatureKey, consensus: OuterConsensus, membership_coordinator: EpochMembershipCoordinator, - leader_stats: BTreeMap>, - replica_stats: BTreeMap>, + leader_stats: BTreeMap>, + replica_stats: BTreeMap>, latencies_by_view: BTreeMap, sizes_by_view: BTreeMap, epoch_start_times: BTreeMap, timeouts: BTreeSet, + orchestrator_client: Option, } impl StatsTaskState { pub fn new( + node_index: u64, view: TYPES::View, epoch: Option, public_key: TYPES::SignatureKey, consensus: OuterConsensus, membership_coordinator: EpochMembershipCoordinator, + orchestrator_url: Option, ) -> Self { Self { + node_index, view, epoch, public_key, @@ -132,14 +67,15 @@ impl StatsTaskState { sizes_by_view: BTreeMap::new(), epoch_start_times: BTreeMap::new(), timeouts: BTreeSet::new(), + orchestrator_client: orchestrator_url.map(OrchestratorClient::new), } } - fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats { + fn leader_entry(&mut self, view: TYPES::View) -> &mut LeaderViewStats { self.leader_stats .entry(view) .or_insert_with(|| LeaderViewStats::new(view)) } - fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats { + fn replica_entry(&mut self, view: TYPES::View) -> &mut ReplicaViewStats { self.replica_stats .entry(view) .or_insert_with(|| ReplicaViewStats::new(view)) @@ -184,16 +120,21 @@ impl StatsTaskState { Ok(()) } - fn log_basic_stats(&self, now: i128, epoch: &TYPES::Epoch) { + fn log_basic_stats(&self, now: i128, epoch: &TYPES::Epoch) -> i128 { let num_views = self.latencies_by_view.len(); let total_size = self.sizes_by_view.values().sum::(); // Either we have no views logged yet, no TXNs or we are not in the DA committee and don't know block sizes if num_views == 0 || total_size == 0 { - return; + return 0; } let total_latency = self.latencies_by_view.values().sum::(); + let elapsed_time = if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) { + now - epoch_start_time + } else { + 0 + }; let average_latency = total_latency / num_views as i128; tracing::warn!("Average latency: {}ms", average_latency); tracing::warn!( @@ -201,12 +142,18 @@ impl StatsTaskState { epoch, self.timeouts.len() ); - if let Some(epoch_start_time) = self.epoch_start_times.get(epoch) { - let elapsed_time = now - epoch_start_time; + let total_size = self.sizes_by_view.values().sum::(); + if total_size == 0 { + // Either no TXNs or we are not in the DA committee and don't know block sizes + return elapsed_time; + } + if elapsed_time > 0 { // multiply by 1000 to convert to seconds let throughput = (total_size / elapsed_time) * 1000; tracing::warn!("Throughput: {} bytes/s", throughput); + return elapsed_time; } + elapsed_time } } @@ -224,33 +171,43 @@ impl TaskState for StatsTaskState { match event.as_ref() { HotShotEvent::BlockRecv(block_recv) => { - self.leader_entry(block_recv.view_number).block_built = Some(now); + self.leader_entry(block_recv.view_number) + .block_built + .get_or_insert(now); }, HotShotEvent::QuorumProposalRecv(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .proposal_recv = Some(now); + .proposal_recv + .get_or_insert(now); }, HotShotEvent::QuorumVoteRecv(_vote) => {}, HotShotEvent::TimeoutVoteRecv(_vote) => {}, HotShotEvent::TimeoutVoteSend(vote) => { - self.replica_entry(vote.view_number()).timeout_vote_send = Some(now); + self.replica_entry(vote.view_number()) + .timeout_vote_send + .get_or_insert(now); }, HotShotEvent::DaProposalRecv(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .da_proposal_received = Some(now); + .da_proposal_received + .get_or_insert(now); }, HotShotEvent::DaProposalValidated(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .da_proposal_validated = Some(now); + .da_proposal_validated + .get_or_insert(now); }, HotShotEvent::DaVoteRecv(_simple_vote) => {}, HotShotEvent::DaCertificateRecv(simple_certificate) => { self.replica_entry(simple_certificate.view_number()) - .da_certificate_recv = Some(now); + .da_certificate_recv + .get_or_insert(now); }, HotShotEvent::DaCertificateValidated(_simple_certificate) => {}, HotShotEvent::QuorumProposalSend(proposal, _) => { - self.leader_entry(proposal.data.view_number()).proposal_send = Some(now); + self.leader_entry(proposal.data.view_number()) + .proposal_send + .get_or_insert(now); // If the last view succeeded, add the metric for time between proposals if proposal.data.view_change_evidence().is_none() { @@ -259,7 +216,8 @@ impl TaskState for StatsTaskState { .proposal_recv { self.leader_entry(proposal.data.view_number()) - .prev_proposal_send = Some(previous_proposal_time); + .prev_proposal_send + .get_or_insert(previous_proposal_time); // calculate the elapsed time as milliseconds (from nanoseconds) let elapsed_time = (now - previous_proposal_time) / 1_000_000; @@ -277,58 +235,67 @@ impl TaskState for StatsTaskState { } }, HotShotEvent::QuorumVoteSend(simple_vote) => { - self.replica_entry(simple_vote.view_number()).vote_send = Some(now); + self.replica_entry(simple_vote.view_number()) + .vote_send + .get_or_insert(now); }, HotShotEvent::ExtendedQuorumVoteSend(simple_vote) => { - self.replica_entry(simple_vote.view_number()).vote_send = Some(now); + self.replica_entry(simple_vote.view_number()) + .vote_send + .get_or_insert(now); }, HotShotEvent::QuorumProposalValidated(proposal, _) => { self.replica_entry(proposal.data.view_number()) - .proposal_validated = Some(now); + .proposal_validated + .get_or_insert(now); self.replica_entry(proposal.data.view_number()) .proposal_timestamp = Some(proposal.data.block_header().timestamp_millis() as i128); }, HotShotEvent::DaProposalSend(proposal, _) => { self.leader_entry(proposal.data.view_number()) - .da_proposal_send = Some(now); + .da_proposal_send + .get_or_insert(now); }, HotShotEvent::DaVoteSend(simple_vote) => { - self.replica_entry(simple_vote.view_number()).vote_send = Some(now); + self.replica_entry(simple_vote.view_number()) + .vote_send + .get_or_insert(now); }, HotShotEvent::QcFormed(either) => { match either { - Either::Left(qc) => { - self.leader_entry(qc.view_number() + 1).qc_formed = Some(now) - }, - Either::Right(tc) => { - self.leader_entry(tc.view_number()) - .timeout_certificate_formed = Some(now) - }, + Either::Left(qc) => self + .leader_entry(qc.view_number() + 1) + .qc_formed + .get_or_insert(now), + Either::Right(tc) => self + .leader_entry(tc.view_number()) + .timeout_certificate_formed + .get_or_insert(now), }; }, HotShotEvent::Qc2Formed(either) => { match either { - Either::Left(qc) => { - self.leader_entry(qc.view_number() + 1).qc_formed = Some(now) - }, - Either::Right(tc) => { - self.leader_entry(tc.view_number()) - .timeout_certificate_formed = Some(now) - }, + Either::Left(qc) => self + .leader_entry(qc.view_number() + 1) + .qc_formed + .get_or_insert(now), + Either::Right(tc) => self + .leader_entry(tc.view_number()) + .timeout_certificate_formed + .get_or_insert(now), }; }, HotShotEvent::DacSend(simple_certificate, _) => { self.leader_entry(simple_certificate.view_number()) - .da_cert_send = Some(now); + .da_cert_send + .get_or_insert(now); }, HotShotEvent::ViewChange(view, epoch) => { // Record the timestamp of the first observed view change // This can happen when transitioning to the next view, either due to voting // or receiving a proposal, but we only store the first one - if self.replica_entry(*view + 1).view_change.is_none() { - self.replica_entry(*view + 1).view_change = Some(now); - } + self.replica_entry(*view + 1).view_change.get_or_insert(now); if *epoch <= self.epoch && *view <= self.view { return Ok(()); @@ -347,10 +314,25 @@ impl TaskState for StatsTaskState { } if new_epoch { - if let Some(prev_epoch) = prev_epoch { - self.log_basic_stats(now, &prev_epoch); - } + let elapsed_time = if let Some(prev_epoch) = prev_epoch { + self.log_basic_stats(now, &prev_epoch) + } else { + 0 + }; let _ = self.dump_stats(); + if let Some(orchestrator_client) = self.orchestrator_client.as_ref() { + orchestrator_client + .post_bench_results::(BenchResults:: { + node_index: self.node_index, + leader_view_stats: self.leader_stats.clone(), + replica_view_stats: self.replica_stats.clone(), + latencies_by_view: self.latencies_by_view.clone(), + sizes_by_view: self.sizes_by_view.clone(), + timeouts: self.timeouts.clone(), + total_time_millis: elapsed_time, + }) + .await; + } self.garbage_collect(*view - 1); } @@ -377,15 +359,18 @@ impl TaskState for StatsTaskState { }, HotShotEvent::VidShareRecv(_, proposal) => { self.replica_entry(proposal.data.view_number()) - .vid_share_recv = Some(now); + .vid_share_recv + .get_or_insert(now); }, HotShotEvent::VidShareValidated(proposal) => { self.replica_entry(proposal.data.view_number()) - .vid_share_validated = Some(now); + .vid_share_validated + .get_or_insert(now); }, HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => { self.replica_entry(proposal.data.view_number()) - .proposal_prelim_validated = Some(now); + .proposal_prelim_validated + .get_or_insert(now); }, HotShotEvent::LeavesDecided(leaves) => { for leaf in leaves { diff --git a/crates/hotshot/task-impls/src/transactions.rs b/crates/hotshot/task-impls/src/transactions.rs index bd05abb1ebe..525903ddc4d 100644 --- a/crates/hotshot/task-impls/src/transactions.rs +++ b/crates/hotshot/task-impls/src/transactions.rs @@ -21,7 +21,7 @@ use hotshot_types::{ event::{Event, EventType}, message::UpgradeLock, traits::{ - block_contents::{BuilderFee, EncodeBytes}, + block_contents::{BlockHeader, BuilderFee, EncodeBytes}, node_implementation::{ConsensusTime, NodeType, Versions}, signature_key::{BuilderSignatureKey, SignatureKey}, BlockPayload, @@ -116,16 +116,9 @@ impl TransactionTaskState { event_stream: &Sender>>, block_view: TYPES::View, block_epoch: Option, + vid: Option, ) -> Option { - let _version = match self.upgrade_lock.version(block_view).await { - Ok(v) => v, - Err(e) => { - tracing::error!("Failed to calculate version: {e:?}"); - return None; - }, - }; - - self.handle_view_change_legacy(event_stream, block_view, block_epoch) + self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid) .await } @@ -136,6 +129,7 @@ impl TransactionTaskState { event_stream: &Sender>>, block_view: TYPES::View, block_epoch: Option, + vid: Option, ) -> Option { let version = match self.upgrade_lock.version(block_view).await { Ok(v) => v, @@ -219,7 +213,7 @@ impl TransactionTaskState { { None } else { - self.wait_for_block(block_view).await + self.wait_for_block(block_view, vid).await } }; @@ -366,7 +360,50 @@ impl TransactionTaskState { .leader(view) .await?; if leader == self.public_key { - self.handle_view_change(&event_stream, view, *epoch).await; + self.handle_view_change(&event_stream, view, *epoch, None) + .await; + return Ok(()); + } + }, + HotShotEvent::QuorumProposalValidated(proposal, _leaf) => { + let view_number = proposal.data.view_number(); + let next_view = view_number + 1; + + let version = match self.upgrade_lock.version(next_view).await { + Ok(v) => v, + Err(e) => { + tracing::error!("Failed to calculate version: {e:?}"); + return Ok(()); + }, + }; + + if version < V::DrbAndHeaderUpgrade::VERSION { + return Ok(()); + } + + let vid = proposal.data.block_header().payload_commitment(); + let block_height = proposal.data.block_header().block_number(); + if is_epoch_transition(block_height, self.epoch_height) { + return Ok(()); + } + if is_last_block(block_height, self.epoch_height) { + return Ok(()); + } + if next_view <= self.cur_view { + return Ok(()); + } + // move to next view for this task only + self.cur_view = next_view; + + let leader = self + .membership_coordinator + .membership_for_epoch(self.cur_epoch) + .await? + .leader(next_view) + .await?; + if leader == self.public_key { + self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid)) + .await; return Ok(()); } }, @@ -444,19 +481,27 @@ impl TransactionTaskState { } #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")] - async fn wait_for_block(&self, block_view: TYPES::View) -> Option> { + async fn wait_for_block( + &self, + block_view: TYPES::View, + vid: Option, + ) -> Option> { let task_start_time = Instant::now(); // Find commitment to the block we want to build upon - let (parent_view, parent_comm) = match self - .last_vid_commitment_retry(block_view, task_start_time) - .await - { - Ok((v, c)) => (v, c), - Err(e) => { - tracing::warn!("Failed to find last vid commitment in time: {e}"); - return None; - }, + let (parent_view, parent_comm) = if let Some(vid) = vid { + (block_view - 1, vid) + } else { + match self + .last_vid_commitment_retry(block_view, task_start_time) + .await + { + Ok((v, c)) => (v, c), + Err(e) => { + tracing::warn!("Failed to find last vid commitment in time: {e}"); + return None; + }, + } }; let parent_comm_sig = match <::SignatureKey as SignatureKey>::sign( diff --git a/crates/hotshot/testing/src/helpers.rs b/crates/hotshot/testing/src/helpers.rs index 9b32a6211f3..7d165d2e161 100644 --- a/crates/hotshot/testing/src/helpers.rs +++ b/crates/hotshot/testing/src/helpers.rs @@ -147,6 +147,7 @@ pub async fn build_system_handle_from_launcher< ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + None, ) .await .expect("Could not init hotshot"); diff --git a/crates/hotshot/testing/src/test_builder.rs b/crates/hotshot/testing/src/test_builder.rs index 669882d30d4..5dda87f90f4 100644 --- a/crates/hotshot/testing/src/test_builder.rs +++ b/crates/hotshot/testing/src/test_builder.rs @@ -337,6 +337,7 @@ pub async fn create_test_handle< ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + None, ) .await; diff --git a/crates/hotshot/testing/src/test_runner.rs b/crates/hotshot/testing/src/test_runner.rs index 51c8cda3d49..d656a30a840 100644 --- a/crates/hotshot/testing/src/test_runner.rs +++ b/crates/hotshot/testing/src/test_runner.rs @@ -553,6 +553,7 @@ where ConsensusMetricsValue::default(), storage, StorageMetricsValue::default(), + None, ) .await } @@ -595,6 +596,7 @@ where StorageMetricsValue::default(), internal_channel, external_channel, + None, ) .await } diff --git a/crates/hotshot/types/Cargo.toml b/crates/hotshot/types/Cargo.toml index 9b912384eae..8ee76603364 100644 --- a/crates/hotshot/types/Cargo.toml +++ b/crates/hotshot/types/Cargo.toml @@ -38,12 +38,12 @@ dyn-clone = "1.0.17" either = { workspace = true } futures = { workspace = true, features = ["alloc"] } hotshot-utils = { workspace = true } +jf-advz = { workspace = true } jf-crhf = { workspace = true } jf-pcs = { workspace = true } jf-rescue = { workspace = true } jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-utils = { workspace = true } -jf-advz = { workspace = true } lazy_static = { workspace = true } libp2p-identity = { workspace = true } memoize = { workspace = true } diff --git a/crates/hotshot/types/src/benchmarking.rs b/crates/hotshot/types/src/benchmarking.rs new file mode 100644 index 00000000000..852edd59f1d --- /dev/null +++ b/crates/hotshot/types/src/benchmarking.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct LeaderViewStats { + pub view: V, + pub prev_proposal_send: Option, + pub proposal_send: Option, + pub vote_recv: Option, + pub da_proposal_send: Option, + pub builder_start: Option, + pub block_built: Option, + pub vid_disperse_send: Option, + pub timeout_certificate_formed: Option, + pub qc_formed: Option, + pub da_cert_send: Option, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ReplicaViewStats { + pub view: V, + pub view_change: Option, + pub proposal_timestamp: Option, + pub proposal_recv: Option, + pub vote_send: Option, + pub timeout_vote_send: Option, + pub da_proposal_received: Option, + pub da_proposal_validated: Option, + pub da_certificate_recv: Option, + pub proposal_prelim_validated: Option, + pub proposal_validated: Option, + pub timeout_triggered: Option, + pub vid_share_validated: Option, + pub vid_share_recv: Option, +} + +impl LeaderViewStats { + pub fn new(view: V) -> Self { + Self { + view, + prev_proposal_send: None, + proposal_send: None, + vote_recv: None, + da_proposal_send: None, + builder_start: None, + block_built: None, + vid_disperse_send: None, + timeout_certificate_formed: None, + qc_formed: None, + da_cert_send: None, + } + } +} + +impl ReplicaViewStats { + pub fn new(view: V) -> Self { + Self { + view, + view_change: None, + proposal_timestamp: None, + proposal_recv: None, + vote_send: None, + timeout_vote_send: None, + da_proposal_received: None, + da_proposal_validated: None, + da_certificate_recv: None, + proposal_prelim_validated: None, + proposal_validated: None, + timeout_triggered: None, + vid_share_validated: None, + vid_share_recv: None, + } + } +} diff --git a/crates/hotshot/types/src/lib.rs b/crates/hotshot/types/src/lib.rs index 30762b036d1..63644cc80f7 100644 --- a/crates/hotshot/types/src/lib.rs +++ b/crates/hotshot/types/src/lib.rs @@ -52,6 +52,8 @@ pub mod utils; pub mod vid; pub mod vote; +pub mod benchmarking; + /// Pinned future that is Send and Sync pub type BoxSyncFuture<'a, T> = Pin + Send + Sync + 'a>>; diff --git a/hotshot-query-service/Cargo.toml b/hotshot-query-service/Cargo.toml index b8018a692ae..a2dfda0a4e1 100644 --- a/hotshot-query-service/Cargo.toml +++ b/hotshot-query-service/Cargo.toml @@ -74,10 +74,10 @@ hotshot-types = { workspace = true } # Dependencies enabled by feature "sql-data-source". include_dir = { version = "0.7", optional = true } itertools = "0.12.1" -jf-merkle-tree-compat = { workspace = true , features = [ +jf-advz = { workspace = true } +jf-merkle-tree-compat = { workspace = true, features = [ "std", ] } -jf-advz = { workspace = true } lazy_static = "1" log = { version = "0.4", optional = true } portpicker = { version = "0.1", optional = true } diff --git a/hotshot-query-service/examples/simple-server.rs b/hotshot-query-service/examples/simple-server.rs index 8d45406e0a6..613a64e2a5f 100644 --- a/hotshot-query-service/examples/simple-server.rs +++ b/hotshot-query-service/examples/simple-server.rs @@ -269,6 +269,7 @@ async fn init_consensus( ConsensusMetricsValue::new(&*data_source.populate_metrics()), storage, StorageMetricsValue::new(&*data_source.populate_metrics()), + None, ) .await .unwrap() diff --git a/hotshot-query-service/src/testing/consensus.rs b/hotshot-query-service/src/testing/consensus.rs index d1e71879072..a795c9a73d7 100644 --- a/hotshot-query-service/src/testing/consensus.rs +++ b/hotshot-query-service/src/testing/consensus.rs @@ -225,6 +225,7 @@ impl MockNetwork { ConsensusMetricsValue::new(&*data_source.populate_metrics()), hs_storage, StorageMetricsValue::new(&*data_source.populate_metrics()), + None, ) .await .unwrap() diff --git a/sdks/crypto-helper/Cargo.toml b/sdks/crypto-helper/Cargo.toml index 0b8cfca4779..9d1f25acef7 100644 --- a/sdks/crypto-helper/Cargo.toml +++ b/sdks/crypto-helper/Cargo.toml @@ -16,11 +16,11 @@ ark-ff = { workspace = true } ark-serialize = { workspace = true } committable = { workspace = true } espresso-types = { path = "../../types" } -primitive-types = { version = "0.13" } hotshot-query-service = { workspace = true } hotshot-types = { workspace = true } jf-crhf = { workspace = true } jf-merkle-tree-compat = { workspace = true, features = ["std"] } +primitive-types = { version = "0.13" } serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 129695e5a3f..22ea3f67cfb 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -81,8 +81,8 @@ jf-crhf = { workspace = true } jf-merkle-tree-compat = { workspace = true } jf-rescue = { workspace = true } -jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-advz = { workspace = true } +jf-signature = { workspace = true, features = ["bls", "schnorr"] } libp2p = { workspace = true } num_enum = "0.7" parking_lot = "0.12" diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index f22437e2736..97b5aceda8a 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -108,6 +108,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer: impl PersistenceEventConsumer + 'static, _: V, proposal_fetcher_cfg: ProposalFetcherConfig, + orchestrator_url: Option, ) -> anyhow::Result { let config = &network_config.config; let pub_key = validator_config.public_key; @@ -152,6 +153,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence ConsensusMetricsValue::new(metrics), Arc::clone(&persistence), StorageMetricsValue::new(metrics), + orchestrator_url, ) .await? .0; diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 48f11eb3396..10c7e04ea5f 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -292,7 +292,7 @@ where info!("Libp2p advertise address: {}", libp2p_advertise_address); // Orchestrator client - let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url); + let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url.clone()); let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key); let validator_config = ValidatorConfig { public_key: pub_key, @@ -617,6 +617,7 @@ where event_consumer, seq_versions, proposal_fetcher_config, + Some(network_params.orchestrator_url), ) .await?; if wait_for_orchestrator { @@ -1327,6 +1328,7 @@ pub mod testing { event_consumer, bind_version, Default::default(), + None, ) .await .unwrap() diff --git a/types/Cargo.toml b/types/Cargo.toml index 68f2b4add89..e65b818f3f1 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -47,9 +47,9 @@ humantime = "2" indexmap = { workspace = true } insta = { version = "1.43", features = ["yaml"] } itertools = { workspace = true } +jf-advz = { workspace = true } jf-merkle-tree-compat = { workspace = true } jf-utils = { workspace = true } # TODO temporary: used only for test_rng() -jf-advz = { workspace = true } lru = { workspace = true } num-traits = { workspace = true } parking_lot = "0.12"