Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions msg-sim/examples/bdp_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
use futures::StreamExt;
use msg_sim::{
ip::Subnet,
network::{Link, Network, PeerIdExt},
network::{Link, Network, PeerIdExt, PeerOptions},
tc::impairment::LinkImpairment,
};
use msg_socket::{RepSocket, ReqSocket};
Expand Down Expand Up @@ -93,8 +93,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 100, 0, 0)), 16);
let mut network = Network::new(subnet).await?;
let sender = network.add_peer().await?;
let receiver = network.add_peer().await?;
let sender = network.add_peer(PeerOptions::default()).await?;
let receiver = network.add_peer(PeerOptions::default()).await?;

let impairment =
LinkImpairment::default().with_latency_ms(LATENCY_MS).with_bandwidth_mbit_s(BANDWIDTH_MBIT);
Expand Down
10 changes: 5 additions & 5 deletions msg-sim/examples/sim_multi_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

use msg_sim::{
ip::Subnet,
network::{Link, Network, PeerIdExt},
network::{Link, Network, PeerIdExt, PeerOptions},
};
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -164,10 +164,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut network = Network::new(subnet).await?;

// Add peers
let eu = network.add_peer().await?;
let us1 = network.add_peer().await?;
let us2 = network.add_peer().await?;
let tokyo = network.add_peer().await?;
let eu = network.add_peer(PeerOptions::default()).await?;
let us1 = network.add_peer(PeerOptions::default()).await?;
let us2 = network.add_peer(PeerOptions::default()).await?;
let tokyo = network.add_peer(PeerOptions::default()).await?;

println!("Peers:");
for id in [eu, us1, us2, tokyo] {
Expand Down
7 changes: 5 additions & 2 deletions msg-sim/examples/tcp_tuning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ fn main() {}
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use std::net::{IpAddr, Ipv4Addr};

use msg_sim::{ip::Subnet, network::Network};
use msg_sim::{
ip::Subnet,
network::{Network, PeerOptions},
};
use tracing_subscriber::EnvFilter;

const TCP_RMEM: &str = "/proc/sys/net/ipv4/tcp_rmem";
Expand Down Expand Up @@ -42,7 +45,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create network with one peer
let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24);
let mut network = Network::new(subnet).await?;
let peer = network.add_peer().await?;
let peer = network.add_peer(PeerOptions::default()).await?;

// Tune TCP buffers in peer's namespace (min, default, max)
let tuned_buffers = "4096 1048576 16777216"; // 4KB / 1MB / 16MB
Expand Down
45 changes: 27 additions & 18 deletions msg-sim/src/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::oneshot;

use crate::dynch::{DynCh, DynRequestSender};
use crate::namespace::helpers::current_netns;
use crate::network::RuntimeFactory;

/// Base directory for named network namespaces.
///
Expand Down Expand Up @@ -230,6 +231,7 @@ impl NetworkNamespaceInner {
/// `setns(2)`, which is thread-local.
pub fn spawn<Ctx: 'static>(
self,
runtime_factory: RuntimeFactory,
make_ctx: impl FnOnce() -> Ctx + Send + 'static,
) -> (std::thread::JoinHandle<Result<()>>, DynRequestSender<Ctx>) {
let (tx, mut rx) = DynCh::<Ctx>::channel(8);
Expand All @@ -245,7 +247,7 @@ impl NetworkNamespaceInner {
// Create mount namespace and remount /proc for namespace-specific sysctl access
helpers::setup_mount_namespace()?;

let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
let rt = runtime_factory();

tracing::debug!("started runtime");
drop(_span);
Expand Down Expand Up @@ -286,6 +288,7 @@ pub struct NetworkNamespace<Ctx = ()> {
impl NetworkNamespace {
pub async fn new<Ctx: 'static>(
name: impl Into<String>,
runtime_factory: RuntimeFactory,
make_ctx: impl FnOnce() -> Ctx + Send + 'static,
) -> Result<NetworkNamespace<Ctx>> {
let name = name.into();
Expand All @@ -296,7 +299,7 @@ impl NetworkNamespace {
let file = tokio::fs::File::open(path).await?.into_std().await;

let inner = NetworkNamespaceInner { name, file };
let (_receiver_task, task_sender) = inner.try_clone()?.spawn(make_ctx);
let (_receiver_task, task_sender) = inner.try_clone()?.spawn(runtime_factory, make_ctx);

Ok(NetworkNamespace::<Ctx> { inner, task_sender, _receiver_task })
}
Expand Down Expand Up @@ -349,11 +352,19 @@ mod tests {

const TCP_SLOW_START_AFTER_IDLE: &str = "/proc/sys/net/ipv4/tcp_slow_start_after_idle";

fn default_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread().enable_all().build().expect("to create runtime")
}

#[tokio::test(flavor = "multi_thread")]
async fn mount_namespace_isolates_proc() {
// Create two namespaces
let ns1 = NetworkNamespace::new("test-ns-mount-1", || ()).await.unwrap();
let ns2 = NetworkNamespace::new("test-ns-mount-2", || ()).await.unwrap();
let ns1 = NetworkNamespace::new("test-ns-mount-1", Box::new(default_runtime), || ())
.await
.unwrap();
let ns2 = NetworkNamespace::new("test-ns-mount-2", Box::new(default_runtime), || ())
.await
.unwrap();

// Verify /proc is mounted in ns1 by checking /proc/self/ns/net exists
let proc_mounted_ns1: bool = ns1
Expand Down Expand Up @@ -385,8 +396,12 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn sysctl_values_are_namespace_specific() {
// Create two namespaces
let ns1 = NetworkNamespace::new("test-ns-sysctl-1", || ()).await.unwrap();
let ns2 = NetworkNamespace::new("test-ns-sysctl-2", || ()).await.unwrap();
let ns1 = NetworkNamespace::new("test-ns-sysctl-1", Box::new(default_runtime), || ())
.await
.unwrap();
let ns2 = NetworkNamespace::new("test-ns-sysctl-2", Box::new(default_runtime), || ())
.await
.unwrap();

// Set different values in each namespace
let write_result_ns1: std::io::Result<()> = ns1
Expand Down Expand Up @@ -446,24 +461,21 @@ mod tests {

assert_eq!(value_ns1, "0", "ns1 should have tcp_slow_start_after_idle=0");
assert_eq!(value_ns2, "1", "ns2 should have tcp_slow_start_after_idle=1");
assert_ne!(
value_ns1, value_ns2,
"sysctls should be isolated between namespaces"
);
assert_ne!(value_ns1, value_ns2, "sysctls should be isolated between namespaces");
}

#[tokio::test(flavor = "multi_thread")]
async fn namespace_has_isolated_network_identity() {
// Create a namespace
let ns = NetworkNamespace::new("test-ns-identity", || ()).await.unwrap();
let ns = NetworkNamespace::new("test-ns-identity", Box::new(default_runtime), || ())
.await
.unwrap();

// Get the network namespace inode from inside the namespace
let ns_inode_inside: u64 = ns
.task_sender
.submit(|_: &mut ()| -> DynFuture<'_, u64> {
Box::pin(async {
helpers::current_netns().map(|id| id.inode).unwrap_or(0)
})
Box::pin(async { helpers::current_netns().map(|id| id.inode).unwrap_or(0) })
})
.await
.unwrap()
Expand All @@ -476,9 +488,6 @@ mod tests {

assert_ne!(ns_inode_inside, 0, "should get valid inode inside namespace");
assert_ne!(host_inode, 0, "should get valid host inode");
assert_ne!(
ns_inode_inside, host_inode,
"namespace inode should differ from host"
);
assert_ne!(ns_inode_inside, host_inode, "namespace inode should differ from host");
}
}
Loading
Loading