Skip to content

Commit c202c0e

Browse files
apollo_network: broadcast network stress test node draft
1 parent 0abe6bf commit c202c0e

File tree

12 files changed

+1433
-3
lines changed

12 files changed

+1433
-3
lines changed

Cargo.lock

Lines changed: 130 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ statistical = "1.0.0"
366366
strum = "0.25.0"
367367
strum_macros = "0.25.2"
368368
syn = "2.0.39"
369+
sysinfo = "0.37.1"
370+
tar = "0.4.38"
369371
tempfile = "3.7.0"
370372
test-case = "3.2.1"
371373
test-log = "0.2.14"

crates/apollo_network_benchmark/Cargo.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,32 @@ license-file.workspace = true
88
[features]
99
testing = []
1010

11+
1112
[dependencies]
13+
apollo_metrics.workspace = true
14+
apollo_network.workspace = true
15+
chrono.workspace = true
1216
clap = { workspace = true, features = ["derive", "env"] }
17+
futures.workspace = true
18+
hex.workspace = true
1319
lazy_static.workspace = true
20+
libp2p = { workspace = true, features = ["identify"] }
1421
metrics-exporter-prometheus.workspace = true
22+
serde.workspace = true
23+
serde_json.workspace = true
24+
serde_yaml.workspace = true
25+
sysinfo.workspace = true
1526
tokio = { workspace = true, features = ["full", "sync"] }
1627
tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] }
1728
tracing.workspace = true
1829
tracing-subscriber.workspace = true
1930

2031

2132
[dev-dependencies]
33+
assert_matches.workspace = true
2234
rstest.workspace = true
35+
tokio = { workspace = true, features = ["full", "sync", "test-util"] }
36+
tokio-stream.workspace = true
2337

2438
[lints]
2539
workspace = true
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::time::Duration;
2+
3+
use apollo_network_benchmark::node_args::NodeArgs;
4+
5+
use crate::metrics::{get_throughput, seconds_since_epoch};
6+
7+
const EXPLORE_MESSAGE_SIZES_BYTES: [usize; 13] = [
8+
1 << 10,
9+
1 << 11,
10+
1 << 12,
11+
1 << 13,
12+
1 << 14,
13+
1 << 15,
14+
1 << 16,
15+
1 << 17,
16+
1 << 18,
17+
1 << 19,
18+
1 << 20,
19+
1 << 21,
20+
1 << 22,
21+
];
22+
const EXPLORE_MESSAGE_HEARTBEAT_MILLIS: [u64; 16] =
23+
[1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 100, 150, 200, 250, 500, 1000];
24+
25+
#[derive(Debug, Clone, Copy, PartialEq)]
26+
pub enum ExplorePhase {
27+
/// In cooldown period - no broadcasting should occur
28+
CoolDown,
29+
/// In running period - broadcasting should occur (if this node is the broadcaster)
30+
Running,
31+
}
32+
33+
#[derive(Clone)]
34+
pub struct ExploreConfiguration {
35+
sorted_configurations: Vec<(usize, Duration)>,
36+
/// The broadcaster configuration index
37+
configuration_index: usize,
38+
/// Duration of the Running phase of the cycle
39+
run_duration_seconds: u64,
40+
/// Total duration for one complete cycle (cooldown + run_duration_seconds)
41+
cycle_duration_seconds: u64,
42+
}
43+
44+
impl ExploreConfiguration {
45+
pub fn new(
46+
cool_down_duration_seconds: u64,
47+
run_duration_seconds: u64,
48+
min_throughput_byte_per_seconds: f64,
49+
min_message_size_bytes: usize,
50+
) -> ExploreConfiguration {
51+
let mut sorted_configurations = Vec::with_capacity(
52+
EXPLORE_MESSAGE_SIZES_BYTES.len() * EXPLORE_MESSAGE_HEARTBEAT_MILLIS.len(),
53+
);
54+
for message_size in EXPLORE_MESSAGE_SIZES_BYTES {
55+
for heartbeat_millis in EXPLORE_MESSAGE_HEARTBEAT_MILLIS {
56+
sorted_configurations.push((message_size, Duration::from_millis(heartbeat_millis)));
57+
}
58+
}
59+
sorted_configurations.retain(|(size, duration)| {
60+
*size >= min_message_size_bytes
61+
&& get_throughput(*size, *duration) >= min_throughput_byte_per_seconds
62+
});
63+
sorted_configurations
64+
.sort_by_cached_key(|(size, duration)| get_throughput(*size, *duration) as u64);
65+
66+
let cycle_duration_seconds = cool_down_duration_seconds + run_duration_seconds;
67+
68+
Self {
69+
sorted_configurations,
70+
configuration_index: 0,
71+
run_duration_seconds,
72+
cycle_duration_seconds,
73+
}
74+
}
75+
76+
/// Gets the current phase within the current configuration cycle
77+
pub fn get_current_phase(&self) -> ExplorePhase {
78+
let now_seconds = seconds_since_epoch();
79+
let position_in_cycle_seconds = now_seconds % self.cycle_duration_seconds;
80+
81+
if position_in_cycle_seconds < self.run_duration_seconds {
82+
ExplorePhase::Running
83+
} else {
84+
ExplorePhase::CoolDown
85+
}
86+
}
87+
88+
/// Gets the current message size and duration based on synchronized time
89+
pub fn get_current_size_and_heartbeat(&mut self) -> (usize, Duration) {
90+
let config_index = self.configuration_index;
91+
self.configuration_index += 1;
92+
if self.configuration_index >= self.sorted_configurations.len() {
93+
self.configuration_index = 0;
94+
}
95+
self.sorted_configurations[config_index]
96+
}
97+
}
98+
99+
/// Extracts explore mode parameters from arguments with validation
100+
pub fn extract_explore_params(args: &NodeArgs) -> (u64, u64, f64, usize) {
101+
let cool_down = args.user.explore_cool_down_duration_seconds;
102+
let run_duration = args.user.explore_run_duration_seconds;
103+
let min_throughput = args.user.explore_min_throughput_byte_per_seconds;
104+
let min_message_size = args.user.explore_min_message_size_bytes;
105+
106+
(cool_down, run_duration, min_throughput, min_message_size)
107+
}

0 commit comments

Comments
 (0)