Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/apollo_network_benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ testing = []

[dependencies]
clap = { workspace = true, features = ["derive", "env"] }
futures.workspace = true
lazy_static.workspace = true
metrics-exporter-prometheus.workspace = true
tokio = { workspace = true, features = ["full", "sync"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use tracing::Level;
mod message_test;

mod message;
mod stress_test_node;

use apollo_network_benchmark::node_args::NodeArgs;
use stress_test_node::BroadcastNetworkStressTestNode;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -48,5 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.describe_and_run(),
);

Ok(())
// Create and run the stress test node
let stress_test_node = BroadcastNetworkStressTestNode::new(args).await;
stress_test_node.run().await
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::time::Duration;

use apollo_network_benchmark::node_args::NodeArgs;
use futures::future::{select_all, BoxFuture};
use tokio::task::JoinHandle;
use tracing::{info, warn};

/// The main stress test node that manages network communication and monitoring
pub struct BroadcastNetworkStressTestNode {
args: NodeArgs,
}

impl BroadcastNetworkStressTestNode {
/// Creates a new BroadcastNetworkStressTestNode instance
pub async fn new(args: NodeArgs) -> Self {
Self { args }
}

/// Gets all the tasks that need to be run
async fn get_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> {
Vec::new()
}

/// Unified run function that handles both simple and network reset modes
pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
let test_timeout = Duration::from_secs(self.args.user.timeout);
let start_time = tokio::time::Instant::now();
// Main loop - restart if network reset is enabled, otherwise run once

info!("Starting/restarting all tasks");

// Start all common tasks
let tasks = self.get_tasks().await;

// Wait for either timeout or any task completion
let remaining_time = test_timeout.saturating_sub(start_time.elapsed());
let spawned_tasks: Vec<_> = tasks.into_iter().map(|task| tokio::spawn(task)).collect();
let task_completed =
tokio::time::timeout(remaining_time, race_and_kill_tasks(spawned_tasks)).await.is_ok();

if !task_completed {
info!("Test timeout reached");
return Err("Test timeout".into());
}

Err("Tasks should never end".into())
}
}

pub async fn race_and_kill_tasks(spawned_tasks: Vec<JoinHandle<()>>) {
if spawned_tasks.is_empty() {
return;
}

// Wait for any task to complete
let (result, _index, remaining_tasks) = select_all(spawned_tasks).await;

// Log the result of the completed task
if let Err(e) = result {
warn!("Task completed with error: {:?}", e);
}

// Abort all remaining tasks
for task in remaining_tasks {
task.abort();
}
}
5 changes: 5 additions & 0 deletions crates/apollo_network_benchmark/src/node_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub struct UserArgs {
/// Set the verbosity level of the logger, the higher the more verbose
#[arg(short, long, env, default_value = "2")]
pub verbosity: u8,

/// The timeout in seconds for the node.
/// When the node runs for longer than this, it will be killed.
#[arg(long, env, default_value = "4000")]
pub timeout: u64,
}

#[derive(Parser, Debug, Clone)]
Expand Down
Loading