Skip to content

Commit a294977

Browse files
apollo_network_benchmark: added message receiving task
1 parent 9731b32 commit a294977

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use std::time::SystemTime;
2+
3+
use libp2p::PeerId;
4+
use tracing::trace;
5+
6+
use crate::message::StressTestMessage;
7+
8+
pub fn receive_stress_test_message(received_message: Vec<u8>, sender_peer_id: Option<PeerId>) {
9+
let end_time = SystemTime::now();
10+
11+
let received_message: StressTestMessage = received_message.into();
12+
let start_time = received_message.metadata.time;
13+
let delay_seconds = match end_time.duration_since(start_time) {
14+
Ok(duration) => duration.as_secs_f64(),
15+
Err(_) => {
16+
let negative_duration = start_time.duration_since(end_time).unwrap();
17+
-negative_duration.as_secs_f64()
18+
}
19+
};
20+
21+
// TODO(AndrewL): Replace this with metric updates
22+
trace!("Received stress test message from {sender_peer_id:?} in {delay_seconds} seconds");
23+
}

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use tracing::Level;
1010
#[cfg(test)]
1111
mod message_test;
1212

13+
mod handlers;
1314
mod message;
1415
mod protocol;
1516
mod stress_test_node;

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/stress_test_node.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use libp2p::Multiaddr;
1010
use tokio::task::JoinHandle;
1111
use tracing::{info, warn};
1212

13+
use crate::handlers::receive_stress_test_message;
1314
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};
1415

1516
/// The main stress test node that manages network communication and monitoring
@@ -19,8 +20,6 @@ pub struct BroadcastNetworkStressTestNode {
1920
// TODO(AndrewL): Remove this once they are used
2021
#[allow(dead_code)]
2122
message_sender: Option<MessageSender>,
22-
// TODO(AndrewL): Remove this once they are used
23-
#[allow(dead_code)]
2423
message_receiver: Option<MessageReceiver>,
2524
}
2625

@@ -85,10 +84,24 @@ impl BroadcastNetworkStressTestNode {
8584
.boxed()
8685
}
8786

87+
/// Starts the message receiving task
88+
pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> {
89+
let message_receiver =
90+
self.message_receiver.take().expect("message_receiver should be available");
91+
92+
async move {
93+
info!("Starting message receiver");
94+
message_receiver.for_each(receive_stress_test_message).await;
95+
info!("Message receiver task ended");
96+
}
97+
.boxed()
98+
}
99+
88100
/// Gets all the tasks that need to be run
89101
async fn get_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> {
90102
let mut tasks = Vec::new();
91103
tasks.push(self.start_network_manager().await);
104+
tasks.push(self.make_message_receiver_task().await);
92105

93106
tasks
94107
}

0 commit comments

Comments
 (0)