From 9cb64f090f1ea44d4a45a27ef37f74fedd479ed0 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 4 Dec 2025 15:17:10 +0100 Subject: [PATCH 1/5] Add latency bench --- .../tests/zombie_ci/statement_store_bench.rs | 248 +++++++++++++++++- 1 file changed, 247 insertions(+), 1 deletion(-) diff --git a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs index a284344ec703a..d796fdf4b5e4a 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs @@ -9,7 +9,7 @@ use log::{debug, info, trace}; use sc_statement_store::{DEFAULT_MAX_TOTAL_SIZE, DEFAULT_MAX_TOTAL_STATEMENTS}; use sp_core::{blake2_256, sr25519, Bytes, Pair}; use sp_statement_store::{Channel, Statement, Topic}; -use std::{cell::Cell, collections::HashMap, time::Duration}; +use std::{cell::Cell, collections::HashMap, sync::Arc, time::Duration}; use tokio::time::timeout; use zombienet_sdk::{ subxt::{backend::rpc::RpcClient, ext::subxt_rpcs::rpc_params}, @@ -762,3 +762,249 @@ fn channel_message(sender: &sr25519::Public, receiver: &sr25519::Public) -> Chan data.extend_from_slice(receiver.as_ref()); blake2_256(&data) } + +struct LatencyBenchConfig { + num_rounds: usize, + num_nodes: usize, + num_clients: u32, + max_retries: u32, + retry_delay_ms: u64, + propagation_delay_ms: u64, + interval_ms: u64, + req_timeout_ms: u64, + messages_pattern: &'static [(usize, usize)], +} + +impl LatencyBenchConfig { + fn messages_per_client(&self) -> usize { + self.messages_pattern.iter().map(|(count, _)| count).sum() + } +} + +#[derive(Debug, Clone)] +struct RoundStats { + send_duration: Duration, + receive_duration: Duration, + full_latency: Duration, + sent_count: u32, + received_count: u32, + receive_attempts: u32, +} + +#[tokio::test(flavor = "multi_thread")] +async fn statement_store_latency_bench() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let config = Arc::new(LatencyBenchConfig { + num_nodes: 5, + num_clients: 50000, + interval_ms: 10000, + num_rounds: 1, + messages_pattern: &[(5, 1024 / 2)], + max_retries: 500, + retry_delay_ms: 100, + req_timeout_ms: 3000, + propagation_delay_ms: 2000, + }); + + let collator_names: Vec = + (0..config.num_nodes).map(|i| format!("collator{}", i)).collect(); + let collator_names: Vec<&str> = collator_names.iter().map(|s| s.as_str()).collect(); + + let network = spawn_network(&collator_names).await?; + + info!("Starting Latency benchmark"); + info!(""); + info!("Clients: {}", config.num_clients); + info!("Nodes: {}", config.num_nodes); + info!("Rounds: {}", config.num_rounds); + info!("Interval, ms: {}", config.interval_ms); + info!("Messages, per round: {}", config.messages_per_client() as u32 * config.num_clients); + info!("Message pattern:"); + for &(count, size) in config.messages_pattern { + info!(" - {} messages {} bytes", count, size); + } + + let mut rpc_clients = Vec::new(); + for &name in &collator_names { + let node = network.get_node(name)?; + let rpc_client = node.rpc().await?; + rpc_clients.push(rpc_client); + } + + let handles: Vec<_> = (0..config.num_clients) + .map(|client_id| { + let config = Arc::clone(&config); + let (keyring, _) = sr25519::Pair::generate(); + let node_idx = (client_id as usize) % config.num_nodes; + let rpc_client = rpc_clients[node_idx].clone(); + let neighbour_id = (client_id + 1) % config.num_clients; + let neighbour_node_idx = (neighbour_id as usize) % config.num_nodes; + if node_idx == neighbour_node_idx && config.num_nodes > 1 { + panic!( + "Client {} and neighbour {} are on the same node {}!", + client_id, neighbour_id, node_idx + ); + } + + tokio::spawn(async move { + let mut rounds_stats = Vec::new(); + for round in 0..config.num_rounds { + let round_start = std::time::Instant::now(); + + let send_start = std::time::Instant::now(); + let mut msg_idx: u32 = 0; + + for &(count, size) in config.messages_pattern { + for _ in 0..count { + let mut statement = Statement::new(); + let topic = blake2_256( + format!("{}-{}-{}", client_id, round, msg_idx).as_bytes(), + ); + let channel = blake2_256(msg_idx.to_le_bytes().as_ref()); + + statement.set_channel(channel); + statement.set_priority(round as u32); + statement.set_topic(0, topic); + statement.set_plain_data(vec![0u8; size]); + statement.sign_sr25519_private(&keyring); + + let encoded: Bytes = statement.encode().into(); + rpc_client + .request::<()>("statement_submit", rpc_params![encoded]) + .await?; + + msg_idx += 1; + } + } + + let sent_count = msg_idx; + let send_duration = send_start.elapsed(); + + tokio::time::sleep(Duration::from_millis(config.propagation_delay_ms)).await; + + let receive_start = std::time::Instant::now(); + let mut received_count = 0; + let mut receive_attempts = 0; + + for msg_idx in 0..config.messages_per_client() as u32 { + let topic = blake2_256( + format!("{}-{}-{}", neighbour_id, round, msg_idx).as_bytes(), + ); + + for retry in 0..config.max_retries { + receive_attempts += 1; + match timeout( + Duration::from_millis(config.req_timeout_ms), + rpc_client.request::>( + "statement_broadcastsStatement", + rpc_params![vec![topic]], + ), + ) + .await + { + Ok(Ok(statements)) if !statements.is_empty() => { + received_count += statements.len() as u32; + break; + }, + _ if retry < config.max_retries - 1 => { + tokio::time::sleep(Duration::from_millis( + config.retry_delay_ms, + )) + .await; + }, + _ => { + return Err(anyhow!( + "Client {}: Failed to retrieve message {} from neighbour {} after {} retries", + client_id, msg_idx, neighbour_id, config.max_retries + )); + }, + } + } + } + + let receive_duration = receive_start.elapsed(); + let full_latency = round_start.elapsed(); + if full_latency < Duration::from_millis(config.interval_ms) { + tokio::time::sleep( + Duration::from_millis(config.interval_ms) - full_latency, + ) + .await; + } + + rounds_stats.push(RoundStats { + send_duration, + receive_duration, + full_latency, + sent_count, + received_count, + receive_attempts, + }); + } + + // Verify all messages were sent and received + let expected_count = config.messages_per_client() as u32; + for stats in &rounds_stats { + if stats.sent_count != expected_count { + return Err(anyhow!( + "Client {}: Expected {} messages sent, but got {}", + client_id, + expected_count, + stats.sent_count + )); + } + if stats.received_count != expected_count { + return Err(anyhow!( + "Client {}: Expected {} messages received, but got {}", + client_id, + expected_count, + stats.received_count + )); + } + } + + Ok::<_, anyhow::Error>(rounds_stats) + }) + }) + .collect(); + + let mut all_round_stats = Vec::new(); + for handle in handles { + let stats = handle.await??; + all_round_stats.extend(stats); + } + + let calc_stats = |values: Vec| -> (f64, f64, f64) { + let min = values.iter().copied().fold(f64::INFINITY, f64::min); + let max = values.iter().copied().fold(f64::NEG_INFINITY, f64::max); + let avg = values.iter().sum::() / values.len() as f64; + (min, avg, max) + }; + + let send_s = + calc_stats(all_round_stats.iter().map(|s| s.send_duration.as_secs_f64()).collect()); + let read_s = + calc_stats(all_round_stats.iter().map(|s| s.receive_duration.as_secs_f64()).collect()); + let latency_s = + calc_stats(all_round_stats.iter().map(|s| s.full_latency.as_secs_f64()).collect()); + let attempts = calc_stats(all_round_stats.iter().map(|s| s.receive_attempts as f64).collect()); + let attempts_per_msg = ( + attempts.0 / config.messages_per_client() as f64, + attempts.1 / config.messages_per_client() as f64, + attempts.2 / config.messages_per_client() as f64, + ); + + info!(""); + info!(" Min Avg Max"); + info!("Send, s {:>8.3} {:>8.3} {:>8.3}", send_s.0, send_s.1, send_s.2); + info!("Receive, s {:>8.3} {:>8.3} {:>8.3}", read_s.0, read_s.1, read_s.2); + info!("Latency, s {:>8.3} {:>8.3} {:>8.3}", latency_s.0, latency_s.1, latency_s.2); + info!( + "Attempts, per msg {:>8.1} {:>8.1} {:>8.1}", + attempts_per_msg.0, attempts_per_msg.1, attempts_per_msg.2 + ); + + Ok(()) +} From 0b41ac24fb13a8821b5046a287c80325d980048a Mon Sep 17 00:00:00 2001 From: Javier Viola <363911+pepoviola@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:52:57 +0100 Subject: [PATCH 2/5] add bench test to run in k8s --- .github/workflows/zombienet_cumulus.yml | 2 +- .github/zombienet-tests/zombienet_cumulus_tests.yml | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/zombienet_cumulus.yml b/.github/workflows/zombienet_cumulus.yml index 1bc05e398502a..91aaef3d0c843 100644 --- a/.github/workflows/zombienet_cumulus.yml +++ b/.github/workflows/zombienet_cumulus.yml @@ -69,7 +69,7 @@ jobs: POLKADOT_IMAGE: "${{ needs.preflight.outputs.TEMP_IMAGES_BASE }}/polkadot-debug:${{ needs.preflight.outputs.DOCKER_IMAGES_VERSION }}" CUMULUS_IMAGE: "${{ needs.preflight.outputs.TEMP_IMAGES_BASE }}/${{ matrix.test.cumulus-image }}:${{ needs.preflight.outputs.DOCKER_IMAGES_VERSION }}" RUST_LOG: ${{ needs.preflight.outputs.RUST_LOG }} - ZOMBIE_PROVIDER: ${{ needs.preflight.outputs.ZOMBIE_PROVIDER }} + ZOMBIE_PROVIDER: ${{ matrix.test.use-provider && matrix.test.use-provider || needs.preflight.outputs.ZOMBIE_PROVIDER }} strategy: fail-fast: false diff --git a/.github/zombienet-tests/zombienet_cumulus_tests.yml b/.github/zombienet-tests/zombienet_cumulus_tests.yml index 2de3c3160799d..db83fe664a9cd 100644 --- a/.github/zombienet-tests/zombienet_cumulus_tests.yml +++ b/.github/zombienet-tests/zombienet_cumulus_tests.yml @@ -86,3 +86,10 @@ cumulus-image: "test-parachain" use-zombienet-sdk: true needs-wasm-binary: true + +- job-name: "zombienet-cumulus-statement_store_latency_bench" + test-filter: "zombie_ci::statement_store_bench::statement_store_latency_bench" + runner-type: "default" + cumulus-image: "test-parachain" + use-zombienet-sdk: true + use-provider: "k8s" From 214d579d1e9974b12a4241df2a9634c8d0971a61 Mon Sep 17 00:00:00 2001 From: Javier Viola <363911+pepoviola@users.noreply.github.com> Date: Tue, 9 Dec 2025 09:39:48 +0100 Subject: [PATCH 3/5] clean code --- .../tests/zombie_ci/statement_store_bench.rs | 62 ++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs index d796fdf4b5e4a..ffe0aeecda461 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs @@ -15,6 +15,7 @@ use zombienet_sdk::{ subxt::{backend::rpc::RpcClient, ext::subxt_rpcs::rpc_params}, LocalFileSystem, Network, NetworkConfigBuilder, }; +use std::env; const GROUP_SIZE: u32 = 6; const PARTICIPANT_SIZE: u32 = GROUP_SIZE * 8333; // Target ~50,000 total @@ -312,14 +313,40 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> { async fn spawn_network(collators: &[&str]) -> Result, anyhow::Error> { assert!(collators.len() >= 2); let images = zombienet_sdk::environment::get_images_from_env(); + let maybe_req_cpu = env::var("ZOMBIE_K8S_REQUEST_CPU"); + let maybe_req_mem = env::var("ZOMBIE_K8S_REQUEST_MEM"); let config = NetworkConfigBuilder::new() .with_relaychain(|r| { - r.with_chain("rococo-local") + let r = r + .with_chain("rococo-local") .with_default_command("polkadot") .with_default_image(images.polkadot.as_str()) - .with_default_args(vec!["-lparachain=debug".into()]) - .with_node(|node| node.with_name("validator-0")) - .with_node(|node| node.with_name("validator-1")) + .with_default_args(vec!["-lparachain=debug".into()]); + + + let r = match (&maybe_req_cpu, &maybe_req_mem) { + (Err(_), Err(_)) => r, + _ => { + r.with_default_resources(|resources| { + let resources = if let Ok(cpu_req) = &maybe_req_cpu { + resources.with_request_cpu(cpu_req.as_str()) + } else { + resources + }; + + let resources = if let Ok(mem_req) = &maybe_req_mem { + resources.with_request_memory(mem_req.as_str()) + } else { + resources + }; + resources + }) + } + }; + + r + .with_node(|node| node.with_name("validator-0")) + .with_node(|node| node.with_name("validator-1")) }) .with_parachain(|p| { let p = p @@ -332,9 +359,30 @@ async fn spawn_network(collators: &[&str]) -> Result, a "-lstatement-store=info,statement-gossip=info,error".into(), "--enable-statement-store".into(), "--rpc-max-connections=50000".into(), - ]) - // Have to set outside of the loop below, so that `p` has the right type. - .with_collator(|n| n.with_name(collators[0])); + ]); + + let p = match (&maybe_req_cpu, &maybe_req_mem) { + (Err(_), Err(_)) => p, + _ => { + p.with_default_resources(|resources| { + let resources = if let Ok(cpu_req) = maybe_req_cpu { + resources.with_request_cpu(cpu_req.as_str()) + } else { + resources + }; + + let resources = if let Ok(mem_req) = maybe_req_mem { + resources.with_request_memory(mem_req.as_str()) + } else { + resources + }; + resources + }) + } + }; + + // Have to set outside of the loop below, so that `p` has the right type. + let p = p.with_collator(|n| n.with_name(collators[0])); collators[1..] .iter() From 3a157df0940ae4ef0d089c3e20e5247330a8cd9f Mon Sep 17 00:00:00 2001 From: Javier Viola <363911+pepoviola@users.noreply.github.com> Date: Tue, 9 Dec 2025 09:41:19 +0100 Subject: [PATCH 4/5] fmt --- .../tests/zombie_ci/statement_store_bench.rs | 65 +++++++++---------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs index ffe0aeecda461..dff0632c834dc 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs @@ -9,13 +9,12 @@ use log::{debug, info, trace}; use sc_statement_store::{DEFAULT_MAX_TOTAL_SIZE, DEFAULT_MAX_TOTAL_STATEMENTS}; use sp_core::{blake2_256, sr25519, Bytes, Pair}; use sp_statement_store::{Channel, Statement, Topic}; -use std::{cell::Cell, collections::HashMap, sync::Arc, time::Duration}; +use std::{cell::Cell, collections::HashMap, env, sync::Arc, time::Duration}; use tokio::time::timeout; use zombienet_sdk::{ subxt::{backend::rpc::RpcClient, ext::subxt_rpcs::rpc_params}, LocalFileSystem, Network, NetworkConfigBuilder, }; -use std::env; const GROUP_SIZE: u32 = 6; const PARTICIPANT_SIZE: u32 = GROUP_SIZE * 8333; // Target ~50,000 total @@ -323,30 +322,26 @@ async fn spawn_network(collators: &[&str]) -> Result, a .with_default_image(images.polkadot.as_str()) .with_default_args(vec!["-lparachain=debug".into()]); - let r = match (&maybe_req_cpu, &maybe_req_mem) { (Err(_), Err(_)) => r, - _ => { - r.with_default_resources(|resources| { - let resources = if let Ok(cpu_req) = &maybe_req_cpu { - resources.with_request_cpu(cpu_req.as_str()) - } else { - resources - }; - - let resources = if let Ok(mem_req) = &maybe_req_mem { - resources.with_request_memory(mem_req.as_str()) - } else { - resources - }; + _ => r.with_default_resources(|resources| { + let resources = if let Ok(cpu_req) = &maybe_req_cpu { + resources.with_request_cpu(cpu_req.as_str()) + } else { resources - }) - } + }; + + let resources = if let Ok(mem_req) = &maybe_req_mem { + resources.with_request_memory(mem_req.as_str()) + } else { + resources + }; + resources + }), }; - r - .with_node(|node| node.with_name("validator-0")) - .with_node(|node| node.with_name("validator-1")) + r.with_node(|node| node.with_name("validator-0")) + .with_node(|node| node.with_name("validator-1")) }) .with_parachain(|p| { let p = p @@ -363,22 +358,20 @@ async fn spawn_network(collators: &[&str]) -> Result, a let p = match (&maybe_req_cpu, &maybe_req_mem) { (Err(_), Err(_)) => p, - _ => { - p.with_default_resources(|resources| { - let resources = if let Ok(cpu_req) = maybe_req_cpu { - resources.with_request_cpu(cpu_req.as_str()) - } else { - resources - }; - - let resources = if let Ok(mem_req) = maybe_req_mem { - resources.with_request_memory(mem_req.as_str()) - } else { - resources - }; + _ => p.with_default_resources(|resources| { + let resources = if let Ok(cpu_req) = maybe_req_cpu { + resources.with_request_cpu(cpu_req.as_str()) + } else { resources - }) - } + }; + + let resources = if let Ok(mem_req) = maybe_req_mem { + resources.with_request_memory(mem_req.as_str()) + } else { + resources + }; + resources + }), }; // Have to set outside of the loop below, so that `p` has the right type. From e7b2fde85637509776db9cb7122b05fb199f355a Mon Sep 17 00:00:00 2001 From: Javier Viola <363911+pepoviola@users.noreply.github.com> Date: Tue, 9 Dec 2025 11:10:20 +0100 Subject: [PATCH 5/5] comment k8s_auth in action --- .github/actions/zombienet-sdk/action.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/actions/zombienet-sdk/action.yml b/.github/actions/zombienet-sdk/action.yml index 197b90809976a..32d302106e39e 100644 --- a/.github/actions/zombienet-sdk/action.yml +++ b/.github/actions/zombienet-sdk/action.yml @@ -71,12 +71,12 @@ runs: gh-token: ${{ inputs.gh-token }} run-id: ${{ inputs.build-id }} - - name: k8s_auth - if: env.ZOMBIE_PROVIDER == 'k8s' - shell: bash - run: | - . /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh - k8s_auth + # - name: k8s_auth + # if: env.ZOMBIE_PROVIDER == 'k8s' + # shell: bash + # run: | + # . /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + # k8s_auth - name: zombie_test shell: bash