Skip to content

Commit 5b995ca

Browse files
apollo_network_benchmark: added message receiving task
1 parent 45412de commit 5b995ca

File tree

3 files changed

+36
-2
lines changed

3 files changed

+36
-2
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use std::time::SystemTime;
2+
3+
use libp2p::PeerId;
4+
use tracing::trace;
5+
6+
use crate::message::StressTestMessage;
7+
8+
pub fn receive_stress_test_message(received_message: Vec<u8>, sender_peer_id: Option<PeerId>) {
9+
let end_time = SystemTime::now();
10+
11+
let received_message: StressTestMessage = received_message.into();
12+
let start_time = received_message.metadata.time;
13+
let delay_seconds = end_time
14+
.duration_since(start_time)
15+
.expect("End time should be after start time (Probably clock misalignment)")
16+
.as_secs_f64();
17+
18+
// TODO(AndrewL): Replace this with metric updates
19+
trace!("Received stress test message from {sender_peer_id:?} in {delay_seconds} seconds");
20+
}

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use tracing::Level;
1010
#[cfg(test)]
1111
mod message_test;
1212

13+
mod handlers;
1314
mod message;
1415
mod protocol;
1516
mod stress_test_node;

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use libp2p::Multiaddr;
1010
use tokio::task::JoinHandle;
1111
use tracing::{info, warn};
1212

13+
use crate::handlers::receive_stress_test_message;
1314
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};
1415

1516
/// The main stress test node that manages network communication and monitoring
@@ -19,8 +20,6 @@ pub struct BroadcastNetworkStressTestNode {
1920
// TODO(AndrewL): Remove this once they are used
2021
#[allow(dead_code)]
2122
message_sender: Option<MessageSender>,
22-
// TODO(AndrewL): Remove this once they are used
23-
#[allow(dead_code)]
2423
message_receiver: Option<MessageReceiver>,
2524
}
2625

@@ -83,10 +82,24 @@ impl BroadcastNetworkStressTestNode {
8382
.boxed()
8483
}
8584

85+
/// Starts the message receiving task
86+
pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> {
87+
let message_receiver =
88+
self.message_receiver.take().expect("message_receiver should be available");
89+
90+
async move {
91+
info!("Starting message receiver");
92+
message_receiver.for_each(receive_stress_test_message).await;
93+
info!("Message receiver task ended");
94+
}
95+
.boxed()
96+
}
97+
8698
/// Gets all the tasks that need to be run
8799
async fn get_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> {
88100
let mut tasks = Vec::new();
89101
tasks.push(self.start_network_manager().await);
102+
tasks.push(self.make_message_receiver_task().await);
90103

91104
tasks
92105
}

0 commit comments

Comments
 (0)