diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs new file mode 100644 index 00000000000..c02e8120ac0 --- /dev/null +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/handlers.rs @@ -0,0 +1,20 @@ +use std::time::SystemTime; + +use libp2p::PeerId; +use tracing::trace; + +use crate::message::StressTestMessage; + +pub fn receive_stress_test_message(received_message: Vec, sender_peer_id: Option) { + let end_time = SystemTime::now(); + + let received_message: StressTestMessage = received_message.into(); + let start_time = received_message.metadata.time; + let delay_seconds = end_time + .duration_since(start_time) + .expect("End time should be after start time (Probably clock misalignment)") + .as_secs_f64(); + + // TODO(AndrewL): Replace this with metric updates + trace!("Received stress test message from {sender_peer_id:?} in {delay_seconds} seconds"); +} diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs index 91846d89c9a..e90ccd282da 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs @@ -10,6 +10,7 @@ use tracing::Level; #[cfg(test)] mod message_test; +mod handlers; mod message; mod protocol; mod stress_test_node; diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs index 171dee600ee..80551cddd32 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs @@ -10,6 +10,7 @@ use libp2p::Multiaddr; use tokio::task::JoinHandle; use tracing::{info, warn}; +use crate::handlers::receive_stress_test_message; use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender}; /// The main stress test node that manages network communication and monitoring @@ -19,8 +20,6 @@ pub struct BroadcastNetworkStressTestNode { // TODO(AndrewL): Remove this once they are used #[allow(dead_code)] message_sender: Option, - // TODO(AndrewL): Remove this once they are used - #[allow(dead_code)] message_receiver: Option, } @@ -83,10 +82,24 @@ impl BroadcastNetworkStressTestNode { .boxed() } + /// Starts the message receiving task + pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> { + let message_receiver = + self.message_receiver.take().expect("message_receiver should be available"); + + async move { + info!("Starting message receiver"); + message_receiver.for_each(receive_stress_test_message).await; + info!("Message receiver task ended"); + } + .boxed() + } + /// Gets all the tasks that need to be run async fn get_tasks(&mut self) -> Vec> { let mut tasks = Vec::new(); tasks.push(self.start_network_manager().await); + tasks.push(self.make_message_receiver_task().await); tasks }