diff --git a/hyperactor/Cargo.toml b/hyperactor/Cargo.toml index 008164ec5..5b650acbe 100644 --- a/hyperactor/Cargo.toml +++ b/hyperactor/Cargo.toml @@ -46,6 +46,7 @@ nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", " opentelemetry = "0.29" paste = "1.0.14" rand = { version = "0.8", features = ["small_rng"] } +rand_distr = "0.4" regex = "1.11.1" rustls-pemfile = "1.0.0" serde = { version = "1.0.219", features = ["derive", "rc"] } diff --git a/hyperactor/src/channel/sim.rs b/hyperactor/src/channel/sim.rs index 41a41487e..18f262331 100644 --- a/hyperactor/src/channel/sim.rs +++ b/hyperactor/src/channel/sim.rs @@ -10,13 +10,13 @@ // SimRx contains a way to receive messages. //! Local simulated channel implementation. +use std::any::Any; // send leads to add to network. use std::marker::PhantomData; use std::sync::Arc; use dashmap::DashMap; use regex::Regex; -use tokio::sync::Mutex; use super::*; use crate::channel; @@ -25,12 +25,9 @@ use crate::clock::RealClock; use crate::clock::SimClock; use crate::data::Serialized; use crate::mailbox::MessageEnvelope; -use crate::simnet; use crate::simnet::Dispatcher; use crate::simnet::Event; use crate::simnet::ScheduledEvent; -use crate::simnet::SimNetConfig; -use crate::simnet::SimNetEdge; use crate::simnet::SimNetError; use crate::simnet::simnet_handle; @@ -126,65 +123,38 @@ impl fmt::Display for SimAddr { /// Message Event that can be passed around in the simnet. #[derive(Debug)] pub(crate) struct MessageDeliveryEvent { - src_addr: Option, dest_addr: ChannelAddr, data: Serialized, - duration_ms: u64, + latency: u64, } impl MessageDeliveryEvent { /// Creates a new MessageDeliveryEvent. - pub fn new(src_addr: Option, dest_addr: ChannelAddr, data: Serialized) -> Self { + pub fn new(dest_addr: ChannelAddr, data: Serialized, latency: u64) -> Self { Self { - src_addr, dest_addr, data, - duration_ms: 100, + latency, } } } #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { // Send the message to the correct receiver. SENDER - .send( - self.src_addr.clone(), - self.dest_addr.clone(), - self.data.clone(), - ) + .send(self.dest_addr.clone(), self.data.clone()) .await?; Ok(()) } fn duration_ms(&self) -> u64 { - self.duration_ms + self.latency } fn summary(&self) -> String { - format!( - "Sending message from {} to {}", - self.src_addr - .as_ref() - .map_or("unknown".to_string(), |addr| addr.to_string()), - self.dest_addr.clone() - ) - } - - async fn read_simnet_config(&mut self, topology: &Arc>) { - if let Some(src_addr) = &self.src_addr { - let edge = SimNetEdge { - src: src_addr.clone(), - dst: self.dest_addr.clone(), - }; - self.duration_ms = topology - .lock() - .await - .topology - .get(&edge) - .map_or_else(|| 1, |v| v.latency.as_millis() as u64); - } + format!("Sending message to {}", self.dest_addr.clone()) } } @@ -194,12 +164,6 @@ pub async fn bind(addr: ChannelAddr) -> anyhow::Result<(), SimNetError> { simnet_handle()?.bind(addr) } -/// Update the configuration for simnet. -pub async fn update_config(config: simnet::NetworkConfig) -> anyhow::Result<(), SimNetError> { - // Only update network config for now, will add host config in the future. - simnet_handle()?.update_network_config(config).await -} - /// Returns a simulated channel address that is bound to "any" channel address. pub(crate) fn any(transport: ChannelTransport) -> ChannelAddr { ChannelAddr::Sim(SimAddr { @@ -274,12 +238,7 @@ fn create_egress_sender( #[async_trait] impl Dispatcher for SimDispatcher { - async fn send( - &self, - _src_addr: Option, - addr: ChannelAddr, - data: Serialized, - ) -> Result<(), SimNetError> { + async fn send(&self, addr: ChannelAddr, data: Serialized) -> Result<(), SimNetError> { self.dispatchers .get(&addr) .ok_or_else(|| { @@ -318,27 +277,34 @@ pub(crate) struct SimRx { } #[async_trait] -impl Tx for SimTx { +impl Tx for SimTx { fn try_post(&self, message: M, _return_handle: oneshot::Sender) -> Result<(), SendError> { let data = match Serialized::serialize(&message) { Ok(data) => data, Err(err) => return Err(SendError(err.into(), message)), }; + + let envelope = (&message as &dyn Any) + .downcast_ref::() + .expect("RemoteMessage should always be a MessageEnvelope"); + + let (sender, dest) = (envelope.sender().clone(), envelope.dest().0.clone()); + match simnet_handle() { - Ok(handle) => match &self.src_addr { - Some(_) if self.client => handle.send_scheduled_event(ScheduledEvent { - event: Box::new(MessageDeliveryEvent::new( - self.src_addr.clone(), - self.dst_addr.clone(), - data, - )), - time: SimClock.millis_since_start(RealClock.now()), - }), - _ => handle.send_event(Box::new(MessageDeliveryEvent::new( - self.src_addr.clone(), + Ok(handle) => { + let event = Box::new(MessageDeliveryEvent::new( self.dst_addr.clone(), data, - ))), + handle.sample_latency(sender.proc_id(), dest.proc_id()), + )); + + match &self.src_addr { + Some(_) if self.client => handle.send_scheduled_event(ScheduledEvent { + event, + time: SimClock.millis_since_start(RealClock.now()), + }), + _ => handle.send_event(event), + } } .map_err(|err: SimNetError| SendError(ChannelError::from(err), message)), Err(err) => Err(SendError(ChannelError::from(err), message)), @@ -410,12 +376,19 @@ impl Rx for SimRx { mod tests { use std::iter::zip; + use ndslice::extent; + use super::*; + use crate::PortId; + use crate::attrs::Attrs; use crate::clock::Clock; use crate::clock::RealClock; use crate::clock::SimClock; - use crate::simnet::NetworkConfig; + use crate::id; + use crate::simnet; + use crate::simnet::LatencyConfig; use crate::simnet::start; + use crate::simnet::start_with_config; #[tokio::test] async fn test_sim_basic() { @@ -423,6 +396,7 @@ mod tests { let srcs_ok = vec!["tcp:[::2]:1234", "tcp:127.0.0.2:8080", "local:124"]; start(); + let handle = simnet_handle().unwrap(); // TODO: New NodeAdd event should do this for you.. for addr in dst_ok.iter().chain(srcs_ok.iter()) { @@ -439,10 +413,24 @@ mod tests { ) .unwrap(); - let (_, mut rx) = sim::serve::(dst_addr.clone()).unwrap(); - let tx = sim::dial::(dst_addr).unwrap(); - tx.try_post(123, oneshot::channel().0).unwrap(); - assert_eq!(rx.recv().await.unwrap(), 123); + let (_, mut rx) = sim::serve::(dst_addr.clone()).unwrap(); + let tx = sim::dial::(dst_addr).unwrap(); + let data = Serialized::serialize(&456).unwrap(); + let sender = id!(world[0].hello); + let dest = id!(world[1].hello); + let ext = extent!(region = 1, dc = 1, rack = 4, host = 4, gpu = 8); + handle.register_proc( + sender.proc_id().clone(), + ext.point(vec![0, 0, 0, 0, 0]).unwrap(), + ); + handle.register_proc( + dest.proc_id().clone(), + ext.point(vec![0, 0, 0, 1, 0]).unwrap(), + ); + + let msg = MessageEnvelope::new(sender, PortId(dest, 0), data.clone(), Attrs::new()); + tx.try_post(msg, oneshot::channel().0).unwrap(); + assert_eq!(*rx.recv().await.unwrap().data(), data); } let records = sim::simnet_handle().unwrap().close().await.unwrap(); @@ -481,30 +469,47 @@ mod tests { #[tokio::test] async fn test_realtime_frontier() { - start(); - tokio::time::pause(); + // 1 second of latency + start_with_config(LatencyConfig { + inter_host: (100, 100), + ..Default::default() + }); + let sim_addr = SimAddr::new("unix:@dst".parse::().unwrap()).unwrap(); let sim_addr_with_src = SimAddr::new_with_src( "unix:@src".parse::().unwrap(), "unix:@dst".parse::().unwrap(), ) .unwrap(); - let (_, mut rx) = sim::serve::<()>(sim_addr.clone()).unwrap(); - let tx = sim::dial::<()>(sim_addr_with_src).unwrap(); - let simnet_config_yaml = r#" - edges: - - src: unix:@src - dst: unix:@dst - metadata: - latency: 100 - "#; - update_config(NetworkConfig::from_yaml(simnet_config_yaml).unwrap()) - .await - .unwrap(); + let (_, mut rx) = sim::serve::(sim_addr.clone()).unwrap(); + let tx = sim::dial::(sim_addr_with_src).unwrap(); + + let controller = id!(world[0].controller); + let dest = id!(world[1].dest); + let handle = simnet::simnet_handle().unwrap(); + + let ext = extent!(region = 1, dc = 1, zone = 1, rack = 4, host = 4, gpu = 8); + handle.register_proc( + controller.proc_id().clone(), + ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap(), + ); + handle.register_proc( + dest.proc_id().clone(), + ext.point(vec![0, 0, 0, 0, 1, 0]).unwrap(), + ); // This message will be delievered at simulator time = 100 seconds - tx.try_post((), oneshot::channel().0).unwrap(); + tx.try_post( + MessageEnvelope::new( + controller, + PortId(dest, 0), + Serialized::serialize(&456).unwrap(), + Attrs::new(), + ), + oneshot::channel().0, + ) + .unwrap(); { // Allow simnet to run tokio::task::yield_now().await; @@ -524,41 +529,74 @@ mod tests { #[tokio::test] async fn test_client_message_scheduled_realtime() { tokio::time::pause(); - start(); + // 1 second of latency + start_with_config(LatencyConfig { + inter_host: (1000, 1000), + ..Default::default() + }); + let controller_to_dst = SimAddr::new_with_src( "unix:@controller".parse::().unwrap(), "unix:@dst".parse::().unwrap(), ) .unwrap(); - let controller_tx = sim::dial::<()>(controller_to_dst.clone()).unwrap(); + + let controller_tx = sim::dial::(controller_to_dst.clone()).unwrap(); let client_to_dst = SimAddr::new_with_client_src( "unix:@client".parse::().unwrap(), "unix:@dst".parse::().unwrap(), ) .unwrap(); - let client_tx = sim::dial::<()>(client_to_dst).unwrap(); - - // 1 second of latency - let simnet_config_yaml = r#" - edges: - - src: unix:@controller - dst: unix:@dst - metadata: - latency: 1 - "#; - update_config(NetworkConfig::from_yaml(simnet_config_yaml).unwrap()) - .await - .unwrap(); + let client_tx = sim::dial::(client_to_dst).unwrap(); + + let controller = id!(world[0].controller); + let dest = id!(world[1].dest); + let client = id!(world[2].client); + + let handle = simnet::simnet_handle().unwrap(); + let ext = extent!(region = 1, dc = 1, zone = 1, rack = 4, host = 4, gpu = 8); + handle.register_proc( + controller.proc_id().clone(), + ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap(), + ); + handle.register_proc( + client.proc_id().clone(), + ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap(), + ); + handle.register_proc( + dest.proc_id().clone(), + ext.point(vec![0, 0, 0, 0, 1, 0]).unwrap(), + ); assert_eq!(SimClock.millis_since_start(RealClock.now()), 0); // Fast forward real time to 5 seconds tokio::time::advance(tokio::time::Duration::from_secs(5)).await; { // Send client message - client_tx.try_post((), oneshot::channel().0).unwrap(); + client_tx + .try_post( + MessageEnvelope::new( + client.clone(), + PortId(dest.clone(), 0), + Serialized::serialize(&456).unwrap(), + Attrs::new(), + ), + oneshot::channel().0, + ) + .unwrap(); // Send system message - controller_tx.try_post((), oneshot::channel().0).unwrap(); + controller_tx + .try_post( + MessageEnvelope::new( + controller.clone(), + PortId(dest.clone(), 0), + Serialized::serialize(&456).unwrap(), + Attrs::new(), + ), + oneshot::channel().0, + ) + .unwrap(); // Allow some time for simnet to run RealClock.sleep(tokio::time::Duration::from_secs(1)).await; } diff --git a/hyperactor/src/clock.rs b/hyperactor/src/clock.rs index 84c572ecc..ccae72228 100644 --- a/hyperactor/src/clock.rs +++ b/hyperactor/src/clock.rs @@ -12,25 +12,17 @@ use std::error::Error; use std::fmt; use std::sync::LazyLock; use std::sync::Mutex; -use std::sync::OnceLock; use std::time::SystemTime; +use async_trait::async_trait; use futures::pin_mut; use hyperactor_telemetry::TelemetryClock; use serde::Deserialize; use serde::Serialize; -use crate::Mailbox; use crate::channel::ChannelAddr; -use crate::data::Named; -use crate::id; -use crate::mailbox::DeliveryError; -use crate::mailbox::MailboxSender; -use crate::mailbox::MessageEnvelope; -use crate::mailbox::Undeliverable; -use crate::mailbox::UndeliverableMailboxSender; -use crate::mailbox::monitored_return_handle; -use crate::simnet::SleepEvent; +use crate::simnet::Event; +use crate::simnet::SimNetError; use crate::simnet::simnet_handle; struct SimTime { @@ -183,6 +175,39 @@ impl ClockKind { } } +#[derive(Debug)] +struct SleepEvent { + done_tx: Option>, + duration_ms: u64, +} + +impl SleepEvent { + pub(crate) fn new(done_tx: tokio::sync::oneshot::Sender<()>, duration_ms: u64) -> Box { + Box::new(Self { + done_tx: Some(done_tx), + duration_ms, + }) + } +} + +#[async_trait] +impl Event for SleepEvent { + async fn handle(&mut self) -> Result<(), SimNetError> { + if self.done_tx.take().unwrap().send(()).is_err() { + tracing::error!("Failed to send wakeup event"); + } + Ok(()) + } + + fn duration_ms(&self) -> u64 { + self.duration_ms + } + + fn summary(&self) -> String { + format!("Sleeping for {} ms", self.duration_ms) + } +} + /// Clock to be used in simulator runs that allows the simnet to create a scheduled event for. /// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's /// time to the wakeup time and use the transmitter to wake up this green thread @@ -192,33 +217,25 @@ pub struct SimClock; impl Clock for SimClock { /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet async fn sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn non_advancing_sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_nonadvanceable_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_nonadvanceable_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn sleep_until(&self, deadline: tokio::time::Instant) { @@ -242,23 +259,18 @@ impl Clock for SimClock { where F: std::future::Future, { - let mailbox = SimClock::mailbox().clone(); - let (tx, deadline_rx) = mailbox.open_once_port::<()>(); + let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); let fut = f; pin_mut!(fut); tokio::select! { - _ = deadline_rx.recv() => { + _ = deadline_rx => { Err(TimeoutError) } res = &mut fut => Ok(res) @@ -267,28 +279,6 @@ impl Clock for SimClock { } impl SimClock { - // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox - // from upstack and handle undeliverable messages properly. - fn mailbox() -> &'static Mailbox { - static SIMCLOCK_MAILBOX: OnceLock = OnceLock::new(); - SIMCLOCK_MAILBOX.get_or_init(|| { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); - let (undeliverable_messages, mut rx) = - mailbox.open_port::>(); - undeliverable_messages.bind_to(Undeliverable::::port()); - tokio::spawn(async move { - while let Ok(Undeliverable(mut envelope)) = rx.recv().await { - envelope.try_set_error(DeliveryError::BrokenLink( - "message returned to undeliverable port".to_string(), - )); - UndeliverableMailboxSender - .post(envelope, /*unused */ monitored_return_handle()) - } - }); - mailbox - }) - } - /// Advance the sumulator's time to the specified instant pub fn advance_to(&self, millis: u64) { let mut guard = SIM_TIME.now.lock().unwrap(); diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index def1fc9f5..a9fcf9e43 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -26,11 +26,13 @@ use async_trait::async_trait; use dashmap::DashMap; use dashmap::DashSet; use enum_as_inner::EnumAsInner; +use ndslice::view::Point; +use rand::thread_rng; +use rand_distr::Distribution; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; use serde::Serializer; -use serde_with::serde_as; use tokio::sync::Mutex; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; @@ -43,6 +45,7 @@ use tokio::time::interval; use crate::ActorId; use crate::Mailbox; use crate::OncePortRef; +use crate::ProcId; use crate::channel::ChannelAddr; use crate::clock::Clock; use crate::clock::RealClock; @@ -88,12 +91,12 @@ pub trait Event: Send + Sync + Debug { /// For a proc spawn, it will be creating the proc object and instantiating it. /// For any event that manipulates the network (like adding/removing nodes etc.) /// implement handle_network(). - async fn handle(&self) -> Result<(), SimNetError>; + async fn handle(&mut self) -> Result<(), SimNetError>; /// This is the method that will be called when the simulator fires the event /// Unless you need to make changes to the network, you do not have to implement this. /// Only implement handle() method for all non-simnet requirements. - async fn handle_network(&self, _phantom: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _phantom: &SimNet) -> Result<(), SimNetError> { self.handle().await } @@ -101,9 +104,6 @@ pub trait Event: Send + Sync + Debug { /// GPU work latency. fn duration_ms(&self) -> u64; - /// Read the simnet config and update self accordingly. - async fn read_simnet_config(&mut self, _topology: &Arc>) {} - /// A user-friendly summary of the event fn summary(&self) -> String; } @@ -117,12 +117,11 @@ struct NodeJoinEvent { #[async_trait] impl Event for NodeJoinEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, simnet: &SimNet) -> Result<(), SimNetError> { - simnet.bind(self.channel_addr.clone()).await; + async fn handle_network(&mut self, simnet: &SimNet) -> Result<(), SimNetError> { self.handle().await } @@ -135,46 +134,6 @@ impl Event for NodeJoinEvent { } } -#[derive(Debug)] -pub(crate) struct SleepEvent { - done_tx: OncePortRef<()>, - mailbox: Mailbox, - duration_ms: u64, -} - -impl SleepEvent { - pub(crate) fn new(done_tx: OncePortRef<()>, mailbox: Mailbox, duration_ms: u64) -> Box { - Box::new(Self { - done_tx, - mailbox, - duration_ms, - }) - } -} - -#[async_trait] -impl Event for SleepEvent { - async fn handle(&self) -> Result<(), SimNetError> { - Ok(()) - } - - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { - self.done_tx - .clone() - .send(&self.mailbox, ()) - .map_err(|_err| SimNetError::Closed("TODO".to_string()))?; - Ok(()) - } - - fn duration_ms(&self) -> u64 { - self.duration_ms - } - - fn summary(&self) -> String { - format!("Sleeping for {} ms", self.duration_ms) - } -} - #[derive(Debug)] /// A pytorch operation pub struct TorchOpEvent { @@ -188,11 +147,11 @@ pub struct TorchOpEvent { #[async_trait] impl Event for TorchOpEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _simnet: &SimNet) -> Result<(), SimNetError> { self.done_tx .clone() .send(&self.mailbox, ()) @@ -255,21 +214,7 @@ pub(crate) struct ScheduledEvent { #[async_trait] pub trait Dispatcher { /// Send a raw data blob to the given target. - async fn send(&self, source: Option, target: A, data: Serialized) - -> Result<(), SimNetError>; -} - -#[derive(Hash, Eq, PartialEq, Debug)] -pub(crate) struct SimNetEdge { - pub(crate) src: ChannelAddr, - pub(crate) dst: ChannelAddr, -} - -#[serde_as] -#[derive(Debug, Serialize, Deserialize)] -pub(crate) struct SimNetEdgeInfo { - #[serde_as(as = "serde_with::DurationSeconds")] - pub(crate) latency: Duration, + async fn send(&self, target: A, data: Serialized) -> Result<(), SimNetError>; } /// SimNetError is used to indicate errors that occur during @@ -324,17 +269,65 @@ pub enum TrainingScriptState { Waiting, } +/// Configuration for latencies between distances for the simulator +pub struct LatencyConfig { + /// inter-region min and max latencies + pub inter_region: (/*min*/ u64, /*max*/ u64), + /// inter-data center min and max latencies + pub inter_dc: (/*min*/ u64, /*max*/ u64), + /// inter-zone min and max latencies + pub inter_zone: (/*min*/ u64, /*max*/ u64), + /// inter-rack min and max latencies + pub inter_rack: (/*min*/ u64, /*max*/ u64), + /// inter-host min and max latencies + pub inter_host: (/*min*/ u64, /*max*/ u64), + /// Beta distribution for sampling latencies + pub dist: rand_distr::Beta, +} + +impl LatencyConfig { + fn from_distance(&self, distance: &Distance) -> u64 { + let mut rng = thread_rng(); + let sample = self.dist.sample(&mut rng) as u64; + + let (min_millis, max_millis) = match distance { + Distance::Region => self.inter_region, + Distance::DataCenter => self.inter_dc, + Distance::Zone => self.inter_zone, + Distance::Rack => self.inter_rack, + Distance::Host => self.inter_host, + Distance::Same => (0, 0), + }; + + min_millis + sample * (max_millis - min_millis) + } +} + +impl Default for LatencyConfig { + fn default() -> Self { + Self { + inter_region: (500, 1000), + inter_dc: (50, 100), + inter_zone: (5, 10), + inter_rack: (2, 5), + inter_host: (1, 2), + dist: rand_distr::Beta::new(2.0, 1.0).unwrap(), + } + } +} + /// A handle to a running [`SimNet`] instance. pub struct SimNetHandle { join_handle: Mutex>>>, event_tx: UnboundedSender<(Box, bool, Option)>, - config: Arc>, pending_event_count: Arc, /// A receiver to receive simulator operational messages. /// The receiver can be moved out of the simnet handle. training_script_state_tx: tokio::sync::watch::Sender, /// Signal to stop the simnet loop stop_signal: Arc, + resources: DashMap, + latencies: LatencyConfig, } impl SimNetHandle { @@ -412,21 +405,6 @@ impl SimNetHandle { } } - /// Update the network configuration to SimNet. - pub async fn update_network_config(&self, config: NetworkConfig) -> Result<(), SimNetError> { - let guard = &self.config.lock().await.topology; - for edge in config.edges { - guard.insert( - SimNetEdge { - src: edge.src.clone(), - dst: edge.dst.clone(), - }, - edge.metadata, - ); - } - Ok(()) - } - /// Wait for all of the received events to be scheduled for flight. /// It ticks the simnet time till all of the scheduled events are processed. pub async fn flush(&self, timeout: Duration) -> Result<(), SimNetError> { @@ -445,24 +423,61 @@ impl SimNetHandle { "timeout waiting for received events to be scheduled".to_string(), )) } -} -pub(crate) type Topology = DashMap; + /// Register the location in resource space for a Proc + pub fn register_proc(&self, proc_id: ProcId, point: Point) { + self.resources.insert(proc_id, point); + } + + /// Sample a latency between two procs + pub fn sample_latency(&self, src: &ProcId, dest: &ProcId) -> u64 { + let distances = [ + Distance::Region, + Distance::DataCenter, + Distance::Zone, + Distance::Rack, + Distance::Host, + Distance::Same, + ]; + + let src_coords = self + .resources + .get(src) + .map(|point| point.coords().clone()) + .unwrap_or(distances.iter().map(|_| 0).collect::>()); + + let dest_coords = self + .resources + .get(dest) + .map(|point| point.coords().clone()) + .unwrap_or(distances.iter().map(|_| 0).collect::>()); + + for ((src, dest), distance) in src_coords.into_iter().zip(dest_coords).zip(distances) { + if src != dest { + return self.latencies.from_distance(&distance); + } + } + + self.latencies.from_distance(&Distance::Same) + } +} -/// Configure network topology for the simnet -pub struct SimNetConfig { - // For now, we assume the network is fully connected - // so as to avoid the complexity of maintaining a graph - // and determining the shortest path between two nodes. - pub(crate) topology: Topology, +#[derive(Debug)] +enum Distance { + Region, + DataCenter, + Zone, + Rack, + Host, + Same, } + /// SimNet defines a network of nodes. /// Each node is identified by a unique id. /// The network is represented as a graph of nodes. /// The graph is represented as a map of edges. /// The network also has a cloud of inflight messages pub struct SimNet { - config: Arc>, address_book: DashSet, state: State, max_latency: Duration, @@ -472,16 +487,16 @@ pub struct SimNet { } /// Starts a sim net. -/// Args: -/// max_duration_ms: an optional config to override default settings of the network latency pub fn start() { + start_with_config(LatencyConfig::default()) +} + +/// Starts a sim net with configured latencies between distances +pub fn start_with_config(config: LatencyConfig) { let max_duration_ms = 1000 * 10; // Construct a topology with one node: the default A. let address_book: DashSet = DashSet::new(); - let topology = DashMap::new(); - let config = Arc::new(Mutex::new(SimNetConfig { topology })); - let (training_script_state_tx, training_script_state_rx) = tokio::sync::watch::channel(TrainingScriptState::Waiting); let (event_tx, event_rx) = @@ -490,13 +505,11 @@ pub fn start() { let stop_signal = Arc::new(AtomicBool::new(false)); let join_handle = Mutex::new(Some({ - let config = config.clone(); let pending_event_count = pending_event_count.clone(); let stop_signal = stop_signal.clone(); tokio::spawn(async move { SimNet { - config, address_book, state: State { scheduled_events: BTreeMap::new(), @@ -514,52 +527,17 @@ pub fn start() { HANDLE.get_or_init(|| SimNetHandle { join_handle, event_tx, - config, pending_event_count, training_script_state_tx, stop_signal, + resources: DashMap::new(), + latencies: config, }); } impl SimNet { - /// Bind an address to a node id. If node id is not provided, then - /// randomly choose a node id. If the address is already bound to a node id, - /// then return the existing node id. - async fn bind(&self, address: ChannelAddr) { - // Add if not present. - if self.address_book.insert(address.clone()) { - // Add dummy latencies with all the other nodes. - for other in self.address_book.iter() { - let duration_ms = if other.key() == &address { - 1 - } else { - rand::random::() % self.max_latency.as_millis() as u64 + 1 - }; - let latency = Duration::from_millis(duration_ms); - let guard = &self.config.lock().await.topology; - guard.insert( - SimNetEdge { - src: address.clone(), - dst: other.clone(), - }, - SimNetEdgeInfo { latency }, - ); - if address != *other.key() { - guard.insert( - SimNetEdge { - src: other.clone(), - dst: address.clone(), - }, - SimNetEdgeInfo { latency }, - ); - } - } - } - } - - async fn create_scheduled_event(&mut self, mut event: Box) -> ScheduledEvent { + fn create_scheduled_event(&mut self, event: Box) -> ScheduledEvent { // Get latency - event.read_simnet_config(&self.config).await; ScheduledEvent { time: SimClock.millis_since_start( SimClock.now() + tokio::time::Duration::from_millis(event.duration_ms()), @@ -607,6 +585,12 @@ impl SimNet { let mut training_script_waiting_time: u64 = 0; // Duration elapsed while only non_advanceable_events has events let mut debounce_timer: Option = None; + + let debounce_duration = std::env::var("SIM_DEBOUNCE") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(1); + 'outer: loop { // Check if we should stop if stop_signal.load(Ordering::SeqCst) { @@ -614,7 +598,10 @@ impl SimNet { } while let Ok(Some((event, advanceable, time))) = RealClock - .timeout(tokio::time::Duration::from_millis(1), event_rx.recv()) + .timeout( + tokio::time::Duration::from_millis(debounce_duration), + event_rx.recv(), + ) .await { let scheduled_event = match time { @@ -622,7 +609,7 @@ impl SimNet { time: time + training_script_waiting_time, event, }, - None => self.create_scheduled_event(event).await, + None => self.create_scheduled_event(event), }; self.schedule_event(scheduled_event, advanceable); } @@ -696,7 +683,7 @@ impl SimNet { time: time + training_script_waiting_time, event, }, - None => self.create_scheduled_event(event).await, + None => self.create_scheduled_event(event), }; self.schedule_event(scheduled_event, advanceable); }, @@ -710,7 +697,7 @@ impl SimNet { training_script_waiting_time += advanced_time; } SimClock.advance_to(scheduled_time); - for scheduled_event in scheduled_events { + for mut scheduled_event in scheduled_events { self.pending_event_count .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); if scheduled_event.event.handle_network(self).await.is_err() { @@ -754,38 +741,13 @@ pub struct SimulatorEventRecord { pub end_at: SimulatorTimeInstant, } -/// A configuration for the network topology. -#[derive(Debug, Serialize, Deserialize)] -pub struct NetworkConfig { - edges: Vec, -} - -/// A configuration for the network edge. -#[derive(Debug, Serialize, Deserialize)] -pub struct EdgeConfig { - #[serde(deserialize_with = "deserialize_channel_addr")] - src: ChannelAddr, - #[serde(deserialize_with = "deserialize_channel_addr")] - dst: ChannelAddr, - metadata: SimNetEdgeInfo, -} - -impl NetworkConfig { - /// Create a new configuration from a YAML string. - #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SimNetError`. - pub fn from_yaml(yaml: &str) -> Result { - let config: NetworkConfig = serde_yaml::from_str(yaml) - .map_err(|err| SimNetError::InvalidArg(format!("failed to parse config: {}", err)))?; - Ok(config) - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; + use ndslice::extent; use tokio::sync::Mutex; use super::*; @@ -811,14 +773,10 @@ mod tests { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), simnet::SimNetError> { + async fn handle(&mut self) -> Result<(), simnet::SimNetError> { if let Some(dispatcher) = &self.dispatcher { dispatcher - .send( - Some(self.src_addr.clone()), - self.dest_addr.clone(), - self.data.clone(), - ) + .send(self.dest_addr.clone(), self.data.clone()) .await?; } Ok(()) @@ -834,19 +792,6 @@ mod tests { self.dest_addr.addr().clone() ) } - - async fn read_simnet_config(&mut self, config: &Arc>) { - let edge = SimNetEdge { - src: self.src_addr.addr().clone(), - dst: self.dest_addr.addr().clone(), - }; - self.duration_ms = config - .lock() - .await - .topology - .get(&edge) - .map_or_else(|| 1, |v| v.latency.as_millis() as u64); - } } impl MessageDeliveryEvent { @@ -855,12 +800,13 @@ mod tests { dest_addr: SimAddr, data: Serialized, dispatcher: Option, + duration_ms: u64, ) -> Self { Self { src_addr, dest_addr, data, - duration_ms: 1, + duration_ms, dispatcher, } } @@ -881,12 +827,7 @@ mod tests { #[async_trait] impl Dispatcher for TestDispatcher { - async fn send( - &self, - _source: Option, - target: SimAddr, - data: Serialized, - ) -> Result<(), SimNetError> { + async fn send(&self, target: SimAddr, data: Serialized) -> Result<(), SimNetError> { let mut buf = self.mbuffers.lock().await; buf.entry(target).or_default().push(data); Ok(()) @@ -914,67 +855,39 @@ mod tests { #[tokio::test] async fn test_simnet_config() { - // Tests that we can create a simnet, config latency between two node and deliver - // the message with configured latency. - start(); - let alice = "local:1".parse::().unwrap(); - let bob = "local:2".parse::().unwrap(); - let latency = Duration::from_millis(1000); - let config = NetworkConfig { - edges: vec![EdgeConfig { - src: alice.clone(), - dst: bob.clone(), - metadata: SimNetEdgeInfo { latency }, - }], - }; - simnet_handle() - .unwrap() - .update_network_config(config) - .await - .unwrap(); + // Tests that we can create a simnet, config latency between distances and sample latencies between procs. + let ext = extent!(region = 1, dc = 1, zone = 1, rack = 4, host = 4, gpu = 8); - let alice = SimAddr::new(alice).unwrap(); - let bob = SimAddr::new(bob).unwrap(); - let msg = Box::new(MessageDeliveryEvent::new( - alice, - bob, - Serialized::serialize(&"123".to_string()).unwrap(), - None, - )); - simnet_handle().unwrap().send_event(msg).unwrap(); - simnet_handle() - .unwrap() - .flush(Duration::from_secs(30)) - .await - .unwrap(); - let records = simnet_handle().unwrap().close().await; - let expected_record = SimulatorEventRecord { - summary: "Sending message from local:1 to local:2".to_string(), - start_at: 0, - end_at: latency.as_millis() as u64, + let alice = id!(world[0]); + let bob = id!(world[1]); + let charlie = id!(world[2]); + + let config = LatencyConfig { + inter_host: (1000, 1000), + inter_rack: (2000, 2000), + ..Default::default() }; - assert!(records.as_ref().unwrap().len() == 1); - assert_eq!(records.unwrap().first().unwrap(), &expected_record); + start_with_config(config); + + let handle = simnet_handle().unwrap(); + handle.register_proc(alice.clone(), ext.point(vec![0, 0, 0, 0, 0, 0]).unwrap()); + handle.register_proc(bob.clone(), ext.point(vec![0, 0, 0, 0, 1, 0]).unwrap()); + handle.register_proc(charlie.clone(), ext.point(vec![0, 0, 0, 1, 0, 0]).unwrap()); + assert_eq!(handle.sample_latency(&alice, &bob), 1000); + assert_eq!(handle.sample_latency(&alice, &charlie), 2000); } #[tokio::test] async fn test_simnet_debounce() { - start(); + let config = LatencyConfig { + inter_host: (1000, 1000), + ..Default::default() + }; + start_with_config(config); let alice = "local:1".parse::().unwrap(); let bob = "local:2".parse::().unwrap(); let latency = Duration::from_millis(10000); - simnet_handle() - .unwrap() - .update_network_config(NetworkConfig { - edges: vec![EdgeConfig { - src: alice.clone(), - dst: bob.clone(), - metadata: SimNetEdgeInfo { latency }, - }], - }) - .await - .unwrap(); let alice = SimAddr::new(alice).unwrap(); let bob = SimAddr::new(bob).unwrap(); @@ -988,6 +901,7 @@ mod tests { bob.clone(), Serialized::serialize(&"123".to_string()).unwrap(), None, + 10000, ))) .unwrap(); RealClock @@ -1040,18 +954,21 @@ mod tests { addr_1.clone(), messages[0].clone(), sender.clone(), + 1, )); let two = Box::new(MessageDeliveryEvent::new( addr_2.clone(), addr_3.clone(), messages[1].clone(), sender.clone(), + 1, )); let three = Box::new(MessageDeliveryEvent::new( addr_0.clone(), addr_1.clone(), messages[2].clone(), sender.clone(), + 1, )); simnet_handle().unwrap().send_event(one).unwrap(); @@ -1079,54 +996,6 @@ mod tests { assert_eq!(buf[&addr_3][0], messages[1]); } - #[tokio::test] - async fn test_read_config_from_yaml() { - let yaml = r#" - edges: - - src: local:0 - dst: local:1 - metadata: - latency: 1 - - src: local:0 - dst: local:2 - metadata: - latency: 2 - - src: local:1 - dst: local:2 - metadata: - latency: 3 - "#; - let config = NetworkConfig::from_yaml(yaml).unwrap(); - assert_eq!(config.edges.len(), 3); - assert_eq!( - config.edges[0].src, - "local:0".parse::().unwrap() - ); - assert_eq!( - config.edges[0].dst, - "local:1".parse::().unwrap() - ); - assert_eq!(config.edges[0].metadata.latency, Duration::from_secs(1)); - assert_eq!( - config.edges[1].src, - "local:0".parse::().unwrap() - ); - assert_eq!( - config.edges[1].dst, - "local:2".parse::().unwrap() - ); - assert_eq!(config.edges[1].metadata.latency, Duration::from_secs(2)); - assert_eq!( - config.edges[2].src, - "local:1".parse::().unwrap() - ); - assert_eq!( - config.edges[2].dst, - "local:2".parse::().unwrap() - ); - assert_eq!(config.edges[2].metadata.latency, Duration::from_secs(3)); - } - #[tokio::test] async fn test_sim_sleep() { start(); diff --git a/hyperactor_mesh/src/alloc/sim.rs b/hyperactor_mesh/src/alloc/sim.rs index 03bf946bf..c212a3335 100644 --- a/hyperactor_mesh/src/alloc/sim.rs +++ b/hyperactor_mesh/src/alloc/sim.rs @@ -11,6 +11,7 @@ #![allow(dead_code)] // until it is used outside of testing use async_trait::async_trait; +use hyperactor::ProcId; use hyperactor::WorldId; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; @@ -61,12 +62,23 @@ pub struct SimAlloc { impl SimAlloc { fn new(spec: AllocSpec) -> Self { - Self { - inner: LocalAlloc::new_with_transport( - spec, - ChannelTransport::Sim(Box::new(ChannelTransport::Unix)), - ), - } + let inner = LocalAlloc::new_with_transport( + spec, + ChannelTransport::Sim(Box::new(ChannelTransport::Unix)), + ); + let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0); + + let ext = inner.extent(); + + hyperactor::simnet::simnet_handle() + .expect("simnet event loop not running") + .register_proc( + client_proc_id.clone(), + ext.point(ext.sizes().iter().map(|_| 0).collect()) + .expect("should be valid point"), + ); + + Self { inner } } /// A chaos monkey that can be used to stop procs at random. pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static { @@ -90,7 +102,13 @@ impl SimAlloc { #[async_trait] impl Alloc for SimAlloc { async fn next(&mut self) -> Option { - self.inner.next().await + let proc_state = self.inner.next().await; + if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state { + hyperactor::simnet::simnet_handle() + .expect("simnet event loop not running") + .register_proc(proc_id.clone(), point.clone()); + } + proc_state } fn extent(&self) -> &Extent { @@ -112,11 +130,66 @@ impl Alloc for SimAlloc { #[cfg(test)] mod tests { + use std::collections::HashMap; + + use hyperactor::simnet::LatencyConfig; + use ndslice::extent; + use super::*; + use crate::ProcMesh; + use crate::RootActorMesh; + use crate::actor_mesh::ActorMesh; + use crate::alloc::AllocConstraints; + use crate::alloc::test_utils::TestActor; #[tokio::test] async fn test_allocator_basic() { hyperactor::simnet::start(); crate::alloc::testing::test_allocator_basic(SimAllocator).await; } + + #[tokio::test] + async fn test_allocator_registers_resources() { + hyperactor::simnet::start_with_config(LatencyConfig { + inter_host: (999, 999), + ..Default::default() + }); + + let alloc = SimAllocator + .allocate(AllocSpec { + extent: extent!(region = 1, dc = 1, zone = 1, rack = 1, host = 10, gpu = 1), + constraints: AllocConstraints { + match_labels: HashMap::new(), + }, + }) + .await + .unwrap(); + + let proc_mesh = ProcMesh::allocate(alloc).await.unwrap(); + + let handle = hyperactor::simnet::simnet_handle().unwrap(); + let actor_mesh: RootActorMesh = proc_mesh.spawn("echo", &()).await.unwrap(); + let actors = actor_mesh.iter_actor_refs().collect::>(); + assert_eq!( + handle.sample_latency( + actors[0].actor_id().proc_id(), + actors[1].actor_id().proc_id() + ), + 999 + ); + assert_eq!( + handle.sample_latency( + actors[2].actor_id().proc_id(), + actors[9].actor_id().proc_id() + ), + 999 + ); + assert_eq!( + handle.sample_latency( + proc_mesh.client().actor_id().proc_id(), + actors[1].actor_id().proc_id() + ), + 999 + ); + } } diff --git a/hyperactor_multiprocess/src/ping_pong.rs b/hyperactor_multiprocess/src/ping_pong.rs index dd610dddf..1942f15c9 100644 --- a/hyperactor_multiprocess/src/ping_pong.rs +++ b/hyperactor_multiprocess/src/ping_pong.rs @@ -14,13 +14,11 @@ mod tests { use hyperactor::ActorRef; use hyperactor::Mailbox; use hyperactor::channel::ChannelAddr; - use hyperactor::channel::sim; use hyperactor::channel::sim::SimAddr; use hyperactor::id; use hyperactor::reference::Index; use hyperactor::reference::WorldId; use hyperactor::simnet; - use hyperactor::simnet::NetworkConfig; use hyperactor::test_utils::pingpong::PingPongActor; use hyperactor::test_utils::pingpong::PingPongActorParams; use hyperactor::test_utils::pingpong::PingPongMessage; @@ -65,29 +63,6 @@ mod tests { let pong_actor_ref = spawn_proc_actor(3, system_sim_addr, sys_mailbox.clone(), world_id.clone()).await; - // Configure the simulation network. - let simnet_config_yaml = r#" -edges: - - src: local!1 - dst: local!2 - metadata: - latency: 1 - - src: local!2 - dst: local!1 - metadata: - latency: 1 - - src: local!1 - dst: local!3 - metadata: - latency: 2 - - src: local!3 - dst: local!1 - metadata: - latency: 2 -"#; - let simnet_config = NetworkConfig::from_yaml(simnet_config_yaml).unwrap(); - sim::update_config(simnet_config).await.unwrap(); - // Kick start the ping pong game by sending a message to the ping actor. The message will ask the // ping actor to deliver a message to the pong actor with TTL - 1. The pong actor will then // deliver a message to the ping actor with TTL - 2. This will continue until the TTL reaches 0. diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 3cc48d743..770c0f1dd 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -16,5 +16,5 @@ use hyperactor::attrs::declare_attrs; // Declare monarch-specific configuration keys declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor - pub attr SHARED_ASYNCIO_RUNTIME: bool = false; + pub attr SHARED_ASYNCIO_RUNTIME: bool = true; } diff --git a/python/monarch/_src/actor/proc_mesh.py b/python/monarch/_src/actor/proc_mesh.py index 9e55e0dbf..968eae410 100644 --- a/python/monarch/_src/actor/proc_mesh.py +++ b/python/monarch/_src/actor/proc_mesh.py @@ -486,8 +486,26 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: return _proc_mesh_from_allocator(allocator=LocalAllocator(), gpus=gpus, hosts=hosts) -def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: - return _proc_mesh_from_allocator(allocator=SimAllocator(), gpus=gpus, hosts=hosts) +def sim_proc_mesh( + *, + gpus: int = 1, + hosts: int = 1, + racks: int = 1, + zones: int = 1, + dcs: int = 1, + regions: int = 1, +) -> ProcMesh: + spec: AllocSpec = AllocSpec( + AllocConstraints(), + hosts=hosts, + gpus=gpus, + racks=racks, + zones=zones, + dcs=dcs, + regions=regions, + ) + alloc = SimAllocator().allocate(spec) + return ProcMesh.from_alloc(alloc, None, True) _BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"