diff --git a/Cargo.lock b/Cargo.lock index 4e2d48b7c38..ef0b2549860 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2095,6 +2095,7 @@ dependencies = [ "starknet_api", "strum 0.25.0", "strum_macros 0.25.3", + "sysinfo", "thiserror 1.0.69", "tokio", "tokio-retry", @@ -7355,7 +7356,7 @@ dependencies = [ "rtnetlink", "system-configuration 0.6.1", "tokio", - "windows", + "windows 0.53.0", ] [[package]] @@ -9214,6 +9215,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -9499,6 +9509,16 @@ dependencies = [ "objc2-core-foundation", ] +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "objc2-io-surface" version = "0.3.2" @@ -12680,6 +12700,20 @@ dependencies = [ "syn 2.0.110", ] +[[package]] +name = "sysinfo" +version = "0.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows 0.61.3", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -14053,6 +14087,28 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core 0.61.2", + "windows-future", + "windows-link 0.1.3", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core 0.61.2", +] + [[package]] name = "windows-core" version = "0.53.0" @@ -14063,6 +14119,19 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -14076,6 +14145,17 @@ dependencies = [ "windows-strings 0.5.1", ] +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -14110,6 +14190,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", +] + [[package]] name = "windows-registry" version = "0.5.3" @@ -14259,6 +14349,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index 61c7998fbdc..2ec07de56da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -363,6 +363,8 @@ statistical = "1.0.0" strum = "0.25.0" strum_macros = "0.25.2" syn = "2.0.39" +sysinfo = "0.37.1" +tar = "0.4.38" tempfile = "3.7.0" test-case = "3.2.1" test-log = "0.2.14" diff --git a/crates/apollo_network/Cargo.toml b/crates/apollo_network/Cargo.toml index f1804bfccf3..153c7cde912 100644 --- a/crates/apollo_network/Cargo.toml +++ b/crates/apollo_network/Cargo.toml @@ -40,6 +40,7 @@ serde = { workspace = true, features = ["derive"] } starknet_api.workspace = true strum.workspace = true strum_macros.workspace = true +sysinfo.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["full", "sync"] } tokio-retry.workspace = true diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs index f9318598dcf..50a4f3a8d09 100644 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs @@ -21,7 +21,7 @@ use futures::future::join_all; use futures::StreamExt; use libp2p::gossipsub::{Sha256Topic, Topic}; use libp2p::Multiaddr; -use metrics::{counter, gauge}; +// use metrics::{counter, gauge}; use metrics_exporter_prometheus::PrometheusBuilder; use tokio::time::Duration; use tracing::{info, trace, Level}; @@ -30,6 +30,7 @@ use tracing::{info, trace, Level}; mod converters_test; mod converters; +pub mod metrics; mod utils; lazy_static::lazy_static! { @@ -94,7 +95,7 @@ async fn send_stress_test_messages( message.metadata.message_index = i; broadcast_topic_client.broadcast_message(message.clone()).await.unwrap(); trace!("Sent message {i}: {:?}", message); - counter!("sent_messages").increment(1); + // counter!("sent_messages").increment(1); tokio::time::sleep(duration).await; } } @@ -112,24 +113,25 @@ fn receive_stress_test_message( Err(_) => panic!("Got a negative duration, the clocks are not synced!"), }; - let delay_seconds = duration.as_secs_f64(); - let delay_micros = duration.as_micros().try_into().unwrap(); + let _delay_seconds = duration.as_secs_f64(); + // let delay_micros = duration.as_micros().try_into().unwrap(); // TODO(AndrewL): Concentrate all string metrics to constants in a different file - counter!("message_received").increment(1); - counter!(format!("message_received_from_{}", received_message.metadata.sender_id)).increment(1); + // counter!("message_received").increment(1); + // counter!(format!("message_received_from_{}", + // received_message.metadata.sender_id)).increment(1); // TODO(AndrewL): This should be a historgram - gauge!("message_received_delay_seconds").set(delay_seconds); - gauge!(format!("message_received_delay_seconds_from_{}", received_message.metadata.sender_id)) - .set(delay_seconds); - - counter!("message_received_delay_micros_sum").increment(delay_micros); - counter!(format!( - "message_received_delay_micros_sum_from_{}", - received_message.metadata.sender_id - )) - .increment(delay_micros); + // gauge!("message_received_delay_seconds").set(delay_seconds); + // gauge!(format!("message_received_delay_seconds_from_{}", + // received_message.metadata.sender_id)) .set(delay_seconds); + + // counter!("message_received_delay_micros_sum").increment(delay_micros); + // counter!(format!( + // "message_received_delay_micros_sum_from_{}", + // received_message.metadata.sender_id + // )) + // .increment(delay_micros); // TODO(AndrewL): Figure out what to log here } diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs new file mode 100644 index 00000000000..59051f58b08 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs @@ -0,0 +1,226 @@ +use std::collections::HashMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use apollo_metrics::define_metrics; +use apollo_metrics::metrics::LossyIntoF64; +use apollo_network::metrics::{ + BroadcastNetworkMetrics, + EventMetrics, + LatencyMetrics, + NetworkMetrics, + SqmrNetworkMetrics, + EVENT_TYPE_LABELS, + NETWORK_BROADCAST_DROP_LABELS, +}; +use libp2p::gossipsub::{Sha256Topic, Topic}; +use sysinfo::{Networks, System}; +use tokio::time::interval; +use tracing::warn; + +use crate::converters::StressTestMessage; + +lazy_static::lazy_static! { + pub static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string()); +} + +define_metrics!( + Infra => { + MetricGauge { BROADCAST_MESSAGE_HEARTBEAT_MILLIS, "broadcast_message_theoretical_heartbeat_millis", "The number of ms we sleep between each two consecutive broadcasts" }, + MetricGauge { BROADCAST_MESSAGE_THROUGHPUT, "broadcast_message_theoretical_throughput", "Throughput in bytes/second of the broadcasted " }, + MetricGauge { BROADCAST_MESSAGE_BYTES, "broadcast_message_bytes", "Size of the stress test sent message in bytes" }, + MetricCounter { BROADCAST_MESSAGE_COUNT, "broadcast_message_count", "Number of stress test messages sent via broadcast", init = 0 }, + MetricCounter { BROADCAST_MESSAGE_BYTES_SUM, "broadcast_message_bytes_sum", "Sum of the stress test messages sent via broadcast", init = 0 }, + MetricHistogram { BROADCAST_MESSAGE_SEND_DELAY_SECONDS, "broadcast_message_send_delay_seconds", "Message sending delay in seconds" }, + + MetricGauge { RECEIVE_MESSAGE_BYTES, "receive_message_bytes", "Size of the stress test received message in bytes" }, + MetricCounter { RECEIVE_MESSAGE_COUNT, "receive_message_count", "Number of stress test messages received via broadcast", init = 0 }, + MetricCounter { RECEIVE_MESSAGE_BYTES_SUM, "receive_message_bytes_sum", "Sum of the stress test messages received via broadcast", init = 0 }, + MetricHistogram { RECEIVE_MESSAGE_DELAY_SECONDS, "receive_message_delay_seconds", "Message delay in seconds" }, + MetricHistogram { RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS, "receive_message_negative_delay_seconds", "Negative message delay in seconds" }, + + MetricGauge { NETWORK_CONNECTED_PEERS, "network_connected_peers", "Number of connected peers in the network" }, + MetricGauge { NETWORK_BLACKLISTED_PEERS, "network_blacklisted_peers", "Number of blacklisted peers in the network" }, + MetricGauge { NETWORK_ACTIVE_INBOUND_SESSIONS, "network_active_inbound_sessions", "Number of active inbound SQMR sessions" }, + MetricGauge { NETWORK_ACTIVE_OUTBOUND_SESSIONS, "network_active_outbound_sessions", "Number of active outbound SQMR sessions" }, + MetricCounter { NETWORK_STRESS_TEST_SENT_MESSAGES, "network_stress_test_sent_messages", "Number of stress test messages sent via broadcast", init = 0 }, + MetricCounter { NETWORK_STRESS_TEST_RECEIVED_MESSAGES, "network_stress_test_received_messages", "Number of stress test messages received via broadcast", init = 0 }, + + MetricGauge { SYSTEM_PROCESS_CPU_USAGE_PERCENT, "system_process_cpu_usage_percent", "CPU usage percentage of the current process" }, + MetricGauge { SYSTEM_PROCESS_MEMORY_USAGE_BYTES, "system_process_memory_usage_bytes", "Memory usage in bytes of the current process" }, + MetricGauge { SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES, "system_process_virtual_memory_usage_bytes", "Virtual memory usage in bytes of the current process" }, + MetricGauge { SYSTEM_NETWORK_BYTES_SENT_TOTAL, "system_network_bytes_sent_total", "Total bytes sent across all network interfaces since system start" }, + MetricGauge { SYSTEM_NETWORK_BYTES_RECEIVED_TOTAL, "system_network_bytes_received_total", "Total bytes received across all network interfaces since system start" }, + MetricGauge { SYSTEM_NETWORK_BYTES_SENT_CURRENT, "system_network_bytes_sent_current", "Bytes sent across all network interfaces since last measurement" }, + MetricGauge { SYSTEM_NETWORK_BYTES_RECEIVED_CURRENT, "system_network_bytes_received_current", "Bytes received across all network interfaces since last measurement" }, + MetricGauge { SYSTEM_TOTAL_MEMORY_BYTES, "system_total_memory_bytes", "Total system memory in bytes" }, + MetricGauge { SYSTEM_AVAILABLE_MEMORY_BYTES, "system_available_memory_bytes", "Available system memory in bytes" }, + MetricGauge { SYSTEM_USED_MEMORY_BYTES, "system_used_memory_bytes", "Used system memory in bytes" }, + MetricGauge { SYSTEM_CPU_COUNT, "system_cpu_count", "Number of logical CPU cores in the system" }, + + MetricCounter { NETWORK_RESET_TOTAL, "network_reset_total", "Total number of network resets performed", init = 0 }, + LabeledMetricCounter { NETWORK_DROPPED_BROADCAST_MESSAGES, "network_dropped_broadcast_messages", "Number of dropped broadcast messages by reason", init = 0, labels = NETWORK_BROADCAST_DROP_LABELS }, + LabeledMetricCounter { NETWORK_EVENT_COUNTER, "network_event_counter", "Network events counter by type", init = 0, labels = EVENT_TYPE_LABELS }, + + MetricHistogram { PING_LATENCY_SECONDS, "ping_latency_seconds", "Ping latency in seconds" }, + }, +); + +pub fn update_broadcast_metrics(message_size_bytes: usize, broadcast_heartbeat: Duration) { + BROADCAST_MESSAGE_HEARTBEAT_MILLIS.set(broadcast_heartbeat.as_millis().into_f64()); + BROADCAST_MESSAGE_THROUGHPUT.set(get_throughput(message_size_bytes, broadcast_heartbeat)); +} + +pub fn receive_stress_test_message(received_message: Vec) { + let end_time = SystemTime::now(); + + let received_message: StressTestMessage = received_message.into(); + let start_time = received_message.metadata.time; + let delay_seconds = match end_time.duration_since(start_time) { + Ok(duration) => duration.as_secs_f64(), + Err(_) => { + let negative_duration = start_time.duration_since(end_time).unwrap(); + -negative_duration.as_secs_f64() + } + }; + + // Use apollo_metrics for all metrics including labeled ones + RECEIVE_MESSAGE_BYTES.set(received_message.len().into_f64()); + RECEIVE_MESSAGE_COUNT.increment(1); + RECEIVE_MESSAGE_BYTES_SUM.increment( + u64::try_from(received_message.len()).expect("Message length too large for u64"), + ); + + // Use apollo_metrics histograms for latency measurements + if delay_seconds.is_sign_positive() { + RECEIVE_MESSAGE_DELAY_SECONDS.record(delay_seconds); + } else { + RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS.record(-delay_seconds); + } +} + +pub fn seconds_since_epoch() -> u64 { + let now = SystemTime::now(); + now.duration_since(UNIX_EPOCH).unwrap().as_secs() +} + +/// Calculates the throughput given the message and how much to sleep between each two consecutive +/// broadcasts +pub fn get_throughput(message_size_bytes: usize, heartbeat_duration: Duration) -> f64 { + let tps = Duration::from_secs(1).as_secs_f64() / heartbeat_duration.as_secs_f64(); + tps * message_size_bytes.into_f64() +} + +/// Creates comprehensive network metrics for monitoring the stress test network performance. +/// Uses the lazy static metrics defined above. +pub fn create_network_metrics() -> NetworkMetrics { + // Create broadcast metrics for the stress test topic + let stress_test_broadcast_metrics = BroadcastNetworkMetrics { + num_sent_broadcast_messages: NETWORK_STRESS_TEST_SENT_MESSAGES, + num_dropped_broadcast_messages: NETWORK_DROPPED_BROADCAST_MESSAGES, + num_received_broadcast_messages: NETWORK_STRESS_TEST_RECEIVED_MESSAGES, + }; + + // Create a map with broadcast metrics for our stress test topic + let mut broadcast_metrics_by_topic = HashMap::new(); + broadcast_metrics_by_topic.insert(TOPIC.hash(), stress_test_broadcast_metrics); + + // Create SQMR metrics for session monitoring + let sqmr_metrics = SqmrNetworkMetrics { + num_active_inbound_sessions: NETWORK_ACTIVE_INBOUND_SESSIONS, + num_active_outbound_sessions: NETWORK_ACTIVE_OUTBOUND_SESSIONS, + }; + + // Create event metrics for network events monitoring + let event_metrics = EventMetrics { event_counter: NETWORK_EVENT_COUNTER }; + + // Create latency metrics for ping monitoring + let latency_metrics = LatencyMetrics { ping_latency_seconds: PING_LATENCY_SECONDS }; + + NetworkMetrics { + num_connected_peers: NETWORK_CONNECTED_PEERS, + num_blacklisted_peers: NETWORK_BLACKLISTED_PEERS, + broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic), + sqmr_metrics: Some(sqmr_metrics), + event_metrics: Some(event_metrics), + latency_metrics: Some(latency_metrics), + } +} + +pub async fn monitor_process_metrics(interval_seconds: u64) { + let mut interval = interval(Duration::from_secs(interval_seconds)); + let current_pid = sysinfo::get_current_pid().expect("Failed to get current process PID"); + + // Initialize networks for network interface monitoring + let mut networks = Networks::new_with_refreshed_list(); + + // Initialize empty system for CPU monitoring - we'll refresh only what we need + let mut system = System::new_all(); + + loop { + interval.tick().await; + + // Refresh only the specific data we need + // system.refresh_memory_specifics(MemoryRefreshKind::new().with_ram()); + // system.refresh_cpu_specifics(CpuRefreshKind::new().with_cpu_usage()); + // system.refresh_processes_specifics( + // ProcessesToUpdate::Some(&[current_pid]), + // false, + // ProcessRefreshKind::everything(), + // ); + // system.refresh_specifics( + // RefreshKind::new() + // .with_cpu(CpuRefreshKind::everything()) + // .with_memory(MemoryRefreshKind::everything()) + // .with_processes(ProcessRefreshKind::new().spe), + // ); + system.refresh_all(); + let total_memory: f64 = system.total_memory().into_f64(); + let available_memory: f64 = system.available_memory().into_f64(); + let used_memory: f64 = system.used_memory().into_f64(); + let cpu_count: f64 = system.cpus().len().into_f64(); + // let load_avg: f64 = system.load_average().one.into_f64(); + + SYSTEM_TOTAL_MEMORY_BYTES.set(total_memory); + SYSTEM_AVAILABLE_MEMORY_BYTES.set(available_memory); + SYSTEM_USED_MEMORY_BYTES.set(used_memory); + SYSTEM_CPU_COUNT.set(cpu_count); + + if let Some(process) = system.process(current_pid) { + let cpu_usage: f64 = process.cpu_usage().into(); + let memory_usage: f64 = process.memory().into_f64(); + let virtual_memory_usage: f64 = process.virtual_memory().into_f64(); + + SYSTEM_PROCESS_CPU_USAGE_PERCENT.set(cpu_usage); + SYSTEM_PROCESS_MEMORY_USAGE_BYTES.set(memory_usage); + SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES.set(virtual_memory_usage); + } else { + warn!("Could not find process information for PID: {}", current_pid); + } + + // Refresh network statistics and collect metrics + networks.refresh(false); + + let mut total_bytes_sent: u64 = 0; + let mut total_bytes_received: u64 = 0; + let mut current_bytes_sent: u64 = 0; + let mut current_bytes_received: u64 = 0; + + for (interface_name, data) in &networks { + // Skip virtual interfaces used for traffic control and loopback to avoid + // double-counting + if interface_name == "lo" || interface_name.starts_with("ifb") { + continue; + } + + total_bytes_sent += data.total_transmitted(); + total_bytes_received += data.total_received(); + current_bytes_sent += data.transmitted(); + current_bytes_received += data.received(); + } + + SYSTEM_NETWORK_BYTES_SENT_TOTAL.set(total_bytes_sent.into_f64()); + SYSTEM_NETWORK_BYTES_RECEIVED_TOTAL.set(total_bytes_received.into_f64()); + SYSTEM_NETWORK_BYTES_SENT_CURRENT.set(current_bytes_sent.into_f64()); + SYSTEM_NETWORK_BYTES_RECEIVED_CURRENT.set(current_bytes_received.into_f64()); + } +}