Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/actions/zombienet-sdk/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ runs:
gh-token: ${{ inputs.gh-token }}
run-id: ${{ inputs.build-id }}

- name: k8s_auth
if: env.ZOMBIE_PROVIDER == 'k8s'
shell: bash
run: |
. /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
k8s_auth
# - name: k8s_auth
# if: env.ZOMBIE_PROVIDER == 'k8s'
# shell: bash
# run: |
# . /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
# k8s_auth

- name: zombie_test
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/zombienet_cumulus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
POLKADOT_IMAGE: "${{ needs.preflight.outputs.TEMP_IMAGES_BASE }}/polkadot-debug:${{ needs.preflight.outputs.DOCKER_IMAGES_VERSION }}"
CUMULUS_IMAGE: "${{ needs.preflight.outputs.TEMP_IMAGES_BASE }}/${{ matrix.test.cumulus-image }}:${{ needs.preflight.outputs.DOCKER_IMAGES_VERSION }}"
RUST_LOG: ${{ needs.preflight.outputs.RUST_LOG }}
ZOMBIE_PROVIDER: ${{ needs.preflight.outputs.ZOMBIE_PROVIDER }}
ZOMBIE_PROVIDER: ${{ matrix.test.use-provider && matrix.test.use-provider || needs.preflight.outputs.ZOMBIE_PROVIDER }}

strategy:
fail-fast: false
Expand Down
7 changes: 7 additions & 0 deletions .github/zombienet-tests/zombienet_cumulus_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,10 @@
cumulus-image: "test-parachain"
use-zombienet-sdk: true
needs-wasm-binary: true

- job-name: "zombienet-cumulus-statement_store_latency_bench"
test-filter: "zombie_ci::statement_store_bench::statement_store_latency_bench"
runner-type: "default"
cumulus-image: "test-parachain"
use-zombienet-sdk: true
use-provider: "k8s"
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use log::{debug, info, trace};
use sc_statement_store::{DEFAULT_MAX_TOTAL_SIZE, DEFAULT_MAX_TOTAL_STATEMENTS};
use sp_core::{blake2_256, sr25519, Bytes, Pair};
use sp_statement_store::{Channel, Statement, Topic};
use std::{cell::Cell, collections::HashMap, time::Duration};
use std::{cell::Cell, collections::HashMap, env, sync::Arc, time::Duration};
use tokio::time::timeout;
use zombienet_sdk::{
subxt::{backend::rpc::RpcClient, ext::subxt_rpcs::rpc_params},
Expand Down Expand Up @@ -312,13 +312,35 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> {
async fn spawn_network(collators: &[&str]) -> Result<Network<LocalFileSystem>, anyhow::Error> {
assert!(collators.len() >= 2);
let images = zombienet_sdk::environment::get_images_from_env();
let maybe_req_cpu = env::var("ZOMBIE_K8S_REQUEST_CPU");
let maybe_req_mem = env::var("ZOMBIE_K8S_REQUEST_MEM");
let config = NetworkConfigBuilder::new()
.with_relaychain(|r| {
r.with_chain("rococo-local")
let r = r
.with_chain("rococo-local")
.with_default_command("polkadot")
.with_default_image(images.polkadot.as_str())
.with_default_args(vec!["-lparachain=debug".into()])
.with_node(|node| node.with_name("validator-0"))
.with_default_args(vec!["-lparachain=debug".into()]);

let r = match (&maybe_req_cpu, &maybe_req_mem) {
(Err(_), Err(_)) => r,
_ => r.with_default_resources(|resources| {
let resources = if let Ok(cpu_req) = &maybe_req_cpu {
resources.with_request_cpu(cpu_req.as_str())
} else {
resources
};

let resources = if let Ok(mem_req) = &maybe_req_mem {
resources.with_request_memory(mem_req.as_str())
} else {
resources
};
resources
}),
};

r.with_node(|node| node.with_name("validator-0"))
.with_node(|node| node.with_name("validator-1"))
})
.with_parachain(|p| {
Expand All @@ -332,9 +354,28 @@ async fn spawn_network(collators: &[&str]) -> Result<Network<LocalFileSystem>, a
"-lstatement-store=info,statement-gossip=info,error".into(),
"--enable-statement-store".into(),
"--rpc-max-connections=50000".into(),
])
// Have to set outside of the loop below, so that `p` has the right type.
.with_collator(|n| n.with_name(collators[0]));
]);

let p = match (&maybe_req_cpu, &maybe_req_mem) {
(Err(_), Err(_)) => p,
_ => p.with_default_resources(|resources| {
let resources = if let Ok(cpu_req) = maybe_req_cpu {
resources.with_request_cpu(cpu_req.as_str())
} else {
resources
};

let resources = if let Ok(mem_req) = maybe_req_mem {
resources.with_request_memory(mem_req.as_str())
} else {
resources
};
resources
}),
};

// Have to set outside of the loop below, so that `p` has the right type.
let p = p.with_collator(|n| n.with_name(collators[0]));

collators[1..]
.iter()
Expand Down Expand Up @@ -762,3 +803,249 @@ fn channel_message(sender: &sr25519::Public, receiver: &sr25519::Public) -> Chan
data.extend_from_slice(receiver.as_ref());
blake2_256(&data)
}

struct LatencyBenchConfig {
num_rounds: usize,
num_nodes: usize,
num_clients: u32,
max_retries: u32,
retry_delay_ms: u64,
propagation_delay_ms: u64,
interval_ms: u64,
req_timeout_ms: u64,
messages_pattern: &'static [(usize, usize)],
}

impl LatencyBenchConfig {
fn messages_per_client(&self) -> usize {
self.messages_pattern.iter().map(|(count, _)| count).sum()
}
}

#[derive(Debug, Clone)]
struct RoundStats {
send_duration: Duration,
receive_duration: Duration,
full_latency: Duration,
sent_count: u32,
received_count: u32,
receive_attempts: u32,
}

#[tokio::test(flavor = "multi_thread")]
async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {
let _ = env_logger::try_init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

let config = Arc::new(LatencyBenchConfig {
num_nodes: 5,
num_clients: 50000,
interval_ms: 10000,
num_rounds: 1,
messages_pattern: &[(5, 1024 / 2)],
max_retries: 500,
retry_delay_ms: 100,
req_timeout_ms: 3000,
propagation_delay_ms: 2000,
});

let collator_names: Vec<String> =
(0..config.num_nodes).map(|i| format!("collator{}", i)).collect();
let collator_names: Vec<&str> = collator_names.iter().map(|s| s.as_str()).collect();

let network = spawn_network(&collator_names).await?;

info!("Starting Latency benchmark");
info!("");
info!("Clients: {}", config.num_clients);
info!("Nodes: {}", config.num_nodes);
info!("Rounds: {}", config.num_rounds);
info!("Interval, ms: {}", config.interval_ms);
info!("Messages, per round: {}", config.messages_per_client() as u32 * config.num_clients);
info!("Message pattern:");
for &(count, size) in config.messages_pattern {
info!(" - {} messages {} bytes", count, size);
}

let mut rpc_clients = Vec::new();
for &name in &collator_names {
let node = network.get_node(name)?;
let rpc_client = node.rpc().await?;
rpc_clients.push(rpc_client);
}

let handles: Vec<_> = (0..config.num_clients)
.map(|client_id| {
let config = Arc::clone(&config);
let (keyring, _) = sr25519::Pair::generate();
let node_idx = (client_id as usize) % config.num_nodes;
let rpc_client = rpc_clients[node_idx].clone();
let neighbour_id = (client_id + 1) % config.num_clients;
let neighbour_node_idx = (neighbour_id as usize) % config.num_nodes;
if node_idx == neighbour_node_idx && config.num_nodes > 1 {
panic!(
"Client {} and neighbour {} are on the same node {}!",
client_id, neighbour_id, node_idx
);
}

tokio::spawn(async move {
let mut rounds_stats = Vec::new();
for round in 0..config.num_rounds {
let round_start = std::time::Instant::now();

let send_start = std::time::Instant::now();
let mut msg_idx: u32 = 0;

for &(count, size) in config.messages_pattern {
for _ in 0..count {
let mut statement = Statement::new();
let topic = blake2_256(
format!("{}-{}-{}", client_id, round, msg_idx).as_bytes(),
);
let channel = blake2_256(msg_idx.to_le_bytes().as_ref());

statement.set_channel(channel);
statement.set_priority(round as u32);
statement.set_topic(0, topic);
statement.set_plain_data(vec![0u8; size]);
statement.sign_sr25519_private(&keyring);

let encoded: Bytes = statement.encode().into();
rpc_client
.request::<()>("statement_submit", rpc_params![encoded])
.await?;

msg_idx += 1;
}
}

let sent_count = msg_idx;
let send_duration = send_start.elapsed();

tokio::time::sleep(Duration::from_millis(config.propagation_delay_ms)).await;

let receive_start = std::time::Instant::now();
let mut received_count = 0;
let mut receive_attempts = 0;

for msg_idx in 0..config.messages_per_client() as u32 {
let topic = blake2_256(
format!("{}-{}-{}", neighbour_id, round, msg_idx).as_bytes(),
);

for retry in 0..config.max_retries {
receive_attempts += 1;
match timeout(
Duration::from_millis(config.req_timeout_ms),
rpc_client.request::<Vec<Bytes>>(
"statement_broadcastsStatement",
rpc_params![vec![topic]],
),
)
.await
{
Ok(Ok(statements)) if !statements.is_empty() => {
received_count += statements.len() as u32;
break;
},
_ if retry < config.max_retries - 1 => {
tokio::time::sleep(Duration::from_millis(
config.retry_delay_ms,
))
.await;
},
_ => {
return Err(anyhow!(
"Client {}: Failed to retrieve message {} from neighbour {} after {} retries",
client_id, msg_idx, neighbour_id, config.max_retries
));
},
}
}
}

let receive_duration = receive_start.elapsed();
let full_latency = round_start.elapsed();
if full_latency < Duration::from_millis(config.interval_ms) {
tokio::time::sleep(
Duration::from_millis(config.interval_ms) - full_latency,
)
.await;
}

rounds_stats.push(RoundStats {
send_duration,
receive_duration,
full_latency,
sent_count,
received_count,
receive_attempts,
});
}

// Verify all messages were sent and received
let expected_count = config.messages_per_client() as u32;
for stats in &rounds_stats {
if stats.sent_count != expected_count {
return Err(anyhow!(
"Client {}: Expected {} messages sent, but got {}",
client_id,
expected_count,
stats.sent_count
));
}
if stats.received_count != expected_count {
return Err(anyhow!(
"Client {}: Expected {} messages received, but got {}",
client_id,
expected_count,
stats.received_count
));
}
}

Ok::<_, anyhow::Error>(rounds_stats)
})
})
.collect();

let mut all_round_stats = Vec::new();
for handle in handles {
let stats = handle.await??;
all_round_stats.extend(stats);
}

let calc_stats = |values: Vec<f64>| -> (f64, f64, f64) {
let min = values.iter().copied().fold(f64::INFINITY, f64::min);
let max = values.iter().copied().fold(f64::NEG_INFINITY, f64::max);
let avg = values.iter().sum::<f64>() / values.len() as f64;
(min, avg, max)
};

let send_s =
calc_stats(all_round_stats.iter().map(|s| s.send_duration.as_secs_f64()).collect());
let read_s =
calc_stats(all_round_stats.iter().map(|s| s.receive_duration.as_secs_f64()).collect());
let latency_s =
calc_stats(all_round_stats.iter().map(|s| s.full_latency.as_secs_f64()).collect());
let attempts = calc_stats(all_round_stats.iter().map(|s| s.receive_attempts as f64).collect());
let attempts_per_msg = (
attempts.0 / config.messages_per_client() as f64,
attempts.1 / config.messages_per_client() as f64,
attempts.2 / config.messages_per_client() as f64,
);

info!("");
info!(" Min Avg Max");
info!("Send, s {:>8.3} {:>8.3} {:>8.3}", send_s.0, send_s.1, send_s.2);
info!("Receive, s {:>8.3} {:>8.3} {:>8.3}", read_s.0, read_s.1, read_s.2);
info!("Latency, s {:>8.3} {:>8.3} {:>8.3}", latency_s.0, latency_s.1, latency_s.2);
info!(
"Attempts, per msg {:>8.1} {:>8.1} {:>8.1}",
attempts_per_msg.0, attempts_per_msg.1, attempts_per_msg.2
);

Ok(())
}
Loading