Skip to content

Commit 36e3ff5

Browse files
committed
feat(msg-sim): configurable runtime
1 parent 2e57baf commit 36e3ff5

File tree

5 files changed

+164
-104
lines changed

5 files changed

+164
-104
lines changed

msg-sim/examples/bdp_throughput.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2424
use futures::StreamExt;
2525
use msg_sim::{
2626
ip::Subnet,
27-
network::{Link, Network, PeerIdExt},
27+
network::{HubOptions, Link, Network, PeerIdExt, PeerOptions},
2828
tc::impairment::LinkImpairment,
2929
};
3030
use msg_socket::{RepSocket, ReqSocket};
@@ -92,9 +92,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9292
println!("Transfer: {} messages × {} KB = {} MB\n", NUM_MESSAGES, MSG_SIZE / 1024, total_mb);
9393

9494
let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 100, 0, 0)), 16);
95-
let mut network = Network::new(subnet).await?;
96-
let sender = network.add_peer().await?;
97-
let receiver = network.add_peer().await?;
95+
let mut network = Network::new(subnet, HubOptions::default()).await?;
96+
let sender = network.add_peer(PeerOptions::default()).await?;
97+
let receiver = network.add_peer(PeerOptions::default()).await?;
9898

9999
let impairment =
100100
LinkImpairment::default().with_latency_ms(LATENCY_MS).with_bandwidth_mbit_s(BANDWIDTH_MBIT);

msg-sim/examples/sim_multi_region.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3232

3333
use msg_sim::{
3434
ip::Subnet,
35-
network::{Link, Network, PeerIdExt},
35+
network::{HubOptions, Link, Network, PeerIdExt, PeerOptions},
3636
};
3737
use tracing_subscriber::EnvFilter;
3838

@@ -161,13 +161,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
161161

162162
// Create network
163163
let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24);
164-
let mut network = Network::new(subnet).await?;
164+
let mut network = Network::new(subnet, HubOptions::default()).await?;
165165

166166
// Add peers
167-
let eu = network.add_peer().await?;
168-
let us1 = network.add_peer().await?;
169-
let us2 = network.add_peer().await?;
170-
let tokyo = network.add_peer().await?;
167+
let eu = network.add_peer(PeerOptions::default()).await?;
168+
let us1 = network.add_peer(PeerOptions::default()).await?;
169+
let us2 = network.add_peer(PeerOptions::default()).await?;
170+
let tokyo = network.add_peer(PeerOptions::default()).await?;
171171

172172
println!("Peers:");
173173
for id in [eu, us1, us2, tokyo] {

msg-sim/examples/tcp_tuning.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn main() {}
1414
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1515
use std::net::{IpAddr, Ipv4Addr};
1616

17-
use msg_sim::{ip::Subnet, network::Network};
17+
use msg_sim::{ip::Subnet, network::{HubOptions, Network, PeerOptions}};
1818
use tracing_subscriber::EnvFilter;
1919

2020
const TCP_RMEM: &str = "/proc/sys/net/ipv4/tcp_rmem";
@@ -41,8 +41,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4141

4242
// Create network with one peer
4343
let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24);
44-
let mut network = Network::new(subnet).await?;
45-
let peer = network.add_peer().await?;
44+
let mut network = Network::new(subnet, HubOptions::default()).await?;
45+
let peer = network.add_peer(PeerOptions::default()).await?;
4646

4747
// Tune TCP buffers in peer's namespace (min, default, max)
4848
let tuned_buffers = "4096 1048576 16777216"; // 4KB / 1MB / 16MB

msg-sim/src/namespace.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tokio::sync::oneshot;
88

99
use crate::dynch::{DynCh, DynRequestSender};
1010
use crate::namespace::helpers::current_netns;
11+
use crate::network::RuntimeMakerFn;
1112

1213
/// Base directory for named network namespaces.
1314
///
@@ -230,6 +231,7 @@ impl NetworkNamespaceInner {
230231
/// `setns(2)`, which is thread-local.
231232
pub fn spawn<Ctx: 'static>(
232233
self,
234+
make_runtime: RuntimeMakerFn,
233235
make_ctx: impl FnOnce() -> Ctx + Send + 'static,
234236
) -> (std::thread::JoinHandle<Result<()>>, DynRequestSender<Ctx>) {
235237
let (tx, mut rx) = DynCh::<Ctx>::channel(8);
@@ -245,7 +247,7 @@ impl NetworkNamespaceInner {
245247
// Create mount namespace and remount /proc for namespace-specific sysctl access
246248
helpers::setup_mount_namespace()?;
247249

248-
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
250+
let rt = make_runtime();
249251

250252
tracing::debug!("started runtime");
251253
drop(_span);
@@ -286,6 +288,7 @@ pub struct NetworkNamespace<Ctx = ()> {
286288
impl NetworkNamespace {
287289
pub async fn new<Ctx: 'static>(
288290
name: impl Into<String>,
291+
make_runtime: RuntimeMakerFn,
289292
make_ctx: impl FnOnce() -> Ctx + Send + 'static,
290293
) -> Result<NetworkNamespace<Ctx>> {
291294
let name = name.into();
@@ -296,7 +299,7 @@ impl NetworkNamespace {
296299
let file = tokio::fs::File::open(path).await?.into_std().await;
297300

298301
let inner = NetworkNamespaceInner { name, file };
299-
let (_receiver_task, task_sender) = inner.try_clone()?.spawn(make_ctx);
302+
let (_receiver_task, task_sender) = inner.try_clone()?.spawn(make_runtime, make_ctx);
300303

301304
Ok(NetworkNamespace::<Ctx> { inner, task_sender, _receiver_task })
302305
}
@@ -349,11 +352,18 @@ mod tests {
349352

350353
const TCP_SLOW_START_AFTER_IDLE: &str = "/proc/sys/net/ipv4/tcp_slow_start_after_idle";
351354

355+
fn default_runtime() -> tokio::runtime::Runtime {
356+
tokio::runtime::Builder::new_multi_thread()
357+
.enable_all()
358+
.build()
359+
.expect("to create runtime")
360+
}
361+
352362
#[tokio::test(flavor = "multi_thread")]
353363
async fn mount_namespace_isolates_proc() {
354364
// Create two namespaces
355-
let ns1 = NetworkNamespace::new("test-ns-mount-1", || ()).await.unwrap();
356-
let ns2 = NetworkNamespace::new("test-ns-mount-2", || ()).await.unwrap();
365+
let ns1 = NetworkNamespace::new("test-ns-mount-1", Box::new(default_runtime), || ()).await.unwrap();
366+
let ns2 = NetworkNamespace::new("test-ns-mount-2", Box::new(default_runtime), || ()).await.unwrap();
357367

358368
// Verify /proc is mounted in ns1 by checking /proc/self/ns/net exists
359369
let proc_mounted_ns1: bool = ns1
@@ -385,8 +395,8 @@ mod tests {
385395
#[tokio::test(flavor = "multi_thread")]
386396
async fn sysctl_values_are_namespace_specific() {
387397
// Create two namespaces
388-
let ns1 = NetworkNamespace::new("test-ns-sysctl-1", || ()).await.unwrap();
389-
let ns2 = NetworkNamespace::new("test-ns-sysctl-2", || ()).await.unwrap();
398+
let ns1 = NetworkNamespace::new("test-ns-sysctl-1", Box::new(default_runtime), || ()).await.unwrap();
399+
let ns2 = NetworkNamespace::new("test-ns-sysctl-2", Box::new(default_runtime), || ()).await.unwrap();
390400

391401
// Set different values in each namespace
392402
let write_result_ns1: std::io::Result<()> = ns1
@@ -446,24 +456,19 @@ mod tests {
446456

447457
assert_eq!(value_ns1, "0", "ns1 should have tcp_slow_start_after_idle=0");
448458
assert_eq!(value_ns2, "1", "ns2 should have tcp_slow_start_after_idle=1");
449-
assert_ne!(
450-
value_ns1, value_ns2,
451-
"sysctls should be isolated between namespaces"
452-
);
459+
assert_ne!(value_ns1, value_ns2, "sysctls should be isolated between namespaces");
453460
}
454461

455462
#[tokio::test(flavor = "multi_thread")]
456463
async fn namespace_has_isolated_network_identity() {
457464
// Create a namespace
458-
let ns = NetworkNamespace::new("test-ns-identity", || ()).await.unwrap();
465+
let ns = NetworkNamespace::new("test-ns-identity", Box::new(default_runtime), || ()).await.unwrap();
459466

460467
// Get the network namespace inode from inside the namespace
461468
let ns_inode_inside: u64 = ns
462469
.task_sender
463470
.submit(|_: &mut ()| -> DynFuture<'_, u64> {
464-
Box::pin(async {
465-
helpers::current_netns().map(|id| id.inode).unwrap_or(0)
466-
})
471+
Box::pin(async { helpers::current_netns().map(|id| id.inode).unwrap_or(0) })
467472
})
468473
.await
469474
.unwrap()
@@ -476,9 +481,6 @@ mod tests {
476481

477482
assert_ne!(ns_inode_inside, 0, "should get valid inode inside namespace");
478483
assert_ne!(host_inode, 0, "should get valid host inode");
479-
assert_ne!(
480-
ns_inode_inside, host_inode,
481-
"namespace inode should differ from host"
482-
);
484+
assert_ne!(ns_inode_inside, host_inode, "namespace inode should differ from host");
483485
}
484486
}

0 commit comments

Comments
 (0)