diff --git a/Cargo.lock b/Cargo.lock index 4e2d48b7c38..50ab14e44b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2077,7 +2077,6 @@ dependencies = [ "async-stream", "async-trait", "bytes", - "clap", "deadqueue", "defaultmap", "derive_more 0.99.20", @@ -2085,9 +2084,6 @@ dependencies = [ "lazy_static", "libp2p", "libp2p-swarm-test", - "metrics", - "metrics-exporter-prometheus", - "mockall", "pretty_assertions", "replace_with", "rstest", @@ -2100,10 +2096,8 @@ dependencies = [ "tokio-retry", "tokio-stream", "tracing", - "tracing-subscriber", "unsigned-varint 0.8.0", "validator", - "void", "waker-fn", ] @@ -13807,12 +13801,6 @@ version = "0.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" -[[package]] -name = "void" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" - [[package]] name = "wait-timeout" version = "0.2.1" diff --git a/crates/apollo_network/Cargo.toml b/crates/apollo_network/Cargo.toml index f1804bfccf3..2240f34d83b 100644 --- a/crates/apollo_network/Cargo.toml +++ b/crates/apollo_network/Cargo.toml @@ -15,7 +15,6 @@ apollo_network_types.workspace = true async-stream.workspace = true async-trait.workspace = true bytes.workspace = true -clap = { workspace = true, features = ["derive"] } derive_more.workspace = true futures.workspace = true lazy_static.workspace = true @@ -33,8 +32,6 @@ libp2p = { workspace = true, features = [ "tokio", "yamux", ] } -metrics.workspace = true -metrics-exporter-prometheus.workspace = true replace_with.workspace = true serde = { workspace = true, features = ["derive"] } starknet_api.workspace = true @@ -44,7 +41,6 @@ thiserror.workspace = true tokio = { workspace = true, features = ["full", "sync"] } tokio-retry.workspace = true tracing.workspace = true -tracing-subscriber.workspace = true unsigned-varint = { workspace = true, features = ["std"] } validator = { workspace = true, features = ["derive"] } @@ -55,12 +51,10 @@ assert_matches.workspace = true deadqueue = { workspace = true, features = ["unlimited"] } defaultmap.workspace = true libp2p-swarm-test.workspace = true -mockall.workspace = true pretty_assertions.workspace = true rstest.workspace = true tokio = { workspace = true, features = ["full", "sync", "test-util"] } tokio-stream.workspace = true -void.workspace = true waker-fn.workspace = true [lints] diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md deleted file mode 100644 index 2889f2a83f4..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md +++ /dev/null @@ -1,41 +0,0 @@ -# Network Stress Test - -## Setup and Run Stress Test - -1. **Create Remote Engines** - - Create 5 gcloud VM instances. Make sure to have the necessary RAM and disk space. Each instance should be named in the following pattern: - - ``` - -0, ... ,-4 - ``` - -2. **Set Bootstrap Node** - - Find the internal IP of your bootstrap node in the VM instances chart on google cloud console. Paste it into the test_config.json file into the bootstrap_peer_multaddr value instead of its placeholder. - -3. **Install Rust and clone repository** - - For all 5 instances run: - - ``` - gcloud compute ssh -0 --project -- 'cd && sudo apt install -y git unzip clang && curl https://sh.rustup.rs -sSf | sh -s -- -y && source "$HOME/.cargo/env" && git clone https://github.com/starkware-libs/sequencer.git; cd sequencer && sudo scripts/dependencies.sh cargo build --release -p apollo_network --bin network_stress_test' - ``` - -4. **Run test** - - ``` - PROJECT_ID= BASE_INSTANCE_NAME= ZONE= ./run_broadcast_stress_test.sh - ``` - -5. **Results** - - Results are retrieved from VM instances and saved to /output.csv. You can change the default path by adjusting the config file. - -## Pull repo updates to virtual machines - -1. **Run** - - ``` - PROJECT_ID= BASE_INSTANCE_NAME= ZONE= ./pull_stress_test.sh - ``` diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/converters.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/converters.rs deleted file mode 100644 index c520aec4d96..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/converters.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use lazy_static::lazy_static; - -lazy_static! { - // Calculate actual metadata size based on serialized empty message - pub static ref METADATA_SIZE: usize = { - let empty_message = StressTestMessage::new(0, 0, vec![]); - let serialized: Vec = empty_message.into(); - serialized.len() - }; -} - -#[derive(Debug, Clone, Copy)] -pub struct StressTestMessageMetadata { - pub sender_id: u64, - pub message_index: u64, - pub time: SystemTime, -} - -#[derive(Debug, Clone)] -pub struct StressTestMessage { - pub metadata: StressTestMessageMetadata, - pub payload: Vec, -} - -impl StressTestMessage { - pub fn new(sender_id: u64, message_index: u64, payload: Vec) -> Self { - StressTestMessage { - metadata: StressTestMessageMetadata { - sender_id, - message_index, - time: SystemTime::now(), - }, - payload, - } - } - - #[cfg(test)] - pub fn slow_len(self) -> usize { - let seq = Vec::::from(self); - seq.len() - } - - pub fn len(&self) -> usize { - *METADATA_SIZE + self.payload.len() - } -} - -impl From for Vec { - fn from(value: StressTestMessage) -> Self { - let payload_len: u64 = value.payload.len().try_into().unwrap(); - let duration = value.metadata.time.duration_since(UNIX_EPOCH).unwrap(); - [ - &value.metadata.sender_id.to_be_bytes()[..], - &value.metadata.message_index.to_be_bytes()[..], - &duration.as_secs().to_be_bytes()[..], - &duration.subsec_nanos().to_be_bytes()[..], - &payload_len.to_be_bytes()[..], - &value.payload[..], - ] - .concat() - } -} - -impl From> for StressTestMessage { - fn from(bytes: Vec) -> Self { - let mut i = 0; - let mut get = |n: usize| { - let r = &bytes[i..i + n]; - i += n; - r - }; - - let sender_id = u64::from_be_bytes(get(8).try_into().unwrap()); - let message_index = u64::from_be_bytes(get(8).try_into().unwrap()); - let secs = u64::from_be_bytes(get(8).try_into().unwrap()); - let nanos = u32::from_be_bytes(get(4).try_into().unwrap()); - let time = UNIX_EPOCH + Duration::new(secs, nanos); - let payload_len = u64::from_be_bytes(get(8).try_into().unwrap()).try_into().unwrap(); - let payload = get(payload_len).to_vec(); - - StressTestMessage { - metadata: StressTestMessageMetadata { sender_id, message_index, time }, - payload, - } - } -} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/converters_test.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/converters_test.rs deleted file mode 100644 index 5a52f7186ff..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/converters_test.rs +++ /dev/null @@ -1,52 +0,0 @@ -use rstest::rstest; - -use crate::converters::{StressTestMessage, METADATA_SIZE}; - -#[rstest] -#[case::one_byte_len(10)] -#[case::two_byte_len(300)] -#[case::three_byte_len(20_000)] -fn test_message_size(#[case] vec_len: usize) { - let payload = vec![0xAA; vec_len]; - let message = StressTestMessage::new(1, 7070, payload.clone()); - let expected_size = *METADATA_SIZE + vec_len; - assert_eq!(message.len(), expected_size); - assert_eq!(message.slow_len(), expected_size); -} - -#[test] -fn test_serialization_and_deserilization() { - let original_message = - StressTestMessage::new(u64::MAX - 1, u64::MAX - 2, vec![0xa1, 0xb2, 0xc3, 0xd4, 0xe5]); - - // Serialize to bytes - let serialized_bytes: Vec = original_message.clone().into(); - - // Deserialize back to message - let deserialized_message: StressTestMessage = serialized_bytes.into(); - - // Verify all fields match - assert_eq!(deserialized_message.metadata.sender_id, original_message.metadata.sender_id); - assert_eq!( - deserialized_message.metadata.message_index, - original_message.metadata.message_index - ); - assert_eq!(deserialized_message.payload, original_message.payload); - assert_eq!(deserialized_message.metadata.time, original_message.metadata.time); -} - -#[test] -fn test_empty_payload() { - let original_message = StressTestMessage::new(1, 42, vec![]); - - let serialized_bytes: Vec = original_message.clone().into(); - let deserialized_message: StressTestMessage = serialized_bytes.into(); - - assert_eq!(deserialized_message.metadata.sender_id, original_message.metadata.sender_id); - assert_eq!( - deserialized_message.metadata.message_index, - original_message.metadata.message_index - ); - assert_eq!(deserialized_message.payload, original_message.payload); - assert_eq!(deserialized_message.metadata.time, original_message.metadata.time); -} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs deleted file mode 100644 index f9318598dcf..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs +++ /dev/null @@ -1,241 +0,0 @@ -//! Runs a node that stress tests the p2p communication of the network. - -use std::convert::Infallible; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::str::FromStr; -use std::time::SystemTime; -use std::vec; - -use apollo_network::network_manager::{ - BroadcastTopicChannels, - BroadcastTopicClient, - BroadcastTopicClientTrait, - BroadcastTopicServer, - NetworkManager, -}; -use apollo_network::NetworkConfig; -use apollo_network_types::network_types::BroadcastedMessageMetadata; -use clap::Parser; -use converters::{StressTestMessage, METADATA_SIZE}; -use futures::future::join_all; -use futures::StreamExt; -use libp2p::gossipsub::{Sha256Topic, Topic}; -use libp2p::Multiaddr; -use metrics::{counter, gauge}; -use metrics_exporter_prometheus::PrometheusBuilder; -use tokio::time::Duration; -use tracing::{info, trace, Level}; - -#[cfg(test)] -mod converters_test; - -mod converters; -mod utils; - -lazy_static::lazy_static! { - static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string()); -} - -#[derive(Parser, Debug, Clone)] -#[command(version, about, long_about = None)] -struct Args { - /// ID for Prometheus logging - #[arg(short, long, env)] - id: usize, - - /// The port to run the Prometheus metrics server on - #[arg(long, env, default_value_t = 2000)] - metric_port: u16, - - /// The port to run the P2P network on - #[arg(short, env, long, default_value_t = 10000)] - p2p_port: u16, - - /// The address to the bootstrap peer - #[arg(long, env)] - bootstrap: Option, - - /// Set the verbosity level of the logger, the higher the more verbose - #[arg(short, long, env, default_value_t = 0)] - verbosity: u8, - - /// Buffer size for the broadcast topic - // Default from crates/apollo_consensus_manager/src/config.rs - #[arg(short, long, env, default_value_t = 10000)] - buffer_size: usize, - - /// Size of StressTestMessage - #[arg(short, long, env, default_value_t = 1 << 10)] - message_size_bytes: usize, - - /// The time to sleep between broadcasts of StressTestMessage in milliseconds - #[arg(long, env, default_value_t = 1)] - heartbeat_millis: u64, - - /// Maximum duration in seconds to run the node for - #[arg(short, long, env, default_value_t = 3_600)] - timeout: u64, -} - -async fn send_stress_test_messages( - mut broadcast_topic_client: BroadcastTopicClient, - args: &Args, - _peer_id: String, -) { - let mut message = StressTestMessage::new( - args.id.try_into().unwrap(), - 0, // message_index, will be updated in loop - vec![0; args.message_size_bytes - *METADATA_SIZE], - ); - let duration = Duration::from_millis(args.heartbeat_millis); - - for i in 0.. { - message.metadata.time = SystemTime::now(); - message.metadata.message_index = i; - broadcast_topic_client.broadcast_message(message.clone()).await.unwrap(); - trace!("Sent message {i}: {:?}", message); - counter!("sent_messages").increment(1); - tokio::time::sleep(duration).await; - } -} - -fn receive_stress_test_message( - message_result: Result, - _metadata: BroadcastedMessageMetadata, -) { - let end_time = SystemTime::now(); - - let received_message = message_result.unwrap(); - let start_time = received_message.metadata.time; - let duration = match end_time.duration_since(start_time) { - Ok(duration) => duration, - Err(_) => panic!("Got a negative duration, the clocks are not synced!"), - }; - - let delay_seconds = duration.as_secs_f64(); - let delay_micros = duration.as_micros().try_into().unwrap(); - - // TODO(AndrewL): Concentrate all string metrics to constants in a different file - counter!("message_received").increment(1); - counter!(format!("message_received_from_{}", received_message.metadata.sender_id)).increment(1); - - // TODO(AndrewL): This should be a historgram - gauge!("message_received_delay_seconds").set(delay_seconds); - gauge!(format!("message_received_delay_seconds_from_{}", received_message.metadata.sender_id)) - .set(delay_seconds); - - counter!("message_received_delay_micros_sum").increment(delay_micros); - counter!(format!( - "message_received_delay_micros_sum_from_{}", - received_message.metadata.sender_id - )) - .increment(delay_micros); - // TODO(AndrewL): Figure out what to log here -} - -async fn receive_stress_test_messages( - broadcasted_messages_receiver: BroadcastTopicServer, -) { - broadcasted_messages_receiver - .for_each(|result| async { - let (message_result, metadata) = result; - tokio::task::spawn_blocking(|| receive_stress_test_message(message_result, metadata)); - }) - .await; - unreachable!("BroadcastTopicServer stream should never terminate..."); -} - -fn create_peer_private_key(peer_index: usize) -> [u8; 32] { - let peer_index: u64 = peer_index.try_into().expect("Failed converting usize to u64"); - let array = peer_index.to_le_bytes(); - assert_eq!(array.len(), 8); - let mut private_key = [0u8; 32]; - private_key[0..8].copy_from_slice(&array); - private_key -} - -#[tokio::main] -async fn main() { - let args = Args::parse(); - - let level = match args.verbosity { - 0 => None, - 1 => Some(Level::ERROR), - 2 => Some(Level::WARN), - 3 => Some(Level::INFO), - 4 => Some(Level::DEBUG), - _ => Some(Level::TRACE), - }; - tracing::subscriber::set_global_default( - tracing_subscriber::FmtSubscriber::builder().with_max_level(level).finish(), - ) - .expect("Failed to set global default subscriber"); - - println!("Starting network stress test with args:\n{args:?}"); - - assert!( - args.message_size_bytes >= *METADATA_SIZE, - "Message size must be at least {} bytes", - *METADATA_SIZE - ); - - let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::UNSPECIFIED, - args.metric_port, - ))); - - builder.install().expect("Failed to install prometheus recorder/exporter"); - - let peer_private_key = create_peer_private_key(args.id); - let peer_private_key_hex = - peer_private_key.iter().map(|byte| format!("{byte:02x}")).collect::(); - info!("Secret Key: {peer_private_key_hex:#?}"); - - let mut network_config = NetworkConfig { - port: args.p2p_port, - secret_key: Some(peer_private_key.to_vec()), - ..Default::default() - }; - if let Some(peer) = &args.bootstrap { - let bootstrap_peer: Multiaddr = Multiaddr::from_str(peer).unwrap(); - network_config.bootstrap_peer_multiaddr = Some(vec![bootstrap_peer]); - } - - let mut network_manager = NetworkManager::new(network_config, None, None); - - let peer_id = network_manager.get_local_peer_id(); - info!("My PeerId: {peer_id}"); - - let network_channels = network_manager - .register_broadcast_topic::(TOPIC.clone(), args.buffer_size) - .unwrap(); - let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } = - network_channels; - - let mut tasks = Vec::new(); - - tasks.push(tokio::spawn(async move { - // Start the network manager to handle incoming connections and messages. - network_manager.run().await.unwrap(); - unreachable!("Network manager should not exit"); - })); - - tasks.push(tokio::spawn(async move { - receive_stress_test_messages(broadcasted_messages_receiver).await; - unreachable!("Broadcast topic receiver should not exit"); - })); - - let args_clone = args.clone(); - tasks.push(tokio::spawn(async move { - send_stress_test_messages(broadcast_topic_client, &args_clone, peer_id).await; - unreachable!("Broadcast topic client should not exit"); - })); - - let test_timeout = Duration::from_secs(args.timeout); - match tokio::time::timeout(test_timeout, join_all(tasks.into_iter())).await { - Ok(_) => unreachable!(), - Err(e) => { - info!("Test timeout after {e}"); - } - } -} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile deleted file mode 100644 index b58d4bf57f5..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -# syntax = devthefuture/dockerfile-x -# -# Run with the project root context: -# docker build -f crates/apollo_network/src/bin/network_stress_test/cluster/Dockerfile . - -INCLUDE deployments/images/base/Dockerfile - -FROM base AS builder -WORKDIR /usr/src/rust_code -COPY . . -# TODO(AndrewL): use cargo chef for better caching -RUN cargo build --release --bin network_stress_test - -FROM ubuntu:24.04 AS final_stage -COPY --from=builder /usr/src/rust_code/target/release/network_stress_test /usr/local/bin/network_stress_test - - -ENTRYPOINT ["network_stress_test"] diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/utils.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/utils.rs deleted file mode 100644 index faa02bd79f5..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/utils.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::collections::{BTreeMap, HashSet}; -use std::net::Ipv4Addr; -use std::vec; - -use apollo_config::dumping::{prepend_sub_config_name, ser_param, SerializeConfig}; -use apollo_config::{ParamPath, ParamPrivacyInput, SerializedParam}; -use apollo_network::utils::make_multiaddr; -use apollo_network::NetworkConfig; -use libp2p::identity::Keypair; -use serde::{Deserialize, Serialize}; - -pub const BOOTSTRAP_CONFIG_FILE_PATH: &str = - "crates/apollo_network/src/bin/network_stress_test/bootstrap_test_config.json"; -pub const BOOTSTRAP_OUTPUT_FILE_PATH: &str = - "crates/apollo_network/src/bin/network_stress_test/bootstrap_output.csv"; -pub const DEFAULT_CONFIG_FILE_PATH: &str = - "crates/apollo_network/src/bin/network_stress_test/test_config.json"; -pub const DEFAULT_OUTPUT_FILE_PATH: &str = - "crates/apollo_network/src/bin/network_stress_test/output.csv"; - -#[derive(Debug, Deserialize, Serialize)] -pub struct TestConfig { - pub network_config: NetworkConfig, - pub buffer_size: usize, - pub message_size: usize, - pub num_messages: u32, - pub output_path: String, -} - -impl SerializeConfig for TestConfig { - fn dump(&self) -> BTreeMap { - let mut config = BTreeMap::from_iter([ - ser_param( - "buffer_size", - &self.buffer_size, - "The buffer size for the network receiver.", - ParamPrivacyInput::Public, - ), - ser_param( - "message_size", - &self.message_size, - "The size of the payload for the test messages.", - ParamPrivacyInput::Public, - ), - ser_param( - "num_messages", - &self.num_messages, - "The amount of messages to send and receive.", - ParamPrivacyInput::Public, - ), - ser_param( - "output_path", - &self.output_path, - "The path of the output file.", - ParamPrivacyInput::Public, - ), - ]); - config.extend(prepend_sub_config_name(self.network_config.dump(), "network_config")); - config - } -} - -impl Default for TestConfig { - fn default() -> Self { - Self { - network_config: NetworkConfig::default(), - buffer_size: 1000, - message_size: 1000, - num_messages: 10000, - output_path: BOOTSTRAP_OUTPUT_FILE_PATH.to_string(), - } - } -} - -impl TestConfig { - #[allow(dead_code)] - pub fn create_config_files() { - let secret_key = vec![0; 32]; - let keypair = Keypair::ed25519_from_bytes(secret_key.clone()).unwrap(); - let peer_id = keypair.public().to_peer_id(); - - let _ = TestConfig { - network_config: NetworkConfig { - port: 10000, - secret_key: Some(secret_key), - ..Default::default() - }, - ..Default::default() - } - .dump_to_file(&vec![], &HashSet::new(), BOOTSTRAP_CONFIG_FILE_PATH); - let _ = TestConfig { - network_config: NetworkConfig { - port: 10002, - bootstrap_peer_multiaddr: Some(vec![make_multiaddr( - Ipv4Addr::LOCALHOST, - 10000, - Some(peer_id), - )]), - ..Default::default() - }, - output_path: DEFAULT_OUTPUT_FILE_PATH.to_string(), - ..Default::default() - } - .dump_to_file(&vec![], &HashSet::new(), DEFAULT_CONFIG_FILE_PATH); - } -}