diff --git a/hyperactor/src/channel/sim.rs b/hyperactor/src/channel/sim.rs index 41a41487e..0ade2b644 100644 --- a/hyperactor/src/channel/sim.rs +++ b/hyperactor/src/channel/sim.rs @@ -146,7 +146,7 @@ impl MessageDeliveryEvent { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { // Send the message to the correct receiver. SENDER .send( diff --git a/hyperactor/src/clock.rs b/hyperactor/src/clock.rs index 84c572ecc..ccae72228 100644 --- a/hyperactor/src/clock.rs +++ b/hyperactor/src/clock.rs @@ -12,25 +12,17 @@ use std::error::Error; use std::fmt; use std::sync::LazyLock; use std::sync::Mutex; -use std::sync::OnceLock; use std::time::SystemTime; +use async_trait::async_trait; use futures::pin_mut; use hyperactor_telemetry::TelemetryClock; use serde::Deserialize; use serde::Serialize; -use crate::Mailbox; use crate::channel::ChannelAddr; -use crate::data::Named; -use crate::id; -use crate::mailbox::DeliveryError; -use crate::mailbox::MailboxSender; -use crate::mailbox::MessageEnvelope; -use crate::mailbox::Undeliverable; -use crate::mailbox::UndeliverableMailboxSender; -use crate::mailbox::monitored_return_handle; -use crate::simnet::SleepEvent; +use crate::simnet::Event; +use crate::simnet::SimNetError; use crate::simnet::simnet_handle; struct SimTime { @@ -183,6 +175,39 @@ impl ClockKind { } } +#[derive(Debug)] +struct SleepEvent { + done_tx: Option>, + duration_ms: u64, +} + +impl SleepEvent { + pub(crate) fn new(done_tx: tokio::sync::oneshot::Sender<()>, duration_ms: u64) -> Box { + Box::new(Self { + done_tx: Some(done_tx), + duration_ms, + }) + } +} + +#[async_trait] +impl Event for SleepEvent { + async fn handle(&mut self) -> Result<(), SimNetError> { + if self.done_tx.take().unwrap().send(()).is_err() { + tracing::error!("Failed to send wakeup event"); + } + Ok(()) + } + + fn duration_ms(&self) -> u64 { + self.duration_ms + } + + fn summary(&self) -> String { + format!("Sleeping for {} ms", self.duration_ms) + } +} + /// Clock to be used in simulator runs that allows the simnet to create a scheduled event for. /// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's /// time to the wakeup time and use the transmitter to wake up this green thread @@ -192,33 +217,25 @@ pub struct SimClock; impl Clock for SimClock { /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet async fn sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn non_advancing_sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_nonadvanceable_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_nonadvanceable_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn sleep_until(&self, deadline: tokio::time::Instant) { @@ -242,23 +259,18 @@ impl Clock for SimClock { where F: std::future::Future, { - let mailbox = SimClock::mailbox().clone(); - let (tx, deadline_rx) = mailbox.open_once_port::<()>(); + let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx, duration.as_millis() as u64)) .unwrap(); let fut = f; pin_mut!(fut); tokio::select! { - _ = deadline_rx.recv() => { + _ = deadline_rx => { Err(TimeoutError) } res = &mut fut => Ok(res) @@ -267,28 +279,6 @@ impl Clock for SimClock { } impl SimClock { - // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox - // from upstack and handle undeliverable messages properly. - fn mailbox() -> &'static Mailbox { - static SIMCLOCK_MAILBOX: OnceLock = OnceLock::new(); - SIMCLOCK_MAILBOX.get_or_init(|| { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); - let (undeliverable_messages, mut rx) = - mailbox.open_port::>(); - undeliverable_messages.bind_to(Undeliverable::::port()); - tokio::spawn(async move { - while let Ok(Undeliverable(mut envelope)) = rx.recv().await { - envelope.try_set_error(DeliveryError::BrokenLink( - "message returned to undeliverable port".to_string(), - )); - UndeliverableMailboxSender - .post(envelope, /*unused */ monitored_return_handle()) - } - }); - mailbox - }) - } - /// Advance the sumulator's time to the specified instant pub fn advance_to(&self, millis: u64) { let mut guard = SIM_TIME.now.lock().unwrap(); diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index def1fc9f5..80dbaa818 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -88,12 +88,12 @@ pub trait Event: Send + Sync + Debug { /// For a proc spawn, it will be creating the proc object and instantiating it. /// For any event that manipulates the network (like adding/removing nodes etc.) /// implement handle_network(). - async fn handle(&self) -> Result<(), SimNetError>; + async fn handle(&mut self) -> Result<(), SimNetError>; /// This is the method that will be called when the simulator fires the event /// Unless you need to make changes to the network, you do not have to implement this. /// Only implement handle() method for all non-simnet requirements. - async fn handle_network(&self, _phantom: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _phantom: &SimNet) -> Result<(), SimNetError> { self.handle().await } @@ -117,11 +117,11 @@ struct NodeJoinEvent { #[async_trait] impl Event for NodeJoinEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, simnet: &SimNet) -> Result<(), SimNetError> { simnet.bind(self.channel_addr.clone()).await; self.handle().await } @@ -135,46 +135,6 @@ impl Event for NodeJoinEvent { } } -#[derive(Debug)] -pub(crate) struct SleepEvent { - done_tx: OncePortRef<()>, - mailbox: Mailbox, - duration_ms: u64, -} - -impl SleepEvent { - pub(crate) fn new(done_tx: OncePortRef<()>, mailbox: Mailbox, duration_ms: u64) -> Box { - Box::new(Self { - done_tx, - mailbox, - duration_ms, - }) - } -} - -#[async_trait] -impl Event for SleepEvent { - async fn handle(&self) -> Result<(), SimNetError> { - Ok(()) - } - - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { - self.done_tx - .clone() - .send(&self.mailbox, ()) - .map_err(|_err| SimNetError::Closed("TODO".to_string()))?; - Ok(()) - } - - fn duration_ms(&self) -> u64 { - self.duration_ms - } - - fn summary(&self) -> String { - format!("Sleeping for {} ms", self.duration_ms) - } -} - #[derive(Debug)] /// A pytorch operation pub struct TorchOpEvent { @@ -188,11 +148,11 @@ pub struct TorchOpEvent { #[async_trait] impl Event for TorchOpEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _simnet: &SimNet) -> Result<(), SimNetError> { self.done_tx .clone() .send(&self.mailbox, ()) @@ -607,6 +567,12 @@ impl SimNet { let mut training_script_waiting_time: u64 = 0; // Duration elapsed while only non_advanceable_events has events let mut debounce_timer: Option = None; + + let debounce_duration = std::env::var("SIM_DEBOUNCE") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(1); + 'outer: loop { // Check if we should stop if stop_signal.load(Ordering::SeqCst) { @@ -614,7 +580,10 @@ impl SimNet { } while let Ok(Some((event, advanceable, time))) = RealClock - .timeout(tokio::time::Duration::from_millis(1), event_rx.recv()) + .timeout( + tokio::time::Duration::from_millis(debounce_duration), + event_rx.recv(), + ) .await { let scheduled_event = match time { @@ -710,7 +679,7 @@ impl SimNet { training_script_waiting_time += advanced_time; } SimClock.advance_to(scheduled_time); - for scheduled_event in scheduled_events { + for mut scheduled_event in scheduled_events { self.pending_event_count .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); if scheduled_event.event.handle_network(self).await.is_err() { @@ -811,7 +780,7 @@ mod tests { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), simnet::SimNetError> { + async fn handle(&mut self) -> Result<(), simnet::SimNetError> { if let Some(dispatcher) = &self.dispatcher { dispatcher .send( diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 3cc48d743..770c0f1dd 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -16,5 +16,5 @@ use hyperactor::attrs::declare_attrs; // Declare monarch-specific configuration keys declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor - pub attr SHARED_ASYNCIO_RUNTIME: bool = false; + pub attr SHARED_ASYNCIO_RUNTIME: bool = true; }