Skip to content

Commit 7442cf7

Browse files
committed
chore(msg-sim): example sim_multi_region
1 parent ab0f6b4 commit 7442cf7

File tree

5 files changed

+409
-1
lines changed

5 files changed

+409
-1
lines changed

msg-sim/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,14 @@ futures.workspace = true
2525
rtnetlink = { version = "0.20.0" }
2626

2727
[dev-dependencies]
28-
tracing-subscriber = "0.3"
28+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
29+
tokio = { version = "1", features = [
30+
"rt-multi-thread",
31+
"macros",
32+
"time",
33+
"net",
34+
] }
35+
36+
[[example]]
37+
name = "sim_multi_region"
38+
path = "examples/sim_multi_region.rs"

msg-sim/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7171
}
7272
```
7373

74+
## Known Limitations
75+
76+
### Packet Duplication Restriction
77+
78+
The Linux kernel prevents creating additional netem qdiscs on a network interface
79+
once one with `duplicate > 0` exists. This means **packet duplication can only be
80+
used on at most one outgoing link per peer**.
81+
82+
For example, if peer A has links to peers B, C, and D:
83+
- You CAN set `duplicate > 0` on the A→B link
84+
- You CANNOT then set any impairments on A→C or A→D (even without duplication)
85+
86+
**Workaround**: If you need multiple outgoing links from a peer, either use
87+
`duplicate` on only one of them, or don't use `duplicate` at all from that peer.
88+
89+
This is enforced by the [`check_netem_in_tree()`][kernel-netem] function in the
90+
Linux kernel, which returns:
91+
> "netem: cannot mix duplicating netems with other netems in tree"
92+
93+
See [`LinkImpairment::duplicate`](src/tc/impairment.rs) for details.
94+
95+
[kernel-netem]: https://github.com/torvalds/linux/blob/master/net/sched/sch_netem.c
96+
7497
## Running Tests
7598

7699
Tests require root privileges to create network namespaces:
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
//! Multi-Region Network Simulation Example
2+
//!
3+
//! Simulates 4 peers in different geographic regions with realistic network impairments.
4+
//!
5+
//! # Topology
6+
//!
7+
//! ```text
8+
//! ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
9+
//! │ EU-West │ │ US-East-1│ │ US-East-2│ │ Tokyo │
10+
//! │ 10.0.0.1 │ │ 10.0.0.2 │ │ 10.0.0.3 │ │ 10.0.0.4 │
11+
//! └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
12+
//! └────────────────┴────────────────┴────────────────┘
13+
//! Hub Bridge
14+
//! ```
15+
//!
16+
//! # Running
17+
//!
18+
//! ```bash
19+
//! sudo HOME=$HOME RUST_LOG=info $(which cargo) run --example multi_region -p msg-sim
20+
//! ```
21+
22+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
23+
use std::time::{Duration, Instant};
24+
25+
use msg_sim::ip::Subnet;
26+
use msg_sim::network::{Link, Network, PeerIdExt};
27+
use msg_sim::tc::impairment::LinkImpairment;
28+
use tracing_subscriber::EnvFilter;
29+
30+
// Peer IDs (assigned sequentially by add_peer)
31+
const EU_WEST: usize = 1;
32+
const US_EAST_1: usize = 2;
33+
const US_EAST_2: usize = 3;
34+
const TOKYO: usize = 4;
35+
36+
fn region_name(id: usize) -> &'static str {
37+
match id {
38+
EU_WEST => "EU-West",
39+
US_EAST_1 => "US-East-1",
40+
US_EAST_2 => "US-East-2",
41+
TOKYO => "Tokyo",
42+
_ => "Unknown",
43+
}
44+
}
45+
46+
/// Network impairment profiles based on typical cloud latencies.
47+
mod profiles {
48+
use super::*;
49+
50+
// US-East-1 <-> US-East-2: Same region, very fast
51+
pub fn same_region() -> LinkImpairment {
52+
LinkImpairment {
53+
latency: 1_000, // 1ms
54+
jitter: 500, // 0.5ms
55+
loss: 0.01, // 0.01%
56+
..Default::default()
57+
}
58+
}
59+
60+
// EU <-> US: Transatlantic
61+
pub fn eu_to_us() -> LinkImpairment {
62+
LinkImpairment {
63+
latency: 40_000, // 40ms one-way
64+
jitter: 5_000, // 5ms
65+
loss: 0.1, // 0.1%
66+
bandwidth_mbit_s: Some(1000.0),
67+
..Default::default()
68+
}
69+
}
70+
71+
// US <-> Tokyo: Transpacific
72+
pub fn us_to_tokyo() -> LinkImpairment {
73+
LinkImpairment {
74+
latency: 80_000, // 80ms one-way
75+
jitter: 10_000, // 10ms
76+
loss: 0.2, // 0.2%
77+
bandwidth_mbit_s: Some(500.0),
78+
..Default::default()
79+
}
80+
}
81+
82+
// EU <-> Tokyo: Longest route
83+
pub fn eu_to_tokyo() -> LinkImpairment {
84+
LinkImpairment {
85+
latency: 120_000, // 120ms one-way
86+
jitter: 15_000, // 15ms
87+
loss: 0.5, // 0.5%
88+
bandwidth_mbit_s: Some(300.0),
89+
..Default::default()
90+
}
91+
}
92+
}
93+
94+
#[tokio::main]
95+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
96+
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();
97+
98+
println!("\n=== Multi-Region Network Simulation ===\n");
99+
100+
// Create network
101+
let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24);
102+
let mut network = Network::new(subnet).await?;
103+
104+
// Add peers
105+
let eu = network.add_peer().await?;
106+
let us1 = network.add_peer().await?;
107+
let us2 = network.add_peer().await?;
108+
let tokyo = network.add_peer().await?;
109+
110+
println!("Peers:");
111+
for id in [eu, us1, us2, tokyo] {
112+
println!(" {} @ {}", region_name(id), id.veth_address(subnet));
113+
}
114+
println!();
115+
116+
// Configure impairments
117+
println!("Configuring network impairments...");
118+
119+
// US-East-1 <-> US-East-2 (same region)
120+
let same = profiles::same_region();
121+
network.apply_impairment(Link::new(us1, us2), same).await?;
122+
network.apply_impairment(Link::new(us2, us1), same).await?;
123+
println!(
124+
" US-East-1 <-> US-East-2: {}ms ±{}ms (same region)",
125+
same.latency / 1000,
126+
same.jitter / 1000
127+
);
128+
129+
// EU <-> US
130+
let eu_us = profiles::eu_to_us();
131+
for us in [us1, us2] {
132+
network.apply_impairment(Link::new(eu, us), eu_us).await?;
133+
network.apply_impairment(Link::new(us, eu), eu_us).await?;
134+
}
135+
println!(
136+
" EU <-> US: {}ms ±{}ms, {} Mbit/s",
137+
eu_us.latency / 1000,
138+
eu_us.jitter / 1000,
139+
eu_us.bandwidth_mbit_s.unwrap() as u32
140+
);
141+
142+
// US <-> Tokyo
143+
let us_tokyo = profiles::us_to_tokyo();
144+
for us in [us1, us2] {
145+
network.apply_impairment(Link::new(us, tokyo), us_tokyo).await?;
146+
network.apply_impairment(Link::new(tokyo, us), us_tokyo).await?;
147+
}
148+
println!(
149+
" US <-> Tokyo: {}ms ±{}ms, {} Mbit/s",
150+
us_tokyo.latency / 1000,
151+
us_tokyo.jitter / 1000,
152+
us_tokyo.bandwidth_mbit_s.unwrap() as u32
153+
);
154+
155+
// EU <-> Tokyo
156+
let eu_tokyo = profiles::eu_to_tokyo();
157+
network.apply_impairment(Link::new(eu, tokyo), eu_tokyo).await?;
158+
network.apply_impairment(Link::new(tokyo, eu), eu_tokyo).await?;
159+
println!(
160+
" EU <-> Tokyo: {}ms ±{}ms, {:.1}% loss, {} Mbit/s",
161+
eu_tokyo.latency / 1000,
162+
eu_tokyo.jitter / 1000,
163+
eu_tokyo.loss,
164+
eu_tokyo.bandwidth_mbit_s.unwrap() as u32
165+
);
166+
167+
println!("\n=== Ping Tests ===\n");
168+
169+
// Test connectivity
170+
let tests = [
171+
(us1, us2, "US-East-1 -> US-East-2"),
172+
(eu, us1, "EU -> US-East-1"),
173+
(us1, tokyo, "US-East-1 -> Tokyo"),
174+
(eu, tokyo, "EU -> Tokyo"),
175+
];
176+
177+
for (src, dst, label) in tests {
178+
match ping(&network, src, dst, subnet).await {
179+
Ok(rtt) => println!(" {}: {:.1}ms", label, rtt.as_secs_f64() * 1000.0),
180+
Err(e) => println!(" {}: failed ({})", label, e),
181+
}
182+
}
183+
184+
println!("\n=== Done ===\n");
185+
Ok(())
186+
}
187+
188+
/// Simple UDP ping between two peers.
189+
async fn ping(
190+
network: &Network,
191+
src: usize,
192+
dst: usize,
193+
subnet: Subnet,
194+
) -> Result<Duration, String> {
195+
let dst_addr = SocketAddr::new(dst.veth_address(subnet), 9999);
196+
197+
// Start listener
198+
let listener = network
199+
.run_in_namespace(dst, |_| {
200+
Box::pin(async {
201+
let sock = tokio::net::UdpSocket::bind("0.0.0.0:9999").await?;
202+
let mut buf = [0u8; 32];
203+
let (n, addr) = sock.recv_from(&mut buf).await?;
204+
sock.send_to(&buf[..n], addr).await?;
205+
Ok::<_, std::io::Error>(())
206+
})
207+
})
208+
.await
209+
.map_err(|e| e.to_string())?;
210+
211+
tokio::time::sleep(Duration::from_millis(10)).await;
212+
213+
// Send ping
214+
let rtt = network
215+
.run_in_namespace(src, move |_| {
216+
Box::pin(async move {
217+
let sock = tokio::net::UdpSocket::bind("0.0.0.0:0").await?;
218+
let start = Instant::now();
219+
sock.send_to(b"ping", dst_addr).await?;
220+
let mut buf = [0u8; 32];
221+
tokio::time::timeout(Duration::from_secs(5), sock.recv(&mut buf))
222+
.await
223+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))??;
224+
Ok::<_, std::io::Error>(start.elapsed())
225+
})
226+
})
227+
.await
228+
.map_err(|e| e.to_string())?
229+
.await
230+
.map_err(|e| e.to_string())?
231+
.map_err(|e| e.to_string())?;
232+
233+
let _ = listener.await;
234+
Ok(rtt)
235+
}

0 commit comments

Comments
 (0)