Skip to content

Commit 00e689e

Browse files
apollo_network: broadcast_network_stress_test_node updated message
1 parent bde7fae commit 00e689e

File tree

3 files changed

+139
-44
lines changed

3 files changed

+139
-44
lines changed
Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,88 @@
1-
use std::mem::size_of;
2-
use std::str::FromStr;
3-
use std::time::{Duration, SystemTime};
1+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
42

5-
use libp2p::PeerId;
3+
use lazy_static::lazy_static;
64

7-
pub const METADATA_SIZE: usize = size_of::<u32>() + size_of::<u64>() + size_of::<u32>() + 38;
5+
lazy_static! {
6+
// Calculate actual metadata size based on serialized empty message
7+
pub static ref METADATA_SIZE: usize = {
8+
let empty_message = StressTestMessage::new(0, 0, vec![]);
9+
let serialized: Vec<u8> = empty_message.into();
10+
serialized.len()
11+
};
12+
}
13+
14+
#[derive(Debug, Clone, Copy)]
15+
pub struct StressTestMessageMetadata {
16+
pub sender_id: u64,
17+
pub message_index: u64,
18+
pub time: SystemTime,
19+
}
820

921
#[derive(Debug, Clone)]
1022
pub struct StressTestMessage {
11-
pub id: u32,
23+
pub metadata: StressTestMessageMetadata,
1224
pub payload: Vec<u8>,
13-
pub time: SystemTime,
14-
pub peer_id: String,
1525
}
1626

1727
impl StressTestMessage {
18-
pub fn new(id: u32, payload: Vec<u8>, peer_id: String) -> Self {
19-
StressTestMessage { id, payload, time: SystemTime::now(), peer_id }
28+
pub fn new(sender_id: u64, message_index: u64, payload: Vec<u8>) -> Self {
29+
StressTestMessage {
30+
metadata: StressTestMessageMetadata {
31+
sender_id,
32+
message_index,
33+
time: SystemTime::now(),
34+
},
35+
payload,
36+
}
37+
}
38+
39+
#[cfg(test)]
40+
pub fn slow_len(self) -> usize {
41+
let seq = Vec::<u8>::from(self);
42+
seq.len()
43+
}
44+
45+
pub fn len(&self) -> usize {
46+
*METADATA_SIZE + self.payload.len()
2047
}
2148
}
2249

2350
impl From<StressTestMessage> for Vec<u8> {
2451
fn from(value: StressTestMessage) -> Self {
25-
let StressTestMessage { id, mut payload, time, peer_id } = value;
26-
let id = id.to_be_bytes().to_vec();
27-
let time = time.duration_since(SystemTime::UNIX_EPOCH).unwrap();
28-
let seconds = time.as_secs().to_be_bytes().to_vec();
29-
let nanos = time.subsec_nanos().to_be_bytes().to_vec();
30-
let peer_id = PeerId::from_str(&peer_id).unwrap().to_bytes();
31-
payload.extend(id);
32-
payload.extend(seconds);
33-
payload.extend(nanos);
34-
payload.extend(peer_id);
35-
payload
52+
let payload_len: u64 = value.payload.len().try_into().unwrap();
53+
let duration = value.metadata.time.duration_since(UNIX_EPOCH).unwrap();
54+
[
55+
&value.metadata.sender_id.to_be_bytes()[..],
56+
&value.metadata.message_index.to_be_bytes()[..],
57+
&duration.as_secs().to_be_bytes()[..],
58+
&duration.subsec_nanos().to_be_bytes()[..],
59+
&payload_len.to_be_bytes()[..],
60+
&value.payload[..],
61+
]
62+
.concat()
3663
}
3764
}
3865

3966
impl From<Vec<u8>> for StressTestMessage {
40-
// This auto implements TryFrom<Vec<u8>> for StressTestMessage
41-
fn from(mut value: Vec<u8>) -> Self {
42-
let vec_size = value.len();
43-
let payload_size = vec_size - METADATA_SIZE;
44-
let id_and_time = value.split_off(payload_size);
45-
let id = u32::from_be_bytes(id_and_time[0..4].try_into().unwrap());
46-
let seconds = u64::from_be_bytes(id_and_time[4..12].try_into().unwrap());
47-
let nanos = u32::from_be_bytes(id_and_time[12..16].try_into().unwrap());
48-
let time = SystemTime::UNIX_EPOCH + Duration::new(seconds, nanos);
49-
let peer_id = PeerId::from_bytes(&id_and_time[16..]).unwrap().to_string();
50-
StressTestMessage { id, payload: value, time, peer_id }
67+
fn from(bytes: Vec<u8>) -> Self {
68+
let mut i = 0;
69+
let mut get = |n: usize| {
70+
let r = &bytes[i..i + n];
71+
i += n;
72+
r
73+
};
74+
75+
let sender_id = u64::from_be_bytes(get(8).try_into().unwrap());
76+
let message_index = u64::from_be_bytes(get(8).try_into().unwrap());
77+
let secs = u64::from_be_bytes(get(8).try_into().unwrap());
78+
let nanos = u32::from_be_bytes(get(4).try_into().unwrap());
79+
let time = UNIX_EPOCH + Duration::new(secs, nanos);
80+
let payload_len = u64::from_be_bytes(get(8).try_into().unwrap()).try_into().unwrap();
81+
let payload = get(payload_len).to_vec();
82+
83+
StressTestMessage {
84+
metadata: StressTestMessageMetadata { sender_id, message_index, time },
85+
payload,
86+
}
5187
}
5288
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use rstest::rstest;
2+
3+
use crate::converters::{StressTestMessage, METADATA_SIZE};
4+
5+
#[rstest]
6+
#[case::one_byte_len(10)]
7+
#[case::two_byte_len(300)]
8+
#[case::three_byte_len(20_000)]
9+
fn test_message_size(#[case] vec_len: usize) {
10+
let payload = vec![0xAA; vec_len];
11+
let message = StressTestMessage::new(1, 7070, payload.clone());
12+
let expected_size = *METADATA_SIZE + vec_len;
13+
assert_eq!(message.len(), expected_size);
14+
assert_eq!(message.slow_len(), expected_size);
15+
}
16+
17+
#[test]
18+
fn test_serialization_and_deserilization() {
19+
let original_message =
20+
StressTestMessage::new(u64::MAX - 1, u64::MAX - 2, vec![0xa1, 0xb2, 0xc3, 0xd4, 0xe5]);
21+
22+
// Serialize to bytes
23+
let serialized_bytes: Vec<u8> = original_message.clone().into();
24+
25+
// Deserialize back to message
26+
let deserialized_message: StressTestMessage = serialized_bytes.into();
27+
28+
// Verify all fields match
29+
assert_eq!(deserialized_message.metadata.sender_id, original_message.metadata.sender_id);
30+
assert_eq!(
31+
deserialized_message.metadata.message_index,
32+
original_message.metadata.message_index
33+
);
34+
assert_eq!(deserialized_message.payload, original_message.payload);
35+
assert_eq!(deserialized_message.metadata.time, original_message.metadata.time);
36+
}
37+
38+
#[test]
39+
fn test_empty_payload() {
40+
let original_message = StressTestMessage::new(1, 42, vec![]);
41+
42+
let serialized_bytes: Vec<u8> = original_message.clone().into();
43+
let deserialized_message: StressTestMessage = serialized_bytes.into();
44+
45+
assert_eq!(deserialized_message.metadata.sender_id, original_message.metadata.sender_id);
46+
assert_eq!(
47+
deserialized_message.metadata.message_index,
48+
original_message.metadata.message_index
49+
);
50+
assert_eq!(deserialized_message.payload, original_message.payload);
51+
assert_eq!(deserialized_message.metadata.time, original_message.metadata.time);
52+
}

crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use metrics_exporter_prometheus::PrometheusBuilder;
2626
use tokio::time::Duration;
2727
use tracing::{info, trace, Level};
2828

29+
#[cfg(test)]
30+
mod converters_test;
31+
2932
mod converters;
3033
mod utils;
3134

@@ -77,18 +80,18 @@ struct Args {
7780
async fn send_stress_test_messages(
7881
mut broadcast_topic_client: BroadcastTopicClient<StressTestMessage>,
7982
args: &Args,
80-
peer_id: String,
83+
_peer_id: String,
8184
) {
8285
let mut message = StressTestMessage::new(
8386
args.id.try_into().unwrap(),
84-
vec![0; args.message_size_bytes - METADATA_SIZE],
85-
peer_id.clone(),
87+
0, // message_index, will be updated in loop
88+
vec![0; args.message_size_bytes - *METADATA_SIZE],
8689
);
8790
let duration = Duration::from_millis(args.heartbeat_millis);
8891

8992
for i in 0.. {
90-
message.time = SystemTime::now();
91-
// message.id = i;
93+
message.metadata.time = SystemTime::now();
94+
message.metadata.message_index = i;
9295
broadcast_topic_client.broadcast_message(message.clone()).await.unwrap();
9396
trace!("Sent message {i}: {:?}", message);
9497
counter!("sent_messages").increment(1);
@@ -103,7 +106,7 @@ fn receive_stress_test_message(
103106
let end_time = SystemTime::now();
104107

105108
let received_message = message_result.unwrap();
106-
let start_time = received_message.time;
109+
let start_time = received_message.metadata.time;
107110
let duration = match end_time.duration_since(start_time) {
108111
Ok(duration) => duration,
109112
Err(_) => panic!("Got a negative duration, the clocks are not synced!"),
@@ -114,16 +117,19 @@ fn receive_stress_test_message(
114117

115118
// TODO(AndrewL): Concentrate all string metrics to constants in a different file
116119
counter!("message_received").increment(1);
117-
counter!(format!("message_received_from_{}", received_message.id)).increment(1);
120+
counter!(format!("message_received_from_{}", received_message.metadata.sender_id)).increment(1);
118121

119122
// TODO(AndrewL): This should be a historgram
120123
gauge!("message_received_delay_seconds").set(delay_seconds);
121-
gauge!(format!("message_received_delay_seconds_from_{}", received_message.id))
124+
gauge!(format!("message_received_delay_seconds_from_{}", received_message.metadata.sender_id))
122125
.set(delay_seconds);
123126

124127
counter!("message_received_delay_micros_sum").increment(delay_micros);
125-
counter!(format!("message_received_delay_micros_sum_from_{}", received_message.id))
126-
.increment(delay_micros);
128+
counter!(format!(
129+
"message_received_delay_micros_sum_from_{}",
130+
received_message.metadata.sender_id
131+
))
132+
.increment(delay_micros);
127133
// TODO(AndrewL): Figure out what to log here
128134
}
129135

@@ -168,8 +174,9 @@ async fn main() {
168174
println!("Starting network stress test with args:\n{args:?}");
169175

170176
assert!(
171-
args.message_size_bytes >= METADATA_SIZE,
172-
"Message size must be at least {METADATA_SIZE} bytes"
177+
args.message_size_bytes >= *METADATA_SIZE,
178+
"Message size must be at least {} bytes",
179+
*METADATA_SIZE
173180
);
174181

175182
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new(

0 commit comments

Comments
 (0)