diff --git a/hyperactor/src/channel/sim.rs b/hyperactor/src/channel/sim.rs index 41a41487e..0ade2b644 100644 --- a/hyperactor/src/channel/sim.rs +++ b/hyperactor/src/channel/sim.rs @@ -146,7 +146,7 @@ impl MessageDeliveryEvent { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { // Send the message to the correct receiver. SENDER .send( diff --git a/hyperactor/src/clock.rs b/hyperactor/src/clock.rs index 84c572ecc..ccae72228 100644 --- a/hyperactor/src/clock.rs +++ b/hyperactor/src/clock.rs @@ -12,25 +12,17 @@ use std::error::Error; use std::fmt; use std::sync::LazyLock; use std::sync::Mutex; -use std::sync::OnceLock; use std::time::SystemTime; +use async_trait::async_trait; use futures::pin_mut; use hyperactor_telemetry::TelemetryClock; use serde::Deserialize; use serde::Serialize; -use crate::Mailbox; use crate::channel::ChannelAddr; -use crate::data::Named; -use crate::id; -use crate::mailbox::DeliveryError; -use crate::mailbox::MailboxSender; -use crate::mailbox::MessageEnvelope; -use crate::mailbox::Undeliverable; -use crate::mailbox::UndeliverableMailboxSender; -use crate::mailbox::monitored_return_handle; -use crate::simnet::SleepEvent; +use crate::simnet::Event; +use crate::simnet::SimNetError; use crate::simnet::simnet_handle; struct SimTime { @@ -183,6 +175,39 @@ impl ClockKind { } } +#[derive(Debug)] +struct SleepEvent { + done_tx: Option>, + duration_ms: u64, +} + +impl SleepEvent { + pub(crate) fn new(done_tx: tokio::sync::oneshot::Sender<()>, duration_ms: u64) -> Box { + Box::new(Self { + done_tx: Some(done_tx), + duration_ms, + }) + } +} + +#[async_trait] +impl Event for SleepEvent { + async fn handle(&mut self) -> Result<(), SimNetError> { + if self.done_tx.take().unwrap().send(()).is_err() { + tracing::error!("Failed to send wakeup event"); + } + Ok(()) + } + + fn duration_ms(&self) -> u64 { + self.duration_ms + } + + fn summary(&self) -> String { + format!("Sleeping for {} ms", self.duration_ms) + } +} + /// Clock to be used in simulator runs that allows the simnet to create a scheduled event for. /// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's /// time to the wakeup time and use the transmitter to wake up this green thread @@ -192,33 +217,25 @@ pub struct SimClock; impl Clock for SimClock { /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet async fn sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn non_advancing_sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_nonadvanceable_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_nonadvanceable_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn sleep_until(&self, deadline: tokio::time::Instant) { @@ -242,23 +259,18 @@ impl Clock for SimClock { where F: std::future::Future, { - let mailbox = SimClock::mailbox().clone(); - let (tx, deadline_rx) = mailbox.open_once_port::<()>(); + let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); let fut = f; pin_mut!(fut); tokio::select! { - _ = deadline_rx.recv() => { + _ = deadline_rx => { Err(TimeoutError) } res = &mut fut => Ok(res) @@ -267,28 +279,6 @@ impl Clock for SimClock { } impl SimClock { - // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox - // from upstack and handle undeliverable messages properly. - fn mailbox() -> &'static Mailbox { - static SIMCLOCK_MAILBOX: OnceLock = OnceLock::new(); - SIMCLOCK_MAILBOX.get_or_init(|| { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); - let (undeliverable_messages, mut rx) = - mailbox.open_port::>(); - undeliverable_messages.bind_to(Undeliverable::::port()); - tokio::spawn(async move { - while let Ok(Undeliverable(mut envelope)) = rx.recv().await { - envelope.try_set_error(DeliveryError::BrokenLink( - "message returned to undeliverable port".to_string(), - )); - UndeliverableMailboxSender - .post(envelope, /*unused */ monitored_return_handle()) - } - }); - mailbox - }) - } - /// Advance the sumulator's time to the specified instant pub fn advance_to(&self, millis: u64) { let mut guard = SIM_TIME.now.lock().unwrap(); diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index def1fc9f5..ad5d5a5c0 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use dashmap::DashMap; use dashmap::DashSet; use enum_as_inner::EnumAsInner; +use ndslice::view::Point; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; @@ -43,6 +44,7 @@ use tokio::time::interval; use crate::ActorId; use crate::Mailbox; use crate::OncePortRef; +use crate::ProcId; use crate::channel::ChannelAddr; use crate::clock::Clock; use crate::clock::RealClock; @@ -88,12 +90,12 @@ pub trait Event: Send + Sync + Debug { /// For a proc spawn, it will be creating the proc object and instantiating it. /// For any event that manipulates the network (like adding/removing nodes etc.) /// implement handle_network(). - async fn handle(&self) -> Result<(), SimNetError>; + async fn handle(&mut self) -> Result<(), SimNetError>; /// This is the method that will be called when the simulator fires the event /// Unless you need to make changes to the network, you do not have to implement this. /// Only implement handle() method for all non-simnet requirements. - async fn handle_network(&self, _phantom: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _phantom: &SimNet) -> Result<(), SimNetError> { self.handle().await } @@ -117,11 +119,11 @@ struct NodeJoinEvent { #[async_trait] impl Event for NodeJoinEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, simnet: &SimNet) -> Result<(), SimNetError> { simnet.bind(self.channel_addr.clone()).await; self.handle().await } @@ -135,46 +137,6 @@ impl Event for NodeJoinEvent { } } -#[derive(Debug)] -pub(crate) struct SleepEvent { - done_tx: OncePortRef<()>, - mailbox: Mailbox, - duration_ms: u64, -} - -impl SleepEvent { - pub(crate) fn new(done_tx: OncePortRef<()>, mailbox: Mailbox, duration_ms: u64) -> Box { - Box::new(Self { - done_tx, - mailbox, - duration_ms, - }) - } -} - -#[async_trait] -impl Event for SleepEvent { - async fn handle(&self) -> Result<(), SimNetError> { - Ok(()) - } - - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { - self.done_tx - .clone() - .send(&self.mailbox, ()) - .map_err(|_err| SimNetError::Closed("TODO".to_string()))?; - Ok(()) - } - - fn duration_ms(&self) -> u64 { - self.duration_ms - } - - fn summary(&self) -> String { - format!("Sleeping for {} ms", self.duration_ms) - } -} - #[derive(Debug)] /// A pytorch operation pub struct TorchOpEvent { @@ -188,11 +150,11 @@ pub struct TorchOpEvent { #[async_trait] impl Event for TorchOpEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _simnet: &SimNet) -> Result<(), SimNetError> { self.done_tx .clone() .send(&self.mailbox, ()) @@ -335,6 +297,7 @@ pub struct SimNetHandle { training_script_state_tx: tokio::sync::watch::Sender, /// Signal to stop the simnet loop stop_signal: Arc, + resources: DashMap, } impl SimNetHandle { @@ -445,6 +408,11 @@ impl SimNetHandle { "timeout waiting for received events to be scheduled".to_string(), )) } + + /// Register the location in resource space for a Proc + pub fn register_proc(&self, proc_id: ProcId, point: Point) { + self.resources.insert(proc_id, point); + } } pub(crate) type Topology = DashMap; @@ -518,6 +486,7 @@ pub fn start() { pending_event_count, training_script_state_tx, stop_signal, + resources: DashMap::new(), }); } @@ -607,6 +576,12 @@ impl SimNet { let mut training_script_waiting_time: u64 = 0; // Duration elapsed while only non_advanceable_events has events let mut debounce_timer: Option = None; + + let debounce_duration = std::env::var("SIM_DEBOUNCE") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(1); + 'outer: loop { // Check if we should stop if stop_signal.load(Ordering::SeqCst) { @@ -614,7 +589,10 @@ impl SimNet { } while let Ok(Some((event, advanceable, time))) = RealClock - .timeout(tokio::time::Duration::from_millis(1), event_rx.recv()) + .timeout( + tokio::time::Duration::from_millis(debounce_duration), + event_rx.recv(), + ) .await { let scheduled_event = match time { @@ -710,7 +688,7 @@ impl SimNet { training_script_waiting_time += advanced_time; } SimClock.advance_to(scheduled_time); - for scheduled_event in scheduled_events { + for mut scheduled_event in scheduled_events { self.pending_event_count .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); if scheduled_event.event.handle_network(self).await.is_err() { @@ -811,7 +789,7 @@ mod tests { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), simnet::SimNetError> { + async fn handle(&mut self) -> Result<(), simnet::SimNetError> { if let Some(dispatcher) = &self.dispatcher { dispatcher .send( diff --git a/hyperactor_mesh/src/alloc/sim.rs b/hyperactor_mesh/src/alloc/sim.rs index 03bf946bf..fbd7a54cd 100644 --- a/hyperactor_mesh/src/alloc/sim.rs +++ b/hyperactor_mesh/src/alloc/sim.rs @@ -11,6 +11,7 @@ #![allow(dead_code)] // until it is used outside of testing use async_trait::async_trait; +use hyperactor::ProcId; use hyperactor::WorldId; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; @@ -61,12 +62,23 @@ pub struct SimAlloc { impl SimAlloc { fn new(spec: AllocSpec) -> Self { - Self { - inner: LocalAlloc::new_with_transport( - spec, - ChannelTransport::Sim(Box::new(ChannelTransport::Unix)), - ), - } + let inner = LocalAlloc::new_with_transport( + spec, + ChannelTransport::Sim(Box::new(ChannelTransport::Unix)), + ); + let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0); + + let ext = inner.extent(); + + hyperactor::simnet::simnet_handle() + .expect("simnet event loop not running") + .register_proc( + client_proc_id.clone(), + ext.point(ext.sizes().iter().map(|_| 0).collect()) + .expect("should be valid point"), + ); + + Self { inner } } /// A chaos monkey that can be used to stop procs at random. pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static { @@ -90,7 +102,13 @@ impl SimAlloc { #[async_trait] impl Alloc for SimAlloc { async fn next(&mut self) -> Option { - self.inner.next().await + let proc_state = self.inner.next().await; + if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state { + hyperactor::simnet::simnet_handle() + .expect("simnet event loop not running") + .register_proc(proc_id.clone(), point.clone()); + } + proc_state } fn extent(&self) -> &Extent { diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 3cc48d743..770c0f1dd 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -16,5 +16,5 @@ use hyperactor::attrs::declare_attrs; // Declare monarch-specific configuration keys declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor - pub attr SHARED_ASYNCIO_RUNTIME: bool = false; + pub attr SHARED_ASYNCIO_RUNTIME: bool = true; } diff --git a/python/monarch/_src/actor/proc_mesh.py b/python/monarch/_src/actor/proc_mesh.py index 9e55e0dbf..968eae410 100644 --- a/python/monarch/_src/actor/proc_mesh.py +++ b/python/monarch/_src/actor/proc_mesh.py @@ -486,8 +486,26 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: return _proc_mesh_from_allocator(allocator=LocalAllocator(), gpus=gpus, hosts=hosts) -def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: - return _proc_mesh_from_allocator(allocator=SimAllocator(), gpus=gpus, hosts=hosts) +def sim_proc_mesh( + *, + gpus: int = 1, + hosts: int = 1, + racks: int = 1, + zones: int = 1, + dcs: int = 1, + regions: int = 1, +) -> ProcMesh: + spec: AllocSpec = AllocSpec( + AllocConstraints(), + hosts=hosts, + gpus=gpus, + racks=racks, + zones=zones, + dcs=dcs, + regions=regions, + ) + alloc = SimAllocator().allocate(spec) + return ProcMesh.from_alloc(alloc, None, True) _BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"