Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use apollo_network_benchmark::node_args::NodeArgs;
use libp2p::PeerId;

use crate::message::{StressTestMessage, METADATA_SIZE};
use crate::message_index_detector::MessageIndexTracker;
use crate::metrics::{
get_throughput,
BROADCAST_MESSAGE_BYTES,
Expand All @@ -17,6 +18,7 @@ use crate::metrics::{
RECEIVE_MESSAGE_BYTES_SUM,
RECEIVE_MESSAGE_COUNT,
RECEIVE_MESSAGE_DELAY_SECONDS,
RECEIVE_MESSAGE_PENDING_COUNT,
};
use crate::protocol::MessageSender;

Expand Down Expand Up @@ -62,7 +64,11 @@ pub async fn send_stress_test_messages(
}
}

pub fn receive_stress_test_message(received_message: Vec<u8>, _sender_peer_id: Option<PeerId>) {
pub fn receive_stress_test_message(
received_message: Vec<u8>,
_sender_peer_id: Option<PeerId>,
tx: tokio::sync::mpsc::UnboundedSender<(usize, u64)>,
) {
let end_time = SystemTime::now();

let received_message: StressTestMessage = received_message.into();
Expand All @@ -81,4 +87,30 @@ pub fn receive_stress_test_message(received_message: Vec<u8>, _sender_peer_id: O

// Use apollo_metrics histograms for latency measurements
RECEIVE_MESSAGE_DELAY_SECONDS.record(delay_seconds);

// Send to index tracker
tx.send((
usize::try_from(received_message.metadata.sender_id)
.expect("sender_id too large for usize"),
received_message.metadata.message_index,
))
.unwrap();
}

pub async fn record_indexed_message(
mut rx: tokio::sync::mpsc::UnboundedReceiver<(usize, u64)>,
num_peers: usize,
) {
let mut index_tracker = vec![MessageIndexTracker::default(); num_peers];
let mut all_pending = 0;
while let Some((peer_id, index)) = rx.recv().await {
let old_pending = index_tracker[peer_id].pending_messages_count();
index_tracker[peer_id].seen_message(index);
let new_pending = index_tracker[peer_id].pending_messages_count();

all_pending -= old_pending;
all_pending += new_pending;

RECEIVE_MESSAGE_PENDING_COUNT.set(all_pending.into_f64());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// TODO(AndrewL): remove this once the struct is used
#![allow(dead_code)]

#[derive(Default, Clone, Copy)]
pub struct MessageIndexTracker {
seen_messages_count: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ define_metrics!(
MetricCounter { RECEIVE_MESSAGE_COUNT, "receive_message_count", "Number of stress test messages received via broadcast", init = 0 },
MetricCounter { RECEIVE_MESSAGE_BYTES_SUM, "receive_message_bytes_sum", "Sum of the stress test messages received via broadcast", init = 0 },
MetricHistogram { RECEIVE_MESSAGE_DELAY_SECONDS, "receive_message_delay_seconds", "Message delay in seconds" },
MetricGauge { RECEIVE_MESSAGE_PENDING_COUNT, "receive_message_pending_count", "Number of stress test messages pending to be received" },

// network metrics from the network manager
MetricGauge { NETWORK_CONNECTED_PEERS, "network_connected_peers", "Number of connected peers in the network" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use libp2p::{Multiaddr, PeerId};
use tokio::task::JoinHandle;
use tracing::{info, warn};

use crate::handlers::{receive_stress_test_message, send_stress_test_messages};
use crate::handlers::{
receive_stress_test_message,
record_indexed_message,
send_stress_test_messages,
};
use crate::metrics::create_network_metrics;
use crate::protocol::{register_protocol_channels, MessageReceiver, MessageSender};

Expand Down Expand Up @@ -138,24 +142,39 @@ impl BroadcastNetworkStressTestNode {
)
}

/// Starts the message receiving task
pub async fn make_message_receiver_task(&mut self) -> BoxFuture<'static, ()> {
/// Starts the message receiving tasks (receiver + index tracker)
pub async fn make_message_receiver_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> {
let message_receiver =
self.message_receiver.take().expect("message_receiver should be available");

async move {
info!("Starting message receiver");
message_receiver.for_each(receive_stress_test_message).await;
info!("Message receiver task ended");
}
.boxed()
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let num_peers = self.args.runner.bootstrap.len();

vec![
async move {
record_indexed_message(rx, num_peers).await;
}
.boxed(),
async move {
info!("Starting message receiver");
let tx_clone = tx.clone();
message_receiver
.for_each(|message, peer_id| {
let tx_clone = tx_clone.clone();
receive_stress_test_message(message, peer_id, tx_clone);
})
.await;
info!("Message receiver task ended");
}
.boxed(),
]
}

/// Gets all the tasks that need to be run
async fn get_tasks(&mut self) -> Vec<BoxFuture<'static, ()>> {
let mut tasks = Vec::new();
tasks.push(self.start_network_manager().await);
tasks.push(self.make_message_receiver_task().await);
tasks.extend(self.make_message_receiver_tasks().await);

if let Some(sender_task) = self.start_message_sender().await {
tasks.push(sender_task);
Expand Down
Loading