Skip to content

Commit 1b3dcbe

Browse files
apollo_network: broadcast network stress test run draft
1 parent 9996360 commit 1b3dcbe

File tree

16 files changed

+3001
-1
lines changed

16 files changed

+3001
-1
lines changed

crates/apollo_network_benchmark/README.md

Lines changed: 604 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# does not build, only copies the binary and entrypoint.sh from the local machine
2+
FROM ubuntu:24.04
3+
4+
RUN apt update && apt -y install iproute2 kmod chrony && apt clean && rm -rf /var/lib/apt/lists/*
5+
6+
COPY --from=tmp broadcast_network_stress_test_node /usr/local/bin/broadcast_network_stress_test_node
7+
COPY ./crates/apollo_network_benchmark/run/entrypoint.sh /entrypoint.sh
8+
9+
RUN chmod +x /entrypoint.sh
10+
11+
ENTRYPOINT ["/entrypoint.sh"]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# syntax = devthefuture/dockerfile-x
2+
#
3+
# Run with the project root context:
4+
# docker build -f crates/apollo_network/src/bin/broadcast_network_stress_test_node/cluster/Dockerfile .
5+
#
6+
# For time synchronization, you can either:
7+
# 1. Use the built-in chrony service (configured below), or
8+
# 2. Mount host time at runtime with: -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro
9+
10+
INCLUDE deployments/images/base/Dockerfile
11+
12+
# --- Stage 1: Install cargo-chef and prepare recipe ---
13+
FROM base AS planner
14+
WORKDIR /app
15+
COPY . .
16+
RUN cargo chef prepare --recipe-path recipe.json
17+
18+
# --- Stage 2: Build dependencies ---
19+
FROM base AS builder
20+
WORKDIR /app
21+
COPY --from=planner /app/recipe.json recipe.json
22+
RUN cargo chef cook --recipe-path recipe.json
23+
COPY . .
24+
# using tokio_unstable to get additional tokio metrics
25+
RUN RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin broadcast_network_stress_test_node
26+
27+
# --- Final Stage: Runtime image ---
28+
FROM ubuntu:24.04 AS final_stage
29+
30+
RUN apt update && apt -y install iproute2 kmod chrony && apt clean && rm -rf /var/lib/apt/lists/*
31+
32+
COPY --from=builder /app/target/release/broadcast_network_stress_test_node /usr/local/bin/broadcast_network_stress_test_node
33+
COPY --from=builder /app/crates/apollo_network_benchmark/run/entrypoint.sh /entrypoint.sh
34+
35+
RUN chmod +x /entrypoint.sh
36+
37+
ENTRYPOINT ["/entrypoint.sh"]
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
echo "Starting container with hostname: $(hostname)"
6+
7+
# For Indexed Jobs: Set stable hostname based on JOB_COMPLETION_INDEX
8+
# This enables stable DNS names like: broadcast-network-stress-test-0.broadcast-network-stress-test-headless
9+
if [ ! -z "$JOB_COMPLETION_INDEX" ]; then
10+
NEW_HOSTNAME="broadcast-network-stress-test-${JOB_COMPLETION_INDEX}"
11+
hostname "$NEW_HOSTNAME" || echo "Warning: Could not set hostname to $NEW_HOSTNAME"
12+
echo "Set hostname to: $(hostname) (based on JOB_COMPLETION_INDEX=$JOB_COMPLETION_INDEX)"
13+
fi
14+
15+
# ********************************* machine information *********************************
16+
17+
echo "Machine identification:"
18+
echo " Container ID: $(cat /proc/self/cgroup 2>/dev/null | head -1 | cut -d/ -f3 | cut -c1-12 || echo 'N/A')"
19+
echo " Host IP addresses:"
20+
ip addr show | grep -E 'inet [0-9]' | awk '{print " " $2}' || echo " IP info unavailable"
21+
echo " Machine ID: $(cat /etc/machine-id 2>/dev/null || echo 'N/A')"
22+
echo " Kernel: $(uname -r)"
23+
echo " Architecture: $(uname -m)"
24+
if [ -n "$NODE_NAME" ]; then
25+
echo " Kubernetes Node: $NODE_NAME"
26+
fi
27+
if [ -n "$KUBERNETES_NODE_NAME" ]; then
28+
echo " K8s Node Name: $KUBERNETES_NODE_NAME"
29+
fi
30+
31+
# ***************************** throttling connection start *****************************
32+
33+
set -e
34+
35+
INTERFACE="eth0" # Default Docker interface
36+
37+
# Load ifb module for ingress shaping
38+
modprobe ifb || echo "ifb module already loaded or not needed"
39+
40+
# Set up ifb0 for ingress
41+
ip link add ifb0 type ifb || true
42+
ip link set ifb0 up || true
43+
44+
# Redirect all ingress traffic to ifb0
45+
tc qdisc add dev $INTERFACE ingress handle ffff: || true
46+
tc filter add dev $INTERFACE parent ffff: protocol ip u32 match u32 0 0 action mirred egress redirect dev ifb0 || true
47+
48+
# Function to apply shaping (htb for bandwidth + netem for latency)
49+
apply_shaping() {
50+
local dev=$1
51+
local parent=$2
52+
local handle=$3 # e.g., 1 (no trailing :)
53+
54+
# If throughput is set, calculate rate in kbit/s (assuming THROUGHPUT in KB/s)
55+
if [ ! -z "${THROUGHPUT}" ]; then
56+
RATE=$((THROUGHPUT * 8))
57+
tc qdisc add dev $dev $parent handle ${handle}: htb default 1 || true
58+
tc class add dev $dev parent ${handle}: classid ${handle}:1 htb rate ${RATE}kbit ceil ${RATE}kbit || true
59+
netem_parent="${handle}:1"
60+
else
61+
netem_parent="root"
62+
fi
63+
64+
# If latency is set, add netem (delay in ms)
65+
if [ ! -z "${LATENCY}" ]; then
66+
# Use maximum queue limit to prevent packet drops
67+
LIMIT=1000000 # 1.4 GiB
68+
tc qdisc add dev $dev parent $netem_parent netem delay ${LATENCY}ms limit ${LIMIT} || true
69+
echo "Applied netem delay ${LATENCY}ms with queue limit ${LIMIT} packets"
70+
fi
71+
}
72+
73+
# Apply to egress (eth0)
74+
# apply_shaping $INTERFACE "root" "1"
75+
76+
# Apply to ingress (ifb0)
77+
apply_shaping ifb0 "root" "1"
78+
79+
# ***************************** throttling connection end *****************************
80+
81+
# Call broadcast_network_stress_test_node
82+
# Use ID from environment variable if set, otherwise use JOB_COMPLETION_INDEX (for Indexed Jobs)
83+
if [ -z "$ID" ]; then
84+
if [ ! -z "$JOB_COMPLETION_INDEX" ]; then
85+
# Used when running in Indexed Job
86+
export ID=$JOB_COMPLETION_INDEX
87+
echo "ID not set in environment, using JOB_COMPLETION_INDEX: $ID"
88+
else
89+
# Fallback: extract from hostname (legacy StatefulSet support)
90+
export ID=$(hostname | grep -o '[0-9]*$')
91+
echo "ID not set in environment, extracted from hostname: $ID"
92+
fi
93+
else
94+
# Used when running locally
95+
echo "Using ID from environment variable: $ID"
96+
fi
97+
exec broadcast_network_stress_test_node
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use std::collections::HashMap;
2+
3+
use apollo_network_benchmark::node_args::UserArgs;
4+
use clap::Parser;
5+
use serde::Serialize;
6+
7+
/// Configuration for the orchestrator that spawns multiple nodes
8+
#[derive(Parser, Debug, Clone, Serialize)]
9+
pub struct SharedArgs {
10+
/// Number of nodes to run
11+
#[arg(long, default_value = "3")]
12+
pub num_nodes: u32,
13+
14+
/// Sets the multi-addresses to use UDP/QUIC instead of TCP
15+
#[arg(long, default_value = "false")]
16+
pub quic: bool,
17+
18+
#[command(flatten)]
19+
pub user: UserArgs,
20+
}
21+
22+
pub fn get_arguments(
23+
id: Option<u32>,
24+
metric_port: u16,
25+
p2p_port: u16,
26+
bootstrap_nodes: &[String],
27+
args: &SharedArgs,
28+
) -> Vec<(String, String)> {
29+
let broadcaster =
30+
args.user.broadcaster.and_then(|b| u32::try_from(b).ok()).unwrap_or(args.num_nodes - 1);
31+
32+
let mut result = vec![];
33+
34+
if let Some(id) = id {
35+
result.push(("--id".to_string(), id.to_string()));
36+
}
37+
38+
result.extend_from_slice(&[
39+
("--metric-port".to_string(), metric_port.to_string()),
40+
("--p2p-port".to_string(), p2p_port.to_string()),
41+
("--bootstrap".to_string(), bootstrap_nodes.join(",")),
42+
("--timeout".to_string(), args.user.timeout.to_string()),
43+
("--verbosity".to_string(), args.user.verbosity.to_string()),
44+
("--buffer-size".to_string(), args.user.buffer_size.to_string()),
45+
("--message-size-bytes".to_string(), args.user.message_size_bytes.to_string()),
46+
("--heartbeat-millis".to_string(), args.user.heartbeat_millis.to_string()),
47+
("--mode".to_string(), args.user.mode.to_string()),
48+
("--network-protocol".to_string(), args.user.network_protocol.to_string()),
49+
("--broadcaster".to_string(), broadcaster.to_string()),
50+
("--round-duration-seconds".to_string(), args.user.round_duration_seconds.to_string()),
51+
(
52+
"--explore-cool-down-duration-seconds".to_string(),
53+
args.user.explore_cool_down_duration_seconds.to_string(),
54+
),
55+
(
56+
"--explore-run-duration-seconds".to_string(),
57+
args.user.explore_run_duration_seconds.to_string(),
58+
),
59+
(
60+
"--explore-min-throughput-byte-per-seconds".to_string(),
61+
args.user.explore_min_throughput_byte_per_seconds.to_string(),
62+
),
63+
(
64+
"--explore-min-message-size-bytes".to_string(),
65+
args.user.explore_min_message_size_bytes.to_string(),
66+
),
67+
("--num-nodes".to_string(), args.num_nodes.to_string()),
68+
]);
69+
70+
result
71+
}
72+
73+
pub fn get_env_vars(
74+
id: Option<u32>,
75+
metric_port: u16,
76+
p2p_port: u16,
77+
bootstrap_nodes: &[String],
78+
args: &SharedArgs,
79+
latency: Option<u32>,
80+
throughput: Option<u32>,
81+
) -> Vec<HashMap<String, String>> {
82+
let arguments = get_arguments(id, metric_port, p2p_port, bootstrap_nodes, args);
83+
84+
let mut env_vars = vec![];
85+
86+
// Convert arguments to environment variables
87+
for (name, value) in arguments {
88+
let env_name = name[2..].replace("-", "_").to_uppercase();
89+
let mut env_map = HashMap::new();
90+
env_map.insert("name".to_string(), env_name);
91+
env_map.insert("value".to_string(), value);
92+
env_vars.push(env_map);
93+
}
94+
95+
// Add latency and throughput if provided
96+
if let Some(latency) = latency {
97+
let mut env_map = HashMap::new();
98+
env_map.insert("name".to_string(), "LATENCY".to_string());
99+
env_map.insert("value".to_string(), latency.to_string());
100+
env_vars.push(env_map);
101+
}
102+
103+
if let Some(throughput) = throughput {
104+
let mut env_map = HashMap::new();
105+
env_map.insert("name".to_string(), "THROUGHPUT".to_string());
106+
env_map.insert("value".to_string(), throughput.to_string());
107+
env_vars.push(env_map);
108+
}
109+
110+
env_vars
111+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use crate::mod_utils::{cluster_deployment_file_path, read_deployment_file, run_cmd};
2+
3+
pub fn run() -> Result<(), String> {
4+
let deployment_data = read_deployment_file(&cluster_deployment_file_path())?;
5+
6+
let num_nodes = deployment_data
7+
.get("args")
8+
.and_then(|a| a.get("shared"))
9+
.and_then(|s| s.get("num_nodes"))
10+
.and_then(|n| n.as_u64())
11+
.ok_or("Failed to get num_nodes from deployment file")?;
12+
13+
let namespace_name = deployment_data
14+
.get("namespace")
15+
.and_then(|n| n.as_str())
16+
.ok_or("Failed to get namespace from deployment file")?;
17+
18+
run_cmd(
19+
&format!("kubectl get pods -n {}", namespace_name),
20+
"Check if pods are running",
21+
false,
22+
)?;
23+
24+
for i in 0..num_nodes {
25+
run_cmd(
26+
&format!(
27+
"timeout 5 kubectl logs -n {} -l \
28+
app=broadcast-network-stress-test,batch.kubernetes.io/job-completion-index={} > \
29+
/tmp/broadcast-network-stress-test-{}.logs.txt",
30+
namespace_name, i, i
31+
),
32+
&format!("Check logs for node {}", i),
33+
true,
34+
)?;
35+
}
36+
37+
run_cmd(
38+
&format!("kubectl get pods -n {}", namespace_name),
39+
"Check if pods are running",
40+
false,
41+
)?;
42+
43+
println!("Cluster logs have been saved to /tmp/broadcast-network-stress-test-*.logs.txt");
44+
Ok(())
45+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::process::{Command, Stdio};
2+
use std::thread;
3+
4+
use crate::mod_utils::{cluster_deployment_file_path, connect_to_cluster, read_deployment_file};
5+
use crate::pr;
6+
use crate::yaml_maker::PROMETHEUS_SERVICE_NAME;
7+
8+
fn port_forward(service_name: &str, local_port: u16, remote_port: u16, namespace: &str) {
9+
let _ = Command::new("kubectl")
10+
.args([
11+
"port-forward",
12+
&format!("service/{}", service_name),
13+
&format!("{}:{}", local_port, remote_port),
14+
"-n",
15+
namespace,
16+
])
17+
.stdout(Stdio::inherit())
18+
.stderr(Stdio::inherit())
19+
.status();
20+
}
21+
22+
pub fn run() -> Result<(), String> {
23+
let deployment_data = read_deployment_file(&cluster_deployment_file_path())?;
24+
25+
let namespace = deployment_data
26+
.get("namespace")
27+
.and_then(|n| n.as_str())
28+
.ok_or("No namespace found in deployment file")?;
29+
30+
connect_to_cluster()?;
31+
32+
pr!("Port forwarding in namespace: {}", namespace);
33+
pr!(" → Grafana: http://localhost:3000");
34+
pr!(" → Prometheus: http://localhost:9090");
35+
pr!("\nPress Ctrl+C to stop\n");
36+
37+
let namespace_grafana = namespace.to_string();
38+
let namespace_prometheus = namespace.to_string();
39+
40+
// Start both port-forwards in separate threads
41+
let grafana_thread = thread::spawn(move || {
42+
port_forward("grafana-service", 3000, 3000, &namespace_grafana);
43+
});
44+
45+
let prometheus_thread = thread::spawn(move || {
46+
port_forward(PROMETHEUS_SERVICE_NAME, 9090, 9090, &namespace_prometheus);
47+
});
48+
49+
// Wait for threads (they'll run until interrupted)
50+
let _ = grafana_thread.join();
51+
let _ = prometheus_thread.join();
52+
53+
Ok(())
54+
}

0 commit comments

Comments
 (0)