|
10 | 10 | //! sudo HOME=$HOME $(which cargo) run --example bdp_throughput -p msg-sim |
11 | 11 | //! ``` |
12 | 12 |
|
13 | | -use std::{ |
14 | | - net::{IpAddr, Ipv4Addr, SocketAddr}, |
15 | | - time::Instant, |
16 | | -}; |
17 | | - |
18 | | -use futures::StreamExt; |
19 | | -use msg_sim::{ |
20 | | - ip::Subnet, |
21 | | - network::{Link, Network, PeerIdExt}, |
22 | | - tc::impairment::LinkImpairment, |
23 | | -}; |
24 | | -use msg_socket::{RepSocket, ReqSocket}; |
25 | | -use msg_transport::tcp::Tcp; |
26 | | -use tracing_subscriber::EnvFilter; |
27 | | - |
28 | | -const BANDWIDTH_MBIT: f64 = 10.0; |
29 | | -const MSG_SIZE: usize = 256 * 1024; // 256 KB per message |
30 | | -const NUM_MESSAGES: usize = 20; // Send multiple to let cwnd grow |
31 | | -const LATENCY_MS: u32 = 20; // 20ms one-way = 40ms RTT |
32 | | - |
33 | | -const TCP_RMEM: &str = "/proc/sys/net/ipv4/tcp_rmem"; |
34 | | -const TCP_WINDOW_SCALING: &str = "/proc/sys/net/ipv4/tcp_window_scaling"; |
35 | | - |
36 | | -/// Transfer multiple messages and measure throughput |
37 | | -async fn transfer(network: &Network, sender: usize, receiver: usize, addr: SocketAddr) -> f64 { |
38 | | - let server = network |
39 | | - .run_in_namespace(receiver, move |_| { |
40 | | - Box::pin(async move { |
41 | | - let mut rep = RepSocket::new(Tcp::default()); |
42 | | - rep.bind(addr).await.unwrap(); |
43 | | - for _ in 0..NUM_MESSAGES { |
44 | | - if let Some(req) = rep.next().await { |
45 | | - req.respond("ok".into()).unwrap(); |
| 13 | +#[cfg(not(target_os = "linux"))] |
| 14 | +fn main() {} |
| 15 | + |
| 16 | +#[cfg(target_os = "linux")] |
| 17 | +#[tokio::main] |
| 18 | +async fn main() -> Result<(), Box<dyn std::error::Error>> { |
| 19 | + use std::{ |
| 20 | + net::{IpAddr, Ipv4Addr, SocketAddr}, |
| 21 | + time::Instant, |
| 22 | + }; |
| 23 | + |
| 24 | + use futures::StreamExt; |
| 25 | + use msg_sim::{ |
| 26 | + ip::Subnet, |
| 27 | + network::{Link, Network, PeerIdExt}, |
| 28 | + tc::impairment::LinkImpairment, |
| 29 | + }; |
| 30 | + use msg_socket::{RepSocket, ReqSocket}; |
| 31 | + use msg_transport::tcp::Tcp; |
| 32 | + use tracing_subscriber::EnvFilter; |
| 33 | + |
| 34 | + const BANDWIDTH_MBIT: f64 = 10.0; |
| 35 | + const MSG_SIZE: usize = 256 * 1024; // 256 KB per message |
| 36 | + const NUM_MESSAGES: usize = 20; // Send multiple to let cwnd grow |
| 37 | + const LATENCY_MS: u32 = 20; // 20ms one-way = 40ms RTT |
| 38 | + |
| 39 | + const TCP_RMEM: &str = "/proc/sys/net/ipv4/tcp_rmem"; |
| 40 | + const TCP_WINDOW_SCALING: &str = "/proc/sys/net/ipv4/tcp_window_scaling"; |
| 41 | + |
| 42 | + /// Transfer multiple messages and measure throughput |
| 43 | + async fn transfer(network: &Network, sender: usize, receiver: usize, addr: SocketAddr) -> f64 { |
| 44 | + let server = network |
| 45 | + .run_in_namespace(receiver, move |_| { |
| 46 | + Box::pin(async move { |
| 47 | + let mut rep = RepSocket::new(Tcp::default()); |
| 48 | + rep.bind(addr).await.unwrap(); |
| 49 | + for _ in 0..NUM_MESSAGES { |
| 50 | + if let Some(req) = rep.next().await { |
| 51 | + req.respond("ok".into()).unwrap(); |
| 52 | + } |
46 | 53 | } |
47 | | - } |
| 54 | + }) |
48 | 55 | }) |
49 | | - }) |
50 | | - .await |
51 | | - .unwrap(); |
52 | | - |
53 | | - tokio::time::sleep(std::time::Duration::from_millis(100)).await; |
54 | | - |
55 | | - let client = network |
56 | | - .run_in_namespace(sender, move |_| { |
57 | | - Box::pin(async move { |
58 | | - let mut req = ReqSocket::new(Tcp::default()); |
59 | | - req.connect_sync(addr); |
60 | | - let payload = vec![0u8; MSG_SIZE]; |
61 | | - let start = Instant::now(); |
62 | | - for _ in 0..NUM_MESSAGES { |
63 | | - req.request(payload.clone().into()).await.unwrap(); |
64 | | - } |
65 | | - start.elapsed() |
| 56 | + .await |
| 57 | + .unwrap(); |
| 58 | + |
| 59 | + tokio::time::sleep(std::time::Duration::from_millis(100)).await; |
| 60 | + |
| 61 | + let client = network |
| 62 | + .run_in_namespace(sender, move |_| { |
| 63 | + Box::pin(async move { |
| 64 | + let mut req = ReqSocket::new(Tcp::default()); |
| 65 | + req.connect_sync(addr); |
| 66 | + let payload = vec![0u8; MSG_SIZE]; |
| 67 | + let start = Instant::now(); |
| 68 | + for _ in 0..NUM_MESSAGES { |
| 69 | + req.request(payload.clone().into()).await.unwrap(); |
| 70 | + } |
| 71 | + start.elapsed() |
| 72 | + }) |
66 | 73 | }) |
67 | | - }) |
68 | | - .await |
69 | | - .unwrap(); |
| 74 | + .await |
| 75 | + .unwrap(); |
70 | 76 |
|
71 | | - let (elapsed, _) = tokio::try_join!(client, server).unwrap(); |
72 | | - println!("Transfer elapsed: {elapsed:?}"); |
| 77 | + let (elapsed, _) = tokio::try_join!(client, server).unwrap(); |
| 78 | + println!("Transfer elapsed: {elapsed:?}"); |
73 | 79 |
|
74 | | - let total_bytes = MSG_SIZE * NUM_MESSAGES; |
75 | | - (total_bytes as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0) |
76 | | -} |
| 80 | + let total_bytes = MSG_SIZE * NUM_MESSAGES; |
| 81 | + (total_bytes as f64 * 8.0) / (elapsed.as_secs_f64() * 1_000_000.0) |
| 82 | + } |
77 | 83 |
|
78 | | -#[tokio::main] |
79 | | -async fn main() -> Result<(), Box<dyn std::error::Error>> { |
80 | 84 | tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).init(); |
81 | 85 |
|
82 | 86 | let rtt_ms = LATENCY_MS * 2; |
|
0 commit comments