Skip to content

Configurable debounce #854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hyperactor/src/channel/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl MessageDeliveryEvent {

#[async_trait]
impl Event for MessageDeliveryEvent {
async fn handle(&self) -> Result<(), SimNetError> {
async fn handle(&mut self) -> Result<(), SimNetError> {
// Send the message to the correct receiver.
SENDER
.send(
Expand Down
104 changes: 47 additions & 57 deletions hyperactor/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,17 @@ use std::error::Error;
use std::fmt;
use std::sync::LazyLock;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::time::SystemTime;

use async_trait::async_trait;
use futures::pin_mut;
use hyperactor_telemetry::TelemetryClock;
use serde::Deserialize;
use serde::Serialize;

use crate::Mailbox;
use crate::channel::ChannelAddr;
use crate::data::Named;
use crate::id;
use crate::mailbox::DeliveryError;
use crate::mailbox::MailboxSender;
use crate::mailbox::MessageEnvelope;
use crate::mailbox::Undeliverable;
use crate::mailbox::UndeliverableMailboxSender;
use crate::mailbox::monitored_return_handle;
use crate::simnet::SleepEvent;
use crate::simnet::Event;
use crate::simnet::SimNetError;
use crate::simnet::simnet_handle;

struct SimTime {
Expand Down Expand Up @@ -183,6 +175,39 @@ impl ClockKind {
}
}

#[derive(Debug)]
struct SleepEvent {
done_tx: Option<tokio::sync::oneshot::Sender<()>>,
duration_ms: u64,
}

impl SleepEvent {
pub(crate) fn new(done_tx: tokio::sync::oneshot::Sender<()>, duration_ms: u64) -> Box<Self> {
Box::new(Self {
done_tx: Some(done_tx),
duration_ms,
})
}
}

#[async_trait]
impl Event for SleepEvent {
async fn handle(&mut self) -> Result<(), SimNetError> {
if self.done_tx.take().unwrap().send(()).is_err() {
tracing::error!("Failed to send wakeup event");
}
Ok(())
}

fn duration_ms(&self) -> u64 {
self.duration_ms
}

fn summary(&self) -> String {
format!("Sleeping for {} ms", self.duration_ms)
}
}

/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
/// time to the wakeup time and use the transmitter to wake up this green thread
Expand All @@ -192,33 +217,25 @@ pub struct SimClock;
impl Clock for SimClock {
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
async fn sleep(&self, duration: tokio::time::Duration) {
let mailbox = SimClock::mailbox().clone();
let (tx, rx) = mailbox.open_once_port::<()>();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

simnet_handle()
.unwrap()
.send_event(SleepEvent::new(
tx.bind(),
mailbox,
duration.as_millis() as u64,
))
.send_event(SleepEvent::new(tx, duration.as_millis() as u64))
.unwrap();
rx.recv().await.unwrap();

rx.await.unwrap();
}

async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
let mailbox = SimClock::mailbox().clone();
let (tx, rx) = mailbox.open_once_port::<()>();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

simnet_handle()
.unwrap()
.send_nonadvanceable_event(SleepEvent::new(
tx.bind(),
mailbox,
duration.as_millis() as u64,
))
.send_nonadvanceable_event(SleepEvent::new(tx, duration.as_millis() as u64))
.unwrap();
rx.recv().await.unwrap();

rx.await.unwrap();
}

async fn sleep_until(&self, deadline: tokio::time::Instant) {
Expand All @@ -242,23 +259,18 @@ impl Clock for SimClock {
where
F: std::future::Future<Output = T>,
{
let mailbox = SimClock::mailbox().clone();
let (tx, deadline_rx) = mailbox.open_once_port::<()>();
let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>();

simnet_handle()
.unwrap()
.send_event(SleepEvent::new(
tx.bind(),
mailbox,
duration.as_millis() as u64,
))
.send_event(SleepEvent::new(tx, duration.as_millis() as u64))
.unwrap();

let fut = f;
pin_mut!(fut);

tokio::select! {
_ = deadline_rx.recv() => {
_ = deadline_rx => {
Err(TimeoutError)
}
res = &mut fut => Ok(res)
Expand All @@ -267,28 +279,6 @@ impl Clock for SimClock {
}

impl SimClock {
// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
// from upstack and handle undeliverable messages properly.
fn mailbox() -> &'static Mailbox {
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
SIMCLOCK_MAILBOX.get_or_init(|| {
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
let (undeliverable_messages, mut rx) =
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
tokio::spawn(async move {
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
envelope.try_set_error(DeliveryError::BrokenLink(
"message returned to undeliverable port".to_string(),
));
UndeliverableMailboxSender
.post(envelope, /*unused */ monitored_return_handle())
}
});
mailbox
})
}

/// Advance the sumulator's time to the specified instant
pub fn advance_to(&self, millis: u64) {
let mut guard = SIM_TIME.now.lock().unwrap();
Expand Down
67 changes: 18 additions & 49 deletions hyperactor/src/simnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ pub trait Event: Send + Sync + Debug {
/// For a proc spawn, it will be creating the proc object and instantiating it.
/// For any event that manipulates the network (like adding/removing nodes etc.)
/// implement handle_network().
async fn handle(&self) -> Result<(), SimNetError>;
async fn handle(&mut self) -> Result<(), SimNetError>;

/// This is the method that will be called when the simulator fires the event
/// Unless you need to make changes to the network, you do not have to implement this.
/// Only implement handle() method for all non-simnet requirements.
async fn handle_network(&self, _phantom: &SimNet) -> Result<(), SimNetError> {
async fn handle_network(&mut self, _phantom: &SimNet) -> Result<(), SimNetError> {
self.handle().await
}

Expand All @@ -117,11 +117,11 @@ struct NodeJoinEvent {

#[async_trait]
impl Event for NodeJoinEvent {
async fn handle(&self) -> Result<(), SimNetError> {
async fn handle(&mut self) -> Result<(), SimNetError> {
Ok(())
}

async fn handle_network(&self, simnet: &SimNet) -> Result<(), SimNetError> {
async fn handle_network(&mut self, simnet: &SimNet) -> Result<(), SimNetError> {
simnet.bind(self.channel_addr.clone()).await;
self.handle().await
}
Expand All @@ -135,46 +135,6 @@ impl Event for NodeJoinEvent {
}
}

#[derive(Debug)]
pub(crate) struct SleepEvent {
done_tx: OncePortRef<()>,
mailbox: Mailbox,
duration_ms: u64,
}

impl SleepEvent {
pub(crate) fn new(done_tx: OncePortRef<()>, mailbox: Mailbox, duration_ms: u64) -> Box<Self> {
Box::new(Self {
done_tx,
mailbox,
duration_ms,
})
}
}

#[async_trait]
impl Event for SleepEvent {
async fn handle(&self) -> Result<(), SimNetError> {
Ok(())
}

async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> {
self.done_tx
.clone()
.send(&self.mailbox, ())
.map_err(|_err| SimNetError::Closed("TODO".to_string()))?;
Ok(())
}

fn duration_ms(&self) -> u64 {
self.duration_ms
}

fn summary(&self) -> String {
format!("Sleeping for {} ms", self.duration_ms)
}
}

#[derive(Debug)]
/// A pytorch operation
pub struct TorchOpEvent {
Expand All @@ -188,11 +148,11 @@ pub struct TorchOpEvent {

#[async_trait]
impl Event for TorchOpEvent {
async fn handle(&self) -> Result<(), SimNetError> {
async fn handle(&mut self) -> Result<(), SimNetError> {
Ok(())
}

async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> {
async fn handle_network(&mut self, _simnet: &SimNet) -> Result<(), SimNetError> {
self.done_tx
.clone()
.send(&self.mailbox, ())
Expand Down Expand Up @@ -607,14 +567,23 @@ impl SimNet {
let mut training_script_waiting_time: u64 = 0;
// Duration elapsed while only non_advanceable_events has events
let mut debounce_timer: Option<tokio::time::Instant> = None;

let debounce_duration = std::env::var("SIM_DEBOUNCE")
.ok()
.and_then(|val| val.parse::<u64>().ok())
.unwrap_or(1);

'outer: loop {
// Check if we should stop
if stop_signal.load(Ordering::SeqCst) {
break 'outer self.records.clone();
}

while let Ok(Some((event, advanceable, time))) = RealClock
.timeout(tokio::time::Duration::from_millis(1), event_rx.recv())
.timeout(
tokio::time::Duration::from_millis(debounce_duration),
event_rx.recv(),
)
.await
{
let scheduled_event = match time {
Expand Down Expand Up @@ -710,7 +679,7 @@ impl SimNet {
training_script_waiting_time += advanced_time;
}
SimClock.advance_to(scheduled_time);
for scheduled_event in scheduled_events {
for mut scheduled_event in scheduled_events {
self.pending_event_count
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
if scheduled_event.event.handle_network(self).await.is_err() {
Expand Down Expand Up @@ -811,7 +780,7 @@ mod tests {

#[async_trait]
impl Event for MessageDeliveryEvent {
async fn handle(&self) -> Result<(), simnet::SimNetError> {
async fn handle(&mut self) -> Result<(), simnet::SimNetError> {
if let Some(dispatcher) = &self.dispatcher {
dispatcher
.send(
Expand Down
2 changes: 1 addition & 1 deletion monarch_hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ use hyperactor::attrs::declare_attrs;
// Declare monarch-specific configuration keys
declare_attrs! {
/// Use a single asyncio runtime for all Python actors, rather than one per actor
pub attr SHARED_ASYNCIO_RUNTIME: bool = false;
pub attr SHARED_ASYNCIO_RUNTIME: bool = true;
}
Loading